女人久久久,最近更新中文字幕在线,成人国内精品久久久久影院vr,中文字幕亚洲综合久久综合,久久精品秘?一区二区三区美小说

原創(chuàng)生活

國(guó)內(nèi) 商業(yè) 滾動(dòng)

基金 金融 股票

期貨金融

科技 行業(yè) 房產(chǎn)

銀行 公司 消費(fèi)

生活滾動(dòng)

保險(xiǎn) 海外 觀察

財(cái)經(jīng) 生活 期貨

當(dāng)前位置:滾動(dòng) >

微服務(wù)進(jìn)階避坑策略 微服務(wù)異步架構(gòu)MQ之RocketMQ方案

文章來(lái)源:財(cái)金網(wǎng)  發(fā)布時(shí)間: 2019-04-11 11:42:28  責(zé)任編輯:cfenews.com
+|-

【原標(biāo)題:微服務(wù)進(jìn)階避坑策略 微服務(wù)異步架構(gòu)MQ之RocketMQ方案】“我們大家都知道把一個(gè)微服務(wù)架構(gòu)變成一個(gè)異步架構(gòu)只需要加一個(gè)MQ,現(xiàn)在市面上有很多MQ的開(kāi)源框架。到底選擇哪一個(gè)MQ的開(kāi)源框架才合適呢?”

一、什么是MQ?MQ的原理是什么?

MQ就是消息隊(duì)列,是Message Queue的縮寫(xiě)。消息隊(duì)列是一種通信方式。消息的本質(zhì)就是一種數(shù)據(jù)結(jié)構(gòu)。因?yàn)镸Q把項(xiàng)目中的消息集中式的處理和存儲(chǔ),所以MQ主要有解耦,并發(fā),和削峰的功能。

1,解耦:

MQ的消息生產(chǎn)者和消費(fèi)者互相不關(guān)心對(duì)方是否存在,通過(guò)MQ這個(gè)中間件的存在,使整個(gè)系統(tǒng)達(dá)到解耦的作用。

如果服務(wù)之間用RPC通信,當(dāng)一個(gè)服務(wù)跟幾百個(gè)服務(wù)通信時(shí),如果那個(gè)服務(wù)的通信接口改變,那么幾百個(gè)服務(wù)的通信接口都的跟著變動(dòng),這是非常頭疼的一件事。

但是采用MQ之后,不管是生產(chǎn)者或者消費(fèi)者都可以單獨(dú)改變自己。他們的改變不會(huì)影響到別的服務(wù)。從而達(dá)到解耦的目的。為什么要解耦呢?說(shuō)白了就是方便,減少不必要的工作量。

2,并發(fā)

MQ有生產(chǎn)者集群和消費(fèi)者集群,所以客戶端是億級(jí)用戶時(shí),他們都是并行的。從而大大提升響應(yīng)速度。

3,削峰

因?yàn)镸Q能存儲(chǔ)的消息量很大,所以他可以把大量的消息請(qǐng)求先存下了,然后再并發(fā)的方式慢慢處理。

如果采用RPC通信,每一次請(qǐng)求用調(diào)用RPC接口,當(dāng)請(qǐng)求量巨大的時(shí)候,因?yàn)镽PC的請(qǐng)求是很耗資源的,所以巨大的請(qǐng)求一定會(huì)壓垮服務(wù)器。

削峰的目的是用戶體驗(yàn)變好,并且使整個(gè)系統(tǒng)穩(wěn)定。能承受大量請(qǐng)求消息。

二、現(xiàn)在市面上有什么MQ,

重點(diǎn)介紹RocketMQ

現(xiàn)在市面上的MQ有很多,主要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,這些都是開(kāi)源的MQ產(chǎn)品。以前很多人推薦使用RabbitMQ,他也是非常好用的MQ產(chǎn)品,這里不做過(guò)多的介紹。Kafka也是高吞吐量的老大,我們這里也不介紹。

我們重點(diǎn)介紹一下RocketMQ,RocketMQ是阿里巴巴在2012年開(kāi)源的分布式消息中間件,目前已經(jīng)捐贈(zèng)給Apache軟件基金會(huì),并于并于2017年9月25日成為 Apache 的頂級(jí)項(xiàng)目。

作為經(jīng)歷過(guò)多次阿里巴巴雙十一這種“超級(jí)工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國(guó)產(chǎn)中間件,以其高性能、低延時(shí)和高可靠等特性近年來(lái)已經(jīng)也被越來(lái)越多的國(guó)內(nèi)企業(yè)使用。

功能概覽圖

可以看見(jiàn)RocketMQ支持定時(shí)和延時(shí)消息,這是RabbitMQ所沒(méi)有的能力。

RocketMQ的物理結(jié)構(gòu)

從這里可以看出,RocketMQ涉及到四大集群,producer,Name Server,Consumer,Broker。

Producer集群:

是生產(chǎn)者集群,負(fù)責(zé)產(chǎn)生消息,向消費(fèi)者發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息,RocketMQ提供三種方式發(fā)送消息:同步,異步,單向。

一,普通消息

