微信网站公司,it外包公司可以进吗,长春广告公司网站建设,工业网站模板文章目录1. 事务机制2. Confirm模式2.1 生产者2.1.1 普通Confirm模式2.1.2 批量Confirm模式2.1.3 异步Confirm模式2.2 消费者3. 其他消费者如何确保消息一定能够消费成功呢#xff1f;由于在前面工作队列模式里面我们了解了应答模式#xff0c;所以我们可以很自信的回答如上题…
文章目录1. 事务机制2. Confirm模式2.1 生产者2.1.1 普通Confirm模式2.1.2 批量Confirm模式2.1.3 异步Confirm模式2.2 消费者3. 其他消费者如何确保消息一定能够消费成功呢由于在前面工作队列模式里面我们了解了应答模式所以我们可以很自信的回答如上题目。
通过应答形式默认自动应答可以修改为手动应答来保证消息消费成功。
其实应答形式就是 RabbitMQ 消息确认机制的一种体现我们再来看看问题的产生背景: 生产者发送消息出去之后不知道到底有没有发送到 RabbitMQ 服务器 默认是不知道的。而且有的时候我们在发送消息之后后面的逻辑出问题了我们不想要发送之前的消息了需要撤回该怎么做。 两种解决方案:
AMQP 事务机制Confirm 模式
1. 事务机制
事务机制分为三部分开启事务提交事务事务回滚如下
txSelect 将当前 channel 通道设置为 transaction 模式开启事务txCommit 提交当前事务txRollback 事务回滚
我们通过一个例子模拟消息生产者发送消息过程发生异常进行事务回滚的过程。
public class Producer {/** 队列名称 */private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.发送消息 */try {/** 4.1 开启事务 */channel.txSelect();String msg 我是生产者生成的消息;System.out.println(生产者发送消息msg);channel.basicPublish(, QUEUE_NAME, null, msg.getBytes());/** 4.2 提交事务 - 模拟异常 */int i 1/0;channel.txCommit();}catch (Exception e){e.printStackTrace();System.out.println(发生异常我要进行事务回滚了);/** 4.3 事务回滚 */channel.txRollback();}finally {channel.close();newConnection.close();}}}打印结果 生产者发送消息我是生产者生成的消息 java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37) 发生异常我要进行事务回滚了 2. Confirm模式
像上方这种采用 AMQP 事务机制来保证消息的准确到达在一定程度上是消耗了性能的所以我们再来看看 Confirm 模式。
Confirm 模式分为两块一是生产者的 Confirm 模式再就是消费者的 Confirm 模式。
2.1 生产者
通过生产者的确认模式我们是要保证消息准确达到客户端而与 AMQP 事务不同的是 Confirm 是针对一条消息的而事务是可以针对多条消息的。
Confirm 模式最大的好处在于它是异步的一旦发布一条消息生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息当消息最终得到确认之后生产者应用便可以通过回调方法来处理该确认消息。
Confirm 的三种实现方式
channel.waitForConfirms() 普通发送方确认模式channel.waitForConfirmsOrDie() 批量确认模式channel.addConfirmListener() 异步监听发送方确认模式
2.1.1 普通Confirm模式
public class Producer11 {/** 队列名称 */private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i 0; i 5; i) {channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, ( Confirm模式 第 (i 1) 条消息).getBytes());try {if (channel.waitForConfirms()) {System.out.println(发送成功);}else{System.out.println(进行消息重发);}} catch (InterruptedException e) {e.printStackTrace();}}/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}在推送消息之前channel.confirmSelect() 声明开启发送方确认模式再使用channel.waitForConfirms() 等待消息被服务器确认即可。 2.1.2 批量Confirm模式
public class Producer22 {/** 队列名称 */private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i 0; i 5; i) {channel.basicPublish(, QUEUE_NAME, null, ( Confirm模式 第 (i 1) 条消息).getBytes());}/** 6.直到所有信息都发布只要有一个未确认就会IOException */channel.waitForConfirmsOrDie();System.out.println(全部执行完成);/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}channel.waitForConfirmsOrDie() 使用同步方式等所有的消息发送之后才会执行后面代码只要有一个消息未被确认就会抛出 IOException 异常。 2.1.3 异步Confirm模式
public class Producer33 {/** 队列名称 */private static final String QUEUE_NAME test_queue;public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();for (int i 0; i 10; i) {String message 我是生产者生成的消息 i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));}/** 5.发送消息 异步监听确认和未确认的消息 */channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(未确认消息标识 deliveryTag);}Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format(已确认消息标识%d多个消息%b, deliveryTag, multiple));}});/** 6.关闭通道、连接 *//** channel.close();*//** newConnection.close();*/}}异步模式的优点就是执行效率高不需要等待消息执行完只需要监听消息即可以上异步返回的信息如下 可以看出代码是异步执行的消息确认有可能是批量确认的是否批量确认在于返回的 multiple 的参数此参数为 bool 值如果 true 表示批量执行了 deliveryTag 这个值以前的所有消息如果为 false 的话表示单条确认。 维持异步调用要求我们不能断掉连接因此注释掉第6步。 2.2 消费者
为了保证消息从队列可靠地到达消费者RabbitMQ 提供消息确认机制(message acknowledgment)。消费者在声明队列时可以指定 noAck 参数当 noAckfalse 时 RabbitMQ 会等待消费者显式发回ack信号后才从内存(和磁盘如果是持久化消息的话)中移去消息。否则RabbitMQ 会在队列中消息被消费后立即删除它。
在消费者中 Confirm 模式又分为手动确认和自动确认。
关于两者的介绍
自动确认 在自动确认模式下消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量只要消费者能够跟上不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同如果消费者的TCP连接或信道在成功投递之前关闭该消息则会丢失。
手动确认 使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用限制信道上未完成“进行中”传送的数量。 然而对于自动确认根据定义没有这样的限制。 因此消费者可能会被交付速度所压倒可能积压在内存中堆积如山或者被操作系统终止。 某些客户端库将应用TCP反压直到未处理的交付积压下降超过一定的限制时才停止从套接字读取。 因此只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。 综上尽量选择手动确认方式。 主要实现代码
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);// 关闭自动确认
boolean autoAck false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);3. 其他
1、如果 RabbitMQ 服务器宕机了消息会丢失吗 不会丢失RabbitMQ 服务器支持消息持久化机制会把消息持久化到硬盘上。 2、如何确保消息正确地发送至RabbitMQ RabbitMQ 使用发送方确认模式确保消息正确地发送到 RabbitMQ。 发送方确认模式将信道设置成 confirm 模式发送方确认模式则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后或者消息被写入磁盘后可持久化的消息信道会发送一个确认给生产者包含消息唯一ID。如果RabbitMQ发生内部错误从而导致消息丢失会发送一条nacknot acknowledged未确认消息。 发送方确认模式是异步的生产者应用程序在等待确认的同时可以继续发送消息。当确认消息到达生产者应用程序生产者应用程序的回调方法就会被触发来处理确认消息。 我创建了一个java相关的公众号用来记录自己的学习之路感兴趣的小伙伴可以关注一下微信公众号哈niceyoo