rabbitmq客户端开发
This commit is contained in:
parent
531498f486
commit
8a5e7afc76
@ -1,20 +1,25 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>rabbitmq-basis</artifactId>
|
||||
<version>1.0</version>
|
||||
<groupId>com.heibaiying</groupId>
|
||||
<artifactId>rabbitmq-basis</artifactId>
|
||||
<version>1.0</version>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.rabbitmq</groupId>
|
||||
<artifactId>amqp-client</artifactId>
|
||||
<version>5.6.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.rabbitmq</groupId>
|
||||
<artifactId>amqp-client</artifactId>
|
||||
<version>5.7.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-nop</artifactId>
|
||||
<version>1.7.26</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -1,45 +0,0 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import com.rabbitmq.client.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 消费者
|
||||
*/
|
||||
public class Consumer {
|
||||
|
||||
private static final String QUEUE_NAME = "queue-hello";
|
||||
private static final String USER_NAME = "root";
|
||||
private static final String PASSWORD = "root";
|
||||
private static final String IP_ADDRESS = "192.168.200.229";
|
||||
/* RabbitMQ 服务默认的连接端口号 */
|
||||
private static final int PORT = 5672;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost(IP_ADDRESS);
|
||||
factory.setPort(PORT);
|
||||
factory.setUsername(USER_NAME);
|
||||
factory.setPassword(PASSWORD);
|
||||
Connection connection = factory.newConnection();
|
||||
final Channel channel = connection.createChannel();
|
||||
// 设置客户端最多接收多少个未被 Ack 的消息, 0 代表没有限制
|
||||
channel.basicQos(64);
|
||||
DefaultConsumer consumer = new DefaultConsumer(channel) {
|
||||
@Override
|
||||
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
|
||||
System.out.println("收到消息:" + new String(body));
|
||||
// 消息 Ack
|
||||
channel.basicAck(envelope.getDeliveryTag(), false);
|
||||
}
|
||||
};
|
||||
channel.basicConsume(QUEUE_NAME, consumer);
|
||||
// 等到回调函数执行后再关闭连接
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
channel.close();
|
||||
connection.close();
|
||||
|
||||
}
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
package com.heibaiying;
|
||||
|
||||
import com.rabbitmq.client.*;
|
||||
|
||||
/**
|
||||
* 生产者
|
||||
*/
|
||||
public class Producer {
|
||||
|
||||
private static final String EXCHANGE_NAME = "exchange-hello";
|
||||
private static final String ROUTING_KEY = "routingkey-hello";
|
||||
private static final String QUEUE_NAME = "queue-hello";
|
||||
private static final String USER_NAME = "root";
|
||||
private static final String PASSWORD = "root";
|
||||
private static final String IP_ADDRESS = "192.168.200.229";
|
||||
/* RabbitMQ 服务默认的连接端口号 */
|
||||
private static final int PORT = 5672;
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// 创建连接工厂
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost(IP_ADDRESS);
|
||||
factory.setPort(PORT);
|
||||
factory.setUsername(USER_NAME);
|
||||
factory.setPassword(PASSWORD);
|
||||
// 获取新的连接
|
||||
Connection connection = factory.newConnection();
|
||||
// 创建信道
|
||||
Channel channel = connection.createChannel();
|
||||
// 创建一个 type="direct"、持久化的、非自动删除的交换器
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
|
||||
// 创建一个持久化、非排他的、非自动删除的队列
|
||||
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
|
||||
// 将交换器与队列通过路由键绑定
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
|
||||
String message = "hello world";
|
||||
// 发送一条持久化的文本消息
|
||||
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
||||
// 关闭信道
|
||||
channel.close();
|
||||
// 关闭连接
|
||||
connection.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
package com.heibaiying.base;
|
||||
|
||||
import com.rabbitmq.client.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class Consumer {
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
// 1.创建连接工厂
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
// 2.配置服务器地址和连接信息
|
||||
factory.setHost("localhost");
|
||||
factory.setUsername("guest");
|
||||
factory.setPassword("guest");
|
||||
factory.setVirtualHost("/");
|
||||
// 3.获取连接
|
||||
Connection connection = factory.newConnection();
|
||||
// 4.创建信道
|
||||
final Channel channel = connection.createChannel();
|
||||
// 5.声明一个类型为 type 的持久化的、非自动删除的交换器
|
||||
String exchangeName = "hello-exchange";
|
||||
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
|
||||
// 6.声明一个具名的、持久化的、非排他的、不自动删除的队列
|
||||
String queueName = "hello-queue";
|
||||
channel.queueDeclare(queueName, true, false, false, null);
|
||||
// 7.建立绑定关系
|
||||
String bindingKey = "my-key";
|
||||
channel.queueBind(queueName, exchangeName, bindingKey);
|
||||
// 8.订阅并消费消息
|
||||
channel.basicConsume(queueName, false, "myConsumerTag", new DefaultConsumer(channel) {
|
||||
@Override
|
||||
public void handleDelivery(String consumerTag,
|
||||
Envelope envelope,
|
||||
AMQP.BasicProperties properties,
|
||||
byte[] body) throws IOException {
|
||||
// 9.处理收到的消息
|
||||
System.out.println("Received:" + new String(body));
|
||||
// 10.手动ACK
|
||||
long deliveryTag = envelope.getDeliveryTag();
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
});
|
||||
|
||||
// 11.这里为了观察结果,先不关闭连接
|
||||
// channel.close();
|
||||
// connection.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package com.heibaiying.base;
|
||||
|
||||
import com.rabbitmq.client.BuiltinExchangeType;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class Producer {
|
||||
|
||||
|
||||
public static void main(String[] args) throws IOException, TimeoutException {
|
||||
// 1.创建连接工厂
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
// 2.配置服务器地址和连接信息
|
||||
factory.setHost("localhost");
|
||||
factory.setUsername("guest");
|
||||
factory.setPassword("guest");
|
||||
factory.setVirtualHost("/");
|
||||
// 3.获取连接
|
||||
Connection connection = factory.newConnection();
|
||||
// 4.创建信道
|
||||
Channel channel = connection.createChannel();
|
||||
// 5.声明一个类型为 type 的持久化的、非自动删除的交换器
|
||||
String exchangeName = "hello-exchange";
|
||||
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
|
||||
// 6.指明路由键
|
||||
String routingKey = "my-key";
|
||||
// 7.需要传递的数据
|
||||
byte[] messageBodyBytes = "Hello RabbiMQ!".getBytes();
|
||||
// 8.将消息发布到指定的交换机上
|
||||
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
|
||||
// 9.关闭信道
|
||||
channel.close();
|
||||
// 10.关闭连接
|
||||
connection.close();
|
||||
}
|
||||
}
|
@ -68,7 +68,7 @@ RabbitMQ 与 AMQP 遵循相同的模型架构,其架构示例图如下:
|
||||
|
||||
### 6. Queue(消息队列)
|
||||
|
||||
用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时候队列中的消息会被平均分配给多个消费者进行处理,而不是每个消费者都收到所有消息并处理。
|
||||
用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询 (round-robin) 的方式分发给所有消费者。即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。
|
||||
|
||||
### 7. Consumer(消费者)
|
||||
|
||||
|
19
notes/RabbitMQ_客户端开发.md
Normal file
19
notes/RabbitMQ_客户端开发.md
Normal file
@ -0,0 +1,19 @@
|
||||
# RabbitMQ 客户端开发
|
||||
|
||||
## 一、基本 API
|
||||
|
||||
### 1.1 连接服务
|
||||
|
||||
### 1.2 声明交换器
|
||||
|
||||
#### 1.3 声明队列
|
||||
|
||||
### 1.4 建立绑定关系
|
||||
|
||||
二、生产者
|
||||
|
||||
三、消费者
|
||||
|
||||
四、过期时间(TTL)
|
||||
|
||||
四、
|
@ -174,9 +174,7 @@ public class Rabbit {
|
||||
#### 2.1.1 生产者和消费者
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/RabbitMQ模型架构.png"/> </div></br>
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/消息队列运转过程.png"/> </div></br>
|
||||
|
||||
#### 2.1.2 队列
|
||||
|
||||
**Queue(队列)** 是 RabbitMQ的内部对象,用于**存储消息**。多个消费者可以订阅同一个消息,这时候队列中的消息会被平均分摊(Round-Robin,即轮训)给多个消费者进行处理,而不是每个消费者都收到所有消费并处理。
|
||||
@ -196,7 +194,6 @@ public class Rabbit {
|
||||
**2. direct**: 把消息路由到那些BindingKey和RountingKey 完全匹配的队列中。
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/direct-exchange.png"/> </div></br>
|
||||
|
||||
**官方原文:**
|
||||
|
||||
In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.
|
||||
@ -213,7 +210,6 @@ In such a setup a message published to the exchange with a routing key orange wi
|
||||
- \# (hash) can substitute for zero or more words.
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/topic-exchange.png"/> </div></br>
|
||||
|
||||
官网原文对这一模式的说明比较清晰,摘抄如下:
|
||||
|
||||
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".
|
||||
@ -285,7 +281,7 @@ Connection conn = factory.newConnection();
|
||||
```java
|
||||
// 创建一个 type="direct"、持久化的、非自动删除的交换器
|
||||
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
|
||||
// 创建一个持久化、非排他的、非自动删除的交换器
|
||||
// 创建一个持久化、非排他的、非自动删除的队列
|
||||
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
|
||||
// 将交换器与队列通过路由键绑定
|
||||
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
|
||||
@ -698,7 +694,6 @@ channel.queueDeclare("myqueue " , false , false , false , args) ;
|
||||
利用死信队列来实现:
|
||||
|
||||
<div align="center"> <img src="https://github.com/heibaiying/LearningNotes/blob/master/pictures/延迟队列.png"/> </div></br>
|
||||
|
||||
### 4.5 优先级队列
|
||||
|
||||
**设置队列优先级**:
|
||||
|
Loading…
x
Reference in New Issue
Block a user