```java
package netty.webScket;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.time.LocalDateTime;
/**
* TODO:<p> <p/>
*
* @package: netty.webScket
* @Author mac
* @Date 2020/4/12 8:42 下午
* @Version V1.0
**/
public class WebsocketServer {
private static Log log= LogFactory.get();
public static void main(String[] args) {
//初始化主线程池(boss 线程池-连接请求)
NioEventLoopGroup bossGroup=new NioEventLoopGroup();
//初始化从线程池(worker 线程池-处理任务数据)
NioEventLoopGroup workGroup=new NioEventLoopGroup();
try {
// 创建服务器启动器
ServerBootstrap bootstrap= new ServerBootstrap();
// 指定使用主线程池和从线程池
bootstrap.group(bossGroup,workGroup)
// 指定使用 Nio 通道类型
.channel(NioServerSocketChannel.class)
// 指定通道初始化器加载通道处理器
.childHandler(new WsServerInitializer());
// 绑定端口号启动服务器,并等待服务器启动
// ChannelFuture 是 Netty 的回调消息
ChannelFuture future = bootstrap.bind(5555).sync();
// 等待服务器 socket 关闭
future.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 优雅关闭 boos 线程池和 worker 线程池
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private static class WsServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/** 其中通道(pipeline)中添加的类相当于拦截器,而通道类似于队列,即每个走该通道的请求都会经过拦截器对其进行处理,原理和tcp/ip协议中的数据包封装和拆解类似*/
ChannelPipeline pipeline=ch.pipeline();
// 用于支持 Http 协议
// websocket 基于 http 协议,需要有 http 的编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加对 HTTP 请求和响应的聚合器:只要使用 Netty 进行 Http 编程都需要使用
// 对 HttpMessage 进行聚合,聚合成 FullHttpRequest 或者 FullHttpResponse
// 在 netty 编程中都会使用到 Handler
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// ---------支持 Web Socket -----------------
// websocket 服务器处理的协议,用于指定给客户端连接访问的路由: /ws
// 本 handler 会帮你处理一些握手动作: handshaking(close, ping, pong) ping +pong = 心跳
// 对于 websocket 来讲,都是以 frames 进行传输的,不同的数据类型对应的 frames 也不同
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加自定义的 handler
pipeline.addLast(new ChatHandler());
}
}
private static class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//用来记录和管理连接进来的客户端。类似于bio中的map(存放socket流和客户端socket)
private ChannelGroup clients=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
//获取从客户端传输过来的消息
String text = textWebSocketFrame.text();
log.info("接收到的数据: {} ",text);
//将接收到消息发送到所有客户端
for (Channel client : clients) {
//注意所有的 websocket 数据都应该以 TextWebSocketFrame 进行封装
client.writeAndFlush(new TextWebSocketFrame("服务器收到的消息 :"+ LocalDateTime.now()+",消息为 :"+text));
}
}
/**
* 当客户端连接服务端之后(打开连接)
* 获取客户端的 channel,并且放入到 ChannelGroup 中去进行管理
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("ehllo");
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 当触发 handlerRemoved, ChannelGroup 会自动移除对应客户端的 channel
//clients.remove(ctx.channel());
// asLongText()——唯一的 ID
// asShortText()——短 ID(有可能会重复)
log.info("客户端断开,channel对应的长id为 :{} ",ctx.channel().id().asLongText());
log.info("客户端断开,channel对应的短id为 :{} ",ctx.channel().id().asShortText());
}
}
}