国产精品免费嫩草研究院|无遮羞动漫在线观看AV|国产麻豆精品传媒AV国产在线|村在线观看|寂寞情人1正版|韩国床震韩国床震古|精品系列专区久久

.Net Core&RabbitMQ限制循環(huán)消費(fèi)

前言當(dāng)消費(fèi)者端接收消息處理業(yè)務(wù)時,如果出現(xiàn)異常或是拒收消息將消息又變更為等待投遞再次推送給消費(fèi)者,這樣一來,則形成循環(huán)的條件 。

.Net Core&RabbitMQ限制循環(huán)消費(fèi)

文章插圖
循環(huán)場景生產(chǎn)者發(fā)送100條消息到RabbitMQ中,消費(fèi)者設(shè)定讀取到第50條消息時,設(shè)置拒收,同時設(shè)定是否還留存在當(dāng)前隊(duì)列中(當(dāng)requeue為false時,設(shè)置了死信隊(duì)列則進(jìn)入死信隊(duì)列,否則移除消息) 。
consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};當(dāng)?shù)?0條消息拒收,則仍在隊(duì)列中且處在隊(duì)列頭部,重新推送給消費(fèi)者,再次拒收,再次推送,反反復(fù)復(fù) 。
.Net Core&RabbitMQ限制循環(huán)消費(fèi)

文章插圖
最終其他消息全部消費(fèi)完畢,僅剩第50條消息往復(fù)間不斷消費(fèi),拒收,消費(fèi),這將可能導(dǎo)致RabbitMQ出現(xiàn)內(nèi)存泄漏問題 。
.Net Core&RabbitMQ限制循環(huán)消費(fèi)

文章插圖
解決方案RabbitMQ及AMQP協(xié)議本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實(shí)現(xiàn)重試限定(以下只考慮基于手動確認(rèn)模式情況) 。此處只想到或是只查到了如下幾種方案解決消息循環(huán)消費(fèi)問題 。
  • 一次消費(fèi)
    • 無論成功與否,消費(fèi)者都對外返回ack,將拒收原因或是異常信息catch存入本地或是新隊(duì)列中另作重試 。
    • 消費(fèi)者拒絕消息或是出現(xiàn)異常,返回Nack或Reject,消息進(jìn)入死信隊(duì)列或丟棄(requeue設(shè)定為false) 。
  • 限定重試次數(shù)
    • 在消息的頭中添加重試次數(shù),并將消息重新發(fā)送出去,再每次重新消費(fèi)時從頭中判斷重試次數(shù),遞增或遞減該值,直到達(dá)到限制,requeue改為false,最終進(jìn)入死信隊(duì)列或丟棄 。
    • 可以在Redis、Memcache或其他存儲中存儲消息唯一鍵(例如Guid、雪花Id等,但必須在發(fā)布消息時手動設(shè)置它),甚至在mysql中連同重試次數(shù)一起存儲,然后在每次重新消費(fèi)時遞增/遞減該值,直到達(dá)到限制,requeue改為false,最終進(jìn)入死信隊(duì)列或丟棄 。
    • 隊(duì)列使用Quorum類型,限制投遞次數(shù),超過次數(shù)消息被刪除 。
  • 隊(duì)列消息過期
    • 設(shè)置過期時間,給隊(duì)列或是消息設(shè)置TTL,重試一定次數(shù)消息達(dá)到過期時間后進(jìn)入死信隊(duì)列或丟棄(requeue設(shè)定為true) 。
  • 也許還有更多好的方案...
一次消費(fèi)對外總是Ack消息到達(dá)了消費(fèi)端,可因某些原因消費(fèi)失敗了,對外可以發(fā)送Ack,而在內(nèi)部走額外的方式去執(zhí)行補(bǔ)償操作,比如將消息轉(zhuǎn)發(fā)到內(nèi)部的RabbitMQ或是其他處理方式,終歸是只消費(fèi)一次 。
var queueName = "alwaysack_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    try    {        var message = ea.Body;        Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))        {            throw new Exception("模擬異常");        }    }    catch (Exception ex)    {        Console.WriteLine(ex.Message);    }    finally    {        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);    }};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

經(jīng)驗(yàn)總結(jié)擴(kuò)展閱讀