官方文档
https://rocketmq.apache.org/zh/docs/quickStart/02quickstart
下载安装启动
下载
下载地址
https://rocketmq.apache.org/zh/download/
zip包下载地址:
https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
下载完成之后得到rocketmq-all-5.0.0-bin-release.zip,使用unzip命令进行解压,得到rocketmq-all-5.0.0-bin-release目录
unzip rocketmq-all-5.0.0-bin-release.zip
给目录改名为rocketmq
mv rocketmq-all-5.0.0-bin-release rocketmq
进入rocketmq目录:
cd rocketmq
启动nameServer
启动mqnamesrv前可以调整一下jvm的分配内存参数(机器内存大的可以忽略)
vi bin/runserver.sh
choose_gc_options中的内存配置有2处,if和else里面的都修改一下
默认内存配置是:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改成为:JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
启动nameServer
sh bin/mqnamesrv &
这里有可能会报错:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
提示说JAVA_HOME变量没有配置,在环境变量中配置JAVA_HOME即可。
安装jdk我选择手动下载jdk19并手动配置环境变量的方式,当然也可以yum和dnf安装。
下载jdk19https://download.oracle.com/java/19/latest/jdk-19_linux-x64_bin.tar.gz
wget https://download.oracle.com/java/19/latest/jdk-19_linux-x64_bin.tar.gz
解压之后得到jdk-19.0.1:
tar xzvf jdk-19_linux-x64_bin.tar.gz
配置当前用户的环境变量添加如下内容:
vi ~/.bash_profile
# User specific environment and startup programs
export JAVA_HOME=/app/soft/jdk-19.0.1
export PATH=$JAVA_HOME/bin:$PATH
执行一下source ~/.bash_profile使配置生效,并测试一下:
[wangxianfeng@ns soft]$ source ~/.bash_profile
[wangxianfeng@ns soft]$ echo $JAVA_HOME
/app/soft/jdk-19.0.1
[wangxianfeng@ns soft]$ java -version
java version "19.0.1" 2022-10-18
Java(TM) SE Runtime Environment (build 19.0.1+10-21)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)
JAVA_HOME配置完成之后再启动nameserver
[wangxianfeng@ns soft]$ cd rocketmq/
[wangxianfeng@ns rocketmq]$ sh bin/mqnamesrv &
[1] 85365
查看启动日志:
tail -f ~/logs/rocketmqlogs/namesrv.log
2023-01-14 09:49:54 INFO main - RemotingServer started, listening 0.0.0.0:9876
2023-01-14 09:49:54 INFO main - name server address updated. NEW : [192.168.2.6:9876] , OLD: null
2023-01-14 09:49:54 INFO NettyEventExecutor - NettyEventExecutor service started
2023-01-14 09:49:54 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2023-01-14 09:49:54 INFO main - Try to start service thread:org.apache.rocketmq.namesrv.routeinfo.BatchUnregistrationService started:false lastThread:null
2023-01-14 09:49:54 INFO main - The Name Server boot success. serializeType=JSON
启动Broker+Proxy
NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。
我们只是一个快速开始,仅用于测试使用,因此仅最简单启动:
先启动broker
nohup sh bin/mqbroker -n 192.168.2.6:9876 --enable-proxy &
如果不是本地测试,此处不要使用localhost形式,如下:
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
这种形式如果使用192.168.2.6:8081访问代理无法访问,使用ip配置启动可以连接。
第二次启动的时候报错:
[wangxianfeng@ns rocketmq]$ more nohup.out
java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x4a499116) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x4a499116
at org.apache.rocketmq.common.UtilAll.viewed(UtilAll.java:720)
at org.apache.rocketmq.common.UtilAll.cleanBuffer(UtilAll.java:684)
at org.apache.rocketmq.store.logfile.DefaultMappedFile.cleanup(DefaultMappedFile.java:470)
at org.apache.rocketmq.store.ReferenceResource.release(ReferenceResource.java:63)
at org.apache.rocketmq.store.ReferenceResource.shutdown(ReferenceResource.java:47)
at org.apache.rocketmq.store.logfile.DefaultMappedFile.destroy(DefaultMappedFile.java:481)
at org.apache.rocketmq.store.index.IndexFile.destroy(IndexFile.java:97)
at org.apache.rocketmq.store.index.IndexService.load(IndexService.java:72)
at org.apache.rocketmq.store.DefaultMessageStore.load(DefaultMessageStore.java:287)
at org.apache.rocketmq.broker.BrokerController.initialize(BrokerController.java:754)
at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:224)
at org.apache.rocketmq.proxy.ProxyStartup.createBrokerController(ProxyStartup.java:212)
at org.apache.rocketmq.proxy.ProxyStartup.createMessagingProcessor(ProxyStartup.java:171)
at org.apache.rocketmq.proxy.ProxyStartup.main(ProxyStartup.java:79)
暂不知道如何解决………………,怀疑是jdk版本问题,先使用jdk1.8吧,jdk版本切换回1.8之后启动成功。
java -version
java version "1.8.0_351"
Java(TM) SE Runtime Environment (build 1.8.0_351-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.351-b10, mixed mode)
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Sat Jan 14 12:32:06 CST 2023 rocketmq-proxy startup successfully
验证broker是否启动成功, 比如, broker的ip是192.168.2.6 然后名字是broker-a
tail -f ~/logs/rocketmqlogs/broker_default.log
2023-01-14 09:55:42 INFO main - The broker[broker-a, 192.168.2.6:10911] boot success. serializeType=JSON and name server is localhost:9876
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。
常用管理命令使用
官方文档地址:Admin Tool
其中-n选项为制定nameserver地址
查看集群
$ bin/mqadmin clusterList -n 192.168.2.6:9876
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster broker-a 0 192.168.2.6:10911 V5_0_0 0.00(0,0ms) 0.10(0,0ms) 0-0(0.0w, 0.0, 0.0) 0 464908.77 0.0200 true
查看topic列表
$ bin/mqadmin topicList -n 192.168.2.6:9876
TestTopic
RMQ_SYS_TRANS_HALF_TOPIC
%RETRY%please_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
DefaultCluster
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_broker-a
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
broker-a
创建topic
创建topic的时候除了需要指定nameserver外,还需要指定broker或者集群名称,如:
sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
指定broker地址:
sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -b 192.168.2.6:10911
工具测试消息收发
在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR
$ export NAMESRV_ADDR=localhost:9876
[wangxianfeng@ns rocketmq]$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=FD437AE6E5090000001132FFFE2BA23A4F037A4F0F2945570CD80000, offsetMsgId=C0A8020600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]……
$ [wangxianfeng@ns rocketmq]$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=263, queueOffset=0, sysFlag=0, bornTimestamp=1673665733239, bornHost=/192.168.2.6:47676, storeTimestamp=1673665733247, storeHost=/192.168.2.6:10911, msgId=C0A8020600002A9F0000000000000315, commitLogOffset=789, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=250, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1673665806825, UNIQ_KEY=FD437AE6E5090000001132FFFE2BA23A4F037A4F0F2945570E770003, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]]……
SDK测试消息收发
工具测试完成后,我们可以尝试使用 SDK 收发消息。这里用java客户端测试:
1. 在IDEA中创建一个Java工程
2. 在 pom.xml 文件中添加以下依赖引入Java依赖库
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java-version}</version>
</dependency>
rocketmq-client-java-version当前最新版本为5.0.4,添加properties中的配置:
<rocketmq-client-java-version>5.0.4</rocketmq-client-java-version>
- 通过mqadmin创建 Topic
sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
查看topic是否创建成功,出现TestTopic代表创建成功:
[wangxianfeng@ns rocketmq]$ sh bin/mqadmin topicList -n localhost:9876
TestTopic
RMQ_SYS_TRANS_HALF_TOPIC
%RETRY%please_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
DefaultCluster
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_broker-a
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
broker-a
- 在已创建的Java工程中,创建发送普通消息程序并运行,示例代码如下
package org.example;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
//接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "192.168.2.6:8081";
//消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
//初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody2".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
运行程序,发送成功:
"D:\Program Files\Java\jdk1.8.0_331\bin\java.exe" "-javaagent:……" org.example.ProducerExample
017E2A31F52F1C564403D3CAC600000000
- 在已创建的Java工程中,创建订阅普通消息程序并运行。Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,您可以选择以下任意一种方式订阅消息。示例代码如下:
package org.example;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
public class PushConsumerExample {
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
//接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.2.6:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
//订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "HelloConsumerGroup";
//指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
//初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//设置消费监听器。
.setMessageListener(messageView -> {
//处理消息并返回消费结果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume message!!" + StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可关闭该进程。
//pushConsumer.close();
}
}
运行结果如下:
"D:\Program Files\Java\jdk1.8.0_331\bin\java.exe" "-javaagent:……" org.example.PushConsumerExample
Consume message!!messageBody2
关闭服务器
完成实验后,我们可以通过以下方式关闭服务
$ sh bin/mqshutdown broker
The mqbroker with proxy enable is running(5263)...
Send shutdown request to mqbroker with proxy enable OK(5263)
No mqbroker running.
$ sh bin/mqshutdown namesrv
The mqnamesrv(5209) is running...
Send shutdown request to mqnamesrv(5209) OK
查看还有没有rocketmq进程:
$ ps -ef|grep rocketmq
wangxia+ 5587 5145 0 12:42 pts/0 00:00:00 grep --color=auto rocketmq
[1]- Exit 143 sh bin/mqnamesrv
[2]+ Exit 143 nohup sh bin/mqbroker -n 192.168.2.6:9876 --enable-proxy
表示rockemq进程结束成功。
文章评论