CentOS9快速开始使用rocketmq

2023-01-14 1106点热度 0人点赞 0条评论

官方文档

https://rocketmq.apache.org/zh/docs/quickStart/02quickstart

下载安装启动

下载

下载地址
https://rocketmq.apache.org/zh/download/
zip包下载地址:
https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
下载完成之后得到rocketmq-all-5.0.0-bin-release.zip,使用unzip命令进行解压,得到rocketmq-all-5.0.0-bin-release目录

unzip rocketmq-all-5.0.0-bin-release.zip

给目录改名为rocketmq

mv rocketmq-all-5.0.0-bin-release rocketmq

进入rocketmq目录:

cd rocketmq

启动nameServer

启动mqnamesrv前可以调整一下jvm的分配内存参数(机器内存大的可以忽略)
vi bin/runserver.sh
choose_gc_options中的内存配置有2处,if和else里面的都修改一下
默认内存配置是:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改成为:JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
启动nameServer

sh bin/mqnamesrv &

这里有可能会报错:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

提示说JAVA_HOME变量没有配置,在环境变量中配置JAVA_HOME即可。
安装jdk我选择手动下载jdk19并手动配置环境变量的方式,当然也可以yum和dnf安装。
下载jdk19https://download.oracle.com/java/19/latest/jdk-19_linux-x64_bin.tar.gz

wget https://download.oracle.com/java/19/latest/jdk-19_linux-x64_bin.tar.gz

解压之后得到jdk-19.0.1:

tar xzvf jdk-19_linux-x64_bin.tar.gz

配置当前用户的环境变量添加如下内容:
vi ~/.bash_profile

# User specific environment and startup programs
export JAVA_HOME=/app/soft/jdk-19.0.1
export PATH=$JAVA_HOME/bin:$PATH

执行一下source ~/.bash_profile使配置生效,并测试一下:

[wangxianfeng@ns soft]$ source ~/.bash_profile
[wangxianfeng@ns soft]$ echo $JAVA_HOME
/app/soft/jdk-19.0.1
[wangxianfeng@ns soft]$ java -version
java version "19.0.1" 2022-10-18
Java(TM) SE Runtime Environment (build 19.0.1+10-21)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)

JAVA_HOME配置完成之后再启动nameserver

[wangxianfeng@ns soft]$ cd rocketmq/
[wangxianfeng@ns rocketmq]$ sh bin/mqnamesrv &
[1] 85365

查看启动日志:

tail -f ~/logs/rocketmqlogs/namesrv.log

2023-01-14 09:49:54 INFO main - RemotingServer started, listening 0.0.0.0:9876
2023-01-14 09:49:54 INFO main - name server address updated. NEW : [192.168.2.6:9876] , OLD: null
2023-01-14 09:49:54 INFO NettyEventExecutor - NettyEventExecutor service started
2023-01-14 09:49:54 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2023-01-14 09:49:54 INFO main - Try to start service thread:org.apache.rocketmq.namesrv.routeinfo.BatchUnregistrationService started:false lastThread:null
2023-01-14 09:49:54 INFO main - The Name Server boot success. serializeType=JSON

启动Broker+Proxy

NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。
我们只是一个快速开始,仅用于测试使用,因此仅最简单启动:
先启动broker

nohup sh bin/mqbroker -n 192.168.2.6:9876 --enable-proxy &

如果不是本地测试,此处不要使用localhost形式,如下:

nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

这种形式如果使用192.168.2.6:8081访问代理无法访问,使用ip配置启动可以连接。

第二次启动的时候报错:

[wangxianfeng@ns rocketmq]$ more nohup.out 
java.lang.IllegalAccessError: class org.apache.rocketmq.common.UtilAll (in unnamed module @0x4a499116) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x4a499116
        at org.apache.rocketmq.common.UtilAll.viewed(UtilAll.java:720)
        at org.apache.rocketmq.common.UtilAll.cleanBuffer(UtilAll.java:684)
        at org.apache.rocketmq.store.logfile.DefaultMappedFile.cleanup(DefaultMappedFile.java:470)
        at org.apache.rocketmq.store.ReferenceResource.release(ReferenceResource.java:63)
        at org.apache.rocketmq.store.ReferenceResource.shutdown(ReferenceResource.java:47)
        at org.apache.rocketmq.store.logfile.DefaultMappedFile.destroy(DefaultMappedFile.java:481)
        at org.apache.rocketmq.store.index.IndexFile.destroy(IndexFile.java:97)
        at org.apache.rocketmq.store.index.IndexService.load(IndexService.java:72)
        at org.apache.rocketmq.store.DefaultMessageStore.load(DefaultMessageStore.java:287)
        at org.apache.rocketmq.broker.BrokerController.initialize(BrokerController.java:754)
        at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:224)
        at org.apache.rocketmq.proxy.ProxyStartup.createBrokerController(ProxyStartup.java:212)
        at org.apache.rocketmq.proxy.ProxyStartup.createMessagingProcessor(ProxyStartup.java:171)
        at org.apache.rocketmq.proxy.ProxyStartup.main(ProxyStartup.java:79)

暂不知道如何解决………………,怀疑是jdk版本问题,先使用jdk1.8吧,jdk版本切换回1.8之后启动成功。

