信号量Semaphore

Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时分能够指定一个值,但是不需求晓得需求同步的线程个数,只需求在同步的中央调用acquire办法时指定需求同步的线程个数;

一.简单运用
同步两个子线程,只要其中两个子线程执行终了,主线程才会执行:
packagecom.example.demo.study;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
publicclassStudy0217{
//创立一个信号量的实例,信号量初始值为0
staticSemaphoresemaphore=newSemaphore(0);
publicstaticvoidmain(String[]args)throwsException{
ExecutorServicepool=Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println(“Thread1—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread2—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread3—start”);
//信号量加一
semaphore.release();
});
//等候两个子线程执行终了就放过,必需要信号量等于2才放过
semaphore.acquire(2);
System.out.println(“两个子线程执行终了”);
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
这个信号量也能够复用,相似CyclicBarrier:
packagecom.example.demo.study;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
publicclassStudy0217{
//创立一个信号量的实例,信号量初始值为0
staticSemaphoresemaphore=newSemaphore(0);
publicstaticvoidmain(String[]args)throwsException{
ExecutorServicepool=Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println(“Thread1—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread2—start”);
//信号量加一
semaphore.release();
});
//等候两个子线程执行终了就放过,必需要信号量等于2才放过
semaphore.acquire(2);
System.out.println(“子线程1,2执行终了”);
pool.submit(()->{
System.out.println(“Thread3—start”);
//信号量加一
semaphore.release();
});
pool.submit(()->{
System.out.println(“Thread4—start”);
//信号量加一
semaphore.release();
});
semaphore.acquire(2);
System.out.println(“子线程3,4执行终了”);
//关闭线程池,正在执行的任务继续执行
pool.shutdown();
}
}
二.信号量原理
看看下面这个图,能够晓得信号量Semaphore还是依据AQS完成的,内部有个Sync工具类操作AQS,还分为公平战略和非公平战略;
结构器:
//默许是非公平战略
publicSemaphore(intpermits){
sync=newNonfairSync(permits);
}
//能够依据第二个参数选择是公平战略还是非公平战略
publicSemaphore(intpermits,booleanfair){
sync=fair?newFairSync(permits):newNonfairSync(permits);
}
acquire(intpermits)办法:
publicvoidacquire(intpermits)throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//AQS中的办法
publicfinalvoidacquireSharedInterruptibly(intarg)
throwsInterruptedException{
if(Thread.interrupted())thrownewInterruptedException();
//这里依据子类是公平战略还是非公平战略
if(tryAcquireShared(arg)<0)
//获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park办法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
//非公平战略
protectedinttryAcquireShared(intacquires){
returnnonfairTryAcquireShared(acquires);
}
finalintnonfairTryAcquireShared(intacquires){
//一个无限循环,获取state剩余的信号量,由于每调用一次release()办法的话,信号量就会加一,这里将
//最新的信号量减去传进来的参数比拟,比方有两个线程,其中一个线程曾经调用了release办法,然后调用acquire(2)办法,那么
//这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;假如另外一个线程也调用了release办法,
//那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
//
for(;;){
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||compareAndSetState(available,remaining))
returnremaining;
}
}
//公平战略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平形式一样的了
protectedinttryAcquireShared(intacquires){
for(;;){
if(hasQueuedPredecessors())
return-1;
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||compareAndSetState(available,remaining))
returnremaining;
}
}
再看看release(intpermits)办法:
//这个办法的作用就是将信号量加一
publicvoidrelease(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.releaseShared(permits);
}
//AQS中办法
publicfinalbooleanreleaseShared(intarg){
//tryReleaseShared尝试释放资源
if(tryReleaseShared(arg)){
//释放资源胜利就调用park办法唤醒唤醒AQS队列中最前面的节点中的线程
doReleaseShared();
returntrue;
}
returnfalse;
}
protectedfinalbooleantryReleaseShared(intreleases){
//一个无限循环,获取state,然后加上传进去的参数,假如新的state的值小于旧的state,阐明曾经超越了state的最大值,溢出了
//没有溢出的话,就用CAS更新state的值
for(;;){
intcurrent=getState();
intnext=current+releases;
if(next<current)//overflow
thrownewError(“Maximumpermitcountexceeded”);
if(compareAndSetState(current,next))
returntrue;
}
}
privatevoiddoReleaseShared(){
for(;;){
Nodeh=head;
if(h!=null&&h!=tail){
intws=h.waitStatus;
//ws==Node.SIGNAL表示节点中线程需求被唤醒
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;//looptorecheckcases
//调用阻塞队列中线程的unpark办法唤醒线程
unparkSuccessor(h);
}
//ws==0表示节点中线程是初始状态
elseif(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;//looponfailedCAS
}
if(h==head)//loopifheadchanged
break;
}
}

© 版权声明
好牛新坐标
版权声明:
1、IT大王遵守相关法律法规,由于本站资源全部来源于网络程序/投稿,故资源量太大无法一一准确核实资源侵权的真实性;
2、出于传递信息之目的,故IT大王可能会误刊发损害或影响您的合法权益,请您积极与我们联系处理(所有内容不代表本站观点与立场);
3、因时间、精力有限,我们无法一一核实每一条消息的真实性,但我们会在发布之前尽最大努力来核实这些信息;
4、无论出于何种目的要求本站删除内容,您均需要提供根据国家版权局发布的示范格式
《要求删除或断开链接侵权网络内容的通知》:https://itdw.cn/ziliao/sfgs.pdf,
国家知识产权局《要求删除或断开链接侵权网络内容的通知》填写说明: http://www.ncac.gov.cn/chinacopyright/contents/12227/342400.shtml
未按照国家知识产权局格式通知一律不予处理;请按照此通知格式填写发至本站的邮箱 wl6@163.com

相关文章