RocketMQ的安装与基本使用

RocketMQ介绍

RocketMQ 是阿里巴巴开源的消息中间件,现已捐献给 Apache。RocketMQ官网
RocketMQ 在某些功能或者概念上面和 Kafka 是类似的,理解了其中一个在理解另外一个会简单的很多。和 Kafka 的部分区别如下:

  1. 采用零拷贝的原理,顺序写,随机读(Kafka也是采用了这种)
  2. 底层通信框架采用Netty NIO
  3. 将 NameServer 代替 Zookeeper,实现服务寻址和服务协调
  4. 消息失败重试机制、消息可查询
  5. 集群部署无单点,可扩展(Kafka采用的分区+副本的机制)

我们来看下 RocketMQ 中的几部分

  1. NameService 提供服务之间的发现和路由,等待 Broker、Producer、Consumer 来连接,并提供各种查询服务。(类似 Kafka 中使用 Zookeeper 的实现)
  2. broker 负责存储消息以及转发消息的,此角色分别 master 和 slave,一个 master 可以对应多个 slave ,master 与 slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为0表示 master,非0表示 slave。master 也可以部署多个(类似 Kafka 中的 broker)
    broker 每隔30秒定时向所有的 NameService 发送心跳包,信息包括自己的状态和存储 topic 信息。NameService 收到 broker 心跳包之后,更新相关信息,并且每隔10s检查上次 broker 发送心跳的时间,若超过120s就判定 broker 下线,并移除此 broker 所有信息。
  3. producer 生产者,拥有相同 Producer Group 的 Producer 组成一个集群,与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 topic 路由信息,并提供 topic 服务的 master 建立长连接,且定时向 master 发送心跳。(类似 Kafka 中的 Producer)
  4. consumer 消费者,接收消息并进行消费,拥有相同 Consumer Group 的 Consumer 组成一个集群,与 NameServer 集群中的其中一个节点(随机选择)建立长连接。
    定期从 NameServer 取 topic 路由信息,并向提供 topic 服务的 master、slave 建立长连接,且定时向 master、slave 发送心跳。Consumer 既可以从 master 订阅消息,也可以从 slave 订阅消息,订阅规则由 Broker 配置决定。

既然 NameServer 代替了 Zookeeper 那 NameServer 也需要启动,所以想要启动 RockerMQ 至少需要启动两个进程,分别为:NameServer、broker,前者是各种 topic 注册中心,后者是真正的 broker。
可参考文章 https://www.jianshu.com/p/a0b22c1e9f82

单机版的 RockerMQ 安装

我是基于 Centos7 安装的,下载是4.6.0的以版本,在安装之前要确保服务上面安装了 JDK。下载好之后按照我步骤安装

  1. unzip rocketmq-all-4.6.0-bin-release.zip
  2. 启动 NameServer,进入到 bin 目录下,运行命令nohup sh mqnamesrv &表示在后台运行默认情况下,NameServer监听的是9876端口。
  3. 启动 broker,nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf & 【-c可以指定broker.conf配置文件】。默认情况下会加载conf/broker.conf。
    【-n ${namesrvIp}】 -n表示指定当前broker对应的命名服务地址: 默认情况下,broker 监听的是 10911 端口。一般情况下使用命令nohup sh mqbroker -n localhost:9876 &启动 broker。

在启动的时候可能遇到 Cannot allocate memory【无法分配内存】,我们在bin目录下可以适当的修改 runbroker.sh 和 runbroker.sh 的值
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

停止服务的时候需要注意,要先停止 broker,其次停止 NameServer。
停止 broker 在bin目录下执行命令sh mqshutdown broker,停止 NameServer 执行命令sh mqshutdown namesrv
让我们来测试一下发送/接受消息是否正常。添加jar包依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>

