博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊rocketmq的NettyEncoder及NettyDecoder
阅读量:6273 次
发布时间:2019-06-22

本文共 4889 字,大约阅读时间需要 16 分钟。

本文主要研究一下rocketmq的NettyEncoder及NettyDecoder

NettyEncoder

org/apache/rocketmq/remoting/netty/NettyEncoder.java

public class NettyEncoder extends MessageToByteEncoder
{ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }}
  • 这里继承MessageToByteEncoder,类型是RemotingCommand,先写入header再写入body

RemotingCommand.encodeHeader

org/apache/rocketmq/remoting/protocol/RemotingCommand.java

public ByteBuffer encodeHeader() {        return encodeHeader(this.body != null ? this.body.length : 0);    }    public ByteBuffer encodeHeader(final int bodyLength) {        // 1> header length size        int length = 4;        // 2> header data length        byte[] headerData;        headerData = this.headerEncode();        length += headerData.length;        // 3> body data length        length += bodyLength;        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);        // length        result.putInt(length);        // header length        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));        // header data        result.put(headerData);        result.flip();        return result;    }
  • 这里先写数据总长度,再写入header长度,最后写入header的数据,在写入body数据
  • 整体结构为:Length | Header length | Header data | Body

NettyDecoder

org/apache/rocketmq/remoting/netty/NettyDecoder.java

public class NettyDecoder extends LengthFieldBasedFrameDecoder {    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    private static final int FRAME_MAX_LENGTH =        Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));    public NettyDecoder() {        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);    }    @Override    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {        ByteBuf frame = null;        try {            frame = (ByteBuf) super.decode(ctx, in);            if (null == frame) {                return null;            }            ByteBuffer byteBuffer = frame.nioBuffer();            return RemotingCommand.decode(byteBuffer);        } catch (Exception e) {            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);            RemotingUtil.closeChannel(ctx.channel());        } finally {            if (null != frame) {                frame.release();            }        }        return null;    }}
  • 这里继承LengthFieldBasedFrameDecoder,maxFrameLength为FRAME_MAX_LENGTH,lengthFieldOffset为0,lengthFieldLength为4,lengthAdjustment为0,initialBytesToStrip为4
  • 获取ByteBuffer之后调用RemotingCommand.decode(byteBuffer)

RemotingCommand.decode

org/apache/rocketmq/remoting/protocol/RemotingCommand.java

public static RemotingCommand decode(final ByteBuffer byteBuffer) {        int length = byteBuffer.limit();        int oriHeaderLen = byteBuffer.getInt();        int headerLength = getHeaderLength(oriHeaderLen);        byte[] headerData = new byte[headerLength];        byteBuffer.get(headerData);        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));        int bodyLength = length - 4 - headerLength;        byte[] bodyData = null;        if (bodyLength > 0) {            bodyData = new byte[bodyLength];            byteBuffer.get(bodyData);        }        cmd.body = bodyData;        return cmd;    }    public static int getHeaderLength(int length) {        return length & 0xFFFFFF;    }    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {        switch (type) {            case JSON:                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);                resultJson.setSerializeTypeCurrentRPC(type);                return resultJson;            case ROCKETMQ:                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);                resultRMQ.setSerializeTypeCurrentRPC(type);                return resultRMQ;            default:                break;        }        return null;    }
  • 这里先获取header长度,获取header的数据,decode出来RemotingCommand,之后获取body

小结

rocketmq的netty编码解码,使用的是RemotingCommand对象,编码继承MessageToByteEncoder,解码采用的是LengthFieldBasedFrameDecoder

doc

转载地址:http://kelpa.baihongyu.com/

你可能感兴趣的文章
hive join 数据倾斜 真实案例
查看>>
Object-C代码练习【文件管理练习(每秒写入一个时间到文件)】
查看>>
Redis列表
查看>>
文件查找工具之find命令详解
查看>>
linux命令 — lsof 查看进程打开那些文件 或者 查看文件给那个进程使用
查看>>
PHP+Swoole及时通讯
查看>>
centos安装图形
查看>>
SpringCloud(第 012 篇)电影微服务接入 Feign 进行客户端负载均衡,通过 FeignClient 调用远程 Http 微服务...
查看>>
mysql tomcat redis nginx 版本的查看方法
查看>>
php判断ajax请求
查看>>
C语言中函数strcpy ,strncpy ,strlcpy的用法
查看>>
RedHat Linux卸载rpm包遇到error: specifies multiple packages
查看>>
centos6.4 http dhcp nfs pxe kickstart 自动化安装系统
查看>>
漫游配置文件
查看>>
数据的存储介质-固态存储SSD
查看>>
oracle PL/SQL 流程控制
查看>>
Linux下/proc目录简介
查看>>
jenkins自动同步配置文件
查看>>
Windows系统清理 只需做到五大方面
查看>>
我的友情链接
查看>>