RabbitMQ一个简单可靠的方案(.Net Core实现)

前言   最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:   1. 临时异常,如数据库网络闪断、http请求临时失效等;   2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;   3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;   4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;   5. 非法异常,一些伪造、攻击类型的消息。   针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。 方案   1. 消息均使用Exchange进行通讯,方式可以是direct或topic,不建议fanout。   2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个审计线程(Audit)监听所有Queue,用于记录消息到MongoDB,同时又不阻塞正常业务处理。   3. 生产者(Publisher)在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。      4. 消费者(Comsumer)消息处理失败时,则把消息发送到重试交换机(Retry Exchange),并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。   5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange),消息过期后自动转发到业务交换机(Exchange)。   6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。      注:选择MongoDB作为存储介质的主要原因是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。 生产者(Publisher)   1. 设置断线自动恢复 复制代码   var factory = new ConnectionFactory   {   Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),   AutomaticRecoveryEnabled = true   }; 复制代码   2. 定义Exchange,模式为direct   channel.ExchangeDeclare("Exchange", "direct");   3. 根据业务定义QueueA和QueueB 复制代码   channel.QueueDeclare("QueueA", true, false, false);   channel.QueueBind("QueueA", "Exchange", "RouteA");   channel.QueueDeclare("QueueB", true, false, false);   channel.QueueBind("QueueB", "Exchange", "RouteB"); 复制代码   4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息   channel.ConfirmSelect();   5. 设置消息持久化 复制代码   var properties = channel.CreateBasicProperties();   properties.Persistent = true; 复制代码   6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers 复制代码   properties.MessageId = Guid.NewGuid().ToString("N");   properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());   properties.Headers = new Dictionary   {   { "key", "value" + i}   }; 复制代码   7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)   channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);   8. 确定收到RabbitMQ服务端的确认消息 复制代码   var isOk = channel.WaitForConfirms();   if (!isOk)   {   throw new Exception("The message is not reached to the server!");   } 复制代码   完整代码 View Code   效果:QueueA和QueueB各一条消息,QueueAudit两条消息     注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。 正常消费者(ComsumerA)   1. 设置预取消息,避免公平轮训问题,可以根据需要设置预取消息数,这里是1   _channel.BasicQos(0, 1, false);      2. 声明Exchange和Queue 复制代码   _channel.ExchangeDeclare("Exchange", "direct");   _channel.QueueDeclare("QueueA", true, false, false);   _channel.QueueBind("QueueA", "Exchange", "RouteA"); 复制代码   3. 编写回调函数 复制代码   var consumer = new EventingBasicConsumer(_channel);   consumer.Received += (model, ea) =>   {   //The QueueA is always successful.   try   {   _channel.BasicAck(ea.DeliveryTag, false);   }   catch (AlreadyClosedException ex)   {   _logger.LogCritical(ex, "RabbitMQ is closed!");   }   };   _channel.BasicConsume("QueueA", false, consumer); 复制代码   注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。 异常消费者(ComsumerB)   1. 设置预取消息   _channel.BasicQos(0, 1, false);   2. 声明Exchange和Queue 复制代码   _channel.ExchangeDeclare("Exchange", "direct");   _channel.QueueDeclare("QueueB", true, false, false);   _channel.QueueBind("QueueB", "Exchange", "RouteB"); 复制代码   3. 设置死信交换机(Dead Letter Exchange) 复制代码   var retryDic = new Dictionary   {   {"x-dead-letter-exchange", "Exchange"},   {"x-dead-letter-routing-key", "RouteB"}   };   _channel.ExchangeDeclare("Exchange_Retry", "direct");   _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);   _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry"); 复制代码   4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒 复制代码   _retryTime = new List   {   1 * 1000,   10 * 1000,   30 * 1000   }; 复制代码   5. 获取当前重试次数 复制代码   var retryCount = 0;   if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))   {   retryCount = (int)ea.BasicProperties.Headers["retryCount"];   _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");   } 复制代码   6. 发生异常,判断是否可以重试 复制代码   private bool CanRetry(int retryCount)   {   return retryCount <= _retryTime.Count - 1;   } 复制代码   7. 可以重试,则启动重试机制 复制代码   private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)   {   var body = ea.Body;   var properties = ea.BasicProperties;   properties.Headers = properties.Headers ?? new Dictionary();   properties.Headers["retryCount"] = retryCount;   properties.Expiration = _retryTime[retryCount].ToString();   try   {   _channel.BasicPublish(retryExchange, retryRoute, properties, body);   }   catch (AlreadyClosedException ex)   {   _logger.LogCritical(ex, "RabbitMQ is closed!");   }   } 复制代码   完整代码 View Code 审计消费者(Audit Comsumer)   1. 声明Exchange和Queue 复制代码   _channel.ExchangeDeclare("Exchange", "direct");   _channel.QueueDeclare("QueueAudit", true, false, false);   _channel.QueueBind("QueueAudit", "Exchange", "RouteA");   _channel.QueueBind("QueueAudit", "Exchange", "RouteB"); 复制代码   2. 排除死信Exchange转发过来的重复消息 复制代码   if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death"))   {   ...   } 复制代码   3. 生成消息实体 复制代码   var message = new Message   {   MessageId = ea.BasicProperties.MessageId,   Body = ea.Body,   Exchange = ea.Exchange,   Route = ea.RoutingKey   }; 复制代码   4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串 复制代码   if (ea.BasicProperties.Headers != null)   {   var headers = new Dictionary();   foreach (var header in ea.BasicProperties.Headers)   {   if (header.Value is byte[] bytes)   {   headers[header.Key] = Encoding.UTF8.GetString(bytes);   }   else   {   headers[header.Key] = header.Value;   }   }   message.Headers = headers;   } 复制代码   5. 把Unix格式的Timestamp转成UTC时间 复制代码   if (ea.BasicProperties.Timestamp.UnixTime > 0)   {   message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;   var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);   message.Timestamp = offset.UtcDateTime;   } 复制代码   6. 消息存入MongoDB   _mongoDbContext.Collection().InsertOne(message, cancellationToken: cancellationToken);   MongoDB记录:      重试记录:    消息检索及重发(WebApi)   1. 通过消息Id检索消息      2. 通过头消息检索消息         3. 消息重发,会重新生成MessageId       Ack,Nack,Reject的关系   1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。   2. 消息处理失败,执行Nack或者Reject:   a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;   b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;   c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。   3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。    RabbitMQ自动恢复 连接(Connection)恢复   1. 重连(Reconnect)   2. 恢复连接监听(Listeners)   3. 重新打开通道(Channels)   4. 恢复通道监听(Listeners)   5. 恢复basic.qos,publisher confirms以及transaction设置    拓扑(Topology)恢复   1. 重新声明交换机(Exchanges)   2. 重新声明队列(Queues)   3. 恢复所有绑定(Bindings)   4. 恢复所有消费者(Consumers) 异常处理机制   1. 临时异常,如数据库网络闪断、http请求临时失效等   通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。   2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行   通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。      3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理   等系统修正后,通过消息重发的方式处理。   4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等   等系统恢复后,通过消息重发的方式处理。   5. 非法异常,一些伪造、攻击类型的消息   多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。 源码地址 https://github.com/ErikXu/RabbitMesage 分类: .Net Core,ASP.NET Web API,RabbitMQ,架构 标签: rabbit, rabbitmq, 队列, 消息队列, .Net Core 好文要顶 关注我 收藏该文 编程玩家 关注 - 7 粉丝 - 139 +加关注 17 0 « 上一篇:Helm - Kubernetes服务编排的利器 posted @ 2018-08-27 02:16 编程玩家 阅读(1181) 评论(15) 编辑 收藏 评论列表 #1楼 2018-08-27 09:00 大漠孤阳 mark 支持(0)反对(0) #2楼 2018-08-27 09:14 Faror mark 支持(0)反对(0) #3楼 2018-08-27 09:23 找不到一个满意的昵称 mark 支持(0)反对(0) #4楼 2018-08-27 09:39 Alin- mark 支持(0)反对(0) #5楼 2018-08-27 09:48 自由的鱼 mark 支持(0)反对(0) #6楼 2018-08-27 09:58 孤独影 支持mark 支持(0)反对(0) #7楼 2018-08-27 09:59 Chaunce 好东西 支持(0)反对(0) #8楼 2018-08-27 09:59 yi.shion mark 支持(0)反对(0) #9楼 2018-08-27 10:11 龙之魂 mark 支持(0)反对(0) #10楼 2018-08-27 10:25 酷呆呆 学习学习 支持(0)反对(0) #11楼 2018-08-27 10:43 禅道 mmm 支持(0)反对(0) #12楼 2018-08-27 10:55 高海东 看来细节还是不少的 支持(0)反对(0) #13楼 2018-08-27 11:25 幻影gool 博主,你们rabbitmq connection是用单例吗,我看官方驱动不支持异步操作 支持(0)反对(0) #14楼[楼主] 2018-08-27 12:26 编程玩家 @ 幻影gool 是单例,什么场景的异步操作? 支持(0)反对(0) #15楼 2018-08-27 13:16 Maxima-Go 很厉害,很优秀。 支持(0)反对(0) 刷新评论刷新页面返回顶部 注册用户登录后才能发表评论,请 登录 或 注册,访问网站首页。 【推荐】超50万VC++源码: 大型组态工控、电力仿真CAD与GIS源码库! 【推荐】企业SaaS应用开发实战,快速构建企业运营/运维系统 【推荐】ActiveReports 报表控件,全面满足 .NET开发需求 qcloud0814 最新IT新闻: · 要推拍照旗舰了?诺基亚手机新收获:PureView商标回归 · QQ宠物下月将关停 打败腾讯QQ的从来不是微信 · 宜家线上快闪店小程序今日开售,微信助力打造完整品牌服务体验 · WPS Office套件上线Win10应用商店:免费+内购 · 惠普坐稳PC全球第一后寻找新增长点:混合现实 » 更多新闻... 华为CH0822 最新知识库文章: · 如何招到一个靠谱的程序员 · 一个故事看懂“区块链” · 被踢出去的用户 · 成为一个有目标的学习者 · 历史转折中的“杭派工程师” » 更多知识库文章... 公告 https://www.cnblogs.com/Erik_Xu/p/9515208.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信