消息中间件RocketMQ原理解析 - 斩秋一:consumer 启动流程 指定 group 订阅 topic 注册消息监听处理器,当消息到来时消费消息 消费端 Start 复制订阅关系 初始化 rebalance 变量 构建 offsetStore 消费进度存储对象 启动消费消息服务 向 恢复写入消息时,消费记录队列的 offset (2) 恢复每个队列的最小 offset 5. 初始化通信层 6. 初始化线程池 7. 注册 broker 端处理器用来接收 client 请求后选择处理器处理 8. 启动每天凌晨 00:00:00 统计消费量任务 9. 启动定时刷消费进度任务 10. 启动扫描数据被删除了的 topic,offset 记录也对应删除任务 11 callback, 让后从缓存中 移除再释放请求 5 processRequestCommand 接收请求处理 根据请求 code 查找对应的处理器线程池 pair, 没有用默认的 有处理器处理请求返回 RemotingCommand 对象的响应 response 若不是 onewayRpc 给 response 设置 opaque0 码力 | 57 页 | 2.39 MB | 1 年前3
Apache RocketMQ 从入门到实战名称,使用的队列 数量为 DefaultMQProducer#defaultTopicQueueNums,即默认为 4。 Step4:Broker 端收到消息后的处理流程 服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时, 会调用 super.msgCheck 方法: AbstractSendMessageProcessor#msgCheck 本文 网络处理机制概述 RocketMQ 的网络设计非常值得我们学习与借鉴,首先在客户端端将不同的请求定义 不同的请求命令 CODE,服务端会将客户端请求进行分类,每个命令或每类请求命令定义 一个处理器(NettyRequestProcessor),然后每一个 NettyRequestProcessor 绑定到 一个单独的线程池,进行命令处理,不同类型的请求将使用不同的线程池进行处理,实现线 程隔离。 busy、broker busy 原因分析与解决方案坑 NettyRequestProcessor RocketMQ 服务端请求处理器,例如 SendMessageProcessor 是消息发送处理器、 PullMessageProcessor 是消息拉取命令处理器。 RequestCode 请求 CODE,用来区分请求的类型,例如 SEND_MESSAGE:表示该请求为消息发 送,0 码力 | 165 页 | 12.53 MB | 1 年前3
共 2 条
- 1













