无论在哪个平台,并发编程都是一个让人头疼的问题。庆幸的是,相对于服务端,客户端的并发编程简单了许多。这篇文章主要讲述一些基于 iOS 平台的一些并发编程相关东西,我写博客习惯于先介绍原理,后介绍用法,毕竟对于 API 的使用,官网有更好的文档。

一些原理性的东西

为了便于理解,这里先解释一些相关概念。如果你对这些概念已经很熟悉,可以直接跳过。

1.进程

从操作系统定义上来说,进程就是系统进行资源分配和调度的基本单位,系统创建一个线程后,会为其分配对应的资源。在 iOS 系统中,进程可以理解为就是一个 App。iOS 并没有提供可以创建进程的 API,即使你调用 fork() 函数,也不能创建新的进程。所以,本文所说的并发编程,都是针对线程来说的

2.线程

线程是程序执行流的最小单元。一般情况下,一个进程会有多个线程,或者至少有一个线程。一个线程有创建、就绪、运行、阻塞和死亡五种状态。线程可以共享进程的资源,所有的问题也是因为共享资源引起的。

3.并发

操作系统引入线程的概念,是为了使过个 CPU 更好的协调运行,充分发挥他们的并行处理能力。例如在 iOS 系统中,你可以在主线程中进行 UI 操作,然后另启一些线程来处理与 UI 操作无关的事情,两件事情并行处理,速度比较快。这就是并发的大致概念。

4.时间片

按照 wiki 上面解释:是分时操作系统分配给每个正在运行的进程微观上的一段CPU时间(在抢占内核中是:从进程开始运行直到被抢占的时间)。线程可以被认为是 ”微进程“,因此这个概念也可以用到线程方面。

一般操作系统使用时间片轮转算法进行调度,即每次调度时,总是选择就绪队列的队首进程,让其在CPU上运行一个系统预先设置好的时间片。一个时间片内没有完成运行的进程,返回到绪队列末尾重新排队,等待下一次调度。不同的操作系统,时间片的范围不一致,一般都是毫秒(ms)级别。

4.死锁

死锁是由于多个线程(进程)在执行过程中,因为争夺资源而造成的互相等待现象,你可以理解为卡主了。产生死锁的必要条件有四个:

  • 互斥条件 : 指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
  • 请求和保持条件 : 指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
  • 不可剥夺条件 : 指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
  • 环路等待条件 : 指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

为了便于理解,这里举一个例子:一座桥,同一时间只允许一辆车经过(互斥)。两辆车 A,B 从桥的两端开上桥,走到桥的中间。此时 A 车不肯退(不可剥夺),又想占用 B 车所占据的道路;B 车此时也不肯退,又想占用 A 车所占据的道路(请求和保持)。此时,A 等待 B 占用的资源,B 等待 A 占用的资源(环路等待),两车僵持下去,就形成了死锁现象

5.线程安全

当多个线程同时访问一块共享资源(例如数据库),因为时序性问题,会导致数据错乱,这就是线程不安全。例如数据库中某个整形字段的 value 为 0,此时两个线程同时对其进行写入操作,线程 A 拿到原值为 0,加一后变为 1;线程 B 并不是在 A 加完后拿的,而是和 A 同时拿的,加完后也是 1,加了两次,理想值应该为 2,但是数据库中最终值却是 1。实际开发场景可能要比这个复杂的多。

所谓的线程安全,可以理解为在多个线程操作(例如读写操作)这部分数据时,不会出现问题。

Lock

因为线程共享进程资源,在并发情况下,就会出现线程安全问题。为了解决此问题,就出现了锁这个概念。在多线程环境下,当你访问一些共享数据时,拿到访问权限,给数据加锁,在这期间其他线程不可访问,直到你操作完之后进行解锁,其他线程才可以对其进行操作。

iOS 提供了多种锁,ibireme 大神的 这篇文章 对这些锁进行了性能分析,我这里直接把图 cp 过来了:

lock_benchmark

下面针对这些锁,逐一分析。

1.OSSpinLock

ibireme 大神的文章也说了,虽然这个锁性能最高,但是已经不安全了,建议不再使用,这里简单说一下。

OSSpinLock 是一种自旋锁,主要提供了加锁(OSSpinLockLock)、尝试枷锁(OSSpinLockTry)和解锁(OSSpinLockUnlock)三个方法。对一块资源进行加锁时,如果尝试加锁失败,不会进入睡眠状态,而是一直进行询问(自旋),占用 CPU资源,不适用于较长时间的任务。在自旋期间,因为占用 CPU 导致低优先级线程拿不到 CUP 资源,无法完成任务并释放锁,从而形成了优先级反转

so,虽然性能很高,但是不要用了。而且 Apple 也已经将这个类比较为 deprecate 了。

自旋锁 & 互斥锁
两者大体类似,区别在于:自旋锁属于 busy-waiting 类型锁,尝试加锁失败,会一直处于询问状态,占用 CPU 资源,效率高;互斥锁属于 sleep-waiting 类型锁,在尝试失败之后,会被阻塞,然后进行上下文切换置于等待队列,因为有上下文切换,效率较低。
在 iOS 中 NSLock 属于互斥锁。

优先级反转 :当一个高优先级任务访问共享资源时,该资源已经被一个低优先级任务抢占,阻塞了高优先级任务;同时,该低优先级任务被一个次高优先级的任务所抢先,从而无法及时地释放该临界资源。最终使得任务优先级被倒置,发生阻塞。(引用自 wiki

关于自旋锁的原理,bestswifter 的文章 深入理解 iOS 开发中的锁 这篇文章讲得很好,我这里大部分锁的知识引用于此,建议读一下原文。

自旋锁是加不上就一直尝试,也就是一个循环,直到尝试加上锁,伪代码如下:

1
2
3
4
5
6
7
bool lock = false; // 一开始没有锁上,任何线程都可以申请锁  
do {
while(test_and_set(&lock); // test_and_set 是一个原子操作,尝试加锁
Critical section // 临界区
lock = false; // 相当于释放锁,这样别的线程可以进入临界区
Reminder section // 不需要锁保护的代码
}

使用 :

1
2
3
4
OSSpinLock spinLock = OS_SPINLOCK_INIT;
OSSpinLockLock(&spinLock);
// 被锁住的资源
OSSpinLockUnlock(&spinLock);

2.dispatch_semaphore

dispatch_semaphore 并不属于锁,而是信号量。两者的区别如下:

  • 锁是用于线程互斥操作,一个线程锁住了某个资源,其他线程都无法访问,直到整个线程解锁;信号量用于线程同步,一个线程完成了某个动作通过信号量告诉别的线程,别的线程再进行操作。
  • 锁的作用域是线程之间;信号量的作用域是线程和进程之间。
  • 信号量有时候可以充当锁的作用,初次之前还有其他作用。
  • 如果转化为数值,锁可以认为只有 0 和 1;信号量可以大于零和小于零,有多个值。

dispatch_semaphore 使用分为三步:create、wait 和 signal。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// create
dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);

// thread A
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
// execute task A
NSLog(@"task A");
sleep(10);
dispatch_semaphore_signal(semaphore);
});

// thread B
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
// execute task B
NSLog(@"task B");
dispatch_semaphore_signal(semaphore);
});

执行结果:

1
2
2018-05-03 21:40:09.068586+0800 ConcurrencyTest[44084:1384262] task A
2018-05-03 21:40:19.072951+0800 ConcurrencyTest[44084:1384265] task B

thread A,B 是两个异步线程,一般情况下,各自执行自己的事件,互不干涉。但是根据 console 输出,B 是在 A 执行完了 10s 执行之后才执行的,显然受到阻塞。使用 dispatch_semaphore 大致执行过程这样:创建 semaphore 时,信号量值为 1;执行到线程 A 的 dispatch_semaphore_wait 时,信号量值减 1,变为 0;然后执行任务 A,执行完毕后 sleep 方法阻塞当前线程 10s;与此同时,线程 B 执行到了 dispatch_semaphore_wait,由于信号量此时为 0,且线程 A 中设置的为 DISPATCH_TIME_FOREVER,因此需要等到线程 A sleep 10s 之后,执行 dispatch_semaphore_signal 将信号量置为 1,线程 B 的任务才开始执行。

根据上面的描述,dispatch_semaphore 的原理大致也就了解了。GCD 源码 对这些方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = dispatch_atomic_dec2o(dsema, dsema_value);
dispatch_atomic_acquire_barrier();
if (fastpath(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}

static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;

again:
// Mach semaphores appear to sometimes spuriously wake up. Therefore,
// we keep a parallel count of the number of times a Mach semaphore is
// signaled (6880961).
while ((orig = dsema->dsema_sent_ksignals)) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
orig - 1)) {
return 0;
}
}

struct timespec _timeout;
int ret;

