ChengDuXiuu
4/23/2020 - 12:53 PM

基于Netty的WebSocket服务器(Java-简单)

```java
package netty.webScket;

import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.time.LocalDateTime;

/**
 * TODO:<p>  <p/>
 *
 * @package: netty.webScket
 * @Author mac
 * @Date 2020/4/12 8:42 下午
 * @Version V1.0
 **/
public class WebsocketServer {
    private static Log log= LogFactory.get();
    public static void main(String[] args) {
        //初始化主线程池(boss 线程池-连接请求)
        NioEventLoopGroup bossGroup=new NioEventLoopGroup();
        //初始化从线程池(worker 线程池-处理任务数据)
        NioEventLoopGroup workGroup=new NioEventLoopGroup();

        try {
            // 创建服务器启动器
            ServerBootstrap bootstrap= new ServerBootstrap();
            // 指定使用主线程池和从线程池
            bootstrap.group(bossGroup,workGroup)
                    // 指定使用 Nio 通道类型
                    .channel(NioServerSocketChannel.class)
                    // 指定通道初始化器加载通道处理器
                    .childHandler(new WsServerInitializer());

            // 绑定端口号启动服务器,并等待服务器启动
            // ChannelFuture 是 Netty 的回调消息
            ChannelFuture future = bootstrap.bind(5555).sync();
            // 等待服务器 socket 关闭
            future.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 优雅关闭 boos 线程池和 worker 线程池
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }


    }

    private static class WsServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            /** 其中通道(pipeline)中添加的类相当于拦截器,而通道类似于队列,即每个走该通道的请求都会经过拦截器对其进行处理,原理和tcp/ip协议中的数据包封装和拆解类似*/
            ChannelPipeline pipeline=ch.pipeline();
            // 用于支持 Http 协议
            // websocket 基于 http 协议,需要有 http 的编解码器
            pipeline.addLast(new HttpServerCodec());
            // 对写大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            // 添加对 HTTP 请求和响应的聚合器:只要使用 Netty 进行 Http 编程都需要使用
            // 对 HttpMessage 进行聚合,聚合成 FullHttpRequest 或者 FullHttpResponse
            // 在 netty 编程中都会使用到 Handler
            pipeline.addLast(new HttpObjectAggregator(1024 * 64));
            // ---------支持 Web Socket -----------------
            // websocket 服务器处理的协议,用于指定给客户端连接访问的路由: /ws
            // 本 handler 会帮你处理一些握手动作: handshaking(close, ping, pong) ping +pong = 心跳
            // 对于 websocket 来讲,都是以 frames 进行传输的,不同的数据类型对应的 frames 也不同
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            // 添加自定义的 handler
            pipeline.addLast(new ChatHandler());

        }

    }

    private static class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        //用来记录和管理连接进来的客户端。类似于bio中的map(存放socket流和客户端socket)
        private ChannelGroup clients=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
            //获取从客户端传输过来的消息
            String text = textWebSocketFrame.text();
            log.info("接收到的数据: {} ",text);
            //将接收到消息发送到所有客户端
            for (Channel client : clients) {
                //注意所有的 websocket 数据都应该以 TextWebSocketFrame 进行封装
                client.writeAndFlush(new TextWebSocketFrame("服务器收到的消息 :"+ LocalDateTime.now()+",消息为 :"+text));
            }
        }
        /**
         * 当客户端连接服务端之后(打开连接)
         * 获取客户端的 channel,并且放入到 ChannelGroup 中去进行管理
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("ehllo");
            clients.add(ctx.channel());
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            // 当触发 handlerRemoved, ChannelGroup 会自动移除对应客户端的 channel
            //clients.remove(ctx.channel());
            // asLongText()——唯一的 ID
            // asShortText()——短 ID(有可能会重复)
            log.info("客户端断开,channel对应的长id为 :{} ",ctx.channel().id().asLongText());
            log.info("客户端断开,channel对应的短id为 :{} ",ctx.channel().id().asShortText());

        }
    }
}
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="initial-scale=1.0, maximum-scale=1.0, user-scalable=no" />
    <title></title>
    <script type="text/javascript">

        document.addEventListener('plusready', function(){
            //console.log("所有plus api都应该在此事件发生后调用,否则会出现plus is undefined。")

        });

    </script>
</head>
<body>
<div>发送消息</div>
<input type="text" id="msgContent" />
<input type="button" value="点击发送" onclick="CHAT.chat()"/>
<div>接收消息:</div>
<div id="recMsg" style="background-color: gainsboro;"></div>
<script type="application/javascript">
    window.CHAT = {
        socket: null,
        init: function() {
            // 判断浏览器是否支持 websocket
            if(window.WebSocket) {
                // 支持 WebScoekt
                // 连接创建 socket,注意要添加 ws 后缀
                CHAT.socket = new WebSocket("ws://127.0.0.1:5555/ws");
                CHAT.socket.onopen = function() {
                    console.log("连接建立成功");
                };
                CHAT.socket.onclose = function() {
                    console.log("连接关闭")
                };
                CHAT.socket.onerror = function() {
                    console.log("发生错误");
                };
                CHAT.socket.onmessage = function(e) {
                    console.log("浏览器接收到消息:" + e.data);
                    var recMsg = document.getElementById("recMsg");
                    var html = recMsg.innerHTML;
                    recMsg.innerHTML = html + "<br/>" + e.data;
                };
            }
            else {
                alert("浏览器不支持 websocket 协议");
            }
        },
        chat: function() {
            var msg = document.getElementById("msgContent");
            CHAT.socket.send(msg.value);
        }
    }
    CHAT.init();
</script>
</body>
</html>