Donnie

不积跬步无以至千里

Netty Demo

服务端:

    public class Server {
    
        /**
         * 开启服务的方法
         */
        public void StartNetty(){
    
            /**
             *创建两个EventLoop的组,EventLoop 这个相当于一个处理线程,
             是Netty接收请求和处理IO请求的线程。
    
            相关资料:NioEventLoopGroup是一个处理I/O操作的多线程事件循环。
            Netty为不同类型的传输提供了各种EventLoopGroup实现。
            在本例中,我们正在实现一个服务器端应用程序,因此将使用两个NioEventLoopGroup。
            第一个,通常称为“boss”,接受传入的连接。
            第二个,通常称为“worker”,当boss接受连接并注册被接受的连接到worker时,处理被接受连接的流量。
            使用了多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
            */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                //1、创建启动类
                ServerBootstrap bootstrap = new ServerBootstrap();
                //2,配置启动参数等
                /**
                 * 设置循环线程组,前者用于处理客户端连接事件,后者用于处理网络IO(server使用两个参数这个)
                 */
                bootstrap.group(bossGroup, workerGroup);
                /**
                 * 设置选项
                 * 参数:Socket的标准参数(key,value)
                 */
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    
                //用于构造socketchannel工厂
                bootstrap.channel(NioServerSocketChannel.class);
    
                /**
                 * 传入自定义客户端Handle(服务端在这里搞事情)
                 */
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 注册handler
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });
    
                /**
                 * 绑定端口,开始接收进来的连接
                 */
                ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 9999).sync();
    
                // 等待服务器 socket 关闭 。
                channelFuture.channel().closeFuture().sync();
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
    
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
    
    
    
        }
    
    
        public static void main(String[] args) {
    
    
            new Server().StartNetty();
        }
    
    
    }

服务端Handler:

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 本方法用于读取客户端发送的信息
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中
            result.readBytes(result1);
            String resultStr = new String(result1);
            // 接收并打印客户端的信息
            System.out.println("Client said:" + resultStr);
            // 释放资源,这行很关键
            result.release();
    
            // 向客户端发送消息
            String response = "hello client! I am Server";
            // 在当前场景下,发送的数据必须转换成ByteBuf数组
            ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
            encoded.writeBytes(response.getBytes());
            ctx.write(encoded);
            ctx.flush();
        }
    
        /**
         * 信息获取完毕后操作
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        /**
         * 本方法用作处理异常
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }

客户端:

    public class Client {
    
    
        public void connect(String ip, int port) throws Exception {
            EventLoopGroup worker = new NioEventLoopGroup();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
    
                //EventLoop的组
                bootstrap.group(worker);
                //用于构造socketchannel工厂
                bootstrap.channel(NioSocketChannel.class);
    
                //设置选项
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
    
                /** 开启客户端监听*/
                ChannelFuture f = bootstrap.connect(ip, port).sync();
    
                /**等待数据直到客户端关闭*/
                f.channel().closeFuture().sync();
    
            } catch (Exception e) {
    
            }finally {
                worker.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Client client=new Client();
            client.connect("127.0.0.1", 9999);
        }
    
    }

客户端handler:

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 本方法用于接收服务端发送过来的消息
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            result.readBytes(result1);
            System.out.println("Server said:" + new String(result1));
            result.release();
    
        }
    
        /**
         * 本方法用于向服务端发送信息
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String msg = "hello Server,I am client";
            ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
            encoded.writeBytes(msg.getBytes());
            ctx.write(encoded);
            ctx.flush();
        }
    
        /**
         * 本方法用于处理异常
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
            // 当出现异常就关闭连接
            cause.printStackTrace();
    
            ctx.close();
        }
    }
赞赏支持