switch (timeout) {
default:
do {
uint64_t nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
ret = slowpath(sem_timedwait(&dsema->dsema_sem, &_timeout));
} while (ret == -1 && errno == EINTR);

if (ret == -1 && errno != ETIMEDOUT) {
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
while ((orig = dsema->dsema_value) < 0) {
if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
errno = ETIMEDOUT;
return -1;
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
do {
ret = sem_wait(&dsema->dsema_sem);
} while (ret != 0);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
break;
}

goto again;
}

以上时对 wait 方法的定义,如果你不想看代码,可以直接听我说:

  • 调用 dispatch_semaphore_wait 方法时,如果信号量大于 0,直接返回;否则进入后续步骤。
  • _dispatch_semaphore_wait_slow 方法根据传入 timeout 参数不同,使用 switch-case 处理。
  • 如果传入的是 DISPATCH_TIME_NOW 参数,将信号量加 1 并立即返回。
  • 如果传入的是一个超时时间,调用系统的 semaphore_timedwait 方法进行等待,直至超时。
  • 如果传入的是 DISPATCH_TIME_FOREVER 参数,调用系统的 semaphore_wait 进行等待,直到收到 singal 信号。

至于 dispatch_semaphore_signal 就比较简单了,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
dispatch_atomic_release_barrier();
long value = dispatch_atomic_inc2o(dsema, dsema_value);
if (fastpath(value > 0)) {
return 0;
}
if (slowpath(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
  • 现将信号量加 1,大于 0 直接返回。
  • 小于 0 返回 _dispatch_semaphore_signal_slow,这个方法的作用是调用内核的 semaphore_signal 函数唤醒信号量,然后返回 1。

3.pthread_mutex

Pthreads 是 POSIX Threads 的缩写。pthread_mutex 属于互斥锁,即尝试加锁失败后悔阻塞线程并睡眠,会进行上下文切换。锁的类型主要有三种:PTHREAD_MUTEX_NORMALPTHREAD_MUTEX_ERRORCHECKPTHREAD_MUTEX_RECURSIVE

  • PTHREAD_MUTEX_NORMAL,普通锁,当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
  • PTHREAD_MUTEX_ERRORCHECK,检错锁,如果同一个线程请求同一个锁,则返回 EDEADLK。否则和 PTHREAD_MUTEX_NORMAL 相同。
  • PTHREAD_MUTEX_RECURSIVE,递归锁,允许一个线程进行递归申请锁。

使用如下:

1
2
3
4
5
6
7
8
9
pthread_mutex_t mutex;   // 定义锁
pthread_mutexattr_t attr; // 定义 mutexattr_t 变量
pthread_mutexattr_init(&attr); // 初始化attr为默认属性
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); // 设置锁的属性
pthread_mutex_init(&mutex, &attr); // 创建锁

pthread_mutex_lock(&mutex); // 申请锁
// 临界区
pthread_mutex_unlock(&mutex); // 释放锁

4.NSLock

NSLock 属于互斥锁,是 Objective-C 封装的一个对象。虽然我们不知道 Objective-C 是如何实现的,但是我们可以在 swift 源码 中找到他的实现 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
...
open func lock() {
pthread_mutex_lock(mutex)
}

open func unlock() {
pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
#endif
}

可以看出他只是将 pthread_mutex 封装了一下。只因为比 pthread_mutex 慢一些,难道是因为方法层级之间的调用,多了几次压栈操作???

常规使用:

1
2
3
4
NSLock *mutexLock = [NSLock new];
[mutexLock lock];
// 临界区
[muteLock unlock];

4.NSCondition & NSConditionLock

NSCondition 可以同时起到 lock 和条件变量的作用。同样你可以在 swift 源码 中找到他的实现 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
open class NSCondition: NSObject, NSLocking {
internal var mutex = _PthreadMutexPointer.allocate(capacity: 1)
internal var cond = _PthreadCondPointer.allocate(capacity: 1)

public override init() {
pthread_mutex_init(mutex, nil)
pthread_cond_init(cond, nil)
}

deinit {
pthread_mutex_destroy(mutex)
pthread_cond_destroy(cond)
mutex.deinitialize(count: 1)
cond.deinitialize(count: 1)
mutex.deallocate()
cond.deallocate()
}

open func lock() {
pthread_mutex_lock(mutex)
}

open func unlock() {
pthread_mutex_unlock(mutex)
}

open func wait() {
pthread_cond_wait(cond, mutex)
}

open func wait(until limit: Date) -> Bool {
guard var timeout = timeSpecFrom(date: limit) else {
return false
}
return pthread_cond_timedwait(cond, mutex, &timeout) == 0
}

open func signal() {
pthread_cond_signal(cond)
}

open func broadcast() {
pthread_cond_broadcast(cond)
}

open var name: String?
}

可以看出,它还是遵循 NSLocking 协议,lock 方法同样还是使用的 pthread_mutex,wait 和 signal 使用的是 pthread_cond_waitpthread_cond_signal

使用 NSCondition 是,先对要操作的临界区加锁,然后因为条件不满足,使用 wait 方法阻塞线程;待条件满足之后,使用 signal 方法进行通知。下面是一个 生产者-消费者的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
NSCondition *condition = [NSCondition new];
NSMutableArray *products = [NSMutableArray array];

// consume
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[condition lock];
while (products.count == 0) {
[condition wait];
}
[products removeObjectAtIndex:0];
[condition unlock];
});

// product
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[condition lock];
[products addObject:[NSObject new]];
[condition signal];
[condition unlock];
});

NSConditionLock 是通过使用 NSCondition 来实现的,遵循 NSLocking 协议,然后这是 swift 源码 (源码比较占篇幅,我这里简化一下):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
open class NSConditionLock : NSObject, NSLocking {
internal var _cond = NSCondition()

...

open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}

open func `try`() -> Bool {
return lock(before: Date.distantPast)
}

open func tryLock(whenCondition condition: Int) -> Bool {
return lock(whenCondition: condition, before: Date.distantPast)
}

open func unlock(withCondition condition: Int) {
_cond.lock()
_thread = nil
_value = condition
_cond.broadcast()
_cond.unlock()
}

open func lock(before limit: Date) -> Bool {
_cond.lock()
while _thread != nil {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}

open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}

...
}

可以看出它使用了一个 NSCondition 全局变量来实现 lock 和 unlock 方法,都是一些简单的代码逻辑,就不详细说了。

使用 NSConditionLock 注意:

  • 初始化 NSConditionLock 会设置一个 condition,只有满足这个 condition 才能加锁。
  • -[unlockWithCondition:] 并不是满足条件时解锁,而是解锁后,修改 condition 值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

typedef NS_ENUM(NSInteger, CTLockCondition) {
CTLockConditionNone = 0,
CTLockConditionPlay,
CTLockConditionShow
};

- (void)testConditionLock {
NSConditionLock *conditionLock = [[NSConditionLock alloc] initWithCondition:CTLockConditionPlay];

// thread one
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[conditionLock lockWhenCondition:CTLockConditionNone];
NSLog(@"thread one");
sleep(2);
[conditionLock unlock];
});

// thread two
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(1);
if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
NSLog(@"thread two");
[conditionLock unlockWithCondition:CTLockConditionShow];
NSLog(@"thread two unlocked");
} else {
NSLog(@"thread two try lock failed");
}
});

// thread three
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(2);
if ([conditionLock tryLockWhenCondition:CTLockConditionPlay]) {
NSLog(@"thread three");
[conditionLock unlock];
NSLog(@"thread three locked success");
} else {
NSLog(@"thread three try lock failed");
}
});
}

// thread four
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
sleep(4);
if ([conditionLock tryLockWhenCondition:CTLockConditionShow]) {
NSLog(@"thread four");
[conditionLock unlock];
NSLog(@"thread four unlocked success");
} else {
NSLog(@"thread four try lock failed");
}
});
}

然后看输出结果 :

1
2
3
4
5
2018-05-05 16:34:33.801855+0800 ConcurrencyTest[97128:3100768] thread two
2018-05-05 16:34:33.802312+0800 ConcurrencyTest[97128:3100768] thread two unlocked
2018-05-05 16:34:34.804384+0800 ConcurrencyTest[97128:3100776] thread three try lock failed
2018-05-05 16:34:35.806634+0800 ConcurrencyTest[97128:3100778] thread four
2018-05-05 16:34:35.806883+0800 ConcurrencyTest[97128:3100778] thread four unlocked success

可以看出,thread one 因为条件和初始化不符,加锁失败,未输出 log; thread two 条件相符,解锁成功,并修改加锁条件;thread three 使用原来的加锁条件,显然无法加锁,尝试加锁失败; thread four 使用修改后的条件,加锁成功。

5. NSRecursiveLock

NSRecursiveLock 属于递归锁。然后这是 swift 源码,只贴一下关键部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
open class NSRecursiveLock: NSObject, NSLocking {
...
public override init() {
super.init()
#if CYGWIN
var attrib : pthread_mutexattr_t? = nil
#else
var attrib = pthread_mutexattr_t()
#endif
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
}

...
}

它是使用 PTHREAD_MUTEX_RECURSIVE 类型的 pthread_mutex_t 初始化的。递归所可以在一个线程中重复调用,然后底层会记录加锁和解锁次数,当二者次数相同时,才能正确解锁,释放这块临界区。

使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- (void)testRecursiveLock {
NSRecursiveLock *recursiveLock = [NSRecursiveLock new];

int (^__block fibBlock)(int) = ^(int num) {
[recursiveLock lock];

if (num < 0) {
[recursiveLock unlock];
return 0;
}

if (num == 1 || num == 2) {
[recursiveLock unlock];
return num;
}
int newValue = fibBlock(num - 1) + fibBlock(num - 2);
[recursiveLock unlock];
return newValue;
};

int value = fibBlock(10);
NSLog(@"value is %d", value);
}

6. @synchronized

@synchronized 是牺牲性能来换取语法上的简洁。如果你想深入了解,建议你去读 这篇文章。这里说一下他的大概原理:

@synchronized 的加锁过程,大概是这个样子:

1
2
3
4
5
6
@try {
objc_sync_enter(obj); // lock
// 临界区
} @finally {
objc_sync_exit(obj); // unlock
}

@synchronized 的存储结构,是使用哈希表来实现的。当你传入一个对象后,会为这个对象分配一个锁。锁和对象打包成一个对象,然后和一个锁在进行二次打包成一个对象,可以理解为 value;通过一个算法,根据对象的地址得到一个值,作为 key。然后以 key-value 的形式写入哈希表。结构大概是这个样子:

synchronized结构

存储的时候,是以哈希表结构存储,不是我上面画的顺序存储,上面只是一个节点而已。

@synchronized 的使用就很简单了 :

1
2
3
4
5
NSMutableArray *elementArray = [NSMutableArray array];

@synchronized(elementArray) {
[elementArray addObject:[NSObject new]];
}

Pthreads

前面也说了,pthreads 是 POSIX Threads 的缩写。这个东西一般我们用不到,这里简单介绍一下。Pthreads 是POSIX的线程标准,定义了创建和操纵线程的一套API。实现POSIX 线程标准的库常被称作Pthreads,一般用于Unix-like POSIX 系统,如Linux、 Solaris。

NSThread

NSThread 是对内核 mach kernel 中的 mach thread 的封装,一个 NSThread 对象就是一个线程。使用频率比较低,除了 API 的使用,没什么可讲的。如果你已经熟悉这些 API,可以跳过这一节了。

1.初始化线程执行一个 task

