看源码的一些准备工作,以下逻辑会出现多次。
RemotingServer
rocketMq采用NettyRemotingServer来处理各个组件之间的网络通信。
处理消息的方式
NettyRemotingServer的channelRead0()–>NettyServerHandler.processMessageReceived–>NettyServerHandler.processRequestCommand的processorTable处理,其中processorTable的方法是通过下面的方式注册进去
- 注册消息的:通过registerProcessor方法,每种消息根据requestcode,来制定消息处理方式。
- 维护了processorTable对象,HashMap<Integer/ request code /, Pair<NettyRequestProcessor, ExecutorService>>
- 每一种request指定一个ExecutorService和一个registerProcessor来处理
- 处理消息:通过调用processMessageReceived来处理。
NettyClient三种发送消息的方式
见信息的发送
invokeOneway:单向发送无返回值
1 |
|
invokeAsync:异步发送方式,invokeCallback中处理返回值信息
1 |
|
invokeSync:同步发送,等待返回值
1 | public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) |