Netty分布式Server启动流程服务端初始化源码分析
概述
Netty是一个高性能的基于Java NIO的网络编程框架,可以实现异步的、事件驱动的网络应用程序。
本文将对Netty分布式Server启动流程的服务端初始化源码进行详细分析,从源码实现的角度解析Netty分布式Server启动流程,并提供两个具体的示例来说明。
Netty分布式Server启动流程服务端初始化源码分析
Netty分布式Server启动流程
Netty分布式Server启动流程如下:
- ServerBootstrap:启动Netty的Server端,包括配置Server的参数、设置Server线程、设置ChannelHandler等。
- group:创建线程池,来处理任务的执行。Server的Channel接收到客户端的连接后,将会产生一个Channel实例。这个Channel实例会被提交给一个EventLoopGroup实例中的一个EventLoop来处理,实现任务的异步处理。
- ServerChannel:是一个特殊的Channel类型,主要作用是接收客户端的连接请求,并为每个连接创建一个对应的SocketChannel。
- ChannelInitializer:用于初始化Channel,配置Channel的ChannelHandler,定义 ChannelInitializer 的时候可以添加多个ChannelHandler。
- ChannelHandler:处理业务逻辑,例如编解码、数据传输、心跳检测等。
服务端初始化源码分析
创建连接线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
以上代码创建了两个线程池,一个是用于处理 ServerSocketChannel 的 boss 线程组,另一个是用于处理 SocketChannel 的 work 线程组。
创建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
通过 ServerBootstrap 类创建服务器端启动引导类。
配置Server参数
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加处理器
ch.pipeline().addLast(new MyHandler());
}
});
bootstrap.group(bossGroup, workGroup)
:设置EventLoopGroup。bootstrap.channel(NioServerSocketChannel.class)
:指定使用NioServerSocketChannel作为服务器的通道实现。bootstrap.option(ChannelOption.SO_BACKLOG, 1024)
:设置TCP参数,即最大等待客户端连接数。bootstrap.option(ChannelOption.TCP_NODELAY, true)
:设置TCP参数,禁用 Nagle 算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为 true。bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...})
:添加处理器,用于初始化 SocketChannel。
启动服务器
ChannelFuture future = bootstrap.bind(8888).sync();
通过 bind 方法绑定公网 IP 和端口,等待客户端连接,启动服务器。
到此为止,Netty分布式Server的服务端初始化源码分析结束。
示例说明
示例1
在这个示例中,我们将启动一个基于Netty的Server端,并监听指定的IP和端口。每当有客户端连接上来后,就向客户端发送消息。
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// bossGroup 用于处理连接请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workGroup 用于数据处理
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建 ServerBootstrap 对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup
bootstrap.group(bossGroup, workGroup)
// 设置 Channel 类型
.channel(NioServerSocketChannel.class)
// 设置 ChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
// 初始化 Channel
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 打印收到的消息
System.out.println("Server receive message:" + msg);
// 向客户端发送消息
ctx.writeAndFlush("Server reply message:" + msg);
}
});
}
});
// 绑定端口
ChannelFuture future = bootstrap.bind(8080).sync();
// 阻塞线程,知道服务器关闭
future.channel().closeFuture().sync();
} finally {
// 关闭 EventLoopGroup
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
以上代码启动的Server端监听的是本机的8080端口,将客户端发来的消息逆发回去(加上"Server reply message:"前缀)。
示例2
在这个示例中,我们将启动一个基于Netty的Server端,并监听指定的IP和端口。每当有客户端连接上来后,就向客户端随机发送一个Emoji表情。
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// bossGroup 用于处理连接请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workGroup 用于数据处理
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建 ServerBootstrap 对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup
bootstrap.group(bossGroup, workGroup)
// 设置 Channel 类型
.channel(NioServerSocketChannel.class)
// 设置 ChannelOption
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置 ChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
// 初始化 Channel
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EmojiEncoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
private Random random = new Random();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 向客户端发送随机Emoji表情
ChannelFuture future = ctx.writeAndFlush(new Emoji(random.nextInt(10)));
future.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
// 绑定端口
ChannelFuture future = bootstrap.bind(8080).sync();
// 阻塞线程,知道服务器关闭
future.channel().closeFuture().sync();
} finally {
// 关闭 EventLoopGroup
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private static class EmojiEncoder extends MessageToMessageEncoder<CharSequence> {
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
for (int i = 0; i < msg.length(); i++) {
out.add(new Emoji(msg.charAt(i)));
}
}
}
private static class Emoji {
private int code;
public Emoji(int code) {
this.code = code;
}
@Override
public String toString() {
return new String(Character.toChars(code));
}
}
}
以上代码启动的Server端监听的是本机的8080端口,每当客户端连接上来后就随机发送一个Emoji表情回去。在此代码中,我们自定义了两个类 EmojiEncoder 和 Emoji,其中 EmojiEncoder 继承了 MessageToMessageEncoder 并对 CharSequence 进行编码,Emoji 模拟了一个 Emoji 表情。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Netty分布式Server启动流程服务端初始化源码分析 - Python技术站