java -version
java version "1.8.0_351"
Java(TM) SE Runtime Environment (build 1.8.0_351-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.351-b10, mixed mode)

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Sat Jan 14 12:32:06 CST 2023 rocketmq-proxy startup successfully

验证broker是否启动成功, 比如, broker的ip是192.168.2.6 然后名字是broker-a

tail -f ~/logs/rocketmqlogs/broker_default.log

2023-01-14 09:55:42 INFO main - The broker[broker-a, 192.168.2.6:10911] boot success. serializeType=JSON and name server is localhost:9876

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。

常用管理命令使用

官方文档地址:Admin Tool

其中-n选项为制定nameserver地址

查看集群

$ bin/mqadmin clusterList -n 192.168.2.6:9876
#Cluster Name           #Broker Name            #BID  #Addr                  #Version              #InTPS(LOAD)     #OutTPS(LOAD)  #Timer(Progress)        #PCWait(ms)  #Hour         #SPACE    #ACTIVATED
DefaultCluster          broker-a                0     192.168.2.6:10911      V5_0_0                 0.00(0,0ms)       0.10(0,0ms)  0-0(0.0w, 0.0, 0.0)               0  464908.77     0.0200          true

查看topic列表

$ bin/mqadmin topicList -n 192.168.2.6:9876
TestTopic
RMQ_SYS_TRANS_HALF_TOPIC
%RETRY%please_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
DefaultCluster
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_broker-a
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
broker-a

创建topic

创建topic的时候除了需要指定nameserver外,还需要指定broker或者集群名称,如:

sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

指定broker地址:

sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -b 192.168.2.6:10911

工具测试消息收发

在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876
[wangxianfeng@ns rocketmq]$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=FD437AE6E5090000001132FFFE2BA23A4F037A4F0F2945570CD80000, offsetMsgId=C0A8020600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]……

$ [wangxianfeng@ns rocketmq]$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Consumer Started.
ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=263, queueOffset=0, sysFlag=0, bornTimestamp=1673665733239, bornHost=/192.168.2.6:47676, storeTimestamp=1673665733247, storeHost=/192.168.2.6:10911, msgId=C0A8020600002A9F0000000000000315, commitLogOffset=789, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=250, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1673665806825, UNIQ_KEY=FD437AE6E5090000001132FFFE2BA23A4F037A4F0F2945570E770003, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]]……

SDK测试消息收发

工具测试完成后,我们可以尝试使用 SDK 收发消息。这里用java客户端测试:
1. 在IDEA中创建一个Java工程
2. 在 pom.xml 文件中添加以下依赖引入Java依赖库

   <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>${rocketmq-client-java-version}</version>
    </dependency>

rocketmq-client-java-version当前最新版本为5.0.4,添加properties中的配置:

<rocketmq-client-java-version>5.0.4</rocketmq-client-java-version>
  1. 通过mqadmin创建 Topic
sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

查看topic是否创建成功,出现TestTopic代表创建成功:

[wangxianfeng@ns rocketmq]$ sh bin/mqadmin topicList -n localhost:9876
TestTopic
RMQ_SYS_TRANS_HALF_TOPIC
%RETRY%please_rename_unique_group_name_4
BenchmarkTest
OFFSET_MOVED_EVENT
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
DefaultCluster
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_broker-a
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
broker-a
  1. 在已创建的Java工程中,创建发送普通消息程序并运行,示例代码如下
package org.example;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        //接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoint = "192.168.2.6:8081";
        //消息发送的目标Topic名称,需要提前创建。
        String topic = "TestTopic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        //初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        //普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //消息体。
                .setBody("messageBody2".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

运行程序,发送成功:

"D:\Program Files\Java\jdk1.8.0_331\bin\java.exe" "-javaagent:……" org.example.ProducerExample
017E2A31F52F1C564403D3CAC600000000
  1. 在已创建的Java工程中,创建订阅普通消息程序并运行。Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,您可以选择以下任意一种方式订阅消息。示例代码如下:
package org.example;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

public class PushConsumerExample {
    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        //接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.2.6:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "HelloConsumerGroup";
        //指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TestTopic";
        //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //设置消费监听器。
                .setMessageListener(messageView -> {
                    //处理消息并返回消费结果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume message!!" + StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可关闭该进程。
        //pushConsumer.close();
    }
}

运行结果如下:

"D:\Program Files\Java\jdk1.8.0_331\bin\java.exe" "-javaagent:……" org.example.PushConsumerExample
Consume message!!messageBody2

关闭服务器

完成实验后,我们可以通过以下方式关闭服务

$ sh bin/mqshutdown broker
The mqbroker with proxy enable is running(5263)...
Send shutdown request to mqbroker with proxy enable OK(5263)
No mqbroker running.

$ sh bin/mqshutdown namesrv
The mqnamesrv(5209) is running...
Send shutdown request to mqnamesrv(5209) OK

查看还有没有rocketmq进程:

$ ps -ef|grep rocketmq
wangxia+    5587    5145  0 12:42 pts/0    00:00:00 grep --color=auto rocketmq
[1]-  Exit 143                sh bin/mqnamesrv
[2]+  Exit 143                nohup sh bin/mqbroker -n 192.168.2.6:9876 --enable-proxy

表示rockemq进程结束成功。

王显锋

激情工作,快乐生活!

文章评论