简介 Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件。相比于 Kafka,其拥有更好的实时性和消息可靠性。更适用于和 Money 相关的系统。它支持如下特性:
订阅/发布模式的消息
支持消费组模式的消费,即一个消费组集群内只有一个实例会收到那一条消息。
延时消息
只支持特定 Level 的延时设置,默认有 “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h” 18个 Level。先扔到对应的延时队列,后台线程根据延时再将其挪到实际的 Topic 中。
顺序消息
只保证在单个 Broker 的单个 Queue 内是有序的,全局不保证有序。和 Kafka 一样.。
消息持久化
[主从同步 + 同步刷盘] 模式保证了持久数据的安全性。4.5 版本后加入 Dleger 更是支持了 主从自动切换。
消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现,减少了r无用消息的网络传输。
消息回溯
已消费过的消息,可以根据时间或 key 等维度来重新消费。在处理系统环境异常时很有用。
事务消息
先发送到一个特殊的系统 Topic 中,然后利用 2PC + 事务回查机制,判断将消息转到真正的 Topic 还是抛弃。
死信队列
消费失败并重试一定次数还是失败的的消息会先放到死信队列,需要手动进行重发
消息重试
每个消费组有一个 “%RETRY%+consumerGroup” 的重试队列。重试的消息会按照延迟的时间先放到 “SCHEDULE_TOPIC_XXXX” 队列中,然后才会被保存至 “%RETRY%+consumerGroup” 的重试队列中。
At-Least-Once
消息消费有 ACK 机制,消费结束才返回对应的 ACK 相应。和 Kafka 不太一样,Kafka 追求消息的大量快速处理,默认都是异步,整合成一批来消费生产的。
RoeketMQ 总的来说大致可以分为几个部分:NameServer, Broker Server, Client ( Producer and Consumer )。物理部署逻辑图如下:
NameServer:保存 Topic 路由信息,NameServer 实例之间不通信
Broker:Broker 有主从之分,Broker Name 相同的,BrokerId 为 0 的则为主服务器;每个 Broker 都需要向所有的 NameServer 注册,并周期性的向其发注册请求
Client(Consumer & Producer):周期性的去 NameServer 获取路由信息,周期性的向所有 Broker 发送心跳
NameServer NameServer 有点类似于 Kafka 中 ZooKeeper 的作用,其充当一个路由注册中心,维护所有的 Broker 和 Topic 路由的信息。但是和 ZooKeeper 不同,集群内的 NameServer 之间是不通信的。 Broker 启动时需要向所有的 NameServer 实例注册,并定时向其发送心跳信息。因此,每个 NameServer 都是包含了所有的 Broker 和 Topic 的路由信息的,是一个完整的个体。
大致看下 NameServer 启动流程[基于代码版本 4.5.2 ]:
构建 NamesrvController –> NamesrvController 初始化 –> NamesrvController 启动
NamesrvController 初始化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer (this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl ("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { NamesrvController.this .kvConfigManager.printAllPeriodically(); } }, 1 , 10 , TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { } return true ; }
可以看出,其主要的行为就是:
启动对应的 RemotingServer 用于网络通信
注册其对应的 RequestProcessor 用于处理接收的特定请求
启动定时任务:扫描不活跃的 Broker 并移除之、打印对应的配置信息
RouteInfoManager 管理的数据:
1 2 3 4 5 6 7 8 9 public class RouteInfoManager { private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 ; private final ReadWriteLock lock = new ReentrantReadWriteLock (); private final HashMap<String, List<QueueData>> topicQueueTable; private final HashMap<String, BrokerData> brokerAddrTable; private final HashMap<String, Set<String>> clusterAddrTable; private final HashMap<String, BrokerLiveInfo> brokerLiveTable; private final HashMap<String, List<String>> filterServerTable; }
NameServer 的 RequestProcessor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class NamesrvController { private void registerProcessor () { if (namesrvConfig.isClusterTest()) { this .remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor (this , namesrvConfig.getProductEnvName()), this .remotingExecutor); } else { this .remotingServer.registerDefaultProcessor(new DefaultRequestProcessor (this ), this .remotingExecutor); } } } public class DefaultRequestProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null ) { log.debug("receive request, {} {} {}" , request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this .putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this .getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this .deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this .registerBrokerWithFilterServer(ctx, request); } else { return this .registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this .unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this .getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this .getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this .wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this .getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this .getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this .getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this .getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this .getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this .getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this .updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this .getConfig(ctx, request); default : break ; } return null ; } }
从上面可以看出,NameServer 主要处理如下几种类型的 Request,这也正是整个 NameServer 的作用:
NameServer 的配置管理
UPDATE_NAMESRV_CONFIG
GET_NAMESRV_CONFIG
kvConfig 的管理
PUT_KV_CONFIG
GET_KV_CONFIG
DELETE_KV_CONFIG
GET_KVLIST_BY_NAMESPACE
Broker 的管理
QUERY_DATA_VERSION
REGISTER_BROKER
UNREGISTER_BROKER
GET_BROKER_CLUSTER_INFO
WIPE_WRITE_PERM_OF_BROKER
Topic 路由信息的管理
GET_ROUTEINTO_BY_TOPIC
GET_ALL_TOPIC_LIST_FROM_NAMESERVER
DELETE_TOPIC_IN_NAMESRV
GET_TOPICS_BY_CLUSTER
GET_SYSTEM_TOPIC_LIST_FROM_NS
GET_UNIT_TOPIC_LIST
GET_HAS_UNIT_SUB_TOPIC_LIST
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST
NameServer 处理 REGISTER_BROKER 请求 Broker 向 NameServer 注册,类似于充当了一个心跳的作用。从 BrokerController 中可以看出,Broker 启动的时候就需要向所有的 NameServer 发送 REGISTER_BROKER 请求去注册自己的相关信息,并定默认时每 30s 再去注册一次。
结合 NameServer 的 scanNotActiveBroker 定时任务,NameServer 便可以维护一个较为实时的 Broker 信息列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class BrokerController { public void start () throws Exception { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { BrokerController.this .registerBrokerAll(true , false , brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception" , e); } } }, 1000 * 10 , Math.max(10000 , Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000 )), TimeUnit.MILLISECONDS); } }
再看看 NameServer 如何处理该请求,大体流程如下:
根据请求解析出请求头和请求体
请求头信息包含:brokerName、brokerAddr、clusterName、haServerAddr、 brokerId、compressed
请求体包含:filterServerList、TopicConfigSerializeWrapper[ConcurrentMap<String, TopicConfig> topicConfigTable、DataVersion dataVersion]
根据请求头和请求体去更新 NameServer 本地维护的 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable 等信息
看看其主要行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public class RouteInfoManager { public RegisterBrokerResult registerBroker ( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult (); try { try { this .lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this .clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet <String>(); this .clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false ; BrokerData brokerData = this .brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true ; brokerData = new BrokerData (clusterName, brokerName, new HashMap <Long, String>()); this .brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this .isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null ) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this .createAndUpdateQueueData(brokerName, entry.getValue()); } } } } BrokerLiveInfo prevBrokerLiveInfo = this .brokerLiveTable.put(brokerAddr, new BrokerLiveInfo ( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}" , brokerAddr, haServerAddr); } if (filterServerList != null ) { if (filterServerList.isEmpty()) { this .filterServerTable.remove(brokerAddr); } else { this .filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null ) { BrokerLiveInfo brokerLiveInfo = this .brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null ) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this .lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception" , e); } return result; } }
Client RocketMQ 的 Client 从 NameServer 获取路由信息,并定时向所有的 Broker 发送心跳信息。
生产者 启动 生产者的话,我们主要从构造 DefaultMQProducerImpl
开始,随后就是启动该生产者了。构造函数主要是对 Executor 的一个初始化,暂且不看,我们直接看看其启动方法做的事情:
注册 Producer Group
调用 MQClientInstance
的 start 方法
如果没有配置 NameServer 地址,首先获取 NameServerAddr
启动网络应答模块 request-response channel
启动系列周期定时任务
每两分钟获取 NameServerAddr
默认每 30s 从 NameServer 获取 Topic 路由信息
默认每 30s 向所有 Broker 发送心跳信息
默认每 5s 提交持久化消费 offset 记录
每分钟调整一次线程池
启动拉取消息的线程服务
启动 reblance 的线程服务
向所有 Broker 发送心跳信息 并上传对应的 Filter class 信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class DefaultMQProducerImpl implements MQProducerInner { public void start (final boolean startFactory) throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this ); if (startFactory) { mQClientFactory.start(); } break ; } this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } } public class MQClientInstance { public void start () throws MQClientException { synchronized (this ) { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; if (null == this .clientConfig.getNamesrvAddr()) { this .mQClientAPIImpl.fetchNameServerAddr(); } this .mQClientAPIImpl.start(); this .startScheduledTask(); this .pullMessageService.start(); this .rebalanceService.start(); this .defaultMQProducer.getDefaultMQProducerImpl().start(false ); log.info("the client factory [{}] start OK" , this .clientId); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: break ; case SHUTDOWN_ALREADY: break ; case START_FAILED: throw new MQClientException ("The Factory object[" + this .getClientId() + "] has been created before, and failed." , null ); default : break ; } } } }
Produce 对于生产消息,有几点需要注意 [代码较多就不贴了]
发送消息时如何选择 Broker 和 其中的 queue
大致就是根据 Topic 获取路由信息 -> 取随机数(后续则取得是之前的随机数+1)后取模获取 MessageQueue -> LatencyFaultTolerance 中判断其是否有效,有效则选择该 MessageQueue;否则的话,从 LatencyFaultTolerance 中选取一个 Broker;如果该 Broker 有的 writeQueueNums > 0, 则取模选其中的一个 queue;否则重新走一遍流程。 其中 LatencyFaultTolerance 的核心就是针对之前失败的请求,按照一定时间来做退避。即短时间内不再选取其作为发送目的地
发送的 Topic 如果还未创建的话是如何处理的
如果 Topic 还未创建的话,这个时候从本地或是 NameServer 都是没办法根据该 Topic Name 获取到对应的路由信息的。
不过 RocketMQ 是可以支持自动创建 Topic 的(生产不建议打开就是了)。
其实际就是获取所有打开了 autoCreateTopic
配置的 Broker 的 默认 Topic: TBW102 (如果autoCreateTopic=true,该 Topic 会在 Broker 启动的时候自动被注册到 NameServer)的路由信息信息。
但是这么做可能会导致消息的负载不均衡,我们以发送一个 UNKNOWN_TOPIC 为例说明下(开启了 autoCreateTopic):
从 NameServer 获取 UNKNOWN_TOPIC 路由信息,但获取不到
获取 TBW102 路由信息
选择 Broker & Queue 发送消息到对应的 Broker
Broker 收到消息后做对应的存储持久化,并将该 UNKNOWN_TOPIC 注册到 NameServer(此时 NameServer 含有 UNKNOWN_TOPIC 的路由信息了,但是只有该 Broker的)
客户端后台定时任务从 NameServer 获取路由信息并更新本地的记录
假如在步骤5执行前,只有一条消息发送到了一个 Broker,那么此后岂不是该 Topic 的所有信息只能发送到这一个 Broker了,就失去了负载均衡的效果了。
发送的消息是事务消息时如何处理的
RT,流程如图所示。总的来说是有点类似于 2PC 的一个模式。但是加了个回查机制,这样可以处理网络请求的未知状态问题。
首次发送的消息属于 Half 消息,包含了消息的所有信息,但是其并不会直接发送到对应的 Topic 中去,而是会发送到名为 RMQ_SYS_TRANS_HALF_TOPIC 的系统 Topic 中。Broker 会有定时任务去检查该 Topic 中还未处理的的消息,然后去回查事务的状态,判断该事务是需要 commit (转移到真实的 Topic Queue 中)还是 rollback;当然回查不会是无线的,默认是回查 15 次没结果的话就会回滚。
那么如何才能知道该消息是否处理了? RocketMQ 的做法是额外引入了个 Op 消息,就是对于每个 Message,在 Commit or Rollback 后,都会有一条对于的 Op 信息,没有的话就说明该消息还未处理完成。
(为啥不使用拓展属性字段来表示?)
详细的可以查看 RocketMQ GitHub 上的设计文档:设计(design)
消费者 消费的启动整体和生产者类似。其中需要比较需要注意的是其内部的 Rebalance 的实现。这是一个后台定时任务,在启动 MQInstance 的时候就启动了。
Rebalance 我们这里主要看看针对集群消费模式的 Rebalnace。总的来说就是 RocketMQ 有个后台线程周期性的在执行 Rebanlance 的任务,默认为 20000ms。执行 Rebalance 实际是按照 Topic 来划分的,具体针对单个 Topic 进行 Rebalance 的流程大致如下:
根据 Topic 获取其当前对应的 MessageQueue 和 Consumer ID
根据配置的分配策略对数据进行分配处理,得出当前 Consumer 的消费 Mssageueue 列表。总共有如下几种分配策略
AllocateMachineRoomNearby
AllocateMessageQueueAveragely(默认)
AllocateMessageQueueAveragelyByCircle
AllocateMessageQueueByConfig
AllocateMessageQueueByMachineRoom
AllocateMessageQueueConsistentHash
策略讲解参考 Blog:RocketMQ-负载均衡
根据新的分配结果调整本地的消费分配数据 processQueueTable (ConcurrentMap<MessageQueue, ProcessQueue>
)
如果本地消费分配缓存数据有调整,则调整本地拉取消息的线程任务;删除失效的 MessageQueue 相关拉取任务,添加新的 MessageQueue 相关拉取任务
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public abstract class RebalanceImpl { private void rebalanceByTopic (final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { case CLUSTERING: { Set<MessageQueue> mqSet = this .topicSubscribeInfoTable.get(topic); List<String> cidAll = this .mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null ) { List<MessageQueue> mqAll = new ArrayList <MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this .allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null ; try { allocateResult = strategy.allocate( this .consumerGroup, this .mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}" , strategy.getName(), e); return ; } Set<MessageQueue> allocateResultSet = new HashSet <MessageQueue>(); if (allocateResult != null ) { allocateResultSet.addAll(allocateResult); } boolean changed = this .updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}" , strategy.getName(), consumerGroup, topic, this .mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this .messageQueueChanged(topic, mqSet, allocateResultSet); } } break ; } } } }
Broker Server 整体设计架构 作为 RocketMQ 的核心部分,我们首先看看 Broker 其大体的一个架构。说有模块的都是基于其 Romoting Module 网络通信模块来实现的。
Client Manager:管理客户端并维护 Consumer 的 Topic 订阅信息
Store Service:负责消息存储服务
HA Service: 负责主从 Broker 的数据同步
Index Service:负责根据消息 Key 对其进行索引的服务
消息文件:
CommitLog:消息数据的实际存储记录
ConsumeQueue:消息消费队列,提高消息消费的性能。存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
,保存了指定 Topic下 的队列消息在 CommitLog 中的起始物理偏移量,消息大小和消息 Tag 的 HashCode 值。
IndexFile:消息索引文件,方便通过 key 或时间区间来查询消息。存储路径为:$HOME\store\index${fileName}
其中 ConsumeQueue 和 IndexFile 是后台线程根据 CommitLog 异步生成的。
启动
initialize
加载 topics.json、consumerOffset.json、subscriptionGroup.json、consumerFilter.json 文件恢复之前存储的相关数据
初始化 ThreadPoolExecutor
注册对应请求的处理器
SendMessageProcessor: SEND_MESSAGE、SEND_MESSAGE_V2、SEND_BATCH_MESSAGE、CONSUMER_SEND_MSG_BACK
PullMessageProcessor: PULL_MESSAGE
QueryMessageProcessor: QUERY_MESSAGE、VIEW_MESSAGE_BY_ID
ClientManageProcessor: HEART_BEAT、UNREGISTER_CLIENT、CHECK_CLIENT_CONFIG
ConsumerManageProcessor: GET_CONSUMER_LIST_BY_GROUP、UPDATE_CONSUMER_OFFSET、QUERY_CONSUMER_OFFSET
EndTransactionProcessor: END_TRANSACTION
AdminBrokerProcessor: 命令较多,大多都是些和 Broker 相关的配置和状态获取调整的命令
启动一些打印 Broker 运行相关信息的后台周期性线程:consumerOffset 持久化的周期性线程( 默认 5s 一次)、consumerFilter 持久化的周期性线程( 10s 一次)、Broker 保护启动探测线程( 如果有开启的话[默认 false],消费者消费太慢会被禁止消费)
获取 NameServer 地址 和 SSL 、ACL、Deleger、Transaction 等的初始化
start
启动各模块服务,messageStore、remotingServer、filterServerManager、brokerOuterAPI、brokerStatsManager等
启动周期性任务(默认 30s 一次;最小间隔 10s,最大间隔 60s ):向所有的 NameServer 注册该 Broker 的 Topic 等信息
处理请求 上面已经说了,每个请求都有对应的 Processor;这里我们就从 SEND_MESSAGE 的请求来看看其实如何处理的,这里我们只看最简单的单个消息发送处理的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this .consumerSendMsgBack(ctx, request); default : SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null ) { return null ; } mqtraceContext = buildMsgContext(ctx, requestHeader); this .executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this .sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this .sendMessage(ctx, request, mqtraceContext, requestHeader); } this .executeSendMessageHookAfter(response, mqtraceContext); return response; } } private RemotingCommand sendMessage (final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this .brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this .brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}" , request); final long startTimstamp = this .brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this .brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s" , UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1 ); super .msgCheck(ctx, requestHeader, response); if (response.getCode() != -1 ) { return response; } final byte [] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this .brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0 ) { queueIdInt = Math.abs(this .random.nextInt() % 99999999 ) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner (); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this .getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null ; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this .brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this .brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden" ); return response; } putMessageResult = this .brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this .brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); } } public class CommitLog { public PutMessageResult putMessage (final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis()); msg.setBodyCRC(UtilAll.crc32(msg.getBody())); AppendMessageResult result = null ; StoreStatsService storeStatsService = this .defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long elapsedTimeInLock = 0 ; MappedFile unlockMappedFile = null ; MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); try { long beginLockTimestamp = this .defaultMessageStore.getSystemClock().now(); this .beginTimeInLock = beginLockTimestamp; msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, null ); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break ; case END_OF_FILE: unlockMappedFile = mappedFile; mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); if (null == mappedFile) { log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); break ; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result); default : beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result); } elapsedTimeInLock = this .defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0 ; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500 ) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}" , elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this .defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this .defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult (PutMessageStatus.PUT_OK, result); storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); handleDiskFlush(result, putMessageResult, msg); handleHA(result, putMessageResult, msg); return putMessageResult; } }
大体流程总结如下:
根据 RequestCode 判断是否是重试发送的消息以及消息的类型:批量发送、事务消息 or 单条消息
构造并校验消息数据:topic是否存在、是否与系统默认 topic 冲突、是否可写、broker 是否可写等
根据是否是事务消息执行 transactionalMessageService.prepareMessage
or messageStore.pusMessage
messageStore.pusMessage
中校验消息数据大小是否超标、Broker 是否为主等后调用 commitLog.putMessage(msg)
commitLog 中最终将 msg append 到 MappedFile 中
commitLog.handleDiskFlush
根据 FlushDiskType
来执行对应的 Flush 行为(取决于是同步刷盘还是异步刷盘)
commitLog.handleHA
根据主从同步模式执行相关行为
Apache RocketMQ开发者指南