本文最后更新于370 天前,其中的信息可能已经过时,如有错误请发送邮件到3368129372@qq.com
交换机
- direct
路由键(RoutingKey)与交换机完全匹配交换机,完全匹配、单播的形式.
一个key可以绑定多个队列 - fanout
扇出型交换机,Routingkey无效,消息发送给所有绑定了了交换机的队列 - topic
跟direct类似,但是支持模糊匹配(*代表一个部分,#代表0个或多个部分) - header
与direct类似,性能差,很少用到
死信队列
- 原因
- nack或者reject的信息
- 超过ttl而过期的队列
- 队列满了放不下
demo代码(未封装的)
生产者:
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String userInput = scanner.nextLine();
String[] strings = userInput.split(" ");
if (strings.length < 1) {
continue;
}
String message = strings[0];
String routingKey = strings[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
}
}
}
消费者:
public class TopicConsumer {
private static final String EXCHANGE_NAME = "topic-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建队列
String queueName = "frontend_queue";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "#.前端.#");
// 创建队列
String queueName2 = "backend_queue";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");
// 创建队列
String queueName3 = "product_queue";
channel.queueDeclare(queueName3, true, false, false, null);
channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback xiaoaDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [xiaoa] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback xiaobDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [xiaob] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback xiaocDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [xiaoc] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, xiaoaDeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName2, true, xiaobDeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName3, true, xiaocDeliverCallback, consumerTag -> {
});
}
}