深入理解GCD之dispatch_queue

更新于2020-12-13 更新异步执行任务的源码分析

GCD队列是我们在使用GCD中经常接触的技术点,分析dispatch_queue部分的源码能更好得理解多线程时的处理。但是libdispatch的源码相对来说比较复杂,综合考虑下,使用了libdispatch-187.9进行分析。

队列和线程的关系

Concurrent Programming: APIs and Challenges中的一张图片可以很直观地描述GCD与线程之间的关系:GCDandThread@2x

线程和队列并不是一对一的关系,一个线程内可能有多个队列,这些队列可能是串行的或者是并行的,按照同步或者异步的方式工作。

对于主线程和主队列来说,主队列是主线程上的一个串行队列,是系统自动为我们创建的,换言之,主线程是可以执行除主队列之外其他队列的任务。我们可以用下面一段代码进行测试:

1
2
3
4
5
6
7
8
// 测试代码
override func viewDidLoad() {
super.viewDidLoad()

let sQueue = DispatchQueue(label: "sQueue")
// 串行队列同步不会产生新线程,任务在当前线程下执行,因此Thread.current必然是主线程
sQueue.sync { print(Thread.current) }
}

队列的定义

dispatch_queue_s

dispatch_queue_s是队列的结构体,可以说我们在GCD中接触最多的结构体了。GCD中使用了很多的宏,不利于我们理解代码,我们用对应的结构替换掉定义的宏。

为了方便后续的分析,先列出一些函数方便后面的理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct dispatch_queue_s {
// 第一部分:DISPATCH_STRUCT_HEADER(dispatch_queue_s, dispatch_queue_vtable_s)
const struct dispatch_queue_vtable_s *do_vtable; // 该类型的结构体包含了对dispatch_queue_s的操作函数
struct dispatch_queue_s *volatile do_next; //链表的next
unsigned int do_ref_cnt; // 引用计数
unsigned int do_xref_cnt; // 外部引用计数
unsigned int do_suspend_cnt; // 暂停标志,比如延时处理中,在任务到时后,计时器处理将会将该标志位修改,然后唤醒队列调度
struct dispatch_queue_s *do_targetq; // 目标队列,GCD允许我们将一个队列放在另一个队列里执行任务
void *do_ctxt; // 上下文,用来存储线程池相关数据,比如用于线程挂起和唤醒的信号量、线程池尺寸等
void *do_finalizer;

// 第二部分:DISPATCH_QUEUE_HEADER
uint32_t volatile dq_running; // 队列运行的任务数量
uint32_t dq_width; // 最大并发数:主队列/串行队列的最大并发数为1
struct dispatch_object_s *volatile dq_items_tail; // 队列尾结点
struct dispatch_object_s *volatile dq_items_head; // 队列头结点
unsigned long dq_serialnum; // 队列序列号
dispatch_queue_t dq_specific_q; // specific队列

char dq_label[DISPATCH_QUEUE_MIN_LABEL_SIZE]; // 队列名,队列名要少于64个字符
char _dq_pad[DISPATCH_QUEUE_CACHELINE_PAD]; // for static queues only
};

dispatch_queue_vtable_s

在GCD队列中,dispatch_queue_vtable_s这个结构体内包含了dispatch_object_s的操作函数,而且针对这些操作函数,定义了相对简短的宏,方便调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
// dispatch_queue_vtable_s结构体,声明了一些函数用于操作dispatch_queue_s结构体
struct dispatch_queue_vtable_s {
// DISPATCH_VTABLE_HEADER(dispatch_queue_s);
unsigned long const do_type;
const char *const do_kind;
size_t (*const do_debug)(struct dispatch_queue_s *, char *, size_t);
// 唤醒队列的方法,全局队列和主队列此项为NULL
struct dispatch_queue_s *(*const do_invoke)(struct dispatch_queue_s);
// 用于检测传入对象中的一些值是否满足条件
bool (*const do_probe)(struct dispatch_queue_s *);
// 销毁队列的方法,通常内部会调用这个对象的finalizer函数
void (*const do_dispose)(struct dispatch_queue_s *)
};

在queue.c中定义三个关于dispatch_queue_vtable_s的静态常量,分别是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 用于主队列和自定义队列
const struct dispatch_queue_vtable_s _dispatch_queue_vtable = {
.do_type = DISPATCH_QUEUE_TYPE,
.do_kind = "queue",
.do_dispose = _dispatch_queue_dispose,
.do_invoke = NULL,
.do_probe = (void *)dummy_function_r0,
.do_debug = dispatch_queue_debug,
};

// 用于全局队列
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};

// 用于管理队列
static const struct dispatch_queue_vtable_s _dispatch_queue_mgr_vtable = {
.do_type = DISPATCH_QUEUE_MGR_TYPE,
.do_kind = "mgr-queue",
.do_invoke = _dispatch_mgr_thread,
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_mgr_wakeup,
};

队列的类型

队列的类型可以分为主队列管理队列自定义队列全局队列4种类型。

主队列

使用dispatch_get_main_queue()可获取主队列,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define dispatch_get_main_queue() (&_dispatch_main_q)

struct dispatch_queue_s _dispatch_main_q = {
#if !DISPATCH_USE_RESOLVERS
.do_vtable = &_dispatch_queue_vtable,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], // root queue中的其中一个
#endif
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.dq_label = "com.apple.main-thread",
.dq_running = 1,
.dq_width = 1, // 说明主队列是一个串行队列
.dq_serialnum = 1, // 主队列序列号
};

do_vtable

主队列的do_vtable_dispatch_queue_vtable

do_targetq

do_targetq即目标队列,关于目标队列的意义,在分析全局队列的do_targetq中会给一个比较具体的总结。

主队列的目标队列定义:

1
2
3
4
5
6
7
8
9
10
11
12
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,
}

do_ref_cnt、do_xref_cnt

