环境需求:

64位操作系统,建议使用Linux / Unix /

  • CentOs7.3

  • 64bit JDK 1.8+

  • Maven 3.2.x

一、安装Maven

参考链接:

二、安装RocketMQ 

1、关闭防火墙

systemctl stop firewalld.service

2、下载和构建

wget http://mirrors.hust.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0
mvn -Prelease-all -DskipTests clean install -U
mv distribution/target/apache-rocketmq /usr/local/apache-rocketmq
cd /usr/local/apache-rocketmq/

编译成功的响应

3、配置rocketmq的环境变量,在/etc/profile最后添加

export ROCKETMQ_HOME=/usr/local/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

4、使rocketmq的环境变量生效

source /etc/profile

三、内存问题:

注意:
启动NameServer 和Broker的时候可能会出现错误,请留意对应的日志文件。在测试环境中常见的错误是内存不足的错误,这时候可以修改NameSever和Broker的启动脚本。Xms\Mmx不小于1g。

另外:

#mqbroker.xml和mqnamesrv.xml的内存不要超过runbroker.sh 和runserver.sh的内存,不然会引起内存不够导致奔溃。
bin/mqnamesrv.xml
bin/mqbroker.xml

四、启动Name Server
 

1、修改runserver,默认 RocketMQ Server 内存需要很大的

vim bin/runserver.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

2、启动 Name Server

#nohup来启动
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功信息如下

The Name Server boot success. serializeType=JSON

五、启动broker

1、修改runbroker,默认 RocketMQ Broker 内存需要很大的。

vim bin/runbroker.sh
--------------------------------------------------------------
JAVA_OPT="${JAVA_OPT} -server -Xms11g -Xmx1g -Xmn512m"

2、启动Broker

#nohup来启动
nohup sh bin/mqbroker -n localhost:9876 >/dev/null 2>&1 &
#查看日志
tail -f ~/logs/rocketmqlogs/broker.log 

启动成功信息如下

2021-07-07 15:26:06 INFO main - Set user specified name server address: 192.168.133.116:9876
2021-07-07 15:26:06 INFO PullRequestHoldService - PullRequestHoldService service started
2021-07-07 15:26:06 INFO main - register broker to name server 192.168.133.116:9876 OK
2021-07-07 15:26:06 INFO main - The broker[env1, 192.168.133.116:10911] boot success. serializeType=JSON and name server is 192.168.133.116:9876

六、查看进程

[root@rich apache-rocketmq]# jps
3441 BrokerStartup
3606 Jps
3383 NamesrvStartup
[root@rich apache-rocketmq]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      1270/dnsmasq
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      991/sshd
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      993/cupsd
tcp        0      0 0.0.0.0:3306            0.0.0.0:*               LISTEN      1126/mysqld
tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd
tcp6       0      0 :::9876                 :::*                    LISTEN      3383/java
tcp6       0      0 :::22                   :::*                    LISTEN      991/sshd
tcp6       0      0 ::1:631                 :::*                    LISTEN      993/cupsd
tcp6       0      0 :::10909                :::*                    LISTEN      3441/java
tcp6       0      0 :::10911                :::*                    LISTEN      3441/java
tcp6       0      0 :::10912                :::*                    LISTEN      3441/java

七、发送和收取消息

在发送和收取消息之前,我们需要告诉客户端Name Server的位置。RocketMQ有多种办法来实现,在这里我们使用最简单的环境变量 NAMESRV_ADDR

export NAMESRV_ADDR=localhost:9876
#生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
#消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

八、关闭服务

#关闭nameserver 
[root@rich bin]# ./mqshutdown namesrv

#关闭broker
[root@rich bin]# ./mqshutdown broker

九、生产者与消费者代码

(1)同步发送消息

可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。

public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.133.117:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

(2)异步发送消息

异步传输一般用于响应时间敏感的业务场景

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //使用生产者组名称进行实例化。
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 指定服务器地址。
        producer.setNamesrvAddr("192.168.133.117:9876");
        //启动实例。
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

(3)单向发送消息

单向传输用于需要中等可靠性的情况,例如日志收集。

public class OnewayProducer {

    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.133.117:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("wu ----- Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);
        producer.shutdown();
    }
}

(4)消费消息

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.133.117:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                byte[] bt = msgs.get(0).getBody();
                try {
                    System.out.println("-------------" + new String(bt,"UTF-8")+ "------------");
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

以上代码来自rocketmq官网。

十、Rocketmq-console可视化的监控管理平台

安装流程如下:

(1)在NameServer的机器中选择一台,然后在里面执行如下命令拉取RocketMQ运维工作台的源码:

git clone https://github.com/apache/rocketmq-externals.git

(2)然后进入rocketmq-console的目录:

cd rocketmq-externals/rocketmq-console/

(3)然后进入target目录下,可以看到一个jar包,接着执行下面的命令启动工作台:

java -jar target/rocketmq-console-ng-2.0.0.jar

或者

java -jar target/rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876

这里的2.0.0版本号不是固定的,第一次执行 mvn 打包操作应该是 1.0.0 ,然后多次打包版本号会有提升。

另外,如果配置文件有执行namesrvAddr的ip地址,就使用第一种执行方式,否则就选择第二种显式的去指定namesrvAddr所在的服务器地址。

这里在设置NameServer的地址时,如果有多个地址可以用分号隔开,接着就会看到工作台启动了,然后就通过浏览器访问那台机器的8080端口就可以了。

(4)布局说明

默认全英文页面,右上角有一个按钮是“ChangeLanguage” , 可以支持切换语言的,选择切换成简体中文就行了。

在这个界面里可以让你看到Broker的答题消息复制,还有各个Topic的消息负载,另外还可以选择日期要看哪一天的监控数据,都可以看到。

点击导航栏里的“集群” , 就会进入集群的一个监控界面。

 这里可以看到很多的信息,包括可以看到各个Broker的分组,哪些是Master,哪些是Slave,他们各自的机器地址和端口号,还有版本号。

还有每台机器的生产小学TPS和消费消息TPS,还有消息总数。

通过这个TPS统计,就是每秒写入或者被消费的消息数量,就可以看出RocketMQ集群的TPS和并发访问量。

此外还有“消费者”、“生产者”、“消息” 等界面,都有各自的作用,这里不做赘述。

(5)注意事项

rocketmq-console是访问的服务器10911端口,所以需要每个namesrvAddr所在服务器都开发该端口,或者关闭防火墙,不然会运维平台会请求失败,拿不到MQ的Broker数据。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