使用初始化方法初始化一个 NSTherad 对象,调用 -[cancel]-[start-[main] 方法对线程进行操作,一般线程执行完即销毁,或者因为某种异常退出。

1
2
3
4
5
6
7
8
9
/** 使用 target 对象的中的方法作为执行主体,可以通过 argument 传递一些参数。
- (instancetype)initWithTarget:(id)target selector:(SEL)selector object:(nullable id)argument;

/** 使用 block 对象作为执行主体 */
- (instancetype)initWithBlock:(void (^)(void))block;

/** 类方法,上面对象方法需要调用 -[start] 方法启动线程,下面两个方法不需要手动启动 */
+ (void)detachNewThreadWithBlock:(void (^)(void))block;
+ (void)detachNewThreadSelector:(SEL)selector toTarget:(id)target withObject:(nullable id)argument;

2.在主线程执行一个 task

1
2
3
4
/** 说一下最后一个参数,这里你至少指定一个 mode 执行 selector,如果你传 nil 或者空数组,selector 不会执行,虽然方法定义写了 nullable */
- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;

- (void)performSelectorOnMainThread:(SEL)aSelector withObject:(nullable id)arg waitUntilDone:(BOOL)wait;

3.在其他线程执行一个 task

1
2
3
4
/** modes 参数同上一个 */
- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait modes:(nullable NSArray<NSString *> *)array;

- (void)performSelector:(SEL)aSelector onThread:(NSThread *)thr withObject:(nullable id)arg waitUntilDone:(BOOL)wait

4.在后台线程执行一个 task

1
- (void)performSelectorInBackground:(SEL)aSelector withObject:(nullable id)arg;

5.获取当前线程

1
@property (class, readonly, strong) NSThread *currentThread;

使用线程相关方法时,记得设置好 name,方便后面调试。同时也设置好优先级等其他参数。

performSelector: 系列方法已经不太安全,慎用。

Grand Central Dispatch (GCD)

GCD 是基于 C 实现的一套 API,而且是开源的,如果有兴趣,可以在 这里 down 一份源码研究一下。GCD 是由系统帮我们处理多线程调度,很是方便,也是使用频率最高的。这一章节主要讲解一下 GCD 的原理和使用。

在讲解之前,我们先有个概览,看一下 GCD 为我们提供了那些东西:

GCD总体结构

系统所提供的 API,完全可以满足我们日常开发需求了。下面就根据这些模块分别讲解一下。

1. Dispatch Queue

GCD 为我们提供了两类队列,串行队列并行队列。两者的区别是:

  • 串行队列中,按照 FIFO 的顺序执行任务,前面一个任务执行完,后面一个才开始执行。
  • 并行队列中,也是按照 FIFO 的顺序执行任务,只要前一个被拿去执行,继而后面一个就开始执行,后面的任务无需等到前面的任务执行完再开始执行。

除此之外,还要解释一个容易混淆的概念,并发并行

  • 并发:是指单独部分可以同时执行,但是需要系统决定怎样发生。
  • 并行:两个任务互不干扰,同时执行。单核设备,系统需要通过切换上下文来实现并发;多核设备,系统可以通过并行来执行并发任务。

最后,还有一个概念,同步异步

  • 同步 : 同步执行的任务会阻塞当前线程。
  • 异步 : 异步执行的任务不会阻塞当前线程。是否开启新的线程,由系统管理。如果当前有空闲的线程,使用当前线程执行这个异步任务;如果没有空闲的线程,而且线程数量没有达到系统最大,则开启新的线程;如果线程数量已经达到系统最大,则需要等待其他线程中任务执行完毕。
队列

我们使用时,一般使用这几个队列:

  • 主队列 - dispatch_get_main_queue :一个特殊的串行队列。在 GCD 中,方法主队列中的任务都是在主线程执行。当我们更新 UI 时想 dispatch 到主线程,可以使用这个队列。

    1
    2
    3
    4
    5
    6
    	- (void)viewDidLoad {
    [super viewDidLoad];
    dispatch_async(dispatch_get_main_queue(), ^{
    // UI 相关操作
    });
    }
  • 全局并行队列 - dispatch_get_global_queue : 系统提供的一个全局并行队列,我们可以通过指定参数,来获取不同优先级的队列。系统提供了四个优先级,所以也可以认为系统为我们提供了四个并行队列,分别为 :

    • DISPATCH_QUEUE_PRIORITY_HIGH
    • DISPATCH_QUEUE_PRIORITY_DEFAULT
    • DISPATCH_QUEUE_PRIORITY_LOW
    • DISPATCH_QUEUE_PRIORITY_BACKGROUND

      1
      2
      3
      4
      dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
      dispatch_async(queue, ^{
      // 相关操作
      });
  • 自定义队列 :你可以自己定义串行或者并行队列,来执行一些相关的任务,平时开发中也建议用自定义队列。创建自定义队列时,需要两个参数。一个是队列的名字,方便我们再调试时查找队列使用,命名方式采用的是反向 DNS 命名规则;一个是队列类型,传 NULL 或者 DISPATCH_QUEUE_SERIAL 代表串行队列,传 DISPATCH_QUEUE_CONCURRENT 代表并行队列,通常情况下,不要传 NULL,会降低可读性。
    DISPATCH_QUEUE_SERIAL_INACTIVE 代表串行不活跃队列,DISPATCH_QUEUE_CONCURRENT_INACTIVE 代表并行不活跃队列,在执行 block 任务时,需要被激活。

    1
    dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch",DISPATCH_QUEUE_SERIAL);
  • 你可以使用 dispatch_queue_set_specificdispatch_queue_get_specificdispatch_get_specific 方法,为 queue 设置关联的 key 或者根据 key 找到关联对象等操作。

可以说,系统为我们提供了 5 中不同的队列,运行在主线程中的 main queue;3 个不同优先级的 global queue; 一个优先级更低的 background queue。除此之外,开发者可以自定义一些串行和并行队列,这些自定义队列中被调度的所有 block 最终都会被放到系统全局队列和线程池中,后面会讲这部分原理。盗用一张经典图:

gcd-queues

同步 VS 异步

我们大多数情况下,都是使用 dispatch_asyn() 做异步操作,因为程序本来就是顺序执行,很少用到同步操作。有时候我们会把 dispatch_syn() 当做锁来用,以达到保护的作用。

系统维护的是一个队列,根据 FIFO 的规则,将 dispatch 到队列中的任务一一执行。有时候我们想把一些任务延后执行以下,例如 App 启动时,我想让主线程中一个耗时的工作放在后,可以尝试用一下 dispatch_asyn(),相当于把任务重新追加到了队尾。

1
2
3
dispatch_async(dispatch_get_main_queue(), ^{
// 想要延后的任务
});

通常情况下,我们使用 dispatch_asyn() 是不会造成死锁的。死锁一般出现在使用 dispatch_syn() 的时候。例如:

1
2
3
dispatch_sync(dispatch_get_main_queue(), ^{
NSLog(@"dead lock");
});

想上面这样写,启动就会报错误。以下情况也如此:

1
2
3
4
5
6
7
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn");
dispatch_sync(queue, ^{
NSLog(@"dispatch asyn -> dispatch syn");
});
});

在上面的代码中,dispatch_asyn() 整个 block(称作 blcok_asyn) 当做一个任务追加到串行队列队尾,然后开始执行。在 block_asyn 内部中,又进行了 dispatch_syn(),想想要执行 block_syn。因为是串行队列,需要前一个执行完(block_asyn),再执行后面一个(block_syn);但是要执行完 block_asyn,需要执行内部的 block_syn。互相等待,形成死锁。

现实开发中,还有更复杂的死锁场景。不过现在编译器很友好,我们能在编译执行时就检测到了。

基本原理

针对下面这几行代码,我们分析一下它的底层过程:

1
2
3
4
5
6
7
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
NSLog(@"dispatch asyn test");
});
}

创建队列

源码很长,但实际只有一个方法,逻辑比较清晰,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/** 开发者调用的方法 */
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

/** 内部实际调用方法 */
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
// 1.初步判断
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}

// 2.配置队列参数
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS

_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}

if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}

// 3. 初始化队列
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}

const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqa->dqa_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (legacy) {
dqf |= DQF_LEGACY;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}

dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
#endif
_dispatch_retain(tq);
if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}

根据代码生成的流程图,不想看代码直接看图,下同:

Create_Queue

根据流程图,这个方法的步骤如下:

  • 开发者调用 dispatch_queue_create() 方法之后,内部会调用 _dispatch_queue_create_with_target() 方法。
  • 然后进行初步判断,多数情况下,我们是不会传队列类型的,都是穿 NULL,所以这里是个 slowpath。如果传了参数,但是不是规定的队列类型,系统会认为你是个智障,并抛出错误。
  • 然后初始化一些配置项。主要是 target_queue,overcommit 项和 qos。target_queue 是依赖的目标队列,像任何队列提交的任务(block),最终都会放到目标队列中执行;支持 overcommit 时,每当想队列提交一个任务时,都会开一个新的线程处理,这样是为了避免单一线程任务太多而过载;qos 是队列优先级,之前已经说过。
  • 然后进入判断分支。普通的串行队列的目标队列,就是一个支持 overcommit 的全局队列(对应 else 分支);当前 tq 对象的引用计数为 DISPATCH_OBJECT_GLOBAL_REFCNT (永远不会释放)时,且还没有目标队列时,才可以设置 overcommit 项,而且当优先级为 DISPATCH_QOS_UNSPECIFIED 时,需要重置 tq (对应 if 分支);其他情况(else if 分支)。
  • 然后配置队列的标识,以方便在调试时找到自己的那个队列。
  • 使用 _dispatch_object_alloc 方法申请一个 dispatch_queue_t 对象空间,dq。
  • 根据传入的信息(并行 or 串行;活跃 or 非活跃)来初始化这个队列。并行队列的 width 会设置为 DISPATCH_QUEUE_WIDTH_MAX 即最大,不设限;串行的会设置为 1。
  • 将上面获得配置项,目标队列,是否支持 overcommit,优先级和 dq 绑定。
  • 返回这个队列。返回去还输出了一句信息,便于调试。

异步执行

这个版本异步执行的代码,因为方法拆分很多,所以显得很乱。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/** 开发者调用 */
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT;

_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_async(dq, dc);
}

/** 内部调用,包一层,再深入调用 */
DISPATCH_NOINLINE
void
_dispatch_continuation_async(dispatch_queue_t dq, dispatch_continuation_t dc)
{
_dispatch_continuation_async2(dq, dc,
dc->dc_flags & DISPATCH_OBJ_BARRIER_BIT);
}

/** 根据 barrier 关键字区别串行还是并行,分两支 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_async2(dispatch_queue_t dq, dispatch_continuation_t dc,
bool barrier)
{
if (fastpath(barrier || !DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
// 串行
return _dispatch_continuation_push(dq, dc);
}

// 并行
return _dispatch_async_f2(dq, dc);
}

/** 并行又多了一层调用,就是这个方法 */
DISPATCH_NOINLINE
static void
_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc)
{
if (slowpath(dq->dq_items_tail)) {// 少路径
return _dispatch_continuation_push(dq, dc);
}

if (slowpath(!_dispatch_queue_try_acquire_async(dq))) {// 少路径
return _dispatch_continuation_push(dq, dc);
}
// 多路径
return _dispatch_async_f_redirect(dq, dc,
_dispatch_continuation_override_qos(dq, dc));
}

/** 主要用来重定向 */
DISPATCH_NOINLINE
static void
_dispatch_async_f_redirect(dispatch_queue_t dq,
dispatch_object_t dou, dispatch_qos_t qos)
{
if (!slowpath(_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dq, dou);
}
dq = dq->do_targetq;

// Find the queue to redirect to
while (slowpath(DISPATCH_QUEUE_USES_REDIRECTION(dq->dq_width))) {
if (!fastpath(_dispatch_queue_try_acquire_async(dq))) {
break;
}
if (!dou._dc->dc_ctxt) {
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dq);
}
dq = dq->do_targetq;
}

// 同步异步最终都是调用的这个方法,将任务追加到队列中
dx_push(dq, dou, qos);
}

... 省略一些调用层级,

/** 核心方法,通过 dc_flags 参数区分了是 group,还是串行,还是并行 */
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou, voucher_t ov,
dispatch_invoke_flags_t flags)
{
dispatch_continuation_t dc = dou._dc, dc1;
dispatch_invoke_with_autoreleasepool(flags, {
uintptr_t dc_flags = dc->dc_flags;
_dispatch_continuation_voucher_adopt(dc, ov, dc_flags);
if (dc_flags & DISPATCH_OBJ_CONSUME_BIT) { // 并行
dc1 = _dispatch_continuation_free_cacheonly(dc);
} else {
dc1 = NULL;
}
if (unlikely(dc_flags & DISPATCH_OBJ_GROUP_BIT)) { // group
_dispatch_continuation_with_group_invoke(dc);
} else { // 串行
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_introspection_queue_item_complete(dou);
}
if (unlikely(dc1)) {
_dispatch_continuation_free_to_cache_limit(dc1);
}
});
_dispatch_perfmon_workitem_inc();
}

不想看代码,直接看图:

Dispatch_Asyn

根据流程图描述一下过程:

  • 首先开发者调用 dispatch_async() 方法,然后内部创建了一个 _dispatch_continuation_init 队列,将 queue、block 这些信息和这个 dc 绑定起来。这过程中 copy 了 block。
  • 然后经过了几个层次的调用,主要为了区分并行还是串行。
  • 如果是串行(这种情况比较常见,所以是 fastpath),直接就 dx_push 了,其实就是讲任务追加到一个链表里面。
  • 如果是并行,需要做重定向。之前我们说过,放到队列中的任务,最终都会以各种形式追加到目标队列里面。在 _dispatch_async_f_redirect 方法中,重新寻找依赖目标队列,然后追加过去。
  • 经过一系列调用,我们会在 _dispatch_continuation_invoke_inline 方法里区分串行还是并行。因为这个方法会被频繁调用,所以定义成了内联函数。对于串行队列,我们使用信号量控制,执行前信号量置为 wait,执行完毕后发送 singal;对于调度组,我们会在执行完之后调用 dispatch_group_leave
  • 底层的线程池,是使用 pthread 维护的,所以最终都会使用 pthread 来处理这些任务。

同步执行

同步执行,相对来说比较简单,源码如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** 开发者调用 */
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_private_data(dq, work, 0);
}
dispatch_sync_f(dq, work, _dispatch_Block_invoke(work));
}

/** 内部调用 */
DISPATCH_NOINLINE
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
if (likely(dq->dq_width == 1)) {
return dispatch_barrier_sync_f(dq, ctxt, func);
}

// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dq))) {
return _dispatch_sync_f_slow(dq, ctxt, func, 0);
}

_dispatch_introspection_sync_begin(dq);
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dq, ctxt, func, 0);
}
_dispatch_sync_invoke_and_complete(dq, ctxt, func);
}

同步执行,相对来说简单些,大体逻辑差不多。偷懒一下,就不画图了,直接描述:

  • 开发者使用 dispatch_sync() 方法,大多数路径,都会调用 dispatch_sync_f() 方法。
  • 如果是串行队列,则通过 dispatch_barrier_sync_f() 方法来保证原子操作。
  • 如果不是串行的(一般很少),我们使用 _dispatch_introspection_sync_begin_dispatch_sync_invoke_and_complete 来保证同步。
dispatch_after

dispatch_after 一般用于延后执行一些任务,可以用来代替 NSTimer,因为有时候 NSTimer 问题太多了。在后面的一章里,我会总体讲一下多线程中的问题,这里就不详细说了。一般我们这样来使用 dispatch_after :

1
2
3
4
5
6
7
- (void)viewDidLoad {
[super viewDidLoad];
dispatch_queue_t queue = dispatch_queue_create("com.bool.dispatch", DISPATCH_QUEUE_SERIAL);
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(NSEC_PER_SEC * 2.0f)),queue, ^{
// 2.0 second execute
});
}

在做页面过渡时,刚进入到新的页面我们并不会立即更新一些 view,为了引起用户注意,我们会过会儿再进行更新,可以中此 API 来完成。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
void *ctxt, void *handler, bool block)
{
dispatch_timer_source_refs_t dt;
dispatch_source_t ds;
uint64_t leeway, delta;

if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
return;
}

delta = _dispatch_timeout(when);
if (delta == 0) {
if (block) {
return dispatch_async(queue, handler);
}
return dispatch_async_f(queue, ctxt, handler);
}
leeway = delta / 10; // <rdar://problem/13447496>

if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;

// this function can and should be optimized to not use a dispatch source
ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, queue);
dt = ds->ds_timer_refs;

dispatch_continuation_t dc = _dispatch_continuation_alloc();
if (block) {
_dispatch_continuation_init(dc, ds, handler, 0, 0, 0);
} else {
_dispatch_continuation_init_f(dc, ds, ctxt, handler, 0, 0, 0);
}
// reference `ds` so that it doesn't show up as a leak
dc->dc_data = ds;
_dispatch_trace_continuation_push(ds->_as_dq, dc);
os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);

if ((int64_t)when < 0) {
// wall clock
when = (dispatch_time_t)-((int64_t)when);
} else {
// absolute clock
dt->du_fflags |= DISPATCH_TIMER_CLOCK_MACH;
leeway = _dispatch_time_nano2mach(leeway);
}
dt->dt_timer.target = when;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_timer.deadline = when + leeway;
dispatch_activate(ds);
}

dispatch_after() 内部会调用 _dispatch_after() 方法,然后先判断延迟时间。如果为 DISPATCH_TIME_FOREVER(永远不执行),则会出现异常;如果为 0 则立即执行;否则的话会创建一个 dispatch_timer_source_refs_t 结构体指针,将上下文相关信息与之关联。然后使用 dispatch_source 相关方法,将定时器和 block 任务关联起来。定时器时间到时,取出 block 任务开始执行。

dispatch_once

如果我们有一段代码,在 App 生命周期内最好只初始化一次,这时候使用 dispatch_once 最好不过了。例如我们单例中经常这样用:

1
2
3
4
5
6
7
8
9
+ (instancetype)sharedManager {
static BLDispatchManager *sharedInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sharedInstance = [[BLDispatchManager alloc] initPrivate];
});

return sharedInstance;
}

还有在定义 NSDateFormatter 时使用:

1
2
3
4
5
6
7
8
9
10
11
12
- (NSString *)todayDateString {
static NSDateFormatter *formatter = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
formatter = [NSDateFormatter new];
formatter.locale = [NSLocale localeWithLocaleIdentifier:@"en_US_POSIX"];
formatter.timeZone = [NSTimeZone timeZoneForSecondsFromGMT:8 * 3600];
formatter.dateFormat = @"yyyyMMdd";
});

return [formatter stringFromDate:[NSDate date]];
}

因为这是很常用的一个代码片段,所以被加在了 Xcode 的 code snippet 中。

它的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/** 一个结构体,里面为当前的信号量、线程端口和指向下一个节点的指针 */
typedef struct _dispatch_once_waiter_s {
volatile struct _dispatch_once_waiter_s *volatile dow_next;
dispatch_thread_event_s dow_event;
mach_port_t dow_thread;
} *_dispatch_once_waiter_t;

/** 我们调用的方法 */
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}

/** 实际执行的方法 */
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if !DISPATCH_ONCE_INLINE_FASTPATH
if (likely(os_atomic_load(val, acquire) == DLOCK_ONCE_DONE)) {
return;
}
#endif // !DISPATCH_ONCE_INLINE_FASTPATH
return dispatch_once_f_slow(val, ctxt, func);
}

DISPATCH_ONCE_SLOW_INLINE
static void
dispatch_once_f_slow(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
#if DISPATCH_GATE_USE_FOR_DISPATCH_ONCE
dispatch_once_gate_t l = (dispatch_once_gate_t)val;

if (_dispatch_once_gate_tryenter(l)) {
_dispatch_client_callout(ctxt, func);
_dispatch_once_gate_broadcast(l);
} else {
_dispatch_once_gate_wait(l);
}
#else
_dispatch_once_waiter_t volatile *vval = (_dispatch_once_waiter_t*)val;
struct _dispatch_once_waiter_s dow = { };
_dispatch_once_waiter_t tail = &dow, next, tmp;
dispatch_thread_event_t event;

if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
dow.dow_thread = _dispatch_tid_self();
_dispatch_client_callout(ctxt, func);

next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
while (next != tail) {
tmp = (_dispatch_once_waiter_t)_dispatch_wait_until(next->dow_next);
event = &next->dow_event;
next = tmp;
_dispatch_thread_event_signal(event);
}
} else {
_dispatch_thread_event_init(&dow.dow_event);
next = *vval;
for (;;) {
if (next == DISPATCH_ONCE_DONE) {
break;
}
if (os_atomic_cmpxchgv(vval, next, tail, &next, release)) {
dow.dow_thread = next->dow_thread;
dow.dow_next = next;
if (dow.dow_thread) {
pthread_priority_t pp = _dispatch_get_priority();
_dispatch_thread_override_start(dow.dow_thread, pp, val);
}
_dispatch_thread_event_wait(&dow.dow_event);
if (dow.dow_thread) {
_dispatch_thread_override_end(dow.dow_thread, val);
}
break;
}
}
_dispatch_thread_event_destroy(&dow.dow_event);
}
#endif
}

不想看代码直接看图 (emmm… 根据逻辑画完图才发现,其实这个图也挺乱的,所以我将两个主分支用不同颜色标记处理):

Dispatch_Once

根据这个图,我来表述一下主要过程:

  • 我们调用 dispatch_once() 方法之后,内部多数情况下会调用 dispatch_once_f_slow() 方法,这个方法才是真正的执行方法。
  • os_atomic_cmpxchg(vval, NULL, tail, acquire) 这个方法,执行过程实际是这个样子

    1
    2
    3
    4
    5
    6
    if (*vval == NULL) {
    *vval = tail = &dow;
    return true;
    } else {
    return false
    }
我们初始化的 once_token,也就是 *vval 实际是 0,所以第一次执行时是返回 true 的。if() 中的这个方法是原子操作,也就是说,如果多个线程同时调用这个方法,只有一个线程会进入 true 的分支,其他都进入 else 分支。
  • 这里先说进入 true 分支。进入之后,会执行对应的 block,也就是对应的任务。然后 next 指向 vval, vval 标记为 DISPATCH_ONCE_DONE,即执行的是这样一个过程:

    1
    2
    3
    4
    next = (_dispatch_once_waiter_t)_dispatch_once_xchg_done(val);
    // 实际执行时这样的
    next = *vval;
    *vval = DISPATCH_ONCE_DONE;
  • 然后 tail = &dow。此时我们发现,原来的 *vval = &dow -> next = *vval,实际则是 next = &dow如果没有其他线程(或者调用)进入 else 分支,&dow 实际没有改变,即 tail == tmp。此时 while (tail != tmp) 是不会执行的,分支结束。

  • 如果有其他线程(或者调用)进入了 else 分支,那么就已经生成了一个等待响应的链表。此时进入 &dow 已经改变,成为了链表尾部,*vval 是链表头部。进入 while 循环后,开始遍历链表,依次发送信号进行唤起。
  • 然后说进入 else 分支的这些调用。进入分支后,随即进入一个死循环,直到发现 *vval 已经标记为了 DISPATCH_ONCE_DONE 才跳出循环。
  • 发现 *vval 不是 DISPATCH_ONCE_DONE 之后,会将这个节点追加到链表尾部,并调用信号量的 wait 方法,等待被唤起。