do_ref_cntdo_xref_cnt是引用计数,它们和GCD对象的内存管理相关。主队列的这两个值为DISPATCH_OBJECT_GLOBAL_REFCNT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void dispatch_retain(dispatch_object_t dou) {
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// ...
}

void _dispatch_retain(dispatch_object_t dou) {
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// ...
}

void dispatch_release(dispatch_object_t dou) {
if (slowpath(dou._do->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return;
}
// 调用_dispatch_release函数;
// ...
}

void _dispatch_release(dispatch_object_t dou) {
if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
return; // global object
}
// 调用dx_dispose宏即调用do_dispose
// ...
}

从上面这几个函数可以看出:

  • 主队列的生命周期是伴随着应用的,不会受retain和release的影响。
  • do_ref_cntdo_xref_cnt这两个值同时为0的时候,对象才会被释放。

管理队列

管理队列是GCD的内部队列,不对外公开,这个队列应该是用来扮演管理的角色,GCD定时器就用到了管理队列。

1
2
3
4
5
6
7
8
9
10
11
struct dispatch_queue_s _dispatch_mgr_q = {
.do_vtable = &_dispatch_queue_mgr_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.libdispatch-manager",
.dq_width = 1,
.dq_serialnum = 2, // 管理队列序列号
};

do_vtable

管理队列的do_vtable_dispatch_queue_mgr_vtable

do_targetq

管理队列的目标队列:

1
2
3
4
5
6
7
8
9
10
11
12
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,
}

do_ref_cnt、do_xref_cnt

管理队列的这两个值为DISPATCH_OBJECT_GLOBAL_REFCNT,所以和主队列的生命周期应该是一样的。

自定义队列

使用dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)创建一个自定义的队列。它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) {
dispatch_queue_t dq;
size_t label_len;

if (!label) {
label = "";
}

label_len = strlen(label);
if (label_len < (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1)) {
label_len = (DISPATCH_QUEUE_MIN_LABEL_SIZE - 1);
}

// XXX switch to malloc()
dq = calloc(1ul, sizeof(struct dispatch_queue_s) -
DISPATCH_QUEUE_MIN_LABEL_SIZE - DISPATCH_QUEUE_CACHELINE_PAD +
label_len + 1);
if (slowpath(!dq)) {
return dq;
}

// _dispatch_queue_init(dq);
// 队列初始化展开如下
dq->do_vtable = &_dispatch_queue_vtable;
dq->do_next = DISPATCH_OBJECT_LISTLESS;
dq->do_ref_cnt = 1;
dq->do_xref_cnt = 1;
// Default target queue is overcommit!
// 使用的目标队列:_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY]
dq->do_targetq = _dispatch_get_root_queue(0, true);
dq->dq_running = 0;
dq->dq_width = 1;
dq->dq_serialnum = dispatch_atomic_inc(&_dispatch_queue_serial_numbers) - 1;

strcpy(dq->dq_label, label);

if (fastpath(!attr)) {
return dq;
}
// 如果是并发队列,设置最大并发数,UINT32_MAX可以看成不限制最大并发数
if (fastpath(attr == DISPATCH_QUEUE_CONCURRENT)) {
dq->dq_width = UINT32_MAX;
// 设置目标队列,对于并发队列_dispatch_get_root_queue函数中的overcommit传的是false,获取的值: _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY]
dq->do_targetq = _dispatch_get_root_queue(0, false);
} else {
dispatch_debug_assert(!attr, "Invalid attribute");
}
return dq;
}

slowpath(x)、fastpath(x)

1
2
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l)) // ~0l就是1
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))

fastpath(x)表示x的值极大概率为1,即多数情况下会发生。slowpath(x)表示x的值极大概率为0,即多数情况下不会发生。
__builtin_expect来帮助程序员处理分支预测,优化程序,这个函数的语义是:我期望表达式的值等于常量C,编译器应当根据我提供的期望值进行优化。

do_vtable

与主队列一样,自定义队列的do_vtable也是_dispatch_queue_vtable

do_targetq

自定义队列的目标队列有两种:

  1. 如果是串行队列,则使用_dispatch_get_root_queue(0, true)函数获取目标队列,获取到的目标队列是_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY]
  2. 如果是并发队列,则使用_dispatch_get_root_queue(0, false)函数获取目标队列,获取到的目标队列是_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY]

_dispatch_get_root_queue(long priority, bool overcommit)函数的overcommit参数代表队列在执行block时,无论系统多忙都会新开一个线程,这样做的目的是不会造成某个线程过载。

dq_serialnum

dq_serialnum是在_dispatch_queue_serial_numbers基础上进行原子操作加1,即从12开始累加。1到11被保留的序列号定义如下:

1
2
3
4
5
6
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - _unused_
// 4,5,6,7,8,9,10,11 - global queues
// we use 'xadd' on Intel, so the initial value == next assigned

其中1用于主队列,2用于管理队列,3暂时没有被使用,4~11是用于全局队列的。由于看的源码版本比较老了,后面苹果有新增了几个队列。

全局队列

上面说了很多全局队列,现在我们来看一下全局队列是如何定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
dispatch_queue_t dispatch_get_global_queue(long priority, unsigned long flags) {
if (flags & ~DISPATCH_QUEUE_OVERCOMMIT) {
return NULL;
}
return _dispatch_get_root_queue(priority, flags & DISPATCH_QUEUE_OVERCOMMIT);
}

static inline dispatch_queue_t _dispatch_get_root_queue(long priority, bool overcommit) {
if (overcommit) switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY];
}

switch (priority) {
case DISPATCH_QUEUE_PRIORITY_LOW:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_DEFAULT:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_HIGH:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY];
case DISPATCH_QUEUE_PRIORITY_BACKGROUND:
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY];
default:
return NULL;
}
}

struct dispatch_queue_s _dispatch_root_queues[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],

.dq_label = "com.apple.root.low-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 4,
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],

.dq_label = "com.apple.root.low-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 5,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],

