大家千万不要被文章的标题给迷惑了,他两在本篇文章是没有关系的, 今天给大家讲讲最近2个有意思的issue,分享一下我学到的
-
mysql DuplicateUpdate的用法要注意的点 -
java的threadpool使用不当会造成“死锁”问题
mysql DuplicateUpdate的用法要注意的点
有个issue说遇到了一个这样的问题,
这个朋友使用我开源的job调度框架 https://github.com/yuzd/Hangfire.HttpJob
存储用的是mysql,采用的实现是 https://github.com/arnoldasgudas/Hangfire.MySqlStorage
set表的id是自增主键,正常理解 都是慢慢自增上去的,但是发现是大幅度跳跃式的自增, 真相是什么?
首先针对这个问题,首先我们搞清楚在hangfire中和storage相关的部分如下
-
hangfire server调度依赖storage -
storage抽象出来一层api(解耦) -
第三方扩展(不关心具体的storage实现) -
不同的storage具体实现(比如mysql,sqlserver等)
Hangfire.Httpjob其实只是依赖了storage api那一层,也没有能力去直接写sql去执行, 只能用api去操作hangfire的那几张表(比如set表)
那么问题肯定不是在扩展层,而是得去看看mysqlstorage的实现源码,针对set表的处理逻辑
https://github.com/arnoldasgudas/Hangfire.MySqlStorage/blob/0bd1016f715c8c6617ce22fb7b2ce5b6c328d2fb/Hangfire.MySql/MySqlWriteOnlyTransaction.cs#L155
publicoverridevoidAddToSet(stringkey,stringvalue,doublescore)
{
Logger.TraceFormat("AddToSetkey={0}value={1}",key,value);
AcquireSetLock();
QueueCommand(x=>x.Execute(
$"INSERTINTO`{_storageOptions.TablesPrefix}Set`(`Key`,`Value`,`Score`)"+
"VALUES(@Key,@Value,@Score)"+
"ONDUPLICATEKEYUPDATE`Score`=@Score",
new{key,value,score}));
}
这里是用了ON DUPLICATE KEY UPDATE 的语句
这个语法是在mysql 4.1(2005)引入的,意思是 insert的时候遇到主键已存在 就执行后面 的update
但是就是这个功能 会造成自增主键成跳跃式增长,增长跨度和SQL的执行次数成正比
根据朋友提供的截图
虽说是会跳跃,但是这个增长也太夸张了
打上断点调试发现
是hangfire server 不断的在调用,目的是把下一次执行时间(秒级别的时间戳)写到set表中
打上日志可以看到有非常多相同值的调用,这仅仅是一个job,这个自增速度得再乘以job的个数,难怪了
既然找到原因了,就提个PR 修改下
publicoverridevoidAddToSet(stringkey,stringvalue,doublescore)
{
Logger.TraceFormat("AddToSetkey={0}value={1}",key,value);
AcquireSetLock();
QueueCommand(x=>
{
varsql="";
if(key=="recurring-jobs")//只发现这个key存在这个问题
{
//key+value是uniq改成先update如果没有成功再insert
sql=$"UPDATE`{_storageOptions.TablesPrefix}Set`set`Score`=@scorewhere`Key`=@keyand`Value`=@value";
varupdateRt=x.Execute(sql,new{score=score,key=key,value=value});
if(updateRt<1)
{
sql=$"INSERTINTO`{_storageOptions.TablesPrefix}Set`(`Key`,`Value`,`Score`)"+
"VALUES(@Key,@Value,@Score)";
x.Execute(
sql,
new{key,value,score});
}
}
else
{
sql=$"INSERTINTO`{_storageOptions.TablesPrefix}Set`(`Key`,`Value`,`Score`)"+
"VALUES(@Key,@Value,@Score)"+
"ONDUPLICATEKEYUPDATE`Score`=@Score";
x.Execute(
sql,
new{key,value,score});
}
//Console.WriteLine(sql+"==>"+key+"@"+value+"@"+score);
});
}
改完之后测试,id自增一切正常:
java的threadpool使用不当会造成“死锁”问题
这个原因先说出来: threadpool的线程被占用完后,再来的task会往queue里面丢,如果这个时候在这个pool的线程里面 future.get()的话会导致task runner(执行器)被堵住,没人从队列里面取任务了~
(简单来说就是 线程在wait future返回,而这个future在queue里面苦苦等待新释放的线程去执行,就像死锁一样,我在等你的结果,而结果在等待着被执行)
好家伙,这个场景有点熟悉,因为我在项目中也用过Future.get()// 虽说有设置timeout
但是这个问题的重要一点是,这种花式“死锁” jvm是检测不出来的,下面有测试
模拟一下这个场景:
我搞了2个线程池,分别是nio线程池和业务线程池,模拟并发20个请求, 注意看process2方法里的注释,如果去掉那里的代码的话 就不会有这个死锁问题
/**
*@authoryuzd
*/
publicclassPoolTest{
//模拟nio线程池
staticThreadPoolExecutornioExecutor=newThreadPoolExecutor(20,20,60,TimeUnit.SECONDS,newLinkedBlockingQueue<>(100),
newCustomerNamedThreadFactory("nio",false),
newThreadPoolExecutor.AbortPolicy());
//业务线程池
staticThreadPoolExecutorbuExecutor=newThreadPoolExecutor(20,20,60,TimeUnit.SECONDS,newLinkedBlockingQueue<>(100),
newCustomerNamedThreadFactory("bu",true),
newThreadPoolExecutor.AbortPolicy());
publicstaticvoidmain(String[]args){
//模拟是http请求并发20个
IntStream.rangeClosed(1,20).parallel().forEach((index)->{
//交给nio线程池处理
nioExecutor.execute(()->{
try{
httpHandler(index);
}catch(Exceptione){
e.printStackTrace();
}
});
});
}
staticvoidhttpHandler(intindex)throwsExecutionException,InterruptedException{
System.out.println(Thread.currentThread().getName()+"requestindex:"+index+"staring");
//交给业务线程池处理
Future<String>parentFuture=buExecutor.submit(()->process1(index));
Stringp1Rt=parentFuture.get();//nio线程在wait
System.out.println(Thread.currentThread().getName()+"requestindex:"+p1Rt+"ending");
}
//future1
staticStringprocess1(intindex)throwsExecutionException,InterruptedException{
System.out.println(Thread.currentThread().getName()+"process1index:"+index+"staring");
Future<String>childFuture=buExecutor.submit(()->process2(index));
Stringp2Rt=childFuture.get();//这里是bu线程在wait这里会发生死锁
System.out.println(Thread.currentThread().getName()+"process1index:"+index+"ending");
returnp2Rt;
}
//future2
staticStringprocess2(intindex)throwsInterruptedException,ExecutionException{
System.out.println(Thread.currentThread().getName()+"process2index:"+index+"staring");
//加上就会死锁
//只要不一下子产生足够数量的task(把core全部占掉)就不会死锁加了这里就会把core全部占据导致task进入到queue,core线程在waitfuture.get无法被释放,而queue的任务在等待它释放产生新的线程
Future<String>submit=buExecutor.submit(()->{
try{
Thread.sleep(1000);
returnString.valueOf(index);
}catch(InterruptedExceptione){
thrownewRuntimeException(e);
}
});
submit.get();
System.out.println(Thread.currentThread().getName()+"process2index:"+index+"ending");
returnString.valueOf(index);
}
}
用visualvm分析线程dump,很难直接发现有异常,异步的很难检测,排查起来比较复杂,只看到是在wait
用jstack没有发现deadlock
在实际项目中我也看到过一个项目中共用一个线程池,线程池被封装成一个util方法,要执行异步的都用它,这个场景尤其要注意这个场景,也建议大家用带有超时的方式 Future.get(xxxx)
1、IT大王遵守相关法律法规,由于本站资源全部来源于网络程序/投稿,故资源量太大无法一一准确核实资源侵权的真实性;
2、出于传递信息之目的,故IT大王可能会误刊发损害或影响您的合法权益,请您积极与我们联系处理(所有内容不代表本站观点与立场);
3、因时间、精力有限,我们无法一一核实每一条消息的真实性,但我们会在发布之前尽最大努力来核实这些信息;
4、无论出于何种目的要求本站删除内容,您均需要提供根据国家版权局发布的示范格式
《要求删除或断开链接侵权网络内容的通知》:https://itdw.cn/ziliao/sfgs.pdf,
国家知识产权局《要求删除或断开链接侵权网络内容的通知》填写说明: http://www.ncac.gov.cn/chinacopyright/contents/12227/342400.shtml
未按照国家知识产权局格式通知一律不予处理;请按照此通知格式填写发至本站的邮箱 wl6@163.com