Java进阶之高并发核心Selector详解
什么是Selector
Selector 是 Java NIO 中的一部分,它是一个可以通过单个线程处理多个 Channel 的组件。
在传统的 IO 模型中,每个连接都需要独立的线程进行处理,而使用 Selector 后,可以使用一个线程来处理多个连接,从而提高了程序的并发处理能力。
Selector 的使用
以下是 Selector 的基本使用过程:
1. 打开一个 Selector。
2. 将 Channel 注册到 Selector 中,并指定需要监听的事件类型。
3. 不断地轮询 Selector,当其中注册的 Channel 发生需要监听的事件时,Selector 返回对应的 SelectionKey,通过 SelectionKey 可以获取到需要处理的 Channel,进而进行业务逻辑处理。
示例1:使用 Selector 实现 TCP 服务器
下面是一个使用 Selector 实现 TCP 服务器的简单示例,其中使用到了 ServerSocketChannel 和 SocketChannel,具体代码如下:
public static void main(String[] args) throws IOException {
// 创建 Selector
Selector selector = Selector.open();
// 创建 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮询 Selector
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = socketChannel.read(buf);
if (len > 0) {
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String receivedMsg = new String(bytes);
System.out.println("received message: " + receivedMsg);
}
}
selectionKeys.remove(selectionKey);
}
}
}
}
该代码段可以实现一个简单的 TCP 服务器,其中通过 ServerSocketChannel 和 SocketChannel 分别实现服务器端和客户端的连接与通信,并使用 Selector 实现了多连接的并发处理。
示例2:使用 Selector 实现 UDP 服务器
下面是一个使用 Selector 实现 UDP 服务器的示例,具体代码如下:
public static void main(String[] args) throws IOException {
// 创建 Selector
Selector selector = Selector.open();
// 创建 DatagramChannel
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress(8080));
datagramChannel.register(selector, SelectionKey.OP_READ);
// 轮询 Selector
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isReadable()) {
DatagramChannel channel = (DatagramChannel) selectionKey.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.clear();
channel.receive(buf);
buf.flip();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
String receivedMsg = new String(bytes);
System.out.println("received message: " + receivedMsg);
// 回复客户端
channel.send(ByteBuffer.wrap(("hello, " + receivedMsg).getBytes()), new InetSocketAddress("localhost", 8081));
}
selectionKeys.remove(selectionKey);
}
}
}
}
该代码段可以实现一个简单的 UDP 服务器,通过 DatagramChannel 完成服务器的启动和数据收发,并使用 Selector 实现了多连接并发处理。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java进阶之高并发核心Selector详解 - Python技术站