.dq_label = "com.apple.root.default-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 6,
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],

.dq_label = "com.apple.root.default-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 7,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],

.dq_label = "com.apple.root.high-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 8,
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],

.dq_label = "com.apple.root.high-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 9,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],

.dq_label = "com.apple.root.background-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 10,
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
.do_vtable = &_dispatch_queue_root_vtable,
.do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT,
.do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK,
.do_ctxt = &_dispatch_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],

.dq_label = "com.apple.root.background-overcommit-priority",
.dq_running = 2,
.dq_width = UINT32_MAX,
.dq_serialnum = 11,
},
};

do_vtable

全局队列的do_vtable_dispatch_queue_root_vtable。前面提到_dispatch_queue_root_vtable的检测函数(do_probe)为_dispatch_queue_wakeup_global,这个函数用来唤醒全局队列,具体的后面分析队列唤醒的时候再讲。

do_targetq

无论是主队列、管理队列还是自定义队列,它们都使用了全局队列(就是从root queue中获取的)作为目标队列,但是全局队列并没有设置do_targetq

Concurrent Programming: APIs and Challenges提到:

While custom queues are a powerful abstraction, all blocks you schedule on them will ultimately trickle down to one of the system’s global queues and its thread pool(s).

虽然自定义队列是一个强大的抽象,但你在队列上安排的所有Block最终都会落到系统的某一个全局队列及其线程池中。那也就是说GCD用到的queue,无论是自定义队列,或是获取系统的主队列、全局队列、管理队列,其最终都是落脚于GCD root queue中。GCD管理的也不过这些root queue。

do_ref_cnt、do_xref_cnt

管理队列的这两个值为DISPATCH_OBJECT_GLOBAL_REFCNT,所以和主队列的生命周期应该是一样的。

do_ctxt

全局队列中有一个上下文的属性,用来存储线程池相关数据,比如用于线程挂起和唤醒的信号量、线程池尺寸等。它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_thread_mediator = &_dispatch_thread_mediator[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY],
.dgq_thread_pool_size = MAX_THREAD_COUNT,
#endif
},
};

队列的同步:dispatch_sync分析

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 串行队列
let sQueue = DispatchQueue(label: "sQueue")
print(1)
sQueue.sync { print("\(2):\(Thread.current)") }
print(3)
sQueue.sync { print("\(4):\(Thread.current)") }
print(5)

// 并行队列
let cQueue = DispatchQueue(label: "cQueue", attributes: [.concurrent])
print(1)
cQueue.sync { print("\(2):\(Thread.current)") }
print(3)
cQueue.sync { print("\(4):\(Thread.current)") }
print(5)

执行结果

1
2
3
4
5
6
7
8
9
10
1
2:<NSThread: 0x600002478980>{number = 1, name = main}
3
4:<NSThread: 0x600002478980>{number = 1, name = main}
5
1
2:<NSThread: 0x600002478980>{number = 1, name = main}
3
4:<NSThread: 0x600002478980>{number = 1, name = main}
5

虽然省略主队列和全局队列的测试,但是结果是一样的。队列同步执行任务的过程,是不会开辟新的线程,所有任务在当前线程中执行,且会阻塞线程。

入口函数:dispatch_sync

dispatch_sync的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void dispatch_sync(dispatch_queue_t dq, void (^work)(void)) {
// DISPATCH_COCOA_COMPAT是Mac OS下才会走的
#if DISPATCH_COCOA_COMPAT
// 是否是主队列
if (slowpath(dq == &_dispatch_main_q)) {
// 内部也是执行dispatch_sync_f函数
return _dispatch_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}
#endif

_dispatch_sync_slow函数内部也是执行dispatch_sync_f函数,所以dispatch_sync的调用本质即dispatch_sync_f函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 串行队列包括主队列
if (fastpath(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}
// 全局队列,全局队列是没有do_targetq的,主队列/管理队列/自定义队列都有
if (slowpath(!dq->do_targetq)) {
// the global root queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
// 其他队列
_dispatch_sync_f2(dq, ctxt, func);
}

同步执行任务的时候分成了三种情况:

  1. 如果是串行队列,执行dispatch_barrier_sync_f即栅栏同步函数;
  2. 如果是全局队列,执行_dispatch_sync_f_invoke
  3. 如果是其他队列,执行_dispatch_sync_f2

重点函数:dispatch_barrier_sync_f

在分析dispatch_barrier_sync_f这个函数前,我们看一下dispatch_barrier_sync函数即同步栅栏函数。它的实现如下:

1
2
3
4
5
6
7
8
9
10
void dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void)) {
#if DISPATCH_COCOA_COMPAT
if (slowpath(dq == &_dispatch_main_q)) {
// 内部调用dispatch_barrier_sync_f函数
return _dispatch_barrier_sync_slow(dq, work);
}
#endif
struct Block_basic *bb = (void *)work;
dispatch_barrier_sync_f(dq, work, (dispatch_function_t)bb->Block_invoke);
}

它的底层也是调用dispatch_barrier_sync_f函数。如果是串行队列压入同步任务,那么当前任务就必须等待前面的任务执行完成后才能执行,源代码就会调用dispatch_barrier_sync_f函数完成上面的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
DISPATCH_NOINLINE
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
// 1) 确保此线程在此调用之前没有入队
// 2) 队列未挂起

// 第1步:如果串行队列中存在其他任务或者队列被挂起,进入_dispatch_barrier_sync_f_slow,
// 等待这个队列中的其他任务完成(用信号量的方式通知),然后执行这个任务。
// 多数情况下不会发生
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}

// 第2步:检查队列的dq_running状态,如果没有运行,进入_dispatch_barrier_sync_f_slow,等待激活。
// bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
// 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回true
// dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1)相当于dq->dq_running为0的时候将
// dq->dq_running设置为1,并返回true
if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
// global queues and main queue bound to main thread always falls into
// the slow case
// 全局队列和绑定到主线程的主队列始终属于慢速情况即会进入_dispatch_barrier_sync_f_slow函数
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}

