面试必问:RabbitMQ 有哪几种消息模式?-全球热点评
原文:juejin cn post 6998363970037874724 **前言**Rabbitmq是使用Erlang语言开发的开源消息队列系统,基
原文:juejin.cn/post/6998363970037874724
Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 实现,是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息传递指的是应用程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此通信,直接调用通常是指远程过程调用的技术。
【资料图】
Simple 模式是最简单的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将消息通过交换机(此时,图中并没有交换机的概念,如不定义交换机,会使用默认的交换机)把消息存储到队列,消费者从队列中取出消息进行处理。
用 Java demo 实现此模式,推荐一个开源免费的 Spring Boot 最全教程:
https://github.com/javastacks/spring-boot-best-practice
Productor
public class Send { private final static String QUEUE_NAME = "queue1"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、创建连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello world"; // 4、发送消息到指定队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent "" + message + """); } catch (TimeoutException | IOException e) { e.printStackTrace(); } finally { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }}
Customer
public class Recv { private final static String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setVirtualHost("/"); // 2、获取 Connection和 Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
观察可视化界面,会看到消息先会被写入到队列中,随后又被消费者消费了。
Fanout——发布订阅模式,是一种广播机制。
此模式包括:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机,交换机不存储消息,将消息存储到队列,消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上,消息将丢失。
用 Java demo 实现此模式
Productor
public class Productor { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello fanout mode"; // 指定路由key String routeKey = ""; String type = "fanout"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue1", true, false, false, null); channel.queueDeclare("queue2", true, false, false, null); channel.queueDeclare("queue3", true, false, false, null); channel.queueDeclare("queue4", true, false, false, null); // 5、绑定 channel 与 queue channel.queueBind("queue1", EXCHANGE_NAME, routeKey); channel.queueBind("queue2", EXCHANGE_NAME, routeKey); channel.queueBind("queue3", EXCHANGE_NAME, routeKey); channel.queueBind("queue4", EXCHANGE_NAME, routeKey); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); }finally { // 关闭通道和连接...... } }}
Customer
public class Customer { private static Runnable runnable = new Runnable() { @Override public void run() { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println(delivery.getEnvelope().getDeliveryTag()); System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println(queueName + ":开始接收消息"); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道和连接...... } } }; public static void main(String[] args) throws IOException, TimeoutException { // 创建线程分别从四个队列中获取消息 new Thread(runnable, "queue1").start(); new Thread(runnable, "queue2").start(); new Thread(runnable, "queue3").start(); new Thread(runnable, "queue4").start(); }}
执行完 Productor 发现四个队列中分别增加了一条消息,而执行完 Customer 后四个队列中的消息都被消费者消费了。
Direct 模式是在 Fanout 模式基础上添加了 routing key,Fanout(发布/订阅)模式是交换机将消息存储到所有绑定的队列中,而 Direct 模式是在此基础上,添加了过滤条件,交换机只会将消息存储到满足 routing key 的队列中。
在上图中,我们可以看到交换机绑定了两个队列,其中队列 Q1绑定的 routing key 为 “orange” ,队列Q2绑定的routing key 为 “black” 和 “green”。在这样的设置中,发布 routing key 为 “orange” 的消息将被路由到 Q1,routing key 为 “black” 或 “green” 的消息将被路由到 Q2
在 rabbitmq 中给队列绑定 routing_key,routing_key 必须是单词列表
用 Java demo 实现此模式
Productor
public class Productor { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello direct mode"; // 指定路由key String routeKey = "email"; String type = "direct"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue1", true, false, false, null); channel.queueDeclare("queue2", true, false, false, null); channel.queueDeclare("queue3", true, false, false, null); // 5、绑定 channel 与 queue channel.queueBind("queue1", EXCHANGE_NAME, "email"); channel.queueBind("queue2", EXCHANGE_NAME, "sms"); channel.queueBind("queue3", EXCHANGE_NAME, "vx"); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } }}
可以通过可视化页面查看,各队列绑定的 routing_key
由于设置的 routing_key为 “email”,所以,应该只有 queue1 存储了一条消息。
Customer 与上述 fanout 示例一致。
Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中 * 和 # 的含义为:
用Java demo 实现此模式
Productor
public class Productor { private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 消息内容 String message = "hello topic mode"; // 指定路由key String routeKey = "com.order.test.xxx"; String type = "topic"; // 3、声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, type); // 4、声明队列 channel.queueDeclare("queue5",true,false,false,null); channel.queueDeclare("queue6",true,false,false,null); // 5、绑定 channel 与 queue channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#"); channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*"); // 6、发布消息 channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8")); System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } }}
执行完 Productor 后,通过可视化页面查看到,queue 绑定的 routing_key
由于上述例子中,routing_key为:“com.order.test.xxx”,那么 queue5 和 queue6 都将接收到消息。
Customer 与上述实例一样,执行完 Customer 后,再次查看队列信息,queue5 和 queue6 的消息都被消费了。
当有多个消费者时,如何均衡消息者消费消息的多少,主要有两种模式:
在这种模式下,rabbitmq 采用轮询的方式将任务分配给多个消费者,但可能出现一种情况,当分配给某一个消费者的任务很复杂时,而有些消费者接收的任务较轻量,会出现有的消费者很忙,而有的消费者处于空闲的状态,而 rabbitmq 不会感知到这种情况的发生,rabbitmq 不考虑消费者未确认消息的数量,只是盲目的分配任务。
用 Java demo 实现此模式
Productor
public class Productor { public static void main(String[] args) { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 2、获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 3、向 Queue1 发布20个消息 for (int i = 0; i < 20; i++) { String msg = "feiyangyang: " + i; channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8)); } System.out.println("消息发送成功!"); } catch (IOException | TimeoutException e) { e.printStackTrace(); System.out.println("消息发送异常"); } finally { // 关闭通道和连接...... } }}
Worker1
public class Worker1 { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.96.109"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 获取连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicConsume("queue1", true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }); System.out.println("Worker1 开始接收消息"); System.in.read(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { // 关闭通道和连接...... } }}
Worker2 与 Worker1 相同
我们看下消息分发结果:
Worker1 开始接收消息Worker1:收到消息是:feiyangyang: 0Worker1:收到消息是:feiyangyang: 2Worker1:收到消息是:feiyangyang: 4Worker1:收到消息是:feiyangyang: 6Worker1:收到消息是:feiyangyang: 8Worker1:收到消息是:feiyangyang: 10Worker1:收到消息是:feiyangyang: 12Worker1:收到消息是:feiyangyang: 14Worker1:收到消息是:feiyangyang: 16Worker1:收到消息是:feiyangyang: 18Worker2 开始接收消息Worker2:收到消息是:feiyangyang: 1Worker2:收到消息是:feiyangyang: 3Worker2:收到消息是:feiyangyang: 5Worker2:收到消息是:feiyangyang: 7Worker2:收到消息是:feiyangyang: 9Worker2:收到消息是:feiyangyang: 11Worker2:收到消息是:feiyangyang: 13Worker2:收到消息是:feiyangyang: 15Worker2:收到消息是:feiyangyang: 17Worker2:收到消息是:feiyangyang: 19
可以看出,轮询分发模式就是将消息均衡的分配所有消费者。
为了解决 Work 轮询分发模式 这个问题,rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
Productor 代码与上述轮询模式相同,而 Customer 中稍作修改
Worker1
// Channel 使用 Qos 机制finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8")); try { Thread.sleep(1000); // 改成手动应答 finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } }}, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { }});
上述实例相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息,将 Worker1 的 sleep 时间设置为 1s,将 Worker2 的 sleep 时间设置为 2s,查看消息分发结果
Worker1 开始接收消息Worker1:收到消息是:feiyangyang: 0Worker1:收到消息是:feiyangyang: 2Worker1:收到消息是:feiyangyang: 4Worker1:收到消息是:feiyangyang: 5Worker1:收到消息是:feiyangyang: 7Worker1:收到消息是:feiyangyang: 8Worker1:收到消息是:feiyangyang: 10Worker1:收到消息是:feiyangyang: 11Worker1:收到消息是:feiyangyang: 13Worker1:收到消息是:feiyangyang: 14Worker1:收到消息是:feiyangyang: 16Worker1:收到消息是:feiyangyang: 17Worker1:收到消息是:feiyangyang: 19Worker2 开始接收消息Worker2:收到消息是:feiyangyang: 1Worker2:收到消息是:feiyangyang: 3Worker2:收到消息是:feiyangyang: 6Worker2:收到消息是:feiyangyang: 9Worker2:收到消息是:feiyangyang: 12Worker2:收到消息是:feiyangyang: 15Worker2:收到消息是:feiyangyang: 18
当使用 Work 公平分发模式时,要设置消费者为手动应答,并且开启 Qos 机制。
消费者完成一项任务可能需要几秒钟,如果其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡,如果将 autoAck 设置为 true ,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除,在这种情况下,我们将丢失所有已分派给该特定消费者但尚未处理的消息。
如果其中一个消费者宕了,rabbitmq 可以将其消息分配给其他消费者。为了确保消息不会丢失,rabbitmq 采用消息确认,消费者发回确认消息,告诉 rabbitmq 消息已经被接收并处理,此时,rabbitmq 可以放心的删除这条消息。
如果消费者在没有发送 ack 的情况下宕了,rabbitmq 将理解为该条消息未被消费者处理完,如果有其他消费者在线,将迅速重新交付给其他消费者,这样就可以确保不会丢失消息了。
默认情况下rabbitmq 会启用手动消息确认,也就是 autoAck 默认为 false,一旦我们完成了一项任务,需要手动的进行消息确认,所以 autoAck 需要保持为默认值 false,并使用如下方法进行手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
rabbitmq 的消息确认机制可以保证消息不会丢失,但是如果 rabbitmq 服务器停止,我们的任务仍然会丢失。
当 rabbitmq 退出或崩溃时,如果不进行持久化,队列和消息都会消失。需要做两件事来确保消息不会丢失,将队列和消息都标记为持久的。
boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
将消息标记为持久性并不能完全保证消息不会丢失,当 rabbitmq 接收到消息并且还没保存时,仍然有很短的时间窗口会使消息丢失,如果需要更强的保证,可以使用发布者确认机制。
解耦、削峰、异步
解耦
在微服务架构体系中,微服务A需要与微服务B进行通信,传统的做法是A调用B的接口。但这样做如果系统B无法访问或连接超时,系统A需要等待,直到系统B做出响应,并且A与B存在严重的耦合现象。如果引入消息队列进行系统AB的通信,流程是这样的:
系统A将消息放到队列中,就不用关心系统B是否可以获取等其他事情了,实现了两个系统间的解耦。
使用场景:
削峰
系统A每秒请求100个,系统可以稳定运行,但如果在秒杀活动中,每秒并发达到1w个,但系统最大处理能力只能每秒处理 1000 个,所以,在秒杀活动中,系统服务器会出现宕机的现象。如果引入 MQ ,可以解决这个问题。每秒 1w个请求会导致系统崩溃,那我们让用户发送的请求都存储到队列中,由于系统最大处理能力是每秒1000个请求,让系统A每秒只从队列中拉取1000个请求,保证系统能稳定运行,在秒杀期间,请求大量进入到队列,积压到MQ中,而系统每秒只从队列中取1000个请求处理。这种短暂的高峰期积压是没问题的,因为高峰期一旦过去,每秒请求数迅速递减,而系统每秒还是从队列中取1000个请求进行处理,系统会快速将积压的消息消费掉。
使用场景:
异步
用户注册,需要发送注册邮件和注册短信,传统的做法有两种:串行、并行。
使用场景:
近期热文推荐:
1.1,000+ 道 Java面试题及答案整理(2022最新版)
2.劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4.别再写满屏的爆爆爆炸类了,试试装饰器模式,这才是优雅的方式!!
5.《Java开发手册(嵩山版)》最新发布,速速下载!
觉得不错,别忘了随手点赞+转发哦!
关键词:
[ 相关文章 ]
原文:juejin cn post 6998363970037874724 **前言**Rabbitmq是使用Erlang语言开发的开源消息队列系统,基
5月24日江西地区氢氟酸市场行情暂稳,目前无水氢氟酸含税价格主流报10000-10500元 吨,场内货源供应正常,
同花顺数据中心显示,天能重工5月23日获融资买入685 28万元,占当日买入金额的13 75%,当前融资余额1 07亿
2021 年,北京单向街公益基金会和甲骨文工作室联合发起了“第一届雅努斯翻译资助计划”,雅努斯计划将资助
在逾期后如何给银行协商停息挂账?首先持卡人可以通过拨打发卡发卡银行的客服热线申请办理停息挂账;也可以带
中科曙光:公司不存在断货的相关情况
1、《民法案例分析教程(第四版)》是2018年01月中国人民大学出版社出版的图书。2、作者是杨立新。文章到此
1、协警不需要政审。2、协警必须在在编民警的带领下开展各项工作,一般属于合同工,不需要政审。3、在涉及
5月23日,东方红战略精选混合A最新单位净值为1 3043元,累计净值为1 3543元,较前一交易日下跌0 21%。历史
1、水果的话多吃一点猕猴桃牛油果苹果之类的。2、如果可以选择。3、样式多样一点。本文到此分享完毕,希望
Wind统计显示,截至5月22日,共有1291只个股获陆股通增仓。其中,持股量环比增幅在100%以上的共有18只,环比增
1、用塑料管专用连接件和塑料管专用熔接机。2、把管件和管头用熔接机加热后插到一起就欧啦。本文就为大家分
1、面部拨筋的好处:做面部拔筋可以疏通经络、畅通气血、加速新陈代谢、深层排除面部毒素、美白淡斑、提升
1、1 城隍庙—普宁八百余乡共同拥有2 南岩古寺—世界最大的白玉卧佛3 德安里—中国古村落。2、国内罕见府第
鞭牛士5月23日消息,金山云发布公告称,一季度总收入达18 644亿元,同比下滑14 2%;净亏损为6 088亿元,上
活动安排本次大赛采取团体竞赛的方式,设社会专业组和院校学生组两个组别。每队4名选手(按团队岗位分工,设
跑酷比赛怎么下载?想要比别人更加抢先抢快的玩到这款游戏,那么你获取游戏开测消息是关键,能够获取到第一
2023年05月23日关于贵州省发布新型储能项目管理暂行办法(征求意见稿)的最新消息:5月23日,贵州省能源局
近日,广陵区商务局公布一组数据:1-4月,广陵限上零售业销售额总体规模为66 9亿元,同比增长16 9%,较去年
1、现在河流源头确定的方法还未得到统一。2、现在认定嘉陵江源头为陕西省宝鸡市凤县代王山。本文就为大家分
[ 相关新闻 ]
Copyright 2015-2022 华东医院网 版权所有 备案号:京ICP备2022016840号-41 联系邮箱:2 913 236 @qq.com