grizzly框架的作者曾经提到NIO框架不应该使用selection key的attach功能(链接)。理由是如果attach到了selection key上,而这个selection key对应的操作迟迟不能就绪(被select出来)。那么这些selection key所attach的附件都是被强引用的,从而无法被gc。如果有大量这样的selection key累积,程序就好像发生了内存泄漏了一样。
但我认为这其实是一个超时处理问题。框架应该支持设置超时,并且可以在超时之后调用框架用户预先设置的处理逻辑,并且释放掉对应的资源。问题是,原生的NIO 1是没有超时支持的。它提供的是selector,可以注册,可以select,可以cancel。但是超时需要自己做记录,程序自己判断超时了,也就是select了老半天了仍然没有就绪,那么就需要去调用cancel把selection key注销掉。如果使用netty这样的封装库,它是把selector的api转成回调的形式,同时也添加了超时的支持。NIO 2除了windows的proactor(OICP)部分之外,对于selector基本上就是一个官方版的netty,也是回调的形式,也支持了超时。
基于协程来封装selector的话,支持超时处理自然也不在话下(代码在此)。如果是回调性质的api,一般的做法是正常就绪给一个callback,超时给另外一个callback。框架根据实际情况决定调用哪个callback。如果是协程的api,最自然的方式自然是抛异常了。
private SocketChannel tryAccept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable {
while(true) {
try {
return scheduler.accept(serverSocketChannel);
} catch (TimeoutException e) {
System.out.println("time out, try again");
continue;
}
}
}
scheduler.accept会有两个路径的返回。一个路径是正常的return一个socket channel,这表明accept阻塞等待成功,拿到了一个socket channel。另外一个返回是抛出了TimeoutException异常,这表明等待超时了。框架要做的就是要在超时的时候抛出这个异常,同时要确保相关的资源这个时候已经释放掉了,不会引起内存泄漏。
首先,需要在做阻塞调用之前说明超时时间的长度。
scheduler.timeout = 5000;
SocketChannel socketChannel = tryAccept(serverSocketChannel);
这里设置的是5秒之后超时。根据这个超时时间可以计算一个dead line:
booking.acceptBlocked(getCurrentTimeMillis() + timeout);
然后拿一个小本子记着这个dead line:
public void acceptBlocked(long deadline) throws Pausable, TimeoutException {
if (null != acceptTask) {
throw new RuntimeException("multiple accept blocked on same channel");
}
acceptDeadline = deadline;
updateDeadline();
acceptTask = Task.getCurrentTask();
Task.pause(this);
if (acceptDeadline == -1) {
acceptUnblocked();
throw new TimeoutException();
}
}
这个dead line会用来计算整个selector booking四个操作的earliest dead line:
public void updateDeadline() {
earliestDeadline = Long.MAX_VALUE;
if (readDeadline > 0 && readDeadline < earliestDeadline) {
earliestDeadline = readDeadline;
}
if (writeDeadline > 0 && writeDeadline < earliestDeadline) {
earliestDeadline = writeDeadline;
}
if (acceptDeadline > 0 && acceptDeadline < earliestDeadline) {
earliestDeadline = acceptDeadline;
}
if (connectDeadline > 0 && connectDeadline < earliestDeadline) {
earliestDeadline = connectDeadline;
}
bookings.remove(this); // when timed out, the booking might be removed already
if (earliestDeadline != Long.MAX_VALUE) {
// add back in case read timed out, but write is still blocking
if (!bookings.offer(this)) {
throw new RuntimeException("update booking failed");
}
}
}
也就是说每个selector booking通过这样的设置都会有一个自己的时间戳(earliestDeadline)。用这个时间戳可以对booking进行一个时间上的排序:
@Override
public int compareTo(SelectorBooking that) {
if (that.earliestDeadline > this.earliestDeadline) {
return -1;
} else if (that.earliestDeadline < this.earliestDeadline) {
return 1;
}
return 0;
}
因为可以排序,所以也就可以用一个PriorityQueue来维护一个链表以记录哪个booking是最近会到期的booking。因为PriorityQueue的排序是发生在插入时的,所以在这个booking的时间戳发生变更的时候,需要从链表中删除然后二次插入已达到更新排序的目的。有了这个排序的链表之后,就可以用来做两个事情:决定selector的select等待时间,以及哪些booking的哪些task是超时了的:
protected int doSelect() throws IOException {
SelectorBooking booking = selectorBookings.peek();
if (null == booking) {
return selector.select();
} else {
long delta = booking.getEarliestDeadline() - getCurrentTimeMillis();
if (delta > 0) {
return selector.select(delta);
} else {
return selector.selectNow();
}
}
}
boolean loopOnce() {
try {
executeReadyTasks();
doSelect();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
ioUnblocked(iterator);
while (hasDeadSelectorBooking()) {
SelectorBooking booking = selectorBookings.poll();
booking.cancelDeadTasks(getCurrentTimeMillis());
}
return true;
} catch (Exception e) {
LOGGER.error("loop died", e);
return false;
}
}
最后就是一件事情了,如果协程所阻塞的io操作确实超时了,如何在超时的调用处抛出异常,以达到走不通业务逻辑路径的目的:
public void cancelDeadTasks(long currentTimeMillis) {
// ...
if (null != acceptTask && currentTimeMillis > acceptDeadline) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_ACCEPT);
acceptDeadline = -1;
updateDeadline();
acceptTask.resume();
if (-1 == acceptDeadline) {
throw new RuntimeException("accept deadline unhandled");
}
}
// ...
if (0 == selectionKey.interestOps()) {
selectionKey.cancel();
}
}
public void acceptBlocked(long deadline) throws Pausable, TimeoutException {
if (null != acceptTask) {
throw new RuntimeException("multiple accept blocked on same channel");
}
acceptDeadline = deadline;
updateDeadline();
acceptTask = Task.getCurrentTask();
Task.pause(this);
if (acceptDeadline == -1) {
acceptUnblocked();
throw new TimeoutException();
}
}
这里是两方面的配合。一方面是在io循环的地方设置一个-1为标志位。然后去唤醒协程。协程唤醒了之后立即去检查-1这个标志位有没有设置,如果设置了,则认为自己被唤醒是因为超时,而不是io操作就绪了。于是TimeoutException被抛出了。特别注意这行:
if (0 == selectionKey.interestOps()) {
selectionKey.cancel();
}
通过在超时之后取消了interestOps,然后在所有interestOps都没有之后自动cancel对应的selection key。这个时候对应的附件也会被垃圾回收给干掉了。只要time out时间合理,grizzly作者之前所说的attach会引发的问题并不会出现。