博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitMQ tipic 模式
阅读量:4703 次
发布时间:2019-06-10

本文共 13062 字,大约阅读时间需要 43 分钟。

前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用。如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好理解。简单的可以理解成direct是通过routingkey精准匹配的,而topic是通过routingkey来模糊匹配。 

在topic模式下支持两个特殊字符的匹配。

* (星号) 代表任意 一个单词# (井号) 0个或者多个单词

注意:上面说的是单词不是字符。

如下图所示,RabbitMQ direct模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。 

我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。 

现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例:

  • RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

  • RoutingKey为“red.critical.high”的日志会只投递到queue2。

  • RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。

 新建TopicProduct用来发布三种routingkey的消息。

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicProduct{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "black.critical.high";            String routeKeyName2 = "red.critical.high";            String routeKeyName3 = "white.critical.high";                        String message1 = "black-critical-high!";            String message2 = "red-critical-high!";            String message3 = "white-critical-high!";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection=factory.CreateConnection())            {                using (IModel channel=connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    IBasicProperties properties = channel.CreateBasicProperties();                    properties.Persistent = true;                    Byte[] body1 = Encoding.UTF8.GetBytes(message1);                    Byte[] body2 = Encoding.UTF8.GetBytes(message2);                    Byte[] body3 = Encoding.UTF8.GetBytes(message3);                    //消息推送                    channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1);                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2);                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3);                    Console.WriteLine(" [x] Sent {0}", message1);                    Console.WriteLine(" [x] Sent {0}", message2);                    Console.WriteLine(" [x] Sent {0}", message3);                }            }            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}

  

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicProduct{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "black.critical.high";            String routeKeyName2 = "red.critical.high";            String routeKeyName3 = "white.critical.high";                        String message1 = "black-critical-high!";            String message2 = "red-critical-high!";            String message3 = "white-critical-high!";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection=factory.CreateConnection())            {                using (IModel channel=connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    IBasicProperties properties = channel.CreateBasicProperties();                    properties.Persistent = true;                    Byte[] body1 = Encoding.UTF8.GetBytes(message1);                    Byte[] body2 = Encoding.UTF8.GetBytes(message2);                    Byte[] body3 = Encoding.UTF8.GetBytes(message3);                    //消息推送                    channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1);                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2);                    channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3);                    Console.WriteLine(" [x] Sent {0}", message1);                    Console.WriteLine(" [x] Sent {0}", message2);                    Console.WriteLine(" [x] Sent {0}", message3);                }            }            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}

新建TopicCustomerA接收一种消息

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicCustomerA{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "black.critical.high";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection=factory.CreateConnection())            {                using (IModel channel=connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    String queueName = channel.QueueDeclare().QueueName;                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        var routingKey = ea.RoutingKey;                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                    };                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                }            }        }    }}

  

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicCustomerA{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "black.critical.high";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection=factory.CreateConnection())            {                using (IModel channel=connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    String queueName = channel.QueueDeclare().QueueName;                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        var routingKey = ea.RoutingKey;                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                    };                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                }            }        }    }}

新建TopicCustomerB接收两种消息

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicCustomerB{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "red.critical.*";            String routeKeyName2 = "white.critical.*";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection = factory.CreateConnection())            {                using (IModel channel = connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    String queueName = channel.QueueDeclare().QueueName;                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        var routingKey = ea.RoutingKey;                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                    };                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                }            }        }    }}

  

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace TopicCustomerB{    class Program    {        static void Main(string[] args)        {            String exchangeName = "wytExchangeTopic";            String routeKeyName1 = "red.critical.*";            String routeKeyName2 = "white.critical.*";            ConnectionFactory factory = new ConnectionFactory();            factory.HostName = "192.168.63.129";            factory.Port = 5672;            factory.VirtualHost = "/wyt";            factory.UserName = "wyt";            factory.Password = "wyt";            using (IConnection connection = factory.CreateConnection())            {                using (IModel channel = connection.CreateModel())                {                    channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);                    String queueName = channel.QueueDeclare().QueueName;                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);                    consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        var routingKey = ea.RoutingKey;                        Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);                    };                    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);                    Console.WriteLine(" Press [enter] to exit.");                    Console.ReadLine();                }            }        }    }}

先运行TopicCustomerA和TopicCustomerB保持订阅状态。然后执行TopicProduct发布消息。TopicCustomerA和TopicCustomerB收到的消息如下:

转载于:https://www.cnblogs.com/wlzhang/p/10921369.html

你可能感兴趣的文章
Climbing Worm
查看>>
协程笔记
查看>>
word在线转换成pdf网址
查看>>
opencart3修改产品页模板没有效果的原因排查
查看>>
poj2187凸包最远点对
查看>>
深入理解Java虚拟机 精华总结(面试)
查看>>
vs2013内置IISExpress相关问题
查看>>
格式化字符串漏洞利用实战之 0ctf-easyprintf
查看>>
BST | 1043 BST树与镜像BST树的判断
查看>>
HTML5每日一练之input新增加的六种时间类型应用
查看>>
在自己的apple中展示App Store中产品使用KStoreProductViewController
查看>>
为ClickOnce部署的程序更新一个新的更新地址(Change the update URL for ClickOnce deployed application)...
查看>>
看看你的C语言到底什么水平吧
查看>>
搬寝室(动态规划)
查看>>
python实现域名解析和归属地查询
查看>>
利用expect验证主机口令
查看>>
Only one database connection at a time is supported
查看>>
JavaScript Date(日期)对象 实例
查看>>
office 2013 保存后 提示停止工作
查看>>
设计模式 -- 外观模式
查看>>