博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ(4)--工作队列(work queues)
阅读量:4087 次
发布时间:2019-05-25

本文共 4536 字,大约阅读时间需要 15 分钟。

工作队列模式,如下图:

同simple queues相比,多了一个或一些消费者,即多个消费者共同消费同一个队列中的消息

应用场景:对于任务过重的情况下,使用工作队列可以提高任务处理的速度。

第1步、创建生产者生产消息

package com.wzy.product;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:生产者 * */public class Work_Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        /**         * 3.创建队列         * 参数1:队列名称;参数2:是否为持久化队列         * 参数3:是否独占本次连接;参数4:是在在不使用的时候自动删除队列         * 参数5:队列其它参数         * */        channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null);        //4.循环发送消息        for(int i=1;i<=30;i++){            //消息            String message="hello,rabbitMQ! work模式:"+i;            channel.basicPublish("",Constant.WORK_QUEUE_NAME,null,message.getBytes());            System.out.println("已发送消息:"+message);        }        //5、关闭资源        channel.close();        connection.close();    }}

第2步、创建消费者1

package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:消费者1 * */public class Work_Consumer1 {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        //3.创建队列        channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null);        //4.一次只能接收并处理一个消息        channel.basicQos(1);        //5.创建消费者,并设置消息处理        DefaultConsumer consumer=new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                try{                    //路由key                    System.out.println("路由key为:"+envelope.getRoutingKey());                    //交换机                    System.out.println("交换机为:"+envelope.getExchange());                    //消息id                    System.out.println("消息id为:"+envelope.getDeliveryTag());                    //收到的消息                    System.out.println("消费者1接收到的消息为:"+new String(body,"utf-8"));                    Thread.sleep(5000);                    //确认消息                    channel.basicAck(envelope.getDeliveryTag(),false);                }catch(Exception e){                    e.printStackTrace();                }            }        };        //监听消息        channel.basicConsume(Constant.WORK_QUEUE_NAME,false,consumer);    }}

第3步、创建消费者2

package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.com.wzy.utils.ConnectionUtil;import com.wzy.com.wzy.utils.Constant;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 工作队列:消费者1 * */public class Work_Consumer2 {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接        Connection connection= ConnectionUtil.getConn();        //2.创建频道        Channel channel=connection.createChannel();        //3.创建队列        channel.queueDeclare(Constant.WORK_QUEUE_NAME,true,false,false,null);        //4.一次只能接收并处理一个消息        channel.basicQos(1);        //5.创建消费者,并设置消息处理        DefaultConsumer consumer=new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                try{                    //路由key                    System.out.println("路由key为:"+envelope.getRoutingKey());                    //交换机                    System.out.println("交换机为:"+envelope.getExchange());                    //消息id                    System.out.println("消息id为:"+envelope.getDeliveryTag());                    //收到的消息                    System.out.println("消费者2接收到的消息为:"+new String(body,"utf-8"));                    Thread.sleep(5000);                    //确认消息                    channel.basicAck(envelope.getDeliveryTag(),false);                }catch(Exception e){                    e.printStackTrace();                }            }        };        //监听消息        channel.basicConsume(Constant.WORK_QUEUE_NAME,false,consumer);    }}

执行结果:

总结:消费者1和消费者2对同一个消息的关系是竞争的关系。都在相互竞争争夺消息.....

转载地址:http://imuii.baihongyu.com/

你可能感兴趣的文章
hbase(5)---API示例
查看>>
SSM-CRUD(1)---环境搭建
查看>>
SSM-CRUD(2)---查询
查看>>
SSM-CRUD (3)---查询功能改造
查看>>
Nginx(2)---安装与启动
查看>>
springBoot(5)---整合servlet、Filter、Listener
查看>>
C++ 模板类型参数
查看>>
C++ 非类型模版参数
查看>>
设计模式 依赖倒转原则 & 里氏代换原则
查看>>
DirectX11 光照
查看>>
图形学 图形渲染管线
查看>>
DirectX11 计时和动画
查看>>
DirectX11 光照与材质的相互作用
查看>>
DirectX11 法线向量
查看>>
DirectX11 兰伯特余弦定理(Lambert)
查看>>
DirectX11 漫反射光
查看>>
DirectX11 环境光
查看>>
DirectX11 镜面光
查看>>
DirectX11 三种光照组成对比
查看>>
DirectX11 指定材质
查看>>