netty介绍

优点

性能高、易用、java中几乎所有框架的网络通信都使用netty。

沾包、拆包

沾包、拆包问题的产生是因为TCP协议是基于数据流的协议,就像流水一样,没有严格的分界。发送数据的时候,可能有多条数据合并在一起,读取数据的时候也可能是多条数据,或者不完整的数据。

沾包:多条数据合并在一起

拆包:将读取的数据进行拆解,获得单独的一条完整数据

常见解决方案

  • 定长:预估数据长度,定义一个可以容纳所有数据的buffer长度,缺点是浪费带宽,适用于数据大小比较恒定的场景。
  • 换行符符:数据包以换行符结尾(\r or \r\n,例如netty中的LineBasedFrameDecoder)
  • 自定义分隔符:自定义一个分隔符作为数据结尾,缺点是可能会跟正常数据冲突。
  • 自定义消息格式:定义消息头和消息体,在消息头中包含消息长度信息。

编解码

网络通信的底层数据都是以字节传输,在接收到数据之后我们需要将字节码进行编码,得到业务数据,例如字符串,对象,httpReauest等。

netty已经内置了很多编解码器,例如:StringDecoder/StringEncoder

自定义编解码器,只需要实现MessageToMessageDecoder(解码)和MessageToMessageEncoder(编码)接口即可。

在很多RPC框架中都会自己实现一套编解码器,将数据反序列化成对象或者将对象序列化成字节。

使用

示例代码

jdk为了兼容各平台操作系统,对nio做了很多兼容处理,如果确定程序是在linux上运行,则可以直接使用epoll的实现。只需要将NioEventLoopGroup替换为EpollEventLoopGroup,并且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 即可

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.ReferenceCountUtil;

public class NettyServer {

    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(5);
        EventLoopGroup worker = new NioEventLoopGroup(20);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) {
                            sc.pipeline().addLast(new StringDecoder());
                            sc.pipeline().addLast(new StringEncoder());
                            sc.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.childOption(ChannelOption.SO_KEEPALIVE, true);
            System.out.println("server start , port : " + port);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyServer(6666).start();
    }

    static class EchoServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            try {
                String data = (String) msg;
                if (data == null || data.trim().length() == 0) {
                    return;
                }
                System.out.println("Server received: " + data);
                ctx.writeAndFlush(Unpooled.copiedBuffer(("received: " + data).getBytes()));
            } finally {
                ReferenceCountUtil.release(msg);
            }

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.31.Final</version>
</dependency>