久久青草精品A片狠狠,日韩欧美视频一区二区,亚洲国码AV日韩,国产精品黄在

幫助中心 >  行業資訊 >  開發 >  從代碼分析事務消息&消息延時

從代碼分析事務消息&消息延時

2021-04-26 15:07:30 9511

代碼分析事務消息

分布式事務模型圖如下:



1.png


我們先看事務消息客戶端的實現。即上圖的MQClient.

我們先看代碼的整體封裝。

下面代碼段做了兩件事:

1.本地數據庫寫入 2.事務消息客戶端發送消息。

@Transactional表示開啟事務。在注釋下,開啟一個新的Order。用OrderDao將數據寫入本地數據庫。然后調用transactionMsgClient.sendMsg將消息發出去(這是靜態方法調用,transactionMsgClient是一個類,sendMsg是它的方法)。這樣,本地數據庫寫入和發消息這兩件事,就是個原子事務,也就是說,兩件事要么一起成功,要么一起失敗。


2.png




上面代碼用到了MybatisTransactionMsgClient。MyBatis是一個Java持久化框架,它通過XML描述符或注解把對象與存儲過程或SQL語句關聯起來,映射成數據庫內對應的記錄。

上面提到過,transactionMsgClient.sendMsg是個類,這個類繼承了TransactionMsgClient。下面代碼中,transactionMsgClient.sendMsg調用了其父類的TransactionsendClient的sendMsg方法,寫事務消息表,并且發送消息。


3.png




我們接下來看sendMsg這個方法到底做了什么:

1、消息內容落數據庫  2、發送消息

下面代碼會先做一個判斷,在if字段里:con.getAutoCommit。也就是說,只有當沒有開啟自動commit的時候(有自動提交就破壞了事務的原子性),把信息寫在數據庫表里,然后構造一個messages,發消息。而發消息的方法是將消息放到消息隊列中。


4.png




在事務消息設計中,后臺發送消息隊列設計,如下圖所示:


5.png



參照上圖,我們可以看到后臺發送消息隊列有兩個:


SendMsg隊列的消息消費很快,基本上放進去很快就會被消費掉。這樣重試才能有效,否則一直重試,沒有意義。


6.png



下面代碼段的的作用:是發送消息時,從隊列里中取消息,看是否到期,將到期的消息取出來:


7.png



后臺優先隊列的維護。


8.png



最后,事務消息表的設計;


9.png



代碼分析事務消息

我們知道,RocketMQ支持延時消息。我們先看一下延時消息的應用場景。

延時需求是在當前時間后的某一時間點觸發指定的業務邏輯或操作。

  • 例如微信發消息,過一段時間沒發送成功的話,提示重新發送(如下圖的小紅點提示)。



  • 訂單狀態流轉:如延時支付。京東上超過24小時的訂單會被自動取消。

實際上,我們在分布式事務的終極模型中,也用到了延時消息。

RocketMQ支持18個級別的延時等級:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是說,消息延時發送有這18個級別。如果業務的延時消息需求與這18個級別不匹配,就需要自行基于RocketMQ進行二次開發。


10.png



接下來,我們看一下18個延時的實現原理。

RocketMQ的18個延時級別,每個級別對應一個Queue,根據Level參數,將消息放到對應的18個隊列中的等級。每個隊列都對應了到時出隊。例如1s隊列就是1s出隊,2小時隊就是2小時出隊。消息出隊以后,當成正常消息投遞。然后被投遞到了消費隊列,被消費者消費掉。


11.png




提交成功!非常感謝您的反饋,我們會繼續努力做到更好!

這條文檔是否有幫助解決問題?

非常抱歉未能幫助到您。為了給您提供更好的服務,我們很需要您進一步的反饋信息:

在文檔使用中是否遇到以下問題: