spring boot 使用rabbitMQ
需要导入相关的相关依赖如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
AMQP默认实现为rabbitMQ。
为了定制化的配置放弃了boot的自动配置,So,以下为RabbitMQ的配置文件:
package com.fengbaogu.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq 的配置类 * @author LiaoLe */ @Configuration public class RabbitMQConfig { /** 消息交换机的名字*/ public static final String EXCHANGE = "delayed_exchange"; /** 队列key1*/ public static final String ROUTING = "notify"; /** 队列名*/ public static final String QUEUE = "delayed_queue"; /** rabbit MQ 账号*/ public static final String USERNAME = "liaoke"; /**rabbit MQ 密码*/ public static final String PASSWORD = "198461lk"; /**rabbit MQ 地址*/ public static final String RABBITMQ_ADDRESS="localhost"; /**rabbit MQ 端口*/ public static final Integer RABBITMQ_PORT=5672; /** * 配置链接 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_ADDRESS,RABBITMQ_PORT); connectionFactory.setUsername(USERNAME); connectionFactory.setPassword(PASSWORD); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); /**需要设置 接受回调信息*/ return connectionFactory; } /** * 交换机设置 * 开启了延时消息 * @return */ @Bean public DirectExchange defaultExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false); directExchange.setDelayed(true);/**只需简单一步开启延时消息,md找了好久,原来这么简单*/ return directExchange; } /** * 队列设置 * @return */ @Bean public Queue notifyQueue() { return new Queue(QUEUE,true);/**消息持久化*/ } /** * 绑定队列到交换机 * @return */ @Bean public Binding bindingNotify() { return BindingBuilder.bind(notifyQueue()).to(defaultExchange()).with(ROUTING); } }
rabbitMQ默认是不开启delayed功能的哟,所以同学们还需要到官网或者githup上面下载插件:
命令行: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#安装插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
#插件是有版本要求的哦
下面贴发送的代码,随便用了个http入口:
package com.fengbaogu.web; import com.fengbaogu.config.RabbitMQConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; /** * Created by LiaoKe on 2017/5/2. */ @RestController public class TestController implements RabbitTemplate.ConfirmCallback{ /** * 使用RabbitTemplate发送消息 */ private RabbitTemplate rabbitTemplate; /** * 构造方法 * @param rabbitTemplate */ public TestController(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; //设置消费回调 this.rabbitTemplate.setConfirmCallback(this); } private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 向延迟队列中发送消息 * @param msg * @return */ @RequestMapping("send") public String send3(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,RabbitMQConfig.ROUTING , msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(20000);/**延时20秒发送*/ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);/**消息持久化*/ System.out.println(sdf.format(new Date()) + " Delay sent."); return message; } }, correlationId); return null; } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause+"\n重新发送"); } } }
启动后可以在RabbitMQ管理界面上有:
这样的交换机。
Queue同理也可以在管理界面上看到,此处不贴图了。
现在来看 Consumer 端,代码异常简洁:
package com.fengbaogu.listener; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * Created by LiaoKe on 2017/5/3. */ @Component @RabbitListener(queues = {"delayed_queue"}) public class Receiver { @RabbitHandler public void process(String msg) { System.out.println("start one work and now data:"+new Date().toString()); System.out.println("This msg is:"+msg); } }
开启消费端,访问send入口:
服务端打印:
2017-05-03 20:24:14 Delay sent.
回调id:CorrelationData [id=277ce017-af15-4571-b556-762fec62720d]
消息成功消费
消费端打印:
start one work and now data:Wed May 03 20:24:34 CST 2017
This msg is:我是消息2
So,相差20秒。延时队列Over。
相关推荐
该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
Spring Boot 已经对RabbitMQ Client API进行了包装,使用起来简洁很多,下面详细介绍一下如何利用rabbitmq_delayed_message_exchange 插件和Spring Boot来实现延迟消息。
rabbitmq延时队列和四种交换机模式下队列的简单实现,需要自己配置一下属性文件。
Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列
RabbitMQ+Erlang+RabbitMq延时队列插件
springboot+rabbitmq实现延时队列,包括消息发送和消费确认,消费者端使用策略模式处理业务
spring boot rabbitmq框架组合练习学习做的一个完整的精简的小demo项目源码
1. 什么是延时队列? 2. 如何实现一个高效的延时队列? 3. DelayQueue的实现原理 ...4. RabbitMQ实现延时队列的基本原理 5. Redis实现延时队列的基本原理 6. 时间轮(Time Wheel) 7. 几种方案的对比
使用RabbitMQ+延迟队列实现分布式事务的最终一致性方案,demo以典型的订单+库存系统为例
springcloud bus rabbitmq 分布式队列 http://knight-black-bob.iteye.com/blog/2356839
本篇文章主要介绍了Spring Boot RabbitMQ 延迟消息实现完整版示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
本篇文章主要介绍了Spring Boot与RabbitMQ结合实现延迟队列的示例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
Spring boot整合消息队列RabbitMQ
支付状态一致性-RabbitMQ死信队列
spring集成rabbitmq 通俗易懂的demo,保证可以使用,具体的配置,可以参考我的文章
采用python编写的批量删除rabbitmq的队列或交换机。 1.修改rabbitmq_delete.py中rabbitmq的配置; 2.执行以下命令: 删除队列: python3 rabbitmq_delete.py -k ‘udata.climb’ -d 1 删除交换机: python3 rabbitmq_...
本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持...
NULL 博文链接:https://dwj147258.iteye.com/blog/2430847
主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下