作者:崔晓兵
从一个线上问题说起
最近在线上遇到了一些[HMDConfigManager remoteConfigWithAppID:]
卡死
初步分析
观察了下主线程堆栈,用到的锁是读写锁
随后又去翻了下持有着锁的子线程,有各种各样的情况,且基本都处于正常的执行状态,例如有的处于打开文件状态,有的处于read
状态,有的正在执行NSUserDefaults
的方法…
通过观察发现,出问题的线程都有QOS:BACKGROUND
标记。整体看起来持有锁的子线程仍然在执行,只是留给主线程的时间不够了。为什么这些子线程在持有锁的情况下,需要执行这么久,直到主线程的8s卡死?一种情况就是真的如此耗时,另一种则是出现了优先级反转。
解决办法
在这个案例里面,持有读写锁且优先级低的线程迟迟得不到调度(又或者得到调度的时候又被抢占了,或者得到调度的时候时间已然不够了),而具有高优先级的线程由于拿不到读写锁,一直被阻塞,所以互相死锁。iOS8
之后引入了QualityOfService
的概念,类似于线程的优先级,设置不同的QualityOfService
的值后系统会分配不同的CPU
时间、网络资源和硬盘资源等,因此我们可以通过这个设置队列的优先级 。
方案一:去除对NSOperationQueue
的优先级设置
在 Threading Programming Guide 文档中,苹果给出了提示:
Important: It is generally a good idea to leave the priorities of your threads at their default values. Increasing the priorities of some threads also increases the likelihood of starvation among lower-priority threads. If your application contains high-priority and low-priority threads that must interact with each other, the starvation of lower-priority threads may block other threads and create performance bottlenecks.
苹果的建议是不要随意修改线程的优先级,尤其是这些高低优先级线程之间存在临界资源竞争的情况。所以删除相关优先级设置代码即可解决问题。
方案二:临时修改线程优先级
在 pthread_rwlock_rdlock(3pthread) 发现了如下提示:
Realtime applications may encounter priority inversion when using read-write locks. The problem occurs when a high priority thread “locks” a read-write lock that is about to be “unlocked” by a low priority thread, but the low priority thread is preempted by a medium priority thread. This scenario leads to priority inversion; a high priority thread is blocked by lower priority threads for an unlimited period of time. During system design, realtime programmers must take into account the possibility of this kind of priority inversion.They can deal with it in a number of ways, such as by having critical sections that are guarded by read-write locks execute at a high priority, so that a thread cannot be preempted while executing in its critical section.
尽管针对的是实时系统,但是还是有一些启示和帮助。按照提示,对有问题的代码进行了修改:在线程通过pthread_rwlock_wrlock
拿到_rwlock
的时候,临时提升其优先级,在释放_rwlock
之后,恢复其原先的优先级。
- (id)remoteConfigWithAppID:(NSString *)appID
{
.......
pthread_rwlock_rdlock(&_rwlock);
HMDHeimdallrConfig *result = ....... // get existing config
pthread_rwlock_unlock(&_rwlock);
if(result == nil) {
result = [[HMDHeimdallrConfig alloc] init]; // make a new config
pthread_rwlock_wrlock(&_rwlock);
qos_class_t oldQos = qos_class_self();
BOOL needRecover = NO;
// 临时提升线程优先级
if (_enablePriorityInversionProtection && oldQos < QOS_CLASS_USER_INTERACTIVE) {
int ret = pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0);
needRecover = (ret == 0);
}
......
pthread_rwlock_unlock(&_rwlock);
// 恢复线程优先级
if (_enablePriorityInversionProtection && needRecover) {
pthread_set_qos_class_self_np(oldQos, 0);
}
}
return result;
}
值得注意的是,这里只能使用
pthread
的api
,NSThread
提供的API
是不可行的
Demo 验证
为了验证上述的手动调整线程优先级是否有一定的效果,这里通过demo
进行本地实验:定义了2000
个operation
(目的是为了CPU
繁忙),优先级设置NSQualityOfServiceUserInitiated
,且对其中可以被100
整除的operation
的优先级调整为NSQualityOfServiceBackground
,在每个operation
执行相同的耗时任务,然后对这被选中的10
个operation
进行耗时统计。
for (int j = 0; j < 2000; ++j) {
NSOperationQueue *operation = [[NSOperationQueue alloc] init];
operation.maxConcurrentOperationCount = 1;
operation.qualityOfService = NSQualityOfServiceUserInitiated;
// 模块1
// if (j % 100 == 0) {
// operation.qualityOfService = NSQualityOfServiceBackground;
// }
// 模块1
[operation addOperationWithBlock:^{
// 模块2
// qos_class_t oldQos = qos_class_self();
// pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0);
// 模块2
NSTimeInterval start = CFAbsoluteTimeGetCurrent();
double sum = 0;
for (int i = 0; i < 100000; ++i) {
sum += sin(i) + cos(i) + sin(i*2) + cos(i*2);
}
start = CFAbsoluteTimeGetCurrent() - start;
if (j % 100 == 0) {
printf("%.8f\n", start * 1000);
}
// 模块2
// pthread_set_qos_class_self_np(oldQos, 0);
// 模块2
}];
}
统计信息如下图所示
A | B | C |
---|---|---|
(注释模块1和模块2代码) | (只打开模块1代码) | (同时打开模块1和模块2代码) |
11.8190561 | 94.70210189 | 15.04005137 |
可以看到
- 正常情况下,每个任务的平均耗时为:11.8190561;
- 当
operation
被设置为低优先级时,其耗时大幅度提升为:94.70210189; - 当
operation
被设置为低优先级时,又在Block
中手动恢复其原有的优先级,其耗时已经大幅度降低:15.04005137(耗时比正常情况高,大家可以思考下为什么)
通过Demo
可以发现,通过手动调整其优先级,低优先级任务的整体耗时得到大幅度的降低,这样在持有锁的情况下,可以减少对主线程的阻塞时间。
上线效果
该问题的验证过程分为2
个阶段:
- 第一个阶段如第1个红框所示,从
3
月6
号开始在版本19.7
上有较大幅度的下降,主要原因:堆栈中被等待的队列信息由QOS:BACKGROUND
变为了com.apple.root.default-qos
,队列的优先级从QOS_CLASS_BACKGROUND
提升为QOS_CLASS_DEFAULT
,相当于实施了方案一,使用了默认优先级。 - 第二个阶段如第
2
个红框所示,从4
月24
号在版本20.3
上开始验证。目前看起来效果暂时不明显,推测一个主要原因是:demo
中是把优先级从QOS_CLASS_BACKGROUND
提升为QOS_CLASS_USER_INITIATED
,而线上相当于把队列的优先级从默认的优先级QOS_CLASS_DEFAULT
提升为QOS_CLASS_USER_INITIATED
所以相对来说,线上的提升相对有限。-
QOS_CLASS_BACKGROUND
的Mach
层级优先级数是4; -
QOS_CLASS_DEFAULT
的Mach
层级优先级数是31; -
QOS_CLASS_USER_INITIATED
的Mach
层级优先级数是37;
-
深刻理解优先级反转
那么是否所有锁都需要像上文一样,手动提升持有锁的线程优先级?系统是否会自动调整线程的优先级?如果有这样的机制,是否可以覆盖所有的锁?要理解这些问题,需要深刻认识优先级反转。
什么是优先级反转?
优先级反转,是指某同步资源被较低优先级的进程/线程所拥有,较高优先级的进程/线程竞争该同步资源未获得该资源,而使得较高优先级进程/线程反而推迟被调度执行的现象。根据阻塞类型的不同,优先级反转又被分为Bounded priority inversion
和Unbounded priority inversion
。这里借助Introduction to RTOS – Solution to Part 11 (Priority Inversion)的图进行示意。
Bounded priority inversion
如图所示,高优先级任务(Task H
)被持有锁的低优先级任务(Task L
)阻塞,由于阻塞的时间取决于低优先级任务在临界区的时间(持有锁的时间),所以被称为bounded priority inversion
。只要Task L
一直持有锁,Task H
就会一直被阻塞,低优先级的任务运行在高优先级任务的前面,优先级被反转。
这里的任务也可以理解为线程
Unbounded priority inversion
在Task L
持有锁的情况下,如果有一个中间优先级的任务(Task M
)打断了Task L
,前面的bounded
就会变为unbounded
,因为Task M
只要抢占了Task L
的CPU
,就可能会阻塞Task H
任意多的时间(Task M
可能不止1
个)
优先级反转常规解决思路
目前解决Unbounded priority inversion
有2
种方法:一种被称作优先权极限(priority ceiling protocol
),另一种被称作优先级继承(priority inheritance
)。
Priority ceiling protocol
在优先权极限方案中,系统把每一个临界资源与1个极限优先权相关联。当1个任务进入临界区时,系统便把这个极限优先权传递给这个任务,使得这个任务的优先权最高;当这个任务退出临界区后,系统立即把它的优先权恢复正常,从而保证系统不会出现优先权反转的情况。该极限优先权的值是由所有需要该临界资源的任务的最大优先级来决定的。
如图所示,锁的极限优先权是3。当Task L
持有锁的时候,它的优先级将会被提升到3,和Task H
一样的优先级。这样就可以阻止Task M
(优先级是2)的运行,直到Task L
和Task H
不再需要该锁。
Priority inheritance
在优先级继承方案中,大致原理是:高优先级任务在尝试获取锁的时候,如果该锁正好被低优先级任务持有,此时会临时把高优先级线程的优先级转移给拥有锁的低优先级线程,使低优先级线程能更快的执行并释放同步资源,释放同步资源后再恢复其原来的优先级。
priority ceiling protocol
和priority inheritance
都会在释放锁的时候,恢复低优先级任务的优先级。同时要注意,以上2
种方法只能阻止Unbounded priority inversion
,而无法阻止Bounded priority inversion
(Task H
必须等待Task L
执行完毕才能执行,这个反转是无法避免的)。
可以通过以下几种发生来避免或者转移Bounded priority inversion
:
- 减少临界区的执行时间,减少
Bounded priority inversion
的反转耗时; - 避免使用会阻塞高优先级任务的临界区资源;
- 专门使用一个队列来管理资源,避免使用锁。
优先级继承必须是可传递的。举个栗子:当
T1
阻塞在被T2
持有的资源上,而T2
又阻塞在T3
持有的一个资源上。如果T1
的优先级高于T2
和T3
的优先级,T3
必须通过T2
继承T1
的优先级。否则,如果另外一个优先级高于T2
和T3
,小于T1
的线程T4
,将抢占T3
,引发相对于T1
的优先级反转。因此,线程所继承的优先级必须是直接或者间接阻塞的线程的最高优先级。
如何避免优先级反转?
QoS 传递
iOS 系统主要使用以下两种机制来在不同线程(或queue
)间传递QoS
:
- 机制1:
dispatch_async
-
dispatch_async()
automatically propagates the QoS from the calling thread, though it will translate User Interactive to User Initiated to avoid assigning that priority to non-main threads. - Captured at time of block submission, translate user interactive to user initiated. Used if destination queue does not have a QoS and does not lower the QoS (ex dispatch_async back to the main thread)
-
- 机制2:基于 XPC 的进程间通信(
IPC
)
系统的 QoS 传递规则比较复杂,主要参考以下信息:
- 当前线程的
QoS
- 如果是使用
dispatch_block_create
() 方法生成的dispatch_block
,则考虑生成block
时所调用的参数 -
dispatch_async
或IPC
的目标queue
或线程的QoS
调度程序会根据这些信息决定block
以什么优先级运行。
- 如果没有其他线程同步地等待此
block
,则block
就按上面所说的优先级来运行。 如果出现了线程间同步等待的情况,则调度程序会根据情况调整线程的运行优先级。
如何触发优先级反转避免机制?
如果当前线程因等待某线程(线程1)上正在进行的操作(如block1
)而受阻,而系统知道block1
所在的目标线程(owner
),系统会通过提高相关线程的优先级来解决优先级反转的问题。反之如果系统不知道block1
所在目标线程,则无法知道应该提高谁的优先级,也就无法解决反转问题;
记录了持有者信息(owner
)的系统 API 如下:
-
pthread mutex
、os_unfair_lock
、以及基于这二者实现的上层 API-
dispatch_once
的实现是基于os_unfair_lock
的 -
NSLock
、NSRecursiveLock
、@synchronized
等的实现是基于pthread mutex
-
-
dispatch_sync
、dispatch_wait
xpc_connection_send_with_message_sync
使用以上这些API
能够在发生优先级反转时使系统启用优先级反转避免机制。
基础API验证
接下来对前文提到的各种「基础系统API
」进行验证
测试验证环境:模拟器 iOS15.2
pthread mutex
pthread mutex
的数据结构pthread_mutex_s
其中有一个m_tid
字段,专门来记录持有该锁的线程Id
。
//types_internal.h
structpthread_mutex_s{
longsig;
_pthread_locklock;
union{
uint32_tvalue;
structpthread_mutex_options_soptions;
}mtxopts;
int16_tprioceiling;
int16_tpriority;
#ifdefined(__LP64__)
uint32_t_pad;
#endif
union{
struct{
uint32_tm_tid[2];//threadidofthreadthathasmutexlocked
uint32_tm_seq[2];//mutexsequenceid
uint32_tm_mis[2];//formisalignedlocksm_tid/m_seqwillspan intohere
}psynch;
struct_pthread_mutex_ulock_sulock;
};
#ifdefined(__LP64__)
uint32_t_reserved[4];
#else
uint32_t_reserved[1];
#endif
};
代码来验证一下:线程优先级是否会被提升?
//printThreadPriority用来打印线程的优先级信息
voidprintThreadPriority(){
thread_tcur_thread=mach_thread_self();
mach_port_deallocate(mach_task_self(),cur_thread);
mach_msg_type_number_tthread_info_count=THREAD_INFO_MAX;
thread_info_data_tthinfo;
kern_return_tkr=thread_info(cur_thread,THREAD_EXTENDED_INFO,(thread_info_t)thinfo,&thread_info_count);
if(kr!=KERN_SUCCESS){
return;
}
thread_extended_info_textend_info=(thread_extended_info_t)thinfo;
printf("pth_priority:%d,pth_curpri:%d,pth_maxpriority:%d\n",extend_info->pth_priority,extend_info->pth_curpri,extend_info->pth_maxpriority);
}
先在子线程上锁并休眠,然后主线程请求该锁
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
pthread_mutex_lock(&_lock); //确保 backgroundQueue 先得到锁
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
pthread_mutex_lock(&_lock);
printf("in main lock\n");
pthread_mutex_unlock(&_lock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
pthread_mutex_unlock(&_lock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock
可以看到,低优先级子线程先持有锁,当时的优先级为4
,而该锁被主线程请求的时候,子线程的优先级被提升为47
os_unfair_lock
os_unfair_lock
用来替换OSSpinLock
,解决优先级反转问题。等待os_unfair_lock
锁的线程会处于休眠状态,从用户态切换到内核态,而并非忙等。os_unfair_lock
将线程ID
保存到了锁的内部,锁的等待者会把自己的优先级让出来,从而避免优先级反转。验证一下:
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
os_unfair_lock_lock(&_unfair_lock); //确保 backgroundQueue 先得到锁
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
os_unfair_lock_lock(&_unfair_lock);
printf("in main lock\n");
os_unfair_lock_unlock(&_unfair_lock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
os_unfair_lock_unlock(&_unfair_lock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock
结果和pthread mutex
一致
pthread_rwlock_t
在pthread_rwlock_init有如下提示:
Caveats: Beware ofpriority inversionwhen using read-write locks. A high-priority thread may be blocked waiting on a read-write lock locked by a low-priority thread. The microkernel has no knowledge of read-write locks, and therefore can’t boost the low-priority thread to prevent the priority inversion.
大意是内核不感知读写锁,无法提升低优先级线程的优先级,从而无法避免优先级反转。通过查询定义发现:pthread_rwlock_s
包含了字段rw_tid
,专门来记录持有写锁的线程,这不由令人好奇:为什么pthread_rwlock_s
有owner
信息却仍然无法避免优先级反转?
structpthread_rwlock_s{
longsig;
_pthread_locklock;
uint32_t
unused:29,
misalign:1,
pshared:2;
uint32_trw_flags;
#ifdefined(__LP64__)
uint32_t_pad;
#endif
uint32_trw_tid[2];//threadidofthreadthathasexclusive(write)lock
uint32_trw_seq[4];//rwsequenceid(at128-bitalignedboundary)
uint32_trw_mis[4];//formisalignedlocksrw_seqwillspan intohere
#ifdefined(__LP64__)
uint32_t_reserved[34];
#else
uint32_t_reserved[18];
#endif
};
https://news.ycombinator.com/item?id=21751269链接中提到:
xnu supports priority inheritance through “turnstiles”, a kernel-internal mechani** which is used by default by a number of locking primitives (list at [1]), including normal pthread mutexes (though not read-write locks [2]), as well as the os_unfair_lock API (via the ulock syscalls). With pthread mutexes, you can actually explicitly request priority inheritance by calling pthread_mutexattr_setprotocol [3] with PTHREAD_PRIO_INHERIT; the Apple implementation supports it, but currently ignores the protocol setting and just gives all mutexes priority inheritance.
大意是:XNU
使用turnstiles
内核机制进行优先级继承,这种机制被应用在pthread mutex
和os_unfair_lock
上。
顺藤摸瓜,在ksyn_wait
方法中找到了_kwq_use_turnstile
的调用,其中的注释对读写锁解释的比较委婉,添加了at least sometimes
pthread mutexes andrwlocks both (at least sometimes)know their owner and can use turnstiles. Otherwise, we pass NULL as the tstore to the shims so they wait on the global waitq.
//libpthread/kern/kern_synch.c
int
ksyn_wait(ksyn_wait_queue_tkwq,kwq_queue_type_tkqi,uint32_tlockseq,
intfit,uint64_tabstime,uint16_tkwe_flags,
thread_continue_tcontinuation,block_hint_tblock_hint)
{
thread_tth=current_thread();
uthread_tuth=pthread_kern->get_bsdthread_info(th);
structturnstile**tstore=NULL;
intres;
assert(continuation!=THREAD_CONTINUE_NULL);
ksyn_waitq_element_tkwe=pthread_kern->uthread_get_uukwe(uth);
bzero(kwe,sizeof(*kwe));
kwe->kwe_count=1;
kwe->kwe_lockseq=lockseq&PTHRW_COUNT_MASK;
kwe->kwe_state=KWE_THREAD_INWAIT;
kwe->kwe_uth=uth;
kwe->kwe_thread=th;
kwe->kwe_flags=kwe_flags;
res=ksyn_queue_insert(kwq,kqi,kwe,lockseq,fit);
if(res!=0){
//panic("psynch_rw_wrlock:failedtoenqueue\n");//XXXksyn_wqunlock(kwq);
returnres;
}
PTHREAD_TRACE(psynch_mutex_kwqwait,kwq->kw_addr,kwq->kw_inqueue,
kwq->kw_prepost.count,kwq->kw_intr.count);
if(_kwq_use_turnstile(kwq)){
//pthreadmutexesandrwlocksboth(atleastsometimes)knowtheir
//ownerandcanuseturnstiles.Otherwise,wepassNULLasthe
//tstoretotheshimssotheywaitontheglobalwaitq.
tstore=&kwq->kw_turnstile;
}
......
}
再去查看_kwq_use_turnstile
的定义,代码还是很诚实的,只有在KSYN_WQTYPE_MTX
才会启用turnstile
进行优先级反转保护,而读写锁的类型为KSYN_WQTYPE_RWLOCK
,这说明读写锁不会使用_kwq_use_turnstile
,所以无法避免优先级反转。
#defineKSYN_WQTYPE_MTX0x01
#defineKSYN_WQTYPE_CVAR0x02
#defineKSYN_WQTYPE_RWLOCK0x04
#defineKSYN_WQTYPE_SEMA0x08
staticinlinebool
_kwq_use_turnstile(ksyn_wait_queue_tkwq)
{
//Ifwehadwriter-ownerinformationfromthe
//rwlockthenwecouldusetheturnstiletopushonit.Fornow,only
//plainmutexesuseit.
return(_kwq_type(kwq)==KSYN_WQTYPE_MTX);
}
另外在_pthread_find_owner
也可以看到,读写锁的owner
是0
void
_pthread_find_owner(thread_tthread,
structstackshot_thread_waitinfo*waitinfo)
{
ksyn_wait_queue_tkwq=_pthread_get_thread_kwq(thread);
switch(waitinfo->wait_type){
casekThreadWaitPThreadMutex:
assert((kwq->kw_type&KSYN_WQTYPE_MASK)==KSYN_WQTYPE_MTX);
waitinfo->owner=thread_tid(kwq->kw_owner);
waitinfo->context=kwq->kw_addr;
break;
/*Ownerofrwlocknotstoredinkernelspaceduetoraces.Punt
*andhopethattheuserspaceaddressishelpfulenough.*/
casekThreadWaitPThreadRWLockRead:
casekThreadWaitPThreadRWLockWrite:
assert((kwq->kw_type&KSYN_WQTYPE_MASK)==KSYN_WQTYPE_RWLOCK);
waitinfo->owner=0;
waitinfo->context=kwq->kw_addr;
break;
/*Condvarsdon'thaveowners,sojustgivetheuserspaceaddress.*/
casekThreadWaitPThreadCondVar:
assert((kwq->kw_type&KSYN_WQTYPE_MASK)==KSYN_WQTYPE_CVAR);
waitinfo->owner=0;
waitinfo->context=kwq->kw_addr;
break;
casekThreadWaitNone:
default:
waitinfo->owner=0;
waitinfo->context=0;
break;
}
}
把锁更换为读写锁,验证一下前面的理论是否正确:
pthread_rwlock_init(&_rwlock, NULL);
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
pthread_rwlock_rdlock(&_rwlock); //确保 backgroundQueue 先得到锁
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
pthread_rwlock_wrlock(&_rwlock);
printf("in main lock\n");
pthread_rwlock_unlock(&_rwlock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
pthread_rwlock_unlock(&_rwlock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue unlock
queue after unlock
in main lock
after main unlock
可以看到读写锁不会发生优先级提升
dispatch_sync
这个API
都比较熟悉了,这里直接验证:
// 当前线程为主线程
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printThreadPriority();
dispatch_async(_queue, ^{
printf("dispatch_async before dispatch_sync : \n");
printThreadPriority();
});
dispatch_sync(_queue, ^{
printf("dispatch_sync: \n");
printThreadPriority();
});
dispatch_async(_queue, ^{
printf("dispatch_async after dispatch_sync: \n");
printThreadPriority();
});
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_async before dispatch_sync :
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_sync:
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_async after dispatch_sync:
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
_queue
是一个低优先级队列(QOS_CLASS_BACKGROUND
),可以看到dispatch_sync
调用压入队列的任务,以及在这之前dispatch_async
压入的任务,都被提升到较高的优先级47
(和主线程一致),而最后一个dispatch_async
的任务则以优先级4
来执行。
dispatch_wait
// 当前线程为主线程
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printf("main thread\n");
printThreadPriority();
dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{
printf("sub thread\n");
sleep(2);
printThreadPriority();
});
dispatch_async(_queue, block);
dispatch_wait(block, DISPATCH_TIME_FOREVER);
_queue
是一个低优先级队列(QOS_CLASS_BACKGROUND
),当在当前主线程使用dispatch_wait
进行等待时,输出如下,低优先级的任务被提升到优先级47
main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
而如果将dispatch_wait(block, DISPATCH_TIME_FOREVER)
注释掉之后,输出如下:
main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
值得注意的是,
dispatch_wait
是一个宏(C11
的泛型),或者是一个入口函数,它可以接受dispatch_block_t
,dispatch_group_t
,dispatch_semaphore_t
3
种类型的参数,但是这里的具体含义应该是指dispatch_block_wait
,只有dispatch_block_wait
会调整优先级,避免优先级反转。
intptr_t
dispatch_wait(void*object,dispatch_time_ttimeout);
#if__has_extension(c_generic_selections)
#definedispatch_wait(object,timeout)\
_Generic((object),\
dispatch_block_t:dispatch_block_wait,\
dispatch_group_t:dispatch_group_wait,\
dispatch_semaphore_t:dispatch_semaphore_wait\
)((object),(timeout))
#endif
神秘的信号量
dispatch_semaphore
之前对dispatch_semaphore
的认知非常浅薄,经常把二值信号量和互斥锁划等号。但是通过调研后发现:dispatch_semaphore
没有QoS
的概念,没有记录当前持有信号量的线程(owner
),所以有高优先级的线程在等待锁时,内核无法知道该提高哪个线程的调试优先级(QoS
)。如果锁持有者优先级比其他线程低,高优先级的等待线程将一直等待。Mutex vs Semaphore: What’s the Difference?一文详细比对了Mutex
和Semaphore
之间的区别。
Semaphores are for signaling (sames a condition variables, events) while mutexes are for mutual exclusion.Technically, you can also use semaphores for mutual exclusion (a mutex can be thought as a binary semaphore) but you really shouldn’t.Right, but libdispatch doesn’t have a mutex. It has semaphores and queues.So if you’re trying to use libdispatch and you don’t want the closure-based aspect of queues, you might be tempted to use a semaphore instead. Don’t do that, use os_unfair_lock or pthread_mutex(or a higher-level construct like NSLock) instead.
这些是一些警示,可以看到dispatch_semaphore
十分危险,使用需要特别小心。
这里通过苹果官方提供的demo进行解释:
__block NSString *taskName = nil;
dispatch_semaphore_t sema = dispatch_semaphore_create(0);
[self.connection.remoteObjectProxy requestCurrentTaskName:^(NSString *task) {
taskName = task;
dispatch_semaphore_signal(sema);
}];
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER);
return taskName;
- 假设在主线程执行这段代码,那么当前线程的优先级是
QOS_CLASS_USER_INTERACTIVE
; - 由于从主线程进行了异步,异步任务队列的
QoS
将会被提升为QOS_CLASS_USER_INITIATED
; - 主线程被信号量
sema
阻塞,而负责释放该信号量的异步任务的优先级QOS_CLASS_USER_INITIATED
低于主线程的优先级QOS_CLASS_USER_INTERACTIVE
,因此可能会发生优先级反转。
值得一提的是,Clang
专门针对这种情况进行了静态检测:
https://github.com/llvm-mirror/clang/blob/master/lib/StaticAnalyzer/Checkers/GCDAntipatternChecker.cpp
staticautofindGCDAntiPatternWithSemaphore()->decltype(compoundStmt()){
constchar*SemaphoreBinding="semaphore_name";
autoSemaphoreCreateM=callExpr(allOf(
callsName("dispatch_semaphore_create"),
hasArgument(0,ignoringParenCasts(integerLiteral(equals(0))))));
autoSemaphoreBindingM=anyOf(
forEachDescendant(
varDecl(hasDescendant(SemaphoreCreateM)).bind(SemaphoreBinding)),
forEachDescendant(binaryOperator(bindAssignmentToDecl(SemaphoreBinding),
hasRHS(SemaphoreCreateM))));
autoHasBlockArgumentM=hasAnyArgument(hasType(
hasCanonicalType(blockPointerType())
));
autoArgCallsSignalM=hasAnyArgument(stmt(hasDescendant(callExpr(
allOf(
callsName("dispatch_semaphore_signal"),
equalsBoundArgDecl(0,SemaphoreBinding)
)))));
autoHasBlockAndCallsSignalM=allOf(HasBlockArgumentM,ArgCallsSignalM);
autoHasBlockCallingSignalM=
forEachDescendant(
stmt(anyOf(
callExpr(HasBlockAndCallsSignalM),
objcMessageExpr(HasBlockAndCallsSignalM)
)));
autoSemaphoreWaitM=forEachDescendant(
callExpr(
allOf(
callsName("dispatch_semaphore_wait"),
equalsBoundArgDecl(0,SemaphoreBinding)
)
).bind(WarnAtNode));
returncompoundStmt(
SemaphoreBindingM,HasBlockCallingSignalM,SemaphoreWaitM);
}
如果想使用该功能,只需要打开xcode
设置即可:
另外,
dispatch_group
跟semaphore
类似,在调用enter()
方法时,无法预知谁会调用leave()
,所以系统也无法知道其owner
是谁,所以同样不会有优先级提升的问题。
信号量卡死现身说法
dispatch_semaphore
给笔者的印象非常深刻,之前写过一段这样的代码:使用信号量在主线程同步等待相机授权结果。
__blockBOOLauth=NO;
dispatch_semaphore_tsemaphore=dispatch_semaphore_create(0);
[KTAuthorizeServicerequestAuthorizationWithType:KTPermissionsTypeCameracompletionHandler:^(BOOLallow){
auth=allow;
dispatch_semaphore_signal(semaphore);
}];
dispatch_semaphore_wait(semaphore,DISPATCH_TIME_FOREVER);
上线后长期占据卡死top1
,当时百思不得其解,在深入了解到信号量无法避免优先级反转后,终于豁然开朗,一扫之前心中的阴霾。这类问题一般通过2
种方式来解决:
- 使用同步
API
BOOL auth = [KTAuthorizeService authorizationWithType:KTPermissionsTypeCamera];
// do something next
- 异步回调,不要在当前线程等待
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) {
BOOL auth = allow;
// do something next via callback
}];
几个概念
turnstile
前文提到XNU
使用turnstile
进行优先级继承,这里对turnstile
机制进行简单的描述和理解。在XNU
内核中,存在着大量的同步对象(例如lck_mtx_t
),为了解决优先级反转的问题,每个同步对象都必须对应一个分离的数据结构来维护大量的信息,例如阻塞在这个同步对象上的线程队列。可以想象一下,如果每个同步对象都要分配一个这样的数据结构,将造成极大的内存浪费。为了解决这个问题,XNU
采用了turnstile
机制,一种空间利用率很高的解决方案。该方案的提出依据是同一个线程在同一时刻不能同时阻塞于多个同步对象上。这一事实允许所有同步对象只需要保留一个指向turnstile
的指针,且在需要的时候去分配一个turnstile
即可,而turnstile
则包含了操作一个同步对象需要的所有信息,例如阻塞线程的队列、拥有这个同步对象的线程指针。turnstile
是从池中动态分配的,这个池的大小会随着系统中已分配的线程数目增加而增加,所以turnstile
总数将始终低于或等于线程数,这也决定了turnstile
的数目是可控的。turnstile
由阻塞在该同步对象上的第一个线程负责分配,当没有更多线程阻塞在该同步对象上,turnstile
会被释放,回收到池中。turnstile
的数据结构如下:
structturnstile{
structwaitqts_waitq;/*waitqembeddedinturnstile*/
turnstile_inheritor_tts_inheritor;/*thread/turnstileinheritingthepriority(IL,WL)*/
union{
structturnstile_listts_free_turnstiles;/*turnstilefreelist(IL)*/
SLIST_ENTRY(turnstile)ts_free_elm;/*turnstilefreelistelement(IL)*/
};
structpriority_queue_sched_maxts_inheritor_queue;/*Queueofturnstilewithusasaninheritor(WL)*/
union{
structpriority_queue_entry_schedts_inheritor_links;/*Inheritorqueuelinks*/
structmpsc_queue_chaints_deallocate_link;/*threaddeallocatelink*/
};
SLIST_ENTRY(turnstile)ts_htable_link;/*linkageforturnstileinglobalhashtable*/
uintptr_tts_proprietor;/*hashkeylookupturnstile(IL)*/
os_refcnt_tts_refcount;/*referencecountforturnstiles*/
_Atomicuint32_tts_type_gencount;/*gencountusedforprioritychaining(IL),typeofturnstile(IL)*/
uint32_tts_port_ref;/*numberofexplicitrefsfromportsonsendturnstile*/
turnstile_update_flags_tts_inheritor_flags;/*flagsforturnstileinheritor(IL,WL)*/
uint8_tts_priority;/*priorityofturnstile(WL)*/
#ifDEVELOPMENT||DEBUG
uint8_tts_state;/*currentstateofturnstile(IL)*/
queue_chain_tts_global_elm;/*globalturnstilechain*/
thread_tts_thread;/*threadtheturnstileisattachedto*/
thread_tts_prev_thread;/*threadtheturnstilewasattachedbeforedonation*/
#endif
};
优先级数值
在验证环节有一些优先级数值,这里借助「Mac OS X and iOS Internals 」解释一下:实验中涉及到的优先级数值都是相对于Mach
层而言的,且都是用户线程数值
- 用户线程的优先级是0~63;
-
NSQualityOfServiceBackground
的Mach
层级优先级数是4; -
NSQualityOfServiceUtility
的Mach
层级优先级数是20; -
NSQualityOfServiceDefault
的Mach
层级优先级数是31; -
NSQualityOfServiceUserInitiated
的Mach
层级优先级数是37; -
NSQualityOfServiceUserInteractive
的Mach
层级优先级是47;
-
- 内核线程的优先级是80~95;
- 实时系统线程的优先级是96~127;
- 64~79被保留给系统使用;
总结
本文主要阐述了优先级反转的一些概念和解决思路,并结合iOS
平台的几种锁进行了详细的调研。通过深入的理解,可以去规避一些不必要的优先级反转,从而进一步避免卡死异常。字节跳动APM
团队也针对线程的优先级做了监控处理,进而达到发现和预防优先级反转的目的。
加入我们
字节跳动 APM 中台致力于提升整个集团内全系产品的性能和稳定性表现,技术栈覆盖iOS/Android/Server/Web/Hybrid/PC/游戏/小程序等,工作内容包括但不限于性能稳定性监控,问题排查,深度优化,防劣化等。长期期望为业界输出更多更有建设性的问题发现和深度优化手段。
欢迎对字节APM团队职位感兴趣的同学投递简历到邮箱xushuangqing@bytedance.com。
参考文档
- WWDC18 What’ s New in LLVM – actorsfit
- https://developer.apple.com/videos/play/wwdc2015/718
- https://developer.apple.com/forums/thread/124155
- https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html
- https://developer.apple.com/library/archive/documentation/Performance/Conceptual/EnergyGuide-iOS/PrioritizeWorkWithQoS.html
- https://github.com/llvm-mirror/clang/blob/google/stable/lib/StaticAnalyzer/Checkers/GCDAntipatternChecker.cpp
- Don’t use dispatch semaphores where mutexes (or dispatch queues) would suffice
- Concurrency Problems Written by Scott Grosch
- https://www.jianshu.com/p/af64e05de503
- https://pubs.opengroup.org/onlinepubs/7908799/xsh/pthread_rwlock_wrlock.html
- iOS中各种“锁”的理解及应用
- 不再安全的 OSSpinLock
- https://blog.actorsfit.com/a?ID=00001-499b1c8e-8a7f-4960-a1c1-c8e2f42c08c6
- https://objccn.io/issue-2-1/#Priority-Inversion
- Introduction to RTOS – Solution to Part 11 (Priority Inversion)
- https://threadreaderapp.com/thread/1229999590482444288.html#
- 深入理解iOS中的锁
- Threads can infect each other with their low priority
1、IT大王遵守相关法律法规,由于本站资源全部来源于网络程序/投稿,故资源量太大无法一一准确核实资源侵权的真实性;
2、出于传递信息之目的,故IT大王可能会误刊发损害或影响您的合法权益,请您积极与我们联系处理(所有内容不代表本站观点与立场);
3、因时间、精力有限,我们无法一一核实每一条消息的真实性,但我们会在发布之前尽最大努力来核实这些信息;
4、无论出于何种目的要求本站删除内容,您均需要提供根据国家版权局发布的示范格式
《要求删除或断开链接侵权网络内容的通知》:https://itdw.cn/ziliao/sfgs.pdf,
国家知识产权局《要求删除或断开链接侵权网络内容的通知》填写说明: http://www.ncac.gov.cn/chinacopyright/contents/12227/342400.shtml
未按照国家知识产权局格式通知一律不予处理;请按照此通知格式填写发至本站的邮箱 wl6@163.com