以上为全部的执行过程。通过源码可以看出,使用的是 原子操作 + 信号量来保证 block 只会被执行多次,哪怕是在多线程情况下。

这样一个关于 dispatch_once 递归调用会产生死锁的现象,也就很好解释了。看下面代码:

1
2
3
4
5
6
- (void)dispatchOnceTest {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self dispatchOnceTest];
});
}

通过上面分析,在 block 执行完,并将 *vval 置为 DISPATCH_ONCE_DONE 之前,其他的调用都会进入 else 分支。第二次递归调用,信号量处于等待状态,需要等到第一个 block 执行完才能被唤起;但是第一个 block 所执行的内容就是进行第二次调用,这个任务被 wait 了,也即是说 block 永远执行不完。死锁就这样发生了。

dispatch_apply

有时候没有时序性依赖的时候,我们会用 dispatch_apply 来代替 for loop。例如我们下载一组图片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 使用 for loop */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
for (NSURL *imageURL in imageURLs) {
[self downloadImageWithURL:imageURL];
}
}

/** dispatch_apply */
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_queue_t downloadQueue = dispatch_queue_create("com.bool.download", DISPATCH_QUEUE_CONCURRENT);
dispatch_apply(imageURLs.count, downloadQueue, ^(size_t index) {
NSURL *imageURL = imageURLs[index];
[self downloadImageWithURL:imageURL];
});
}

进行替换是需要注意几个问题:

  • 任务之间没有时序性依赖,谁先执行都可以。
  • 一般在并发队列,并发执行任务时,才替换。串行队列替换没有意义。
  • 如果数组中数据很少,或者每个任务执行时间很短,替换也没有意义。强行进行并发的消耗,可能比使用 for loop 还要多,并不能得到优化。

至于原理,就不大篇幅讲了。大概是这个样子:这个方法是同步的,会阻塞当前线程,直到所有的 block 任务都完成。如果提交到并发队列,每个任务执行顺序是不一定的。

更多时候,我们执行下载任务,并不希望阻塞当前线程,这时我们可以使用 dispatch_group

dispatch_group

当处理批量异步任务时,dispatch_group 是一个很好的选择。针对上面说的下载图片的例子,我们可以这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)downloadImages:(NSArray <NSURL *> *)imageURLs {
dispatch_group_t taskGroup = dispatch_group_create();
dispatch_queue_t queue = dispatch_queue_create("com.bool.group", DISPATCH_QUEUE_CONCURRENT);
for (NSURL *imageURL in imageURLs) {
dispatch_group_enter(taskGroup);
// 下载方法是异步的
[self downloadImageWithURL:imageURL withQueue:queue completeHandler:^{
dispatch_group_leave(taskGroup);
}];
}

dispatch_group_notify(taskGroup, queue, ^{
// all task finish
});

/** 如果使用这个方法,内部执行异步任务,会立即到 dispatch_group_notify 方法中,因为是异步,系统认为已经执行完了。所以这个方法使用不多。
*/
dispatch_group_async(taskGroup, queue, ^{

})
}

关于原理方面,和 dispatch_async() 方法类似,前面也提到。这里只说一段代码:

1
2
3
4
5
6
7
8
9
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc);
}

这段代码中,调用了 dispatch_group_enter(dg) 方法进行标记,最终都会和 dispatch_async() 走到同样的方法里 _dispatch_continuation_invoke_inline()。在里面判断类型为 group,执行 task,执行结束后调用 dispatch_group_leave((dispatch_group_t)dou),和之前的 enter 对应。

以上是 Dispatch Queues 内容的介绍,我们平时使用 GCD 的过程中,60% 都是使用的以上内容。

2. Dispatch Block

在 iOS 8 中,Apple 为我们提供了新的 API,Dispatch Block 相关。虽然之前我们可以向 dispatch 传递 block 参数,作为任务,但是这里和之前的不一样。之前经常说,使用 NSOperation 创建的任务可以 cancel,使用 GCD 不可以。但是在 iOS 8 之后,可以 cancel 任务了。

基本使用
  • 创建一个 block 并执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    	- (void)dispatchBlockTest {
    // 不指定优先级
    dispatch_block_t dsBlock = dispatch_block_create(0, ^{
    NSLog(@"test block");
    });

    // 指定优先级
    dispatch_block_t dsQosBlock = dispatch_block_create_with_qos_class(0, QOS_CLASS_USER_INITIATED, -1, ^{
    NSLog(@"test block");
    });

    dispatch_async(dispatch_get_main_queue(), dsBlock);
    dispatch_async(dispatch_get_main_queue(), dsQosBlock);

    // 直接创建并执行
    dispatch_block_perform(0, ^{
    NSLog(@"test block");
    });
    }
  • 阻塞当前任务,等 block 执行完在继续执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    	- (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock = dispatch_block_create(0, ^{
    NSLog(@"test block");
    });
    dispatch_async(queue, dsBlock);
    // 等到 block 执行完
    dispatch_block_wait(dsBlock, DISPATCH_TIME_FOREVER);
    NSLog(@"block was finished");
    }
  • block 执行完后,收到通知,执行其他任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    	- (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock = dispatch_block_create(0, ^{
    NSLog(@"test block");
    });
    dispatch_async(queue, dsBlock);
    // block 执行完收到通知
    dispatch_block_notify(dsBlock, queue, ^{
    NSLog(@"block was finished,do other thing");
    });
    NSLog(@"execute first");
    }
  • 对 block 进行 cancel 操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    	- (void)dispatchBlockTest {
    dispatch_queue_t queue = dispatch_queue_create("com.bool.block", DISPATCH_QUEUE_SERIAL);
    dispatch_block_t dsBlock1 = dispatch_block_create(0, ^{
    NSLog(@"test block1");
    });
    dispatch_block_t dsBlock2 = dispatch_block_create(0, ^{
    NSLog(@"test block2");
    });
    dispatch_async(queue, dsBlock1);
    dispatch_async(queue, dsBlock2);

    // 第二个 block 将会被 cancel,不执行
    dispatch_block_cancel(dsBlock2);
    }

3. Dispatch Barriers

Dispatch Barriers 可以理解为调度屏障,常用于多线程并发读写操作。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@interface ViewController ()
@property (nonatomic, strong) dispatch_queue_t imageQueue;
@property (nonatomic, strong) NSMutableArray *imageArray;
@end

@implementation ViewController

- (void)viewDidLoad {
[super viewDidLoad];

self.imageQueue = dispatch_queue_create("com.bool.image", DISPATCH_QUEUE_CONCURRENT);
self.imageArray = [NSMutableArray array];
}

/** 保证写入时不会有其他操作,写完之后到主线程更新 UI */
- (void)addImage:(UIImage *)image {
dispatch_barrier_async(self.imageQueue, ^{
[self.imageArray addObject:image];
dispatch_async(dispatch_get_main_queue(), ^{
// update UI
});
});
}

/** 这里的 dispatch_sync 起到了 lock 的作用 */
- (NSArray <UIImage *> *)images {
__block NSArray *imagesArray = nil;
dispatch_sync(self.imageQueue, ^{
imagesArray = [self.imageArray mutableCopy];
});
return imagesArray;
}
@end

转化成图可能好理解一些:

Dispatch_Barrier

dispatch_barrier_async() 的原理和 dispatch_async() 差不多,只不过设置的 flags 不一样:

1
2
3
4
5
6
7
8
9
10
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
// 在 dispatch_async() 中只设置了 DISPATCH_OBJ_CONSUME_BIT
uintptr_t dc_flags = DISPATCH_OBJ_CONSUME_BIT | DISPATCH_OBJ_BARRIER_BIT;

_dispatch_continuation_init(dc, dq, work, 0, 0, dc_flags);
_dispatch_continuation_push(dq, dc);
}

后面都是 push 到队列中,然后,获取任务时一个死循环,在从队列中获取任务一个一个执行,如果判断 flag 为 barrier,终止循环,则单独执行这个任务。它后面的任务放入一个队列,等它执行完了再开始执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
DISPATCH_ALWAYS_INLINE
static dispatch_queue_wakeup_target_t
_dispatch_queue_drain(dispatch_queue_t dq, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags, uint64_t *owned_ptr, bool serial_drain)
{
...

for (;;) {
...
first_iteration:
dq_state = os_atomic_load(&dq->dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
break;
}
if (unlikely(orig_tq != dq->do_targetq)) {
break;
}

if (serial_drain || _dispatch_object_is_barrier(dc)) {
if (!serial_drain && owned != DISPATCH_QUEUE_IN_BARRIER) {
if (!_dispatch_queue_try_upgrade_full_width(dq, owned)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_IN_BARRIER;
}
next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned = 0;
dic->dic_deferred = dc;
goto out_with_deferred;
}
} else {
if (owned == DISPATCH_QUEUE_IN_BARRIER) {
// we just ran barrier work items, we have to make their
// effect visible to other sync work items on other threads
// that may start coming in after this point, hence the
// release barrier
os_atomic_xor2o(dq, dq_state, owned, release);
owned = dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
} else if (unlikely(owned == 0)) {
if (_dispatch_object_is_sync_waiter(dc)) {
// sync "readers" don't observe the limit
_dispatch_queue_reserve_sync_width(dq);
} else if (!_dispatch_queue_try_acquire_async(dq)) {
goto out_with_no_width;
}
owned = DISPATCH_QUEUE_WIDTH_INTERVAL;
}

next_dc = _dispatch_queue_next(dq, dc);
if (_dispatch_object_is_sync_waiter(dc)) {
owned -= DISPATCH_QUEUE_WIDTH_INTERVAL;
_dispatch_sync_waiter_redirect_or_wake(dq,
DISPATCH_SYNC_WAITER_NO_UNLOCK, dc);
continue;
}

...
}

4. Dispatch Source

关于 dispatch_source 我们使用的少之又少,他是 BSD 系统内核功能的包装,经常用来监测某些事件发生。例如监测断点的使用和取消。[这里][https://developer.apple.com/documentation/dispatch/dispatch_source_type_constants?language=objc] 介绍了可以监测的事件:

  • DISPATCH_SOURCE_TYPE_DATA_ADD : 自定义事件
  • DISPATCH_SOURCE_TYPE_DATA_OR : 自定义事件
  • DISPATCH_SOURCE_TYPE_MACH_RECV : MACH 端口接收事件
  • DISPATCH_SOURCE_TYPE_MACH_SEND : MACH 端口发送事件
  • DISPATCH_SOURCE_TYPE_PROC : 进程相关事件
  • DISPATCH_SOURCE_TYPE_READ : 文件读取事件
  • DISPATCH_SOURCE_TYPE_SIGNAL : 信号相关事件
  • DISPATCH_SOURCE_TYPE_TIMER : 定时器相关事件
  • DISPATCH_SOURCE_TYPE_VNODE : 文件属性修改事件
  • DISPATCH_SOURCE_TYPE_WRITE : 文件写入事件
  • DISPATCH_SOURCE_TYPE_MEMORYPRESSURE : 内存压力事件

例如我们可以通过下面代码,来监测断点的使用和取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@interface ViewController ()
@property (nonatomic, strong) dispatch_source_t signalSource;
@property (nonatomic, assign) dispatch_once_t signalOnceToken;
@end

@implementation ViewController

- (void)viewDidLoad {
dispatch_once(&_signalOnceToken, ^{
dispatch_queue_t queue = dispatch_get_main_queue();
self.signalSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, SIGSTOP, 0, queue);

if (self.signalSource) {
dispatch_source_set_event_handler(self.signalSource, ^{
// 点击一下断点,再取消断点,便会执行这里。
NSLog(@"debug test");
});
dispatch_resume(self.signalSource);
}
});
}

还有 diapatch_after() 就是依赖 dispatch_source() 来实现的。我们可以自己实现一个类似的定时器:

1
2
3
4
5
6
7
8
9
10
- (void)customTimer {
dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 5.0 * NSEC_PER_SEC), 2.0 * NSEC_PER_SEC, 5);
dispatch_source_set_event_handler(timerSource, ^{
NSLog(@"dispatch source timer");
});

self.signalSource = timerSource;
dispatch_resume(self.signalSource);
}
基本原理

