RocketMQ

RocketMQ

Producer

  消息生产者,负责产生消息,一般由业务系统负责产生消息。

Producer Group

  生产者组,一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

Consumer

  消息消费者,消费MQ上的消息的应用程序就是消费者,根据业务的需要对消息进行不同的处理,一般是后台系统负责异步消费。

Push Consumer

  Consumer的一种,应用通常向consumer对象注册一个listener接口,一旦收到消息,consumer对象立刻回调listener接口方法。

Pull Consumber

  Consumer的一种,应用通常主动调用consumer的拉消息方法从broker拉消息,主动权由应用控制。

Consumer Group

  消费者组,和生产者组类似,消费同一类消息的多个consumer示例组成的一个消费者组,消费逻辑一致。

Topic

  Topic是一种消息的逻辑分类,比如在电商中订单、商品、活动等产生的消息,都需要进行分类。订单的消息放到订单的topic,活动的消息放到活动的topic。

Message

  Message是消息的载体。一个message必须指定topic,相当于寄信的地址。Message还可以设置一个可选的tag设置,以便消费端可以基于tag进行消息过滤。

Tag

  Tag可以被认为是topic的进一步细化。一般在相同的业务模块中通过引入标签来标记不同用途的消息。

Broker

  Broker是RocketMQ系统的主要角色,broker接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。一般也称为server,在JMS规范中称为Provider。

Name Server

  Name Server为producer和consumer提供路由信息。作用和zookeeper一样,轻量级的。

广播消费

  一条消息被多个consumer消费,即使这些consumer属于同一个consumer group,消息也会被consumer group中的每个consumer都消费一次,广播消费中的consumer group概念可以认为在消息划分方面无意义。
  在CORBA Notification规范中,消费方式都属于广播消费。在JMS规范中,相当于JMS publish/subscribe model。

集群消费

  一个consumer group中的consumer实例平均分摊消费信息。例如某个topic有9条消息,其中一个consumer group有3个实例(可能是三个进程,也可能是三台机器),那么每个实例消费其中的3条消息。

顺序消息

  消费消息的顺序要同发送消息的顺序一致,在rocketmq中,主要指的是局部顺序,即一类消息为满足顺序性,必须producer单线程顺序发送,且发送到同一个队列,这样consumer就可以按照producer发送的顺序去消费消息。

普通顺序消息

  顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,broker重启,由于队列总数发生变化,哈希取模后定位的队列会发生变化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况下(如某个broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

严格顺序消息

  顺序消息的一种,无论正常异常情况下都能保证顺序,但是牺牲了分布式failover特性,即broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自主切换为主避免,不过仍然会存在几分钟的服务不可用。

  目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

message queue

  在rocketmq中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限的,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。

  也可以认为message queue是一个长度无限的数组,offset就是下标。

RocketMQ特点

  • 是一个队列模型的消息中间件,具有高性能、高可靠性、高实时、分布式特点。
  • Producer、Consumer、队列都可以是分布式。
  • Producer向一些队列轮流发送消息,队列集合称为Topic。Consumer如果做广播消费,则一个consumer示例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
  • 能够保证严格的消息顺序。
  • 提供丰富的消息拉取模式。
  • 高效的订阅者水平拓展能力。
  • 实时的消息订阅机制。
  • 亿级消息堆积能力。
  • 较少的依赖。

RocketMQ物理部署结构

image

  • Name Server是一个几乎无状态节点,可集群部署,节点间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有的Name Server。
  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与Name Server集群中的一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

RocketMQ逻辑部署结构

image

Producer Group

  用来表示一个发送消息的应用,一个Producer Group下包含多个Producer实例,可以使多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group的作用如下:

  • 标识一类Producer。
  • 可以通过运维工具查询这个发送消息应用下有多个Producer实例。
  • 发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内任意一台机器来确认事务状态。

Consumer Group

  用来表示一个消费消息应用,一个Consumer Group下包含多个consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程下的多个consumer对象。一个Consumer Group下的多个consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

RocketMQ数据存储结构

image

  RocketMQ采取一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。对于海量数据、高并发场景也能有效降低端到端延迟,并具备较强的横向拓展能力。

RocketMQ分布式消息系统

消息顺序

  例如订单创建、订单付款、订单完成,需要保证消费的顺序才有意义。所以可以将这几条消息发送到同一个topic队列中,并让同一个消费者来消费,这样就能保证消息的顺序消费。

producer:

1
2
3
4
5
6
7
8
9
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId
);

consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;

}
});

消息重复

  • 消费端处理消息的业务逻辑保持幂等性
  • 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

  RocketMQ不保证消息不重复,如果业务需要保证严格的不重复消息,需要自己在业务端去重。


参考