<del id="d4fwx"><form id="d4fwx"></form></del>
      <del id="d4fwx"><form id="d4fwx"></form></del><del id="d4fwx"><form id="d4fwx"></form></del>

            <code id="d4fwx"><abbr id="d4fwx"></abbr></code>
          • RabbitMQ如何保證隊列里的消息99.99%被消費?-創(chuàng)新互聯(lián)

            1. 本篇概要

            其實,還有1種場景需要考慮:當消費者接收到消息后,還沒處理完業(yè)務邏輯,消費者掛掉了,那消息也算丟失了?,比如用戶下單,訂單中心發(fā)送了1個消息到RabbitMQ里的隊列,積分中心收到這個消息,準備給這個下單的用戶增加20積分,但積分還沒增加成功呢,積分中心自己掛掉了,導致數(shù)據(jù)出現(xiàn)問題。

            建華網(wǎng)站建設公司創(chuàng)新互聯(lián)公司,建華網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為建華成百上千家提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設公司要多少錢,請找那個售后服務好的建華做網(wǎng)站的公司定做!

            那么如何解決這種問題呢?

            為了保證消息被消費者成功的消費,RabbitMQ提供了消息確認機制(message acknowledgement),本文主要講解RabbitMQ中,如何使用消息確認機制來保證消息被消費者成功的消費,避免因為消費者突然宕機而引起的消息丟失。

            2. 開啟顯式Ack模式

            我們開啟一個消費者的代碼是這樣的:

            // 創(chuàng)建隊列消費者
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received Message '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);

            這里的重點是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2個參數(shù),讓我們先看下basicConsume()的源碼:

            public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
                return this.basicConsume(queue, autoAck, "", callback);
            }

            這里的autoAck參數(shù)指的是是否自動確認,如果設置為ture,RabbitMQ會自動把發(fā)送出去的消息置為確認,然后從內存(或者磁盤)中刪除,而不管消費者接收到消息是否處理成功;如果設置為false,RabbitMQ會等待消費者顯式的回復確認信號后才會從內存(或者磁盤)中刪除。

            建議將autoAck設置為false,這樣消費者就有足夠的時間處理消息,不用擔心處理消息過程中消費者宕機造成消息丟失。

            此時,隊列里的消息就分成了2個部分:

            1. 等待投遞給消費者的消息(下圖中的Ready部分)
            2. 已經(jīng)投遞給消費者,但是還沒有收到消費者確認信號的消息(下圖中的Unacked部分)

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            如果RabbitMQ一直沒有收到消費者的確認信號,并且消費此消息的消費者已經(jīng)斷開連接,則RabbitMQ會安排該消息重新進入隊列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。

            RabbitMQ不會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的唯一依據(jù)是消費該消息的消費者連接是否已經(jīng)斷開,這么設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。

            為了便于理解,我們舉個具體的例子,生產(chǎn)者的話的我們延用上文中的DurableProducer:

            package com.zwwhnly.springbootaction.rabbitmq.durable;
            
            import com.rabbitmq.client.AMQP;
            import com.rabbitmq.client.Channel;
            import com.rabbitmq.client.Connection;
            import com.rabbitmq.client.ConnectionFactory;
            
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            
            public class DurableProducer {
                private final static String EXCHANGE_NAME = "durable-exchange";
                private final static String QUEUE_NAME = "durable-queue";
            
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 創(chuàng)建連接
                    ConnectionFactory factory = new ConnectionFactory();
                    // 設置 RabbitMQ 的主機名
                    factory.setHost("localhost");
                    // 創(chuàng)建一個連接
                    Connection connection = factory.newConnection();
                    // 創(chuàng)建一個通道
                    Channel channel = connection.createChannel();
                    // 創(chuàng)建一個Exchange
                    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
                    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
                    // 發(fā)送消息
                    String message = "durable exchange test";
                    AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
                    channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
            
                    // 關閉頻道和連接
                    channel.close();
                    connection.close();
                }
            }

            然后新建一個消費者AckConsumer類:

            package com.zwwhnly.springbootaction.rabbitmq.ack;
            
            import com.rabbitmq.client.*;
            
            import java.io.IOException;
            import java.util.concurrent.TimeoutException;
            
            public class AckConsumer {
                private final static String QUEUE_NAME = "durable-queue";
            
                public static void main(String[] args) throws IOException, TimeoutException {
                    // 創(chuàng)建連接
                    ConnectionFactory factory = new ConnectionFactory();
                    // 設置 RabbitMQ 的主機名
                    factory.setHost("localhost");
                    // 創(chuàng)建一個連接
                    Connection connection = factory.newConnection();
                    // 創(chuàng)建一個通道
                    Channel channel = connection.createChannel();
                    // 創(chuàng)建隊列消費者
                    com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope,
                                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
                            String message = new String(body, "UTF-8");
                            int result = 1 / 0;
                            System.out.println("Received Message '" + message + "'");
                        }
                    };
                    channel.basicConsume(QUEUE_NAME, true, consumer);
                }
            }

            我們先將autoAck參數(shù)設置為ture,即自動確認,并在消費消息時故意寫個異常,然后先運行生產(chǎn)者客戶端將消息寫入隊列中,然后運行消費者客戶端,發(fā)現(xiàn)消息未消費成功但是卻消失了:

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            然后我們將autoAck設置為false:

            channel.basicConsume(QUEUE_NAME, false, consumer);

            再次運行生產(chǎn)者客戶端將消息寫入隊列中,然后運行消費者客戶端,此時雖然消費者客戶端仍然代碼異常,但是消息仍然在隊列中:

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            然后我們刪除掉消費者客戶端中的異常代碼,重新啟動消費者客戶端,發(fā)現(xiàn)消息消費成功了,但是消息一直未Ack:

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            手動停掉消費者客戶端,發(fā)現(xiàn)消息又到了Ready狀態(tài),準備重新投遞:

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            之所以消費掉消息,卻一直還是Unacked狀態(tài),是因為我們沒在代碼中添加顯式的Ack代碼:

            String message = new String(body, "UTF-8");
            //int result = 1 / 0;
            System.out.println("Received Message '" + message + "'");
            
            long deliveryTag = envelope.getDeliveryTag();
            channel.basicAck(deliveryTag, false);

            deliveryTag可以看做消息的編號,它是一個64位的長×××值。

            此時運行消費者客戶端,發(fā)現(xiàn)消息消費成功,并且在隊列中被移除:

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            文末彩蛋

            Java學習、面試;文檔、視頻資源免費獲取

            RabbitMQ如何保證隊列里的消息99.99%被消費?

            創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡助力業(yè)務部署。公司持有工信部辦法的idc、isp許可證, 機房獨有T級流量清洗系統(tǒng)配攻擊溯源,準確進行流量調度,確保服務器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務器買多久送多久。

            本文標題:RabbitMQ如何保證隊列里的消息99.99%被消費?-創(chuàng)新互聯(lián)
            標題路徑:http://www.jbt999.com/article44/dseiee.html

            成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、標簽優(yōu)化、服務器托管、網(wǎng)站維護、網(wǎng)站策劃、微信小程序

            廣告

            聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:[email protected]。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

            微信小程序開發(fā)

              <del id="d4fwx"><form id="d4fwx"></form></del>
              <del id="d4fwx"><form id="d4fwx"></form></del><del id="d4fwx"><form id="d4fwx"></form></del>

                    <code id="d4fwx"><abbr id="d4fwx"></abbr></code>
                  • 欧美特级AAAAAA | 欧美亚洲日韩性爱 | 免费久久女人-级毛片视频 | 白峰美羽无码在线观看 | 91精品国产综合久久蜜芽解析速度 |