QQ讨论群:953553560 目录 1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包 2.创建Exchange 3.创建Queue 4.发送消息 5.消费消息 6.手动释放消息 7.让失败的消息回到队列中 8.监听消息 正文 Producer:消息的生产者,也就是创建消息的对象 Exchange:消息的接受者,也就是用来接收消息的对象,Exchange接收到消息后将消息按照规则发送到与他绑定的Queue中。下面我们来定义一个Producer与Exchange。 回到顶部 1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包 回到顶部 2.创建Exchange 复制代码 using RabbitMQ.Client; namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "root"; var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false); //创建Exchange } } } } } 复制代码 可以看到Echange的参数有: type:可选项为,fanout,direct,topic,headers。区别如下:     fanout:发送到所有与当前Exchange绑定的Queue中     direct:发送到与消息的routeKey相同的Rueue中     topic:fanout的模糊版本     headers:发送到与消息的header属性相同的Queue中 durable:持久化 autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。 运行程序,可以在可视化界面看到change2 接下来我们可以创建与change2绑定的queue 回到顶部 3.创建Queue 复制代码 using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); #创建queue2 channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2 } 复制代码 可以看到Echange的参数有: durable:持久化 exclusive:如果为true,则queue只在channel存在时存在,channel关闭则queue消失 autoDelete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。 去可视化界面看Queue 回到顶部 4.发送消息 复制代码 using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); var props = channel.CreateBasicProperties(); props.Persistent = true; #持久化 channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } 复制代码 回到顶部 5.消费消息 复制代码 using RabbitMQ.Client; using System; using System.Text; namespace RabbitMQClient { class Program { private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "39.**.**.**", Port = 5672, UserName = "root", Password = "root", VirtualHost = "/" }; static void Main(string[] args) { var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); while (true) { var message = channel.BasicGet(queue, true); #第二个参数说明自动释放消息,如为false需手动释放消息 if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); } System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } } } } } 复制代码 运行查看结果 查看可视化界面 回到顶部 6.手动释放消息 复制代码 while (true) { var message = channel.BasicGet(queue, false);#设置为手动释放 if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); } channel.BasicAck(message.DeliveryTag, false); #手动释放 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } 复制代码 我们再发一条消息,然后开始消费,加个断点调试一下 查看一下Queue中消息状态 然后直接取消调试,不让程序走到释放的那一步,再查看一下消息状态 这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,消息就不会被释放掉,我们让程序直接走到这一行代码,查看一下消息的状态 如图已经被释放了 回到顶部 7.让失败的消息回到队列中 复制代码 while (true) { var message = channel.BasicGet(queue, false); if(message!=null) { var msgBody = Encoding.UTF8.GetString(message.Body); Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); Console.WriteLine(message.DeliveryTag);    #当前消息被处理的次序数 if (1==1) channel.BasicReject(message.DeliveryTag, true); } System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } 复制代码 重新发送4条消息 开始消费 我们可以看到消息一直没有没消费,因为消息被处理之后又放到了队尾 回到顶部 8.监听消息 复制代码 using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false); channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue, exchange, route); channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false); #一次接受10条消息,否则rabbit会把所有的消息一次性推到client,会增大client的负荷 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); Console.ReadLine(); } https://www.cnblogs.com/chenyishi/p/10233629.html