// 第3步:有多重队列,寻找真正的目标队列,其实还是回到了dispatch_sync_f方法
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
}

// 第4步:队列无任务执行,调用_dispatch_barrier_sync_f_invoke执行任务。
// 内部调用_dispatch_function_invoke去执行任务
_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}

这里涉及到三个函数:

  1. _dispatch_barrier_sync_f_slow函数内部使用了线程对应的信号量并且调用wait 方法
  2. _dispatch_barrier_sync_f_recurse函数内部调用了dispatch_sync_f函数,还是在寻找真正的目标队列
  3. 如果队列无任务执行,调用_dispatch_barrier_sync_f_invoke执行任务。执行任务的时候会调用_dispatch_function_invoke函数。

_dispatch_barrier_sync_f_invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DISPATCH_NOINLINE
static void
_dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// _dispatch_function_invoke的实现
// 将当前线程的dispatch_queue_key设置为dq,然后执行任务,
// 执行完之后再恢复到之前的old_dq
dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);

// 如果队列中存在其他任务,用信号量的方法唤醒,然后继续执行下一个任务
if (slowpath(dq->dq_items_tail)) {
return _dispatch_barrier_sync_f2(dq);
}

// dispatch_atomic_dec2o这个宏,会调用GCC内置的函数 __sync_sub_and_fetch,实现减法的原子性操作。因此这一行的意思是将dq_running的值减1,然后判断是否与0相等。
// _dispatch_wakeup为唤醒队列函数
if (slowpath(dispatch_atomic_dec2o(dq, dq_running) == 0)) {
_dispatch_wakeup(dq);
}
}

GCD死锁

看了上面的代码注释后,我们来想一下死锁是怎么产生的?先看下示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#import "DeadLock.h"

@implementation DeadLock

- (instancetype)init {
if (self = [super init]) {
// [self _mianQueueDeadLock];
[self _serialQueueDeadLock];
}

return self;
}

#pragma mark - Private

- (void)_mianQueueDeadLock {
dispatch_sync(dispatch_get_main_queue(), ^(void){
NSLog(@"这里死锁了");
});
}

- (void)_serialQueueDeadLock {
dispatch_queue_t queue1 = dispatch_queue_create("1serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("2serialQueue", DISPATCH_QUEUE_SERIAL);

dispatch_sync(queue1, ^{
NSLog(@"11111");

dispatch_sync(queue1, ^{
// 如果使用queue2就不会发生死锁,使用queue1就会死锁
NSLog(@"22222");
});
});
}

@end

_serialQueueDeadLock为例:当第一次执行串行队列任务的时候,跳到第4步,直接开始执行任务,在运行第二个dispatch_sync时候,在任务里面通过执行第1步(队列在运行)向这个同步队列中压入信号量,然后等待信号量,进入死锁。
_mianQueueDeadLock为例:主队列则会跳转到第2步进入死锁。

_dispatch_sync_f_invoke

如果当前队列是全局队列的话,就会调用_dispatch_sync_f_invoke函数。

1
2
3
4
5
6
7
8
9
static void
_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 执行任务
_dispatch_function_invoke(dq, ctxt, func);
// dq->dq_running减2后判断是否等于0,是就唤醒队列
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2) == 0)) {
_dispatch_wakeup(dq);
}
}

这个函数的作用:通过_dispatch_function_invoke函数执行传入的任务,然后根据dq_running检测任务队列有没有激活,没有激活就执行激活函数。关于激活函数_dispatch_wakeup(dq)放在队列的异步中讲解。

重点函数:_dispatch_sync_f2

根据前面讲到的,如果是其他队列,执行_dispatch_sync_f2。这个其他队列我们可以认为就是自定义的并行队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended

// 第1步:队列中有其他任务或者队列被挂起,压入信号量开始等待
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_sync_f_slow(dq, ctxt, func);
}
// 第2步:队列没有激活,激活队列后执行任务,最终还是调用了_dispatch_sync_f_slow函数,只是多了一个_dispatch_wakeup函数
if (slowpath(dispatch_atomic_add2o(dq, dq_running, 2) & 1)) {
return _dispatch_sync_f_slow2(dq, ctxt, func);
}
// 第3步:队列有多重队列,寻找真正的目标队列
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_sync_f_recurse(dq, ctxt, func);
}
// 第4步:队列无任务执行,调用_dispatch_sync_f_invoke执行任务。
// 内部调用_dispatch_function_invoke去执行任务
_dispatch_sync_f_invoke(dq, ctxt, func);
}

这里涉及到三个函数:

  1. _dispatch_sync_f_slow函数内部使用了线程对应的信号量并且调用wait方法。
  2. _dispatch_sync_f_recurse函数内部调用了dispatch_sync_f函数,还是在寻找真正的目标队列。
  3. 如果队列无任务执行,调用_dispatch_sync_f_invoke执行任务。执行任务的时候会调用_dispatch_function_invoke函数。

通过上面的代码,队列的同步执行是顺序执行的。这种顺序执行跟操作队列是串行还是并发是没有关系的。这些操作按着FIFO的方式进入队列中,每一个操作都会被等待执行直到前一个操作完成8,造成了这种顺序执行的现象。

现在我们整理一下队列同步执行的流程,如下图:

gcd_queue_synchronization

队列的异步:dispatch_async分析

测试代码

串行队列测试

1
2
3
4
5
6
let sQueue = DispatchQueue(label: "sQueue")
print(1)
sQueue.async { print("\(2):\(Thread.current)") }
print(3)
sQueue.async { print("\(4):\(Thread.current)") }
print(5)

执行结果

1
2
3
4
5
1
3
5
2:<NSThread: 0x600000b884c0>{number = 4, name = (null)}
4:<NSThread: 0x600000b884c0>{number = 4, name = (null)}