使用 dispatch_source 时,大致过程是这样的:我们创建一个 source,然后加到队列中,并调用 dispatch_resume() 方法,便会从队列中唤起 source,执行对应的 block。下面是一个详细的流程图,我们结合这张图来说一下:

Dispatch_Source

  • 创建一个 source 对象,过程和创建 queue 类似,所以后面一些操作,和操作 queue 很类似。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    dispatch_source_t
    dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
    unsigned long mask, dispatch_queue_t dq)
    {
    dispatch_source_refs_t dr;
    dispatch_source_t ds;

    dr = dux_create(dst, handle, mask)._dr;
    if (unlikely(!dr)) {
    return DISPATCH_BAD_INPUT;
    }

    // 申请内存空间
    ds = _dispatch_object_alloc(DISPATCH_VTABLE(source),
    sizeof(struct dispatch_source_s));
    // 初始化一个队列,然后配置参数,完全被当做一个 queue 来处理
    _dispatch_queue_init(ds->_as_dq, DQF_LEGACY, 1,
    DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER);
    ds->dq_label = "source";
    ds->do_ref_cnt++; // the reference the manager queue holds
    ds->ds_refs = dr;
    dr->du_owner_wref = _dispatch_ptr2wref(ds);

    if (slowpath(!dq)) {
    dq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
    } else {
    _dispatch_retain((dispatch_queue_t _Nonnull)dq);
    }
    ds->do_targetq = dq;
    if (dr->du_is_timer && (dr->du_fflags & DISPATCH_TIMER_INTERVAL)) {
    _dispatch_source_set_interval(ds, handle);
    }
    _dispatch_object_debug(ds, "%s", __func__);
    return ds;
    }
  • 设置 event_handler。从源码中看出,用的是 dispatch_continuation_t 进行绑定,和之前绑定 queue 一样,将 block copy 了一份。后面执行的时候,拿出来用。然后将这个任务 push 到队列里。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    	void
    dispatch_source_set_event_handler(dispatch_source_t ds,
    dispatch_block_t handler)
    {
    dispatch_continuation_t dc;
    // 这里实际就是在初始化 dispatch_continuation_t
    dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
    // 经过一顿操作,将任务 push 到队列中。
    _dispatch_source_set_handler(ds, DS_EVENT_HANDLER, dc);
    }
  • 调用 resume 方法,执行 source。一般新创建的都是暂停状态,这里判断是暂停状态,就开始唤起。

    1
    2
    3
    4
    5
    6
    7
    8
    	void
    dispatch_resume(dispatch_object_t dou)
    {
    DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
    if (dx_vtable(dou._do)->do_suspend) {
    dx_vtable(dou._do)->do_resume(dou._do, false);
    }
    }
  • 最后一步,是最核心的异步,唤起任务开始执行。之前的 queue 最终也是走到这样类似的一步,可以看返回类型都是 dispatch_queue_wakeup_target_t,基本是沿着 queue 的逻辑一路 copy 过来。这个方法,经过一系列判断,保证所有的 source 都会在正确的队列上面执行;如果队列和任务不对应,那么就返回正确的队列,重新派发让任务在正确的队列上执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    	DISPATCH_ALWAYS_INLINE
    static inline dispatch_queue_wakeup_target_t
    _dispatch_source_invoke2(dispatch_object_t dou, dispatch_invoke_context_t dic,
    dispatch_invoke_flags_t flags, uint64_t *owned)
    {
    dispatch_source_t ds = dou._ds;
    dispatch_queue_wakeup_target_t retq = DISPATCH_QUEUE_WAKEUP_NONE;
    // 获取当前 queue
    dispatch_queue_t dq = _dispatch_queue_get_current();
    dispatch_source_refs_t dr = ds->ds_refs;
    dispatch_queue_flags_t dqf;

    ...

    // timer 事件处理
    if (dr->du_is_timer &&
    os_atomic_load2o(ds, ds_timer_refs->dt_pending_config, relaxed)) {
    dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
    if (!(dqf & (DSF_CANCELED | DQF_RELEASED))) {
    // timer has to be configured on the kevent queue
    if (dq != dkq) {
    return dkq;
    }
    _dispatch_source_timer_configure(ds);
    }
    }

    // 是否安装 source
    if (!ds->ds_is_installed) {
    // The source needs to be installed on the kevent queue.
    if (dq != dkq) {
    return dkq;
    }
    _dispatch_source_install(ds, _dispatch_get_wlh(),
    _dispatch_get_basepri());
    }

    // 是否暂停,因为之前判断过,一般不可能走到这里
    if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
    // Source suspended by an item drained from the source queue.
    return ds->do_targetq;
    }

    // 是否在
    if (_dispatch_source_get_registration_handler(dr)) {
    // The source has been registered and the registration handler needs
    // to be delivered on the target queue.
    if (dq != ds->do_targetq) {
    return ds->do_targetq;
    }
    // clears ds_registration_handler
    _dispatch_source_registration_callout(ds, dq, flags);
    }

    ...

    if (!(dqf & (DSF_CANCELED | DQF_RELEASED)) &&
    os_atomic_load2o(ds, ds_pending_data, relaxed)) {
    // 有些 source 还有未完成的数据,需要通过目标队列上的回调进行传送;有些 source 则需要切换到管理队列上去。
    if (dq == ds->do_targetq) {
    _dispatch_source_latch_and_call(ds, dq, flags);
    dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
    prevent_starvation = dq->do_targetq ||
    !(dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
    if (prevent_starvation &&
    os_atomic_load2o(ds, ds_pending_data, relaxed)) {
    retq = ds->do_targetq;
    }
    } else {
    return ds->do_targetq;
    }
    }

    if ((dqf & (DSF_CANCELED | DQF_RELEASED)) && !(dqf & DSF_DEFERRED_DELETE)) {
    // 已经被取消的 source 需要从管理队列中卸载。卸载完成后,取消的 handler 需要交付到目标队列。
    if (!(dqf & DSF_DELETED)) {
    if (dr->du_is_timer && !(dqf & DSF_ARMED)) {
    // timers can cheat if not armed because there's nothing left
    // to do on the manager queue and unregistration can happen
    // on the regular target queue
    } else if (dq != dkq) {
    return dkq;
    }
    _dispatch_source_refs_unregister(ds, 0);
    dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
    if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
    if (!(dqf & DSF_ARMED)) {
    goto unregister_event;
    }
    // we need to wait for the EV_DELETE
    return retq ? retq : DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT;
    }
    }
    if (dq != ds->do_targetq && (_dispatch_source_get_event_handler(dr) ||
    _dispatch_source_get_cancel_handler(dr) ||
    _dispatch_source_get_registration_handler(dr))) {
    retq = ds->do_targetq;
    } else {
    _dispatch_source_cancel_callout(ds, dq, flags);
    dqf = _dispatch_queue_atomic_flags(ds->_as_dq);
    }
    prevent_starvation = false;
    }

    if (_dispatch_unote_needs_rearm(dr) &&
    !(dqf & (DSF_ARMED|DSF_DELETED|DSF_CANCELED|DQF_RELEASED))) {
    // 需要在管理队列进行 rearm 的
    if (dq != dkq) {
    return dkq;
    }
    if (unlikely(dqf & DSF_DEFERRED_DELETE)) {
    // 如果我们可以直接注销,不需要 resume
    goto unregister_event;
    }
    if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(ds))) {
    // 如果 source 已经暂停,不需要在管理队列 rearm
    return ds->do_targetq;
    }
    if (prevent_starvation && dr->du_wlh == DISPATCH_WLH_ANON) {
    return ds->do_targetq;
    }
    if (unlikely(!_dispatch_source_refs_resume(ds))) {
    goto unregister_event;
    }
    if (!prevent_starvation && _dispatch_wlh_should_poll_unote(dr)) {
    _dispatch_event_loop_drain(KEVENT_FLAG_IMMEDIATE);
    }
    }
    return retq;
    }
还有一些其他的方法,这里就不介绍了。有兴趣的可以看源码,太多了。

5. Dispatch I/O

我们可以使用 Dispatch I/O 快速读取一些文件,例如这样 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- (void)readFile {
NSString *filePath = @"/.../青花瓷.m";
dispatch_queue_t queue = dispatch_queue_create("com.bool.readfile", DISPATCH_QUEUE_SERIAL);
dispatch_fd_t fd = open(filePath.UTF8String, O_RDONLY,0);
dispatch_io_t fileChannel = dispatch_io_create(DISPATCH_IO_STREAM, fd, queue, ^(int error) {
close(fd);
});

NSMutableData *fileData = [NSMutableData new];
dispatch_io_set_low_water(fileChannel, SIZE_MAX);
dispatch_io_read(fileChannel, 0, SIZE_MAX, queue, ^(bool done, dispatch_data_t _Nullable data, int error) {
if (error == 0 && dispatch_data_get_size(data) > 0) {
[fileData appendData:(NSData *)data];
}

if (done) {
NSString *str = [[NSString alloc] initWithData:fileData encoding:NSUTF8StringEncoding];
NSLog(@"read file completed, string is :\n %@",str);
}
});
}

输出结果:

1
2
3
ConcurrencyTest[41479:5357296] read file completed, string is :
天青色等烟雨 而我在等你
月色被打捞起 晕开了结局

如果读取大文件,我们可以进行切片读取,将文件分割多个片,放在异步线程中并发执行,这样会比较快一些。

关于源码,简单看了一下,调度逻辑和之前的任务类似。然后读写操作,是调用的一些底层接口实现,这里就偷懒一下不详细说了。使用 Dispatch I/O,多数情况下是为了并发读取一个大文件,提高读取速度。