1,同步原理圖

同步消息關(guān)鍵代碼

try {        SendResult sendResult = producer.send(msg);        // 同步發(fā)送消息,只要不拋異常就是成功        if (sendResult != null) {        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());    }    catch (Exception e) {        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());        e.printStackTrace();    }}

2,異步原理圖

異步消息關(guān)鍵代碼

producer.sendAsync(msg, new SendCallback() {@Overridepublic void onSuccess(final SendResult sendResult) {       // 消費(fèi)發(fā)送成功      System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); }@Overridepublic void onException(OnExceptionContext context) {    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());}});

3,單向(Oneway)發(fā)送原理圖

單向只發(fā)送,不等待返回,所以速度最快,一般在微秒級(jí),但可能丟失

單向(Oneway)發(fā)送消息關(guān)鍵代碼

producer.sendOneway(msg);

三種發(fā)送消息具體代碼請(qǐng)參考文檔:https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.6.566.7e49793fuueSlB[1]

二,定時(shí)消息和延時(shí)消息

發(fā)送定時(shí)消息關(guān)鍵代碼

try {     // 定時(shí)消息,單位毫秒(ms),在指定時(shí)間戳(當(dāng)前時(shí)間之后)進(jìn)行投遞,例如 2016-03-07 16:21:00 投遞。如果被設(shè)置成當(dāng)前時(shí)間戳之前的某個(gè)時(shí)刻,消息將立刻投遞給消費(fèi)者。    long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();    msg.setStartDeliverTime(timeStamp);    // 發(fā)送消息,只要不拋異常就是成功    SendResult sendResult = producer.send(msg);    System.out.println("MessageId:"+sendResult.getMessageId());}catch (Exception e) {    // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());    e.printStackTrace(); }

發(fā)送延時(shí)消息關(guān)鍵代碼

try {    // 延時(shí)消息,單位毫秒(ms),在指定延遲時(shí)間(當(dāng)前時(shí)間之后)進(jìn)行投遞,例如消息在 3 秒后投遞    long delayTime = System.currentTimeMillis() + 3000;    // 設(shè)置消息需要被投遞的時(shí)間 msg.setStartDeliverTime(delayTime);     SendResult sendResult = producer.send(msg);     // 同步發(fā)送消息,只要不拋異常就是成功     if (sendResult != null) {        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());      }} catch (Exception e) {   // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());    e.printStackTrace(); }

注意事項(xiàng)

1,定時(shí)和延時(shí)消息的 msg.setStartDeliverTime 參數(shù)需要設(shè)置成當(dāng)前時(shí)間戳之后的某個(gè)時(shí)刻(單位毫秒)。如果被設(shè)置成當(dāng)前時(shí)間戳之前的某個(gè)時(shí)刻,消息將立刻投遞給消費(fèi)者。

2,定時(shí)和延時(shí)消息的 msg.setStartDeliverTime 參數(shù)可設(shè)置40天內(nèi)的任何時(shí)刻(單位毫秒),超過(guò)40天消息發(fā)送將失敗。

3,StartDeliverTime 是服務(wù)端開(kāi)始向消費(fèi)端投遞的時(shí)間。 如果消費(fèi)者當(dāng)前有消息堆積,那么定時(shí)和延時(shí)消息會(huì)排在堆積消息后面,將不能嚴(yán)格按照配置的時(shí)間進(jìn)行投遞。

4,由于客戶端和服務(wù)端可能存在時(shí)間差,消息的實(shí)際投遞時(shí)間與客戶端設(shè)置的投遞時(shí)間之間可能存在偏差。

5,設(shè)置定時(shí)和延時(shí)消息的投遞時(shí)間后,依然受 3 天的消息保存時(shí)長(zhǎng)限制。例如,設(shè)置定時(shí)消息 5 天后才能被消費(fèi),如果第 5 天后一直沒(méi)被消費(fèi),那么這條消息將在第8天被刪除。

6,除 Java 語(yǔ)言支持延時(shí)消息外,其他語(yǔ)言都不支持延時(shí)消息。

發(fā)布消息原理圖

三,事務(wù)消息

RocketMQ提供類似X/Open XA的分布式事務(wù)功能來(lái)確保業(yè)務(wù)發(fā)送方和MQ消息的最終一致性,其本質(zhì)是通過(guò)半消息的方式把分布式事務(wù)放在MQ端來(lái)處理。

原理圖

其中:

1,發(fā)送方向消息隊(duì)列 RocketMQ 服務(wù)端發(fā)送消息。

2,服務(wù)端將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。

3,發(fā)送方開(kāi)始執(zhí)行本地事務(wù)邏輯。

4,發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit 或是 Rollback),服務(wù)端收到 Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。

5,在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟 4 提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過(guò)固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。

6,發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

7,發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟 4 對(duì)半消息進(jìn)行操作。

RocketMQ的半消息機(jī)制的注意事項(xiàng)是

1,根據(jù)第六步可以看出他要求發(fā)送方提供業(yè)務(wù)回查接口。

2,不能保證發(fā)送方的消息冪等,在ack沒(méi)有返回的情況下,可能存在重復(fù)消息

3,消費(fèi)方要做冪等處理。

核心代碼

final BusinessService businessService = new BusinessService(); // 本地業(yè)務(wù)

TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());try {    SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {    @Override    public TransactionStatus execute(Message msg, Object arg) {        // 消息 ID(有可能消息體一樣,但消息 ID 不一樣,當(dāng)前消息 ID 在控制臺(tái)無(wú)法查詢)        String msgId = msg.getMsgID();        // 消息體內(nèi)容進(jìn)行 crc32,也可以使用其它的如 MD5        long crc32Id = HashUtil.crc32Code(msg.getBody());        // 消息 ID 和 crc32id 主要是用來(lái)防止消息重復(fù)        // 如果業(yè)務(wù)本身是冪等的,可以忽略,否則需要利用 msgId 或 crc32Id 來(lái)做冪等        // 如果要求消息絕對(duì)不重復(fù),推薦做法是對(duì)消息體 body 使用 crc32 或 MD5 來(lái)防止重復(fù)消息        Object businessServiceArgs = new Object();        TransactionStatus transactionStatus =TransactionStatus.Unknow;        try {        boolean isCommit = businessService.execbusinessService(businessServiceArgs);        if (isCommit) {        // 本地事務(wù)成功則提交消息 transactionStatus = TransactionStatus.CommitTransaction;        } else {        // 本地事務(wù)失敗則回滾消息 transactionStatus = TransactionStatus.RollbackTransaction;        }        } catch (Exception e) {log.error("Message Id:{}", msgId, e);        }        System.out.println(msg.getMsgID());log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());         return transactionStatus;    }    }, null);    }catch (Exception e) {  // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理   System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());   e.printStackTrace();}

具體代碼參考文檔:https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.570.5d5738a49FJl1t[2]

所有消息發(fā)布原理圖

producer完全無(wú)狀態(tài),可以集群部署。

Name Server集群:

NameServer是一個(gè)幾乎無(wú)狀態(tài)的節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無(wú)任何信息同步,NameServer很像注冊(cè)中心的功能。

聽(tīng)說(shuō)阿里之前的NameServer 是用ZooKeeper做的,可能因?yàn)閆ookeeper不能滿足大規(guī)模并發(fā)的要求,所以之后NameServer 是阿里自研的。

NameServer其實(shí)就是一個(gè)路由表,他管理Producer和Comsumer之間的發(fā)現(xiàn)和注冊(cè)。

Broker集群:

Broker部署相對(duì)復(fù)雜,Broker分為Master與Slave,一個(gè)Master可以對(duì)應(yīng)多個(gè)Slaver,但是一個(gè)Slaver只能對(duì)應(yīng)一個(gè)Master,Master與Slaver的對(duì)應(yīng)關(guān)系通過(guò)指定相同的BrokerName。

不同的BrokerId來(lái)定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個(gè)。每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)Topic信息到所有的NameServer。

Consumer集群:

訂閱方式

消息隊(duì)列 RocketMQ 支持以下兩種訂閱方式:

集群訂閱:同一個(gè) Group ID 所標(biāo)識(shí)的所有 Consumer 平均分?jǐn)傁M(fèi)消息。 例如某個(gè) Topic 有 9 條消息,一個(gè) Group ID 有 3 個(gè) Consumer 實(shí)例,那么在集群消費(fèi)模式下每個(gè)實(shí)例平均分?jǐn)?,只消費(fèi)其中的 3 條消息。

// 集群訂閱方式設(shè)置(不設(shè)置的情況下,默認(rèn)為集群訂閱方式)properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

廣播訂閱:同一個(gè) Group ID 所標(biāo)識(shí)的所有 Consumer 都會(huì)各自消費(fèi)某條消息一次。 例如某個(gè) Topic 有 9 條消息,一個(gè) Group ID 有 3 個(gè) Consumer 實(shí)例,那么在廣播消費(fèi)模式下每個(gè)實(shí)例都會(huì)各自消費(fèi) 9 條消息。

// 廣播訂閱方式設(shè)置properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BR

OADCASTING);

訂閱消息關(guān)鍵代碼:

Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() { //訂閱多個(gè) Tagpublic Action consume(Message message, ConsumeContext context) {   System.out.println("Receive: " + message);   return Action.CommitMessage;}});//訂閱另外一個(gè) Topicconsumer.subscribe("TopicTestMQ-Other", "*", **new** MessageListener() { //訂閱全部 Tagpublic Action consume(Message message, ConsumeContext context) {    System.out.println("Receive: " + message);    return Action.CommitMessage;}});consumer.start();

專題首頁(yè)|財(cái)金網(wǎng)首頁(yè)

原創(chuàng)
新聞

精彩
互動(dòng)

獨(dú)家
觀察

京ICP備2021034106號(hào)-38   營(yíng)業(yè)執(zhí)照公示信息  財(cái)金網(wǎng)  版權(quán)所有  cfenews.com  投稿郵箱:362293157@qq.com  業(yè)務(wù)QQ:362293157立即發(fā)帖