本文共 4536 字,大约阅读时间需要 15 分钟。
工作队列模式,如下图:
同simple queues相比,多了一个或一些消费者,即多个消费者共同消费同一个队列中的消息
应用场景:对于任务过重的情况下,使用工作队列可以提高任务处理的速度。
第1步、创建生产者生产消息
package com.wzy.product;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:生产者 * */public class Work_Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); /** * 3.创建队列 * 参数1:队列名称;参数2:是否为持久化队列 * 参数3:是否独占本次连接;参数4:是在在不使用的时候自动删除队列 * 参数5:队列其它参数 * */ channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null); //4.循环发送消息 for(int i=1;i<=30;i++){ //消息 String message="hello,rabbitMQ! work模式:"+i; channel.basicPublish("",Constant.WORK_QUEUE_NAME,null,message.getBytes()); System.out.println("已发送消息:"+message); } //5、关闭资源 channel.close(); connection.close(); }}
第2步、创建消费者1
package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:消费者1 * */public class Work_Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); //3.创建队列 channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null); //4.一次只能接收并处理一个消息 channel.basicQos(1); //5.创建消费者,并设置消息处理 DefaultConsumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ //路由key System.out.println("路由key为:"+envelope.getRoutingKey()); //交换机 System.out.println("交换机为:"+envelope.getExchange()); //消息id System.out.println("消息id为:"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者1接收到的消息为:"+new String(body,"utf-8")); Thread.sleep(5000); //确认消息 channel.basicAck(envelope.getDeliveryTag(),false); }catch(Exception e){ e.printStackTrace(); } } }; //监听消息 channel.basicConsume(Constant.WORK_QUEUE_NAME,false,consumer); }}
第3步、创建消费者2
package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:消费者1 * */public class Work_Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接 Connection connection= ConnectionUtil.getConn(); //2.创建频道 Channel channel=connection.createChannel(); //3.创建队列 channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null); //4.一次只能接收并处理一个消息 channel.basicQos(1); //5.创建消费者,并设置消息处理 DefaultConsumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ //路由key System.out.println("路由key为:"+envelope.getRoutingKey()); //交换机 System.out.println("交换机为:"+envelope.getExchange()); //消息id System.out.println("消息id为:"+envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者2接收到的消息为:"+new String(body,"utf-8")); Thread.sleep(5000); //确认消息 channel.basicAck(envelope.getDeliveryTag(),false); }catch(Exception e){ e.printStackTrace(); } } }; //监听消息 channel.basicConsume(Constant.WORK_QUEUE_NAME,false,consumer); }}
执行结果:
总结:消费者1和消费者2对同一个消息的关系是竞争的关系。都在相互竞争争夺消息.....
转载地址:http://imuii.baihongyu.com/