6. Other

上面已经讲了概览图中的大部分东西,还有一些未讲述,这里简单描述一下:

  • dispatch_object。GCD 用 C 函数实现的对象,不能通过集成 dispatch 类实现,也不能用 alloc 方法初始化。GCD 针对 dispatch_object 提供了一些接口,我们使用这些接口可以处理一些内存事件、取消和暂停操作、定义上下文和处理日志相关工作。dispatch_object 必须要手动管理内存,不遵循垃圾回收机制。

  • dispatch_time。在 GCD 中使用的时间对象,可以创建自定义时间,也可以使用 DISPATCH_TIME_NOWDISPATCH_TIME_FOREVER 这两个系统给出的时间。

以上为 GCD 相关知识,这次使用的源码版本为最新版本 —— 912.30.4.tar.gz,和之前看的版本代码差距很大,因为代码量的增加,新版本代码比较乱,不过基本原理还是差不多的。曾经我一度认为,最上面的是最新版本…

Operations

Operations 也是我们在并发编程中常用的一套 API,根据 官方文档 划分的结构如下图:

Operations 结构图

其中 NSBlockOperationNSInvocationOperation 是基于 NSOperation 的子类化实现。相对于 GCD,Operations 的原理要稍微好理解一些,下面就将用法和原理介绍一下。

1. NSOperation

基本使用

每一个 operation 可以认为是一个 task。NSOperation 本事是一个抽象类,使用前需子类化。幸运的是,Apple 为我们实现了两个子类:NSInvocationOperationNSBlockOperation。我们也可以自己去定义一个 operation。下面介绍一下基本使用:

  • 创建一个 NSInvocationOperation 对象并在当前线程执行.

    1
    2
    NSInvocationOperation *invocationOperation = [[NSInvocationOperation alloc] initWithTarget:self selector:@selector(log) object:nil];
    [invocationOperation start];
  • 创建一个 NSBlockOperation 对象并执行 (每个 block 不一定会在当前线程,也不一定在同一线程执行).

    1
    2
    3
    4
    5
    6
    7
    8
    9
    NSBlockOperation *blockOpeartion = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"block operation");
    }];

    // 可以添加多个 block
    [blockOpeartion addExecutionBlock:^{
    NSLog(@"other block opeartion");
    }];
    [blockOpeartion start];
  • 自定义一个 Operation。当我们不需要操作状态的时候,只需要实现 main() 方法即可。需要操作状态的后面再说.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @interface BLOpeartion : NSOperation
    @end

    @implementation BLOpeartion
    - (void)main {
    NSLog(@"BLOperation main method");
    }
    @end

    - (void)viewDidLoad {
    [super viewDidLoad];
    BLOperation *blOperation = [BLOperation new];
    [blOperation start];
    }
  • 每个 operation 之间设置依赖.

    1
    2
    3
    4
    5
    6
    7
    8
    NSBlockOperation *blockOpeartion1 = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"block operation1");
    }];
    NSBlockOperation *blockOpeartion2 = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"block operation2");
    }];
    // 2 需要在 1 执行完之后再执行。
    [blockOpeartion2 addDependency:blockOpeartion1];
  • 与队列相关的使用,后面再说.

基本原理

NSOperation 内置了一个强大的状态机,一个 operation 从初始化到执行完毕这一生命周期,对应了各种状态。下面是在 WWDC 2015 Advanced NSOperations 出现的一张图:

opeartion_status

operation 一开始是 Pending 状态,代表即将进入 Ready;进入 Ready 之后,代表任务可以执行;然后进入 Executing 状态;最后执行完成,进入 Finished 状态。过程中,除了 Finished 状态,在其他几个状态中都可以进行 Cancelled。

NSOperation 并没有开源。但是 swift 开源了,在 swift 中它叫 Opeartion,我们可以在 这里 找到他的源码。我这里 copy 了一份:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
open class Operation : NSObject {
let lock = NSLock()
internal weak var _queue: OperationQueue?

// 默认几个状态都是 false
internal var _cancelled = false
internal var _executing = false
internal var _finished = false
internal var _ready = false

// 用一个集合来保存依赖它的对象
internal var _dependencies = Set<Operation>()

// 初始化一些 dispatch_group 对象,来管理 operation 以及其依赖对象的 执行。
#if DEPLOYMENT_ENABLE_LIBDISPATCH
internal var _group = DispatchGroup()
internal var _depGroup = DispatchGroup()
internal var _groups = [DispatchGroup]()
#endif

public override init() {
super.init()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.enter()
#endif
}

internal func _leaveGroups() {
// assumes lock is taken
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_groups.forEach() { $0.leave() }
_groups.removeAll()
_group.leave()
#endif
}

// 默认实现的 start 方法中,执行 main 方法,线程安全,下同。执行前后设置 _executing。
open func start() {
if !isCancelled {
lock.lock()
_executing = true
lock.unlock()
main()
lock.lock()
_executing = false
lock.unlock()
}
finish()
}

// 默认实现的 finish 方法中,标记 _finished 状态。
internal func finish() {
lock.lock()
_finished = true
_leaveGroups()
lock.unlock()
if let queue = _queue {
queue._operationFinished(self)
}
...
}

// main 方法默认空,需要子类去实现。
open func main() { }


// 调用 cancel 方法后,只是标记状态,具体操作在 main 中,调用 cancel 后也被认为是 finish。
open func cancel() {
lock.lock()
_cancelled = true
lock.unlock()
}

/** 几个状态的 get 方法,省略 */
...


// 是否为异步任务,默认为 false。这个方法在 OC 中永远不会去实现
open var isAsynchronous: Bool {
return false
}

// 设置依赖,即将 operation 放到集合中
open func addDependency(_ op: Operation) {
lock.lock()
_dependencies.insert(op)
op.lock.lock()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.enter()
op._groups.append(_depGroup)
#endif
op.lock.unlock()
lock.unlock()
}

...

// 默认队列优先级为 normal
open var queuePriority: QueuePriority = .normal

public var completionBlock: (() -> Void)?
open func waitUntilFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.wait()
#endif
}

// 线程优先级
open var threadPriority: Double = 0.5

/// - Note: Quality of service is not directly supported here since there are not qos class promotions available outside of darwin targets.
open var qualityOfService: QualityOfService = .default

open var name: String?

internal func _waitUntilReady() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.wait()
#endif
_ready = true
}
}

代码很简单,具体过程可以直接看注释,就不另说了。除此之外,我们可以看出,Operation 总很多方法造作都加了锁,说明这个类是线程安全的,当我们对 NSOperation 进行子类化时,重写方法要注意线程暗转问题。

2. NSOperationQueue

NSOperation 的很多花式操作,都是结合着 NSOperationQueue 进行的。我们在使用的时候,也是两者结合着使用。下面对其进行详细分析。

基本用法
  • operation 放到 queue 中不用在手动调用 start 方法去执行,operation 会自动执行。
  • queue 可以设置最大并发数,当并发数量设置为 1 时,为串行队列;默认并发数量为无限大。
  • queue 可以通过设置 suspended 属性来暂停或者启动还未执行的 operation
  • queue 可以通过调用 -[cancelAllOperations] 方法来取消队列中的任务。
  • queue 可以通过 mainQueue 方法来回到主队列(主线程);可以通过 currentQueue 方法来获取当前队列。
  • 更多方法,请参考 官方文档

使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- (void)testOperationQueue {
NSOperationQueue *operationQueue = [NSOperationQueue new];
// 设置最大并发数量为 3
[operationQueue setMaxConcurrentOperationCount:3];

NSInvocationOperation *invocationOpeartion = [[NSInvocationOperation alloc] initWithTarget:self selector:@selector(log) object:nil];
[operationQueue addOperation:invocationOpeartion];

[operationQueue addOperationWithBlock:^{
NSLog(@"block operation");
// 回到主线程执行任务
[[NSOperationQueue mainQueue] addOperationWithBlock:^{
NSLog(@"execute in main thread");
}];
}];

// 暂停还未开始执行的任务
operationQueue.suspended = YES;

// 取消所有任务
[operationQueue cancelAllOperations];
}

有一个问题要特别说明一下:

NSOperationQueue 和 GCD 中的队列不同。GCD 中的队列是遵循 FIFO 原则,先加入队列的先执行;NSOperationQueue 中的任务,根据谁先进入到 Ready 状态,谁先执行。如果有多个任务同时达到 Ready 状态,那么根据优先级来执行。

例如下面的任务中,4 先到达了 Ready 状态,4 先执行。并不是按照 1,2,3… 顺序执行。

Operation_Queue

基本原理

我们依然是在 swift 中找到相关源码,然后来进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// 默认最大并发数量为 int 最大值
public extension OperationQueue {
public static let defaultMaxConcurrentOperationCount: Int = Int.max
}

// 使用一个 list 来保存各个优先级的 operation。调用其中的方法对 operation 进行增删等操作。
internal struct _OperationList {
var veryLow = [Operation]()
var low = [Operation]()
var normal = [Operation]()
var high = [Operation]()
var veryHigh = [Operation]()
var all = [Operation]()

mutating func insert(_ operation: Operation) { ... }
mutating func remove(_ operation: Operation) { ... }
mutating func dequeue() -> Operation? { ... }
var count: Int {
return all.count
}
func map<T>(_ transform: (Operation) throws -> T) rethrows -> [T] {
return try all.map(transform)
}
}

open class OperationQueue: NSObject {
...
// 使用一个信号量的来控制并发数量
var __concurrencyGate: DispatchSemaphore?
var __underlyingQueue: DispatchQueue? {
didSet {
let key = OperationQueue.OperationQueueKey
oldValue?.setSpecific(key: key, value: nil)
__underlyingQueue?.setSpecific(key: key, value: Unmanaged.passUnretained(self))
}
}

...

internal var _underlyingQueue: DispatchQueue {
lock.lock()
if let queue = __underlyingQueue {
lock.unlock()
return queue
} else {
...

// 信号量的值根据最大并发数量来确定。每当执行一个任务,wait 信号量减一,signal 信号量加一,当信号量为0时,一直等待,直接大于 0 才会正常执行。
if maxConcurrentOperationCount == 1 {
attr = []
__concurrencyGate = DispatchSemaphore(value: 1)
} else {
attr = .concurrent
if maxConcurrentOperationCount != OperationQueue.defaultMaxConcurrentOperationCount {
__concurrencyGate = DispatchSemaphore(value:maxConcurrentOperationCount)
}
}
let queue = DispatchQueue(label: effectiveName, attributes: attr)
if _suspended {
queue.suspend()
}
__underlyingQueue = queue
lock.unlock()
return queue
}
}
#endif

...

// 出队列,每个任务执行时拿出队列执行
internal func _dequeueOperation() -> Operation? {
lock.lock()
let op = _operations.dequeue()
lock.unlock()
return op
}

open func addOperation(_ op: Operation) {
addOperations([op], waitUntilFinished: false)
}

// 主要执行方法。先判断 operation 是否 ready,处于 ready 后判断是否 cancel。没有 cancel 则执行。
internal func _runOperation() {
if let op = _dequeueOperation() {
if !op.isCancelled {
op._waitUntilReady()
if !op.isCancelled {
op.start()
}
}
}
}

// 将任务加到队列中。如果不指定任务优先级,执行的还快一些。否则需要对不同优先级进行划分,然后执行
open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
var waitGroup: DispatchGroup?
if wait {
waitGroup = DispatchGroup()
}
#endif
lock.lock()
// 将 operation 依依加入 list,根据优先级保存到不同数组中
ops.forEach { (operation: Operation) -> Void in
operation._queue = self
_operations.insert(operation)
}
lock.unlock()

// 遍历执行,使用了 diapatch group,控制 enter 和 leave
ops.forEach { (operation: Operation) -> Void in
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.enter()
}

// 通过信号量来控制并发数量
let block = DispatchWorkItem(flags: .enforceQoS) { () -> Void in
if let sema = self._concurrencyGate {
sema.wait()
self._runOperation()
sema.signal()
} else {
self._runOperation()
}
if let group = waitGroup {
group.leave()
}
}
_underlyingQueue.async(group: queueGroup, execute: block)
#endif
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.wait()
}
#endif
}

