项目实践·Docker RocketMQ

作者 : jamin 本文共8230个字,预计阅读时间需要21分钟 发布时间: 2020-10-18 共981人阅读

Docker RocketMQ

RocketMQ 简介

Message Queue(MQ,消息队列中间件)作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
  • 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
  • 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序
  • 支持 Topic 与 Queue 两种模式
  • 亿级消息堆积能力
  • 比较友好的分布式特性
  • 同时支持 Push 与 Pull 方式消费消息

RocketMQ 优势:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
  • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

RocketMQ 有 namesrv 和 broker 组成。

namesrv 的功能如下:

  • 接收 broker 的请求注册 broker 路由信息(master 和 slave)
  • 接收 client 的请求根据某个 topic 获取所有到 broker 的路由信息

broker 则是 RocketMQ 真正存储消息的地方,broker 消息存储主要包括 3 个部分,分别 commitLog 的存储、consumeQueue 的存储、index 的存储。

Producer 和 Consumer 都是通过 namesrv 获取 broker 路由信息,连接到 broker 生产消费消息,namesrv 和 broker 可以分别集群部署,生产者消费者同样可以分别集群部署,物理部署架构图如下:

RocketMQ

Docker 安装 RocketMQ

docker-compose.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    networks:
      rmq:
        aliases:
          - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: 'rmqnamesrv:9876'
      JAVA_OPTS: ' -Duser.home=/opt'
      JAVA_OPT_EXT: '-server -Xms128m -Xmx128m -Xmn128m'
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
      JAVA_OPTS: '-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false'
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

需要注意 rocketmq-broker 与 rokcetmq-console 都需要与 rokcetmq-nameserver 连接,需要知道 nameserver ip。使用 docker-compose 之后,上面三个 docker 容器将会一起编排,可以直接使用容器名代替容器 ip,如这里 nameserver 容器名 rmqnamesrv

broker.conf

RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

censed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192.168.205.10

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

精简配置文件:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.205.10

配置完成,启动:

docker-compose up -d

访问 http://rmqIP:8080 进入控制台。

RocketMQ控制台

Spring Cloud Stream

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念(Kafka、RabbitMQ、RocketMQ 都是消息中间件)。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

RocketMQ中间件

使用

这里只展示最简单 Demo,更多栗子可以参考 spring cloud alibaba rocketmq

接入

  1. 首先,修改 pom.xml 文件,引入 RocketMQ Stream Starter。
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
  1. 配置 Input 和 Output 的 Binding 信息并配合 @EnableBinding 注解使其生效
spring:
  cloud:
    stream:
      # 配置 rocketmq 的 nameserver 地址
      rocketmq:
        binder:
          name-server: 192.168.205.4:9876
      # 配置 input 和 output binding
      bindings:
        input:
          content-type: text/plain
          destination: test-topic
          group: test-group
        output:
          content-type: text/plain
          destination: test-topic
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class AdminApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketMQApplication.class, args);
    }
}

消息生产者

消息发送服务 SenderService:

@Service
public class SenderService {

    @Autowired
    private MessageChannel output;

    public void send(String msg) throws Exception {
        output.send(MessageBuilder.withPayload(msg).build());
    }

    public <T> void sendWithTags(T msg, String tag) throws Exception {
        Message message = MessageBuilder.createMessage(msg,
                new MessageHeaders(Stream.of(tag).collect(Collectors
                        .toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
        output.send(message);
    }

    public <T> void sendObject(T msg, String tag) throws Exception {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(MessageConst.PROPERTY_TAGS, tag)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        output.send(message);
    }

    public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
        MessageBuilder builder = MessageBuilder.withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
        builder.setHeader("test", String.valueOf(num));
        builder.setHeader(RocketMQHeaders.TAGS, "binder");
        Message message = builder.build();
        output.send(message);
    }
}

发送消息:

@Bean
public CustomRunner customRunner() {
    return new CustomRunner("output");
}

public static class CustomRunner implements CommandLineRunner {

    private final String bindingName;

    public CustomRunner(String bindingName) {
        this.bindingName = bindingName;
    }

    @Autowired
    private SenderService senderService;

    @Override
    public void run(String... args) throws Exception {
        if (this.bindingName.equals("output")) {
            int count = 5;
            for (int index = 1; index <= count; index++) {
                String msgContent = "msg-" + index;
                if (index % 3 == 0) {
                    senderService.send(msgContent);
                }
                else if (index % 3 == 1) {
                    senderService.sendWithTags(msgContent, "tagStr");
                }
                else {
                    senderService.sendObject(new Foo(index, "foo"), "tagObj");
                }
            }
        }
    }
}

@Data
public static class Foo {
    private Integer inx;
    private String msg;

    Foo(Integer inx, String msg) {
        this.inx = inx;
        this.msg = msg;
    }
}

消息消费者

消息接收服务 ReceiveService:

@Service
public class ReceiveService {

    @StreamListener("input")
    public void receiveInput1(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }
}

参考文章:
基于 Docker 安装 RocketMQ
rocketmq 部署启动指南-Docker 版
Spring Alibaba RocketMQ

本站所提供的部分资源来自于网络,版权争议与本站无关,版权归原创者所有!仅限用于学习和研究目的,不得将上述内容资源用于商业或者非法用途,否则,一切后果请用户自负。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源。如果上述内容资对您的版权或者利益造成损害,请提供相应的资质证明,我们将于3个工作日内予以删除。本站不保证所提供下载的资源的准确性、安全性和完整性,源码仅供下载学习之用!如用于商业或者非法用途,与本站无关,一切后果请用户自负!本站也不承担用户因使用这些下载资源对自己和他人造成任何形式的损失或伤害。如有侵权、不妥之处,请联系站长以便删除!
金点网络-全网资源,一网打尽 » 项目实践·Docker RocketMQ

常见问题FAQ

免费下载或者VIP会员专享资源能否直接商用?
本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。
是否提供免费更新服务?
持续更新,永久免费
是否经过安全检测?
安全无毒,放心食用

提供最优质的资源集合

立即加入 友好社区
×