Selector
Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.
使用 Selector 的图解如下:
为了使用 Selector, 我们首先需要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞, 直到注册在 Selector 中的 Channel 发送可读写事件. 当这个方法返回后, 当前的这个线程就可以处理 Channel 的事件了.
创建选择器
通过 Selector.open()方法, 我们可以创建一个选择器:
Selector selector = Selector.open();
将 Channel 注册到选择器中
为了使用选择器管理 Channel, 我们需要将 Channel 注册到选择器中:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意
, 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即channel.configureBlocking(false);
因为 Channel 必须要是非阻塞的, 因此 FileChannel 是不能够使用选择器的, 因为 FileChannel 都是阻塞的.
注意到, 在使用 Channel.register()方法时, 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣, 这些事件有:
Connect, 即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT
Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPT
Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读.
Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写.
一个 Channel发出一个事件也可以称为 对于某个事件, Channel 准备好了. 因此一个 Channel 成功连接到了另一个服务器也可以被称为 connect ready.
我们可以使用或运算|来组合多个事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意, 一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set
. 例如:
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 注册到同一个 Selector 两次了, 那么第二次的注册其实就是相当于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE.
关于 SelectionKey
如上所示, 当我们使用 register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了如下内容:
interest set, 即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set.
ready set
channel
selector
attached object, 可选的附加对象
interest set
我们可以通过如下方式获取 interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready set
代表了 Channel 所准备好了的操作.
我们可以像判断 interest set 一样操作 Ready set, 但是我们还可以使用如下方法进行判断:
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和 Selector
我们可以通过 SelectionKey 获取相对应的 Channel 和 Selector:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attaching Object
我们可以在selectionKey中附加一个对象:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
或者在注册时直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通过 Selector 选择 Channel
我们可以通过 Selector.select()方法获取对某件事件准备好了的 Channel, 即如果我们在注册 Channel 时, 对其的可写事件感兴趣, 那么当 select()返回时, 我们就可以获取 Channel 了.
注意
, select()方法返回的值表示有多少个 Channel 可操作.
获取可操作的 Channel
如果 select()方法返回值表示有多个 Channel 准备好了, 那么我们可以通过 Selected key set 访问这个 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
注意, 在每次迭代时, 我们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 因为 select() 方法仅仅是简单地将就绪的 IO 操作放到 selectedKeys 集合中, 因此如果我们从 selectedKeys 获取到一个 key, 但是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中.
例如此时我们收到 OP_ACCEPT 通知, 然后我们进行相关处理, 但是并没有将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 我们还可以在 SelectedKeys 中获取到 OP_ACCEPT 的 key.注意, 我们可以动态更改 SekectedKeys 中的 key 的 interest set.
例如在 OP_ACCEPT 中, 我们可以将 interest set 更新为 OP_READ, 这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了.
Selector 的基本使用流程
通过 Selector.open() 打开一个 Selector.
将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
-
不断重复:
调用 select() 方法
调用 selector.selectedKeys() 获取 selected keys
-
迭代每个 selected key:
*从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
*判断是哪些 IO 事件已经就绪了, 然后处理它们.
如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
*根据需要更改 selected key 的监听事件.
*将已经处理过的 key 从 selected keys 集合中删除.
关闭 Selector
当调用了 Selector.close()方法时, 我们其实是关闭了 Selector 本身并且将所有的 SelectionKey 失效, 但是并不会关闭 Channel.
完整的 Selector 例子
public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 打开服务端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打开 Selector
Selector selector = Selector.open();
// 服务端 Socket 监听8080端口, 并配置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 将 channel 注册到 selector 中.
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
// 注册到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
// 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
keyIterator.remove();
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
// 代表客户端的连接
// 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
// 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
本文由 yongshun 发表于个人博客, 采用署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议.
非商业转载请注明作者及出处. 商业转载请联系作者本人
Email: yongshun1228@gmail.com
本文标题为: Java NIO 的前生今世 之四 NIO Selector 详解
本文链接为: segmentfault.com/a/1190000006824196