700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java实现rabbitmq简单队列模型 生产者 消费者 消息队列

java实现rabbitmq简单队列模型 生产者 消费者 消息队列

时间:2023-06-05 01:18:56

相关推荐

java实现rabbitmq简单队列模型 生产者  消费者  消息队列

生产者向队列发送消息,随机消费者从队列中接收消息

创建用户和虚拟主机

通过rabbitmq提供的用户管理界面可以很轻松的创建用户和虚拟主机,并且需要将用户绑定到对应的虚拟主机。自带有guest用户和/虚拟主机,也可以直接用这两个既有信息。我们创建了名为wuwl的用户和/vh的虚拟主机,注意虚拟主机需要以/开头。

导入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>

创建连接,这里把ConnectionUtils 视为一个工具类,生产者消费者都需要连接对象

public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {// 创建连接mq的连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置连接rabbitmq主机connectionFactory.setHost("192.168.20.128");// 设置端口号connectionFactory.setPort(5672);// 设置虚拟主机connectionFactory.setVirtualHost("/vh");// 设置用户名和密码connectionFactory.setUsername("wuwl");connectionFactory.setPassword("123456");// 获取连接对象return connectionFactory.newConnection();}}

创建生产者

public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();/** 通道绑定对应的消息队列* 参数一:队列名称,如果队列不存在,自动创建队列* 参数二:定义队列是否持久化* 参数三:是否独占队列* 参数四:是否在消费完成后自动删除队列*/channel.queueDeclare("wuwl",true,false,false,null);/*** 发布消息* 参数一: 交换机名称* 参数二:队列名称* 参数三:传递消息额外设置* 参数四:消息的具体内容*/channel.basicPublish("","wuwl",null,"hello rabbitmq".getBytes());}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}}

创建消费者,消费者的通道和连接对象不关闭则一直处于监听状态,只有生产者发送新的消息至消息队列,消费者会立即消费掉

public class Consumer {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();/** 通道绑定对应的消息队列* 参数一:队列名称,如果队列不存在,自动创建队列* 参数二:定义队列是否持久化* 参数三:是否独占队列* 参数四:是否在消费完成后自动删除队列*/channel.queueDeclare("wuwl", true, false, false, null);/*** 消费* 参数一: 交换机名称* 参数二:队列名称* 参数三:传递消息额外设置* 参数四:消息的具体内容*/channel.basicConsume("wuwl",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});} finally {// if (channel != null && channel.isOpen()) {//channel.close();// }// if (connection != null && connection.isOpen()) {//connection.close();// }}}public static void main(String[] args) throws IOException, TimeoutException {Consumer consumer = new Consumer();consumer.consume();}}

测试

单独运行生产者,我们可以在管理界面看到我们定义的queue,以及未被消费的效益一条

启动消费者后,消息被消费,持续监听中

此时队列中待消费的消息数量为0

再次通过生产者发送一条消息至消息队列,消费者直接消费,队列中待消费消息数量为0

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。