本文共 6834 字,大约阅读时间需要 22 分钟。
前提:使用 simple 队列的时候
生产者发送消息
public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { // 消息内容 String message = "." + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); }}
@SuppressWarnings("deprecation")public class Recv1 { private final static String QUEUE_NAME = "test_queue_wor1k"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义一个消息的消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [1] Received '" + message + "'"); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; //消息的确认模式自动应答 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) throws InterruptedException { Thread.sleep(1000); } }
public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义一个消息的消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [2] Received '" + message + "'"); try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
[1] Received '.0'[x] Done[1] Received '.2'[x] Done[1] Received '.4'[x] Done[1] Received '.6'……….
消费者 1 中将偶数部分处理掉了
[2] Received '.1'[x] Done[2] Received '.3'[x] Done[2] Received '.5'[x] Done…… .. . . .
消费者2中将基数部分处理掉了
我想要的是 1 处理的多,而 2 处理的少
测试结果:
结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 指定一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); int prefetchCount = 1; //每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息 //限制发给同一个消费者不得超过1条消息 channel.basicQos(prefetchCount); // 发送的消息 for (int i = 0; i < 50; i++) { String message = "." + i; // 往队列中发出一条消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 10); } // 关闭频道和连接 channel.close(); connection.close(); }}
public class Recv1 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);//保证一次只分发一个 //定义一个消息的消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [1] Received '" + message + "'"); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //手动确认消息 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) throws InterruptedException { Thread.sleep(1000); }}
@SuppressWarnings("deprecation")public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);//保证一次只分发一个 //定义一个消息的消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [2] Received '" + message + "'"); try { doWork(message); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //关闭自动 确认 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) throws InterruptedException { Thread.sleep(2000); } }
这时候现象就是消费者 1 速度大于消费者 2
转载地址:http://vhonn.baihongyu.com/