internal func _operationFinished(_ operation: Operation) { ... }
open func addOperation(_ block: @escaping () -> Swift.Void) { ... }

// 返回值不一定准确
open var operations: [Operation] { ... }
// 返回值不一定准确
open var operationCount: Int { ... }
open var maxConcurrentOperationCount: Int = OperationQueue.defaultMaxConcurrentOperationCount

// suppend 属性的 get & set 方法。默认不暂停
internal var _suspended = false
open var isSuspended: Bool { ... }

...

// operation 在获取系统资源时的优先级
open var qualityOfService: QualityOfService = .default

// 依次调用每个 operation 的 cancel 方法
open func cancelAllOperations() { ... }

open func waitUntilAllOperationsAreFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
queueGroup.wait()
#endif
}

static let OperationQueueKey = DispatchSpecificKey<Unmanaged<OperationQueue>>()
// 通过使用 GCD 中的 getSpecific 方法获取当前队列
open class var current: OperationQueue? {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
guard let specific = DispatchQueue.getSpecific(key: OperationQueue.OperationQueueKey) else {
if _CFIsMainThread() {
return OperationQueue.main
} else {
return nil
}
}

return specific.takeUnretainedValue()
#else
return nil
#endif
}

// 定义主队列,最大并发数量为 1,获取主队列时将这个值返回
private static let _main = OperationQueue(_queue: .main, maxConcurrentOperations: 1)
open class var main: OperationQueue { ... }
}

代码很长,但是简单,可以直接通过注释来理解了。这里屡一下:

  • 将每个 operation 加入到队列时,会根据优先级将 operation 分类存入 list 中,根据优先级执行。如果都不设置优先级,执行起来比较快一些。
  • 加入到队列,会遍历每个 operation,取出进入 Ready 状态且没被 Cancel 的依次执行。
  • 通过 concurrencyGate 这个信号量来控制并发数量。每当执行一个任务,wait 信号量减一,signal 信号量加一,当信号量为0时,一直等待,直接大于 0 才会正常执行。
  • 每个方法中基本都加了锁,来保证线程安全。
自定义 NSOperation

之前说了自定义普通的 NSOperation,只需要重写 main 方法就可以了,但是因为我们没有处理并发情况,线程执行结束操作,KVO 机制,所以这种普通的不建议用来做并发任务。下面讲一下如何自定义并行的 NSOperation

必须要实现的一些方法:

  • start 方法,在你想要执行的线程中调用此方法。不需要调用 super 方法
  • main 方法,在 start 方法中调用,任务主体。
  • isExecuting 方法,是否正在执行,要实现 KVO 机制。
  • isConcurrent 方法,已经弃用,由 isAsynchronous 来代替。
  • isAsynchronous 方法,在并发任务中,需要返回 YES。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@interface BLOperation ()
@property (nonatomic, assign) BOOL executing;
@property (nonatomic, assign) BOOL finished;
@end

@implementation BLOperation
@synthesize executing;
@synthesize finished;

- (instancetype)init {
self = [super init];
if (self) {
executing = NO;
finished = NO;
}
return self;
}

- (void)start {
if ([self isCancelled]) {
[self willChangeValueForKey:@"isFinished"];
finished = YES;
[self didChangeValueForKey:@"isFinished"];
return;
}

[self willChangeValueForKey:@"isExecuting"];
[NSThread detachNewThreadSelector:@selector(main) toTarget:self withObject:nil];
executing = YES;
[self didChangeValueForKey:@"isExecuting"];
}

- (void)main {
NSLog(@"main begin");
@try {
@autoreleasepool {
NSLog(@"custom operation");
NSLog(@"currentThread = %@", [NSThread currentThread]);
NSLog(@"mainThread = %@", [NSThread mainThread]);

[self willChangeValueForKey:@"isFinished"];
[self willChangeValueForKey:@"isExecuting"];
executing = NO;
finished = YES;
[self didChangeValueForKey:@"isExecuting"];
[self didChangeValueForKey:@"isFinished"];
}
} @catch (NSException *exception) {
NSLog(@"exception is %@", exception);
}
NSLog(@"main end");
}

- (BOOL)isExecuting {
return executing;
}

- (BOOL)isFinished {
return finished;
}

- (BOOL)isAsynchronous {
return YES;
}
@end

关于 NSBlockOpeartion,主要实现了 main 方法,然后用一个数组保存加进来的其他 block,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
open class BlockOperation: Operation {
typealias ExecutionBlock = () -> Void
internal var _block: () -> Void
internal var _executionBlocks = [ExecutionBlock]()

public init(block: @escaping () -> Void) {
_block = block
}

override open func main() {
lock.lock()
let block = _block
let executionBlocks = _executionBlocks
lock.unlock()
block()
executionBlocks.forEach { $0() }
}

open func addExecutionBlock(_ block: @escaping () -> Void) {
lock.lock()
_executionBlocks.append(block)
lock.unlock()
}

open var executionBlocks: [() -> Void] {
lock.lock()
let blocks = _executionBlocks
lock.unlock()
return blocks
}
}

关于 NSOperation 的相关东西,到此结束。

在开发中的一些问题

相对于 API 的使用和基本原理的了解,我认为最重要的还是这一部分。毕竟我们还是要拿这些东西来开发的。并发编程中有很多坑,这里简单介绍一些。

1. NSNotification 与多线程问题

我们都知道,NSNotification 在哪个线程 post,最终就会在哪个线程执行。如果我们不是在主线程 post 的,但是却在主线程接收的,而且我们期望 selector 在主线程执行。这时候我们需要注意下,在 selector 需要 dispatch 到主线程才可以。当然你也可以使用 addObserverForName:object:queue:usingBlock: 来指定执行 block 的 queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@implementation BLPostNotification

- (void)postNotification {
dispatch_queue_t queue = dispatch_queue_create("com.bool.post.notification", DISPATCH_QUEUE_SERIAL);
dispatch_async(queue, ^{
// 从非主线程发送通知 (通知名字最好定义成一个常量)
[[NSNotificationCenter defaultCenter] postNotificationName:@"downloadImage" object:nil];
});
}
@end

@implementation ImageViewController

- (void)viewDidLoad {
[super viewDidLoad];
[[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(show) name:@"downloadImage" object:nil];
}

- (void)showImage {
// 需要 dispatch 到主线程更新 UI
dispatch_async(dispatch_get_main_queue(), ^{
// update UI
});
}
@end

2. NSTimer 与多线程问题

使用 NSTimer 时,在哪个线程生成的 timer,就在哪个线程销毁,否则会有意想不到的结果。官方这样描述的:

However, for a repeating timer, you must invalidate the timer object yourself by calling its invalidate method. Calling this method requests the removal of the timer from the current run loop; as a result, you should always call the invalidate method from the same thread on which the timer was installed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@interface BLTimerTest ()
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSTimer *timer;
@end

@implementation BLTimerTest
- (instancetype)init {
self = [super init];
if (self) {
_queue = dispatch_queue_create("com.bool.timer.test", DISPATCH_QUEUE_SERIAL);
}
return self;
}

- (void)installTimer {
dispatch_async(self.queue, ^{
self.timer = [NSTimer scheduledTimerWithTimeInterval:3.0f repeats:YES block:^(NSTimer * _Nonnull timer) {
NSLog(@"test timer");
}];
});
}

- (void)clearTimer {
dispatch_async(self.queue, ^{
if ([self.timer isValid]) {
[self.timer invalidate];
self.timer = nil;
}
});
}
@end

3. Dispatch Once 死锁问题

在开发中,我们经常使用 dispatch_once,但是递归调用会造成死锁。例如下面这样:

1
2
3
4
5
6
- (void)dispatchOnceTest {
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
[self dispatchOnceTest];
});
}

至于为什么会死锁,上文介绍 Dispatch Once 的时候已经说明了,这里就不多做介绍了。提醒一下使用的时候要注意,不要造成递归调用。

4. Dispatch Group 问题

在使用 dispatch_group 的时候,dispatch_group_enter(taskGroup)dispatch_group_leave(taskGroup) 一定要成对,否则也会出现崩溃。大多数情况下我们都会注意,但是有时候可能会疏忽。例如多层 for loop 时 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- (void)testDispatchGroup {
NSString *path = @"";
NSFileManager *fileManager = [NSFileManager defaultManager];
NSArray *folderList = [fileManager contentsOfDirectoryAtPath:path error:nil];
dispatch_group_t taskGroup = dispatch_group_create();

for (NSString *folderName in folderList) {
dispatch_group_enter(taskGroup);
NSString *folderPath = [@"path" stringByAppendingPathComponent:folderName];
NSArray *fileList = [fileManager contentsOfDirectoryAtPath:folderPath error:nil];
for (NSString *fileName in fileList) {
dispatch_async(_queue, ^{
// 异步任务
dispatch_group_leave(taskGroup);
});
}
}
}

上面的 dispatch_group_enter(taskGroup) 在第一层 for loop 中,而 dispatch_group_leave(taskGroup) 在第二层 for loop 中,两者的关系是一对多,很容造成崩溃。有时候嵌套层级太多,很容易忽略这个问题。

总结

关于 iOS 并发编程,就总结到这里。后面如果有一些 best practices 我会更新进来。另外,因为文章比较长,可能会出现一个错误,欢迎指正,我会对此加以修改。

参考文献

  1. 深入理解 iOS 开发中的锁
  2. 关于 @synchronized,这儿比你想知道的还要多
  3. 深入理解 GCD
  4. GCD源码分析2 —— dispatch_once篇
  5. GCD源码分析6 —— dispatch_source篇
  6. Dispatch
  7. Task Management - Operation
  8. swift-corelibs-foundation
  9. Advanced NSOperations