发送端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.rocketmq.demo;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* 消息发送端示例
*/
public class RocketMQProducer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_group_name 的组
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("my_group_name");
//指定namesrv服务地址,获取broker相关信息
defaultMQProducer.setNamesrvAddr("192.168.32.137:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i++) {
try{
//创建一个名为 myTopic 的主题,名为 myTag 的tag,并将消息转换未 byte[] 数据
Message message = new Message("myTopic","myTag",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult send = defaultMQProducer.send(message);
//发送消息并回去发送结果
System.out.println("发送消息返回结果:"+send);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

消费端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

/**
* 消息消费端示例
*/
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_consumer_group_name 的组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group_name");
//指定namesrv服务地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.32.137:9876");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消息生产者中的 myTopic,* 代表消费该 topic 下所有的 tag,*表示不过滤,可以通过tag来过滤,比如:”myTag”
consumer.subscribe("myTopic","*");
/**
* 注册消息监听回调
* 有两种回调 MessageListenerConcurrently 为普通监听,MessageListenerOrderly 为顺序监听
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt me : msgs) {
System.out.printf("消费消息:%s %n",new String(me.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}

控制台的安装

我觉得任何一个中间件都有一个图形化的界面,RocketMQ也不例外。来看下 RocketMQ 图形化界面的安装。
下载地址
步骤1,2 这几个步骤我在 Windows 下面进行的

  1. 进到目录 rocketmq-externals-master/rocketmq-console/src/main/resources 并修改 application.properties 文件,配置 namesrvAddr 地址,指向目标服务的ip和端口:rocketmq.config.namesrvAddr=192.168.32.137:9876,(IP 地址请填写自己的IP地址)
  2. 进入到rocketmq-externals-master/rocketmq-console目录执行命令mvn clean package -DskipTests,执行完毕之后会生成一个target的目录,在该目录下把rocketmq-console-ng-1.0.1.jar包上传指服务器并使用命令java -jar rocketmq-console-ng-1.0.1.jar启动控制台(默认端口8080)

在控制台中我们也可以创建Topic,入下图所示


写队列数量表示 producer 发送到的 MessageQueue 的队列个数
读队列数量表示 consumer 读取消息的 MessageQueue 队列个数
这两个值需要相等,在集群模式下如果不相等,如果 写队列数量=6,读队列数量=3, 那每个 broker 上会有3个queue的消息是无法消费的。
perm 代表队列的权限,2表示 w(写),4表示 r(读),6表示rw(可读可写)

消息的发送的几种方式

第一种是同步发送,在上文中的代码都同步发送的,producer 发送消息之后需要等待 broker 回应之后才能继续发下一条消息。
异步发送是指 producer 发送消息后,不需要等待 broker 的回应,紧接着就能发送下条消息。 RocketMQ 的异步发送,需要用户实现异步发送回调接口。异步发送如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* 异步发送消息示例
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_group_name 的组
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("my_group_name");
//指定namesrv服务地址,多个地址以 ; 隔开
defaultMQProducer.setNamesrvAddr("192.168.32.137:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i++) {
try{
//创建一个名为 myTopic 的主题,名为 myTag 的tag,并将消息转换未 byte[] 数据
Message message = new Message("myTopic","myTag",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
defaultMQProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送消息返回结果:%s %n",sendResult.getSendStatus());
}

@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

在 Kafka 中也是有同步发送以及异步发送消息概念。另外 RocketMQ 还支持一种单向发送消息,producer 只负责发送消息,不需要等待 broker 回应且没有回调函数触发,即只发送消息不等待回应,效率最高。如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* 单向发送消息示例
*/
public class OneWayProducer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_group_name 的组
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("my_group_name");
//指定namesrv服务地址,多个地址以 ; 隔开
defaultMQProducer.setNamesrvAddr("192.168.32.137:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i++) {
try{
//创建一个名为 myTopic 的主题,名为 myTag 的tag,并将消息转换未 byte[] 数据
Message message = new Message("myTopic","myTag",("Hello OneWay RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
defaultMQProducer.sendOneway(message);
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}