背景
成都創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站設(shè)計(jì)、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的保康網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
在開發(fā)中,往往會(huì)遇到一些關(guān)于延時(shí)任務(wù)的需求。例如
•生成訂單30分鐘未支付,則自動(dòng)取消
•生成訂單60秒后,給用戶發(fā)短信
對(duì)上述的任務(wù),我們給一個(gè)專業(yè)的名字來(lái)形容,那就是延時(shí)任務(wù)。
最近需要做一個(gè)延時(shí)處理的功能,主要是從kafka中消費(fèi)消息后根據(jù)消息中的某個(gè)延時(shí)字段來(lái)進(jìn)行延時(shí)處理,在實(shí)際的實(shí)現(xiàn)過(guò)程中有一些需要注意的地方,記錄如下。
實(shí)現(xiàn)過(guò)程
說(shuō)到j(luò)ava中的定時(shí)功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點(diǎn):
1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來(lái)進(jìn)行實(shí)現(xiàn),接下來(lái)就是代碼編寫啦(大體流程代碼)。
主要的延時(shí)實(shí)現(xiàn)如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy()); //從消息中取出延遲時(shí)間及相關(guān)信息的代碼略 int delayTime = 0; executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //具體操作邏輯 }},0,delayTime, TimeUnit.SECONDS);
其中NamedThreadFactory是我自定義的一個(gè)線程工廠,主要給線程池定義名稱及相關(guān)日志打印便于后續(xù)的問(wèn)題分析,這里就不多做介紹了。拒絕策略也是采用默認(rèn)的拒絕策略。
然后測(cè)試了一下,滿足目標(biāo)需求的功能,可以做到延遲指定時(shí)間后執(zhí)行,至此似乎功能就被完成了。
大家可能疑問(wèn),這也太簡(jiǎn)單了有什么好說(shuō)的,但是這種方式實(shí)現(xiàn)簡(jiǎn)單是簡(jiǎn)單但是存在一個(gè)潛在的問(wèn)題,問(wèn)題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
ScheduledThreadPoolExecutor由于它自身的延時(shí)和周期的特性,默認(rèn)使用了DelayWorkQueue,而并不像我們平時(shí)使用的SingleThreadExecutor等構(gòu)造是可以使用自己定義的LinkedBlockingQueue并且設(shè)置隊(duì)列大小,問(wèn)題就出在這里。
DelayWrokQueue是一個(gè)無(wú)界隊(duì)列,而我們的目標(biāo)數(shù)據(jù)源是kafka,也就是一個(gè)高并發(fā)高吞吐的消息隊(duì)列,很大可能在某一時(shí)間段有大量的消息過(guò)來(lái)從而導(dǎo)致OOM,在使用多線程時(shí)我們是肯定要考慮到OOM的可能性的,因?yàn)镺OM帶來(lái)的后果往往比較嚴(yán)重,系統(tǒng)OOM臨時(shí)的解決辦法一般只能是重啟,可能會(huì)導(dǎo)致用戶數(shù)據(jù)丟失等不可能挽回的問(wèn)題,所以從編碼設(shè)計(jì)階段要采用盡可能穩(wěn)妥的手段來(lái)避免這些問(wèn)題。
2、采用redis和線程結(jié)合
這一次換了思路,采用redis來(lái)幫助我們做緩沖,從而避免消息過(guò)多OOM的問(wèn)題。
相關(guān)redis zset api:
//添加元素 ZADD key score member [[score member] [score member] …] //根據(jù)分值及限制數(shù)量查詢 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] //從zset中刪除指定成員 ZREM key member [member …]
我們采用redis基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)的zset結(jié)構(gòu),采用score來(lái)存儲(chǔ)我們目標(biāo)發(fā)送時(shí)間的數(shù)值,整體處理流程如下:
public void onMessage(String topic, String message) { String orderId; int delayTime = 0; try { Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() { }.getType()); if (msgMap.isEmpty()) { return; } LOGGER.info("onMessage kafka content:{}", msgMap.toString()); orderId = msgMap.get("orderId"); if(StringUtils.isNotEmpty(orderId)){ delayTime = Integer.parseInt(msgMap.get("delayTime")); Calendar calendar = Calendar.getInstance(); //計(jì)算出預(yù)計(jì)發(fā)送時(shí)間 calendar.add(Calendar.MINUTE, delayTime); long sendTime = calendar.getTimeInMillis(); RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId); LOGGER.info("orderId:{}---放入redis中等待發(fā)送---sendTime:{}", ---orderId:{}, sendTime); } } catch (Exception e) { LOGGER.info("onMessage 延時(shí)發(fā)送異常:{}", e); } }
public void run(){ //獲取批量大小 int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100")); try { //批量獲取離發(fā)送時(shí)間最近的orderNum條數(shù)據(jù) Calendar calendar = Calendar.getInstance(); long now = calendar.getTimeInMillis(); //獲取無(wú)限早到現(xiàn)在的事件key(防止上次批量數(shù)量小于放入數(shù)量,存在歷史數(shù)據(jù)未消費(fèi)情況) Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds)); if (CollectionUtils.isNotEmpty(orders)){ //刪除key 防止重復(fù)發(fā)送 for (String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); } //接下來(lái)執(zhí)行發(fā)送等業(yè)務(wù)邏輯 } } catch (Exception e) { LOGGER.warn("task.run exception:{}", e); } }
至此完成了依賴redis和線程完成了延時(shí)發(fā)送的功能。
結(jié)語(yǔ)
那么對(duì)上面兩種不同的實(shí)現(xiàn)方式進(jìn)行一下優(yōu)缺點(diǎn)比較:
綜上是本人這次延時(shí)功能的實(shí)現(xiàn)過(guò)程的兩種實(shí)現(xiàn)方式的總結(jié),具體采用哪種方式還需大家根據(jù)實(shí)際情況選擇,希望能給大家?guī)?lái)幫助。ps:由于本人的技術(shù)能力有限,文章中可能出現(xiàn)技術(shù)描述不準(zhǔn)確或者錯(cuò)誤的情況懇請(qǐng)各位大佬指出,我立馬進(jìn)行改正,避免誤導(dǎo)大家,謝謝!
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)創(chuàng)新互聯(lián)的支持。
新聞名稱:利用Redis實(shí)現(xiàn)延時(shí)處理的方法實(shí)例
當(dāng)前路徑:http://muchs.cn/article46/ighohg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、品牌網(wǎng)站制作、App設(shè)計(jì)、云服務(wù)器、企業(yè)網(wǎng)站制作、網(wǎng)站設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)