并发队列测试

1
2
3
4
5
6
let cQueue = DispatchQueue(label: "cQueue", attributes: [.concurrent])
print(1)
cQueue.async { print("\(2):\(Thread.current)") }
print(3)
cQueue.async { print("\(4):\(Thread.current)") }
print(5)

执行结果

1
2
3
4
5
1
3
5
4:<NSThread: 0x600002bc69c0>{number = 6, name = (null)}
2:<NSThread: 0x600002bc84c0>{number = 5, name = (null)}

通过上面的测试代码我们可以知道:

  1. 队列异步执行任务的过程中,具备开辟新线程的能力。
  2. 非主队列的串行队列,会开辟一个新的线程,不会阻塞当前线程,所有任务有序执行。
  3. 并发队列会开辟多个线程,具体线程的个数有体统决定。所有任务是无序执行的。

入口函数:dispatch_async

dispatch_async的源码如下:

1
2
3
void dispatch_async(dispatch_queue_t dq, void (^work)(void)) {
dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
}

dispatch_async主要将block从栈copy到堆上,或者增加引用计数,保证block在执行之前不会被销毁,另外_dispatch_call_block_and_release用于销毁block。然后调用dispatch_async_f

dispatch_async_f函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;

// No fastpath/slowpath hint because we simply don't know
// 串行队列,执行dispatch_barrier_async_f,其实最后还是执行任务入队的操作
if (dq->dq_width == 1) {
return dispatch_barrier_async_f(dq, ctxt, func);
}

// 从线程私有数据中获取一个dispatch_continuation_t的结构体
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_async_f_slow(dq, ctxt, func);
}

dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;

// No fastpath/slowpath hint because we simply don't know
// 有目标队列,调用_dispatch_async_f2函数进行转发。
if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}

// 全局队列直接进行入队操作
_dispatch_queue_push(dq, dc);
}

从上面的源代码中我们可以看出dispatch_async_f大致分为三种情况:

  1. 如果是串行队列,调用dispatch_barrier_async_f函数;
  2. 其他队列且有目标队列,调用_dispatch_async_f2函数;
  3. 如果是全局队列的话,直接调用_dispatch_queue_push函数进行入队操作。

由于队列的异步执行任务的过程比较复杂,我们用一张图描述一下dispatch_async_f这个函数执行过程:

dispatch_async_f函数

虽然上面分三种情况,它们最后执行都是_dispatch_queue_push_dispatch_async_f2函数。另外_dispatch_async_f2函数其实也是在进行入队的操作。所以dispatch_async_f的本质就是执行_dispatch_queue_push函数来任务入队。

dispatch_continuation_t结构体

在看上述过程的源码时会涉及到dispatch_continuation_t这样的结构体,这个结构体的作用就是封装我们传入的异步block的任务。以dispatch_barrier_async_f函数的实现为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;
// 从线程私有数据中获取一个dispatch_continuation_t的结构体。
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
// _dispatch_barrier_async_f_slow内部也是在进行入队操作
return _dispatch_barrier_async_f_slow(dq, ctxt, func);
}

// DISPATCH_OBJ_BARRIER_BIT,用于阻塞标识
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
// 将_dispatch_call_block_and_release作为func方法
dc->dc_func = func;
// 将传入的block作为上下文
dc->dc_ctxt = ctxt;
// 入队操作
_dispatch_queue_push(dq, dc);
}

另外还需要注意下dispatch_continuation_tdo_vtable的赋值情况。

1
2
3
4
5
// 串行队列异步或者使用dispatch_barrier_async函数会有一个DISPATCH_OBJ_BARRIER_BIT的barrier标记
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);

// not barrier
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;

libdispatch全部标识符有四种:

1
2
3
4
#define DISPATCH_OBJ_ASYNC_BIT		0x1		//异步
#define DISPATCH_OBJ_BARRIER_BIT 0x2 //阻塞
#define DISPATCH_OBJ_GROUP_BIT 0x4 //组
#define DISPATCH_OBJ_SYNC_SLOW_BIT 0x8 //同步慢

从上面我们可以知道串行队列异步执行任务的时候,通过DISPATCH_OBJ_BARRIER_BIT这个标识符实现阻塞等待的。

任务入队:_dispatch_queue_push

_dispatch_queue_push是一个宏定义,它最后会变成执行_dispatch_queue_push_list函数。

1
2
3
#define _dispatch_queue_push(x, y) _dispatch_queue_push_list((x), (y), (y))

#define _dispatch_queue_push_list _dispatch_trace_queue_push_list

_dispatch_trace_queue_push_list

1
2
3
4
5
6
7
8
9
10
11
12
void _dispatch_trace_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head, dispatch_object_t _tail) {
// 是否可以入队
if (slowpath(DISPATCH_QUEUE_PUSH_ENABLED())) {
struct dispatch_object_s *dou = _head._do;
do {
// 主要是对dispatch_continuation_s结构体的处理,确保后面的使用。
_dispatch_trace_continuation(dq, dou, DISPATCH_QUEUE_PUSH);
} while (dou != _tail._do && (dou = dou->do_next));
}

_dispatch_queue_push_list(dq, _head, _tail);
}

_dispatch_queue_push_list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void _dispatch_queue_push_list(dispatch_queue_t dq, dispatch_object_t _head, dispatch_object_t _tail) {
struct dispatch_object_s *prev, *head = _head._do, *tail = _tail._do;

tail->do_next = NULL;
dispatch_atomic_store_barrier();
// dispatch_atomic_xchg2o实质是调用((typeof(*(p)))__sync_swap((p), (n))),它的定义是将p设为n并返回p操作之前的值。
// dispatch_atomic_xchg2o(dq, dq_items_tail, tail)相当于dq->dq_items_tail = tail,重新设置了队列的尾指针
prev = fastpath(dispatch_atomic_xchg2o(dq, dq_items_tail, tail));
if (prev) {
// if we crash here with a value less than 0x1000, then we are at a
// known bug in client code for example, see _dispatch_queue_dispose
// or _dispatch_atfork_child
// prev是原先的队尾,如果队列中有其他的元素,就将压入的对象加在队列的尾部。
prev->do_next = head;
} else {
// 如果队列为空
_dispatch_queue_push_list_slow(dq, head);
}
}

_dispatch_queue_push_list_slow

1
2
3
4
5
6
7
8
9
_dispatch_queue_push_list_slow(dispatch_queue_t dq,
struct dispatch_object_s *obj)
{
//dq->dq_items_head设置为dc,然后唤醒这个队列。因为此时队列为空,没有任务在执行,处于休眠状态,所以需要唤醒
_dispatch_retain(dq);
dq->dq_items_head = obj;
_dispatch_wakeup(dq);
_dispatch_release(dq);
}

通过对异步任务入队的分析,我们可以知道,入队只是将任务一个一个以FIFO的顺序添加到队列中,那就是需要一个时间点去执行这些任务。

唤醒队列:_dispatch_wakeup

无论是同步还是异步中都调用了_dispatch_wakeup这个函数,这个函数的作用就是唤醒当前队列。

_dispatch_wakeup的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
dispatch_queue_t _dispatch_wakeup(dispatch_object_t dou) {
dispatch_queue_t tq;
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}

if (!dx_probe(dou._do) && !dou._dq->dq_items_tail) {
return NULL;
}

// 如果dou._do->do_suspend_cnt == 0,返回YES,否则返回NO;
// 同时将DISPATCH_OBJECT_SUSPEND_LOCK赋值给dou._do->do_suspend_cnt
if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0, DISPATCH_OBJECT_SUSPEND_LOCK)) {
#if DISPATCH_COCOA_COMPAT
// 主队列的任务调用_dispatch_queue_wakeup_main唤醒主队列
if (dou._dq == &_dispatch_main_q) {
_dispatch_queue_wakeup_main();
}
#endif
return NULL;
}

// 放到目标队列中,重新走_dispatch_queue_push方法
_dispatch_retain(dou._do);
tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);

return tq;
}

上面的代码中我们只看到了主队列和其他自定义队列的操作情况,但是没有全局队列的操作的情况,关于全局队列的唤醒的比较隐晦,针对全局队列的dx_probe(dou._do)的调用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define dx_probe(x) (x)->do_vtable->do_probe(x)

// dx_probe(dou._do) 相当于 (dou.do)->do_vtable->do_probe(dou.do)

// 全局队列的do_vtable:_dispatch_queue_root_vtable

// _dispatch_queue_root_vtable的定义
static const struct dispatch_queue_vtable_s _dispatch_queue_root_vtable = {
.do_type = DISPATCH_QUEUE_GLOBAL_TYPE,
.do_kind = "global-queue",
.do_debug = dispatch_queue_debug,
.do_probe = _dispatch_queue_wakeup_global,
};

// 全局队列:
// globalQueue -> _dispatch_queue_root_vtable -> _dispatch_queue_wakeup_global

从上面的代码可以看出_dispatch_wakeup分为四种情况:

  1. 主队列调用_dispatch_queue_wakeup_main
  2. 全局队列调用_dispatch_queue_wakeup_global
  3. 其他队列向目标队列压入这个队列,继续做入队操作;
  4. 管理队列调用_dispatch_mgr_wakeup,这里主要是为了dispatch_source而服务的。

_dispatch_queue_wakeup_main

_dispatch_main_queue_wakeup函数来唤醒主线程的Runloop,之前在《重拾RunLoop原理》中提到:

使用GCD异步操作的时候,我们在一个子线程处理完一些事情后,要返回主线程处理事情的时候,这时候需要依赖于RunLoop。

之前是控制台打印验证,现在我们在源码中亲自验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void _dispatch_queue_wakeup_main(void) {
kern_return_t kr;
// 主要看_dispatch_main_q_port_init的实现
dispatch_once_f(&_dispatch_main_q_port_pred, NULL,
_dispatch_main_q_port_init);
// 关于主线程的唤醒主要靠mach_port和在runloop中注册相对应的source1
kr = _dispatch_send_wakeup_main_thread(main_q_port, 0);

switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}

_dispatch_safe_fork = false;
}

// _dispatch_main_q_port_init的实现,RunLoop的唤醒需要依赖于mach port
void _dispatch_main_q_port_init(void *ctxt DISPATCH_UNUSED) {
kern_return_t kr;

kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
&main_q_port);
DISPATCH_VERIFY_MIG(kr);
(void)dispatch_assume_zero(kr);
kr = mach_port_insert_right(mach_task_self(), main_q_port, main_q_port,
MACH_MSG_TYPE_MAKE_SEND);
DISPATCH_VERIFY_MIG(kr);
(void)dispatch_assume_zero(kr);

_dispatch_program_is_probably_callback_driven = true;
_dispatch_safe_fork = false;
}

_dispatch_queue_wakeup_global

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
static bool
_dispatch_queue_wakeup_global(dispatch_queue_t dq) {
static dispatch_once_t pred;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
int r;

if (!dq->dq_items_tail) return false;

_dispatch_safe_fork = false;

dispatch_debug_queue(dq, __PRETTY_FUNCTION__);

// 上下文以及根队列的初始化,根队列内部会初始化线程池
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);

// _dispatch_queue_wakeup_global支持两种实现的任务唤醒pthread_workqueue和thread pool
// 1.支持pthread_workqueue
#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_ENABLE_THREAD_POOL
// 如果队列的dgq_kworkqueue存在,则调用pthread_workqueue_additem_np函数
// dgq_kworkqueue是一个用于创建内核线程的接口,通过它创建的内核线程来执行内核其他模块排列到队列里的工作。
// 不同优先级的dispatch queue对应着对应优先级的workqueue。
// _dispatch_root_queues_init初始化的时候,使用pthread_workqueue_create_np创建pthread_workqueue
if (qc->dgq_kworkqueue)
#endif
{
if (dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, 1)) {
pthread_workitem_handle_t wh;
unsigned int gen_cnt;
_dispatch_debug("requesting new worker thread");
// 该函数使用workq_kernreturn系统调用,通知workqueue增加应当执行的项目。
// 根据该通知,XNU内核基于系统状态判断是否要生成线程,如果是overcommit优先级的队列,workqueue则始终生成线程。
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
(void)dispatch_assume_zero(r);
} else {
_dispatch_debug("work thread request still pending on global "
"queue: %p", dq);
}
goto out;
}
#endif // HAVE_PTHREAD_WORKQUEUES
// 2. 支持thread pool
#if DISPATCH_ENABLE_THREAD_POOL
// 通过发送一个信号量使线程保活
if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
goto out;
}

// 计算线程池可用长度,如果线程池已满则跳转到out即return false,否则执行线程池-1操作
pthread_t pthr;
int t_count;
do {
t_count = qc->dgq_thread_pool_size;
if (!t_count) {
_dispatch_debug("The thread pool is full: %p", dq);
goto out;
}
} while (!dispatch_atomic_cmpxchg2o(qc, dgq_thread_pool_size, t_count, t_count - 1));
// qc->dgq_thread_pool_size的值与t_counts是否相等,是就减1,并返回ture

// 这里说明线程池不够用了,使用pthread创建一个线程,
// 并执行_dispatch_worker_thread,_dispatch_worker_thread最终会调用到_dispatch_worker_thread2
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
sleep(1);
}
// 保证pthr能够被回收
r = pthread_detach(pthr);
(void)dispatch_assume_zero(r);
#endif // DISPATCH_ENABLE_THREAD_POOL

out:
return false;
}

队列的任务调度

主队列的任务调度:_dispatch_main_queue_callback_4CF

通过上面的源码分析,我们知道主队列在唤醒过程中会调用_dispatch_send_wakeup_main_thread函数,但是该函数的实现并没有开源这个函数的相关实现,似乎我们无法看出主队列的任务调度。通过打印函数调用栈我们可以看到主队列的任务调度是依赖_dispatch_main_queue_callback_4CF这个函数。

__CFRUNLOOP_IS_SERVICING_THE_MAIN_DISPATCH_QUEUE__之后调用_dispatch_main_queue_callback_4CF这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
// 处理主队列任务
void _dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED) {
if (main_q_is_draining) {
return;
}
// 正在处理任务,设置状态为true
_dispatch_queue_set_mainq_drain_state(true);
// 调度处理任务
_dispatch_main_queue_drain();
// 处理任务完成任务,恢复状态为false
_dispatch_queue_set_mainq_drain_state(false);
}

主队列是一个串行队列,按顺序执行,因此没有并发的逻辑。主队列的任务调度就是顺序遍历,主队列唤起本次需要执行的dc,并进行任务执行,对于之后入队的任务,将放在下一轮的主队列唤醒中执行。这也是_dispatch_main_queue_drain函数的大致实现。

全局队列的任务调度:_dispatch_worker_thread2

全局队列通过_dispatch_queue_wakeup_global函数,将任务入队。然后调用 _dispatch_worker_thread2函数处理对应queue中的任务。

_dispatch_worker_thread2的实现中有两个函数比较重要

  1. _dispatch_queue_concurrent_drain_one函数;
  2. _dispatch_continuation_pop函数;
_dispatch_queue_concurrent_drain_one

_dispatch_queue_concurrent_drain_one函数主要处理了以下几件事情:

  • 多线程竞争下的边界处理;
  • 获取出队dc
  • 再次唤醒全局队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static struct dispatch_object_s *
_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq) {
struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul;

// The mediator value acts both as a "lock" and a signal
head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator);

// 1. 检查队列是否为空,是返回NULL
if (slowpath(head == NULL)) {
(void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL);
_dispatch_debug("no work on global work queue");
return NULL;
}

if (slowpath(head == mediator)) {
// 该线程在现线程竞争中失去了对队列的拥有权,这意味着libdispatch的效率很糟糕,
// 这种情况意味着在线程池中有太多的线程,这个时候应该创建一个pengding线程,
// 然后退出该线程,内核会在负载减弱的时候创建一个新的线程
_dispatch_debug("Contention on queue: %p", dq);
_dispatch_queue_wakeup_global(dq);
#if DISPATCH_PERF_MON
dispatch_atomic_inc(&_dispatch_bad_ratio);
#endif
return NULL;
}

// 在返回之前将head指针的do_next保存下来,如果next为NULL,这意味着item是最后一个
next = fastpath(head->do_next);

if (slowpath(!next)) {
dq->dq_items_head = NULL;

if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL)) {
// head和tail头尾指针均为空
goto out;
}

// 此时一定有item,该线程不会等待太久。
while (!(next = head->do_next)) {
_dispatch_hardware_pause();
}
}

dq->dq_items_head = next;
// 再次唤醒全局队列
_dispatch_queue_wakeup_global(dq);
out:
// 返回需要处理的dc
return head;
}

这里解释下我对再次调用_dispatch_queue_wakeup_global唤醒全局队列的理解:
我们知道并发队列中的dc执行是并发的,所以每一次出队dc后检查一下全局队列,是否还有dc在队列中。如果有就再次通知需要再创建一个work queue处理队列中剩余的dc,然后重复上面的步骤,类似于一种递归的过程。当多个work queue同时处理多个dc的时候,就是我们看到异步效果。

_dispatch_continuation_pop

_dispatch_continuation_pop函数实现了对任务的处理。这些任务可能是异步任务、group任务、栅栏任务甚至可能就是队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static inline void
_dispatch_continuation_pop(dispatch_object_t dou) {
dispatch_continuation_t dc = dou._dc;
dispatch_group_t dg;

_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou);
// 检测是不是队列,如果是,就进入_dispatch_queue_invoke处理队列
// dispatch_barrier_async的任务会进入以下分支,以保证barrier任务和其他任务隔离,
// 并通过dispath_semaphore_t实现通知barrier任务执行
if (DISPATCH_OBJ_IS_VTABLE(dou._do)) {
return _dispatch_queue_invoke(dou._dq);
}

// Add the item back to the cache before calling the function. This
// allows the 'hot' continuation to be used for a quick callback.
//
// The ccache version is per-thread.
// Therefore, the object has not been reused yet.
// This generates better assembly.
// 是否是异步任务
if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) {
_dispatch_continuation_free(dc);
}

// 判断是否是group任务
if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) {
dg = dc->dc_group;
} else {
dg = NULL;
}

// 是任务封装的dispatch_continuation_t结构体(dc),直接执行任务。
// 这也是异步的block被调用的时机
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
if (dg) {
// 如果是group执行dispatch_group_leave
dispatch_group_leave(dg);
_dispatch_release(dg);
}
}
_dispatch_queue_invoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void
_dispatch_queue_invoke(dispatch_queue_t dq) {
if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) &&
fastpath(dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1))) {
dispatch_atomic_acquire_barrier();
dispatch_queue_t otq = dq->do_targetq, tq = NULL;
_dispatch_queue_drain(dq);
if (dq->do_vtable->do_invoke) {
// Assume that object invoke checks it is executing on correct queue
tq = dx_invoke(dq);
} else if (slowpath(otq != dq->do_targetq)) {
// An item on the queue changed the target queue
tq = dq->do_targetq;
}
// We do not need to check the result.
// When the suspend-count lock is dropped, then the check will happen.
dispatch_atomic_release_barrier();
//dq_running减1,因为任务要么被直接执行了,要么被压到target队列了
(void)dispatch_atomic_dec2o(dq, dq_running);
if (tq) {
return _dispatch_queue_push(tq, dq);
}
}

dq->do_next = DISPATCH_OBJECT_LISTLESS;
if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
DISPATCH_OBJECT_SUSPEND_LOCK)) {
// 队列处于空闲状态,需要唤醒
if (dq->dq_running == 0) {
_dispatch_wakeup(dq); // verify that the queue is idle
}
}
// 释放队列
_dispatch_release(dq); // added when the queue is put on the list
}

现在我们整理一下队列异步执行的流程,如下图:dispatch_async

总结

  1. 队列与线程可以是多对一关系,一个线程上可以执行不同队列的任务,在主线程上一样适用。
  2. 队列操作与开启线程的关系:
    • 串行队列同步执行任务,任务在当前线程中有序执行,如果前面的任务没有完成则可能会阻塞当前线程。
    • 串行队列异步执行任务,开启一个线程,任务在新线程中有序执行,不会阻塞当前线程,新线程中的任务有序执行。
    • 并发队列同步执行任务,不会开启新线程,任务在当前线程中有序执行,如果前面的任务没有完成则可能会阻塞当前线程。
    • 并发队列异步执行任务,开启多个线程,具体数量由系统自己决定,任务在新开辟的线程中执行,不会阻塞当前线程,所有任务为无序执行。
    • 主队列同步执行任务,死锁。
    • 主队列异步执行任务,任务在主线程中有序执行,如果前面的任务没有完成则可能会阻塞当前线程。
  3. 队列的同步/异步决定是否具备开启线程的能力,队列的串行/并发决定处理任务的个数。
  4. dispatch_queue通过结构体和链表,被实现为FIFO(先进先出)队列,无论串行队列和并发队列,都是符合FIFO的原则。两者的主要区别是:执行顺序不同,以及开启线程数不同。
  5. dispatch_sync函数一般都在当前线程执行,利用与线程绑定的信号量来实现串行。
  6. dispatch_async分发到主队列的任务由Runloop处理,而分发到其他队列的任务由线程池处理。
  7. Block并不是直接添加到队列上,而是先构成一个dispatch_continuation结构体。结构体包含了这个Block还有一些上下文信息。队列会将这些dispatch_continuation结构体添加队列的链表中。无论这些队列是什么类型的,最终都是和全局队列相关的。在全局队列执行Block的时候,libdispatch从全局队列中取出dispatch_continuation,调用pthread_workqueue_additem_np函数,将该全局队列自身、符合其优先级的workqueue信息以及dispatch_continuation结构体的回调函数传递给参数。pthread_workqueue_additem_np函数使用workq_kernreturn系统调用,通知workqueue增加应当执行的项目。根据该同志,XNU内核基于系统状态判断是否要生成线程。如果是overcommit优先级的全局队列workqueue则会始终生成线程。workqueue的线程执行pthread_workqueue函数,该函数调用libdispatch的回调函数。在该函数中执行加入到dispatch_continuation的Block

  8. GCD死锁是队列导致的而不是线程导致,原因是_dispatch_barrier_sync_f_slow函数中使用了线程对应的信号量并且调用wait方法,从而导致线程死锁。

  9. 关于栅栏函数

    • dispatch_barrier_async适用的场景队列必须是用DISPATCH_QUEUE_CONCURRENT属性创建的队列,而使用全局并发队列的时候,其表现就和dispatch_async一样。原因:dispatch_barrier_async如果传入的是全局队列,在唤醒队列时会执行_dispatch_queue_wakeup_global函数,其执行效果同dispatch_async一致,而如果是自定义的队列的时候,_dispatch_continuation_pop中会执行dispatch_queue_invoke。在while循环中依次取出任务并调用_dispatch_continuation_redirect函数,使得block并发执行。当遇到DISPATCH_OBJ_BARRIER_BIT标记时,会修改do_suspend_cnt标志以保证后续while循环时直接goto out。barrier block的任务执行完之后_dispatch_queue_class_invoke会将do_suspend_cnt重置回去,所以barrier block之后的任务会继续执行。