基本上所有的网络应用都会示范一个tcp的echo写法。前面我们已经看到了如何使用协程和异步io来做tcp服务器的第一步,accept。下面是一个完整的echo server的实现(完整代码):
package org.github.taowen.daili;
import kilim.Pausable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Main {
public static void main(String[] args) throws Exception {
Scheduler scheduler = new Scheduler();
DailiTask task = new DailiTask(scheduler) {
@Override
public void execute() throws Pausable, Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9090));
serverSocketChannel.configureBlocking(false);
System.out.println("listening...");
scheduler.timeout = 5000;
SocketChannel socketChannel = scheduler.accept(serverSocketChannel);
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
while (scheduler.read(socketChannel, byteBuffer) > 0) {
byteBuffer.flip();
scheduler.write(socketChannel, byteBuffer);
byteBuffer.clear();
}
}
};
scheduler.callSoon(task);
scheduler.loop();
}
}
从上面的代码来看,完全没有异步IO的感觉,代码写出来和传统Java同步网络编码是一样的。除了scheduler.accept,scheduler.read这些操作的主体是scheduler而不是socket,操作的是byte buffer,而不是input/output stream。
这段代码中最关键的是其中的那个task,是一个协程。scheduler.accept,read和accept三处会引起task的跳出执行,跳出的时候会把task当前在做的IO等待记录到内部的一个叫SelectorBooking的身上。以readBlocked为例其过程是这样的:
public int read(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException, Pausable {
int bytesCount = socketChannel.read(byteBuffer);
if (bytesCount > 0) {
return bytesCount;
}
SelectionKey selectionKey = socketChannel.keyFor(selector);
if (null == selectionKey) {
selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
SelectorBooking booking = addSelectorBooking(selectionKey);
selectionKey.attach(booking);
} else {
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
}
SelectorBooking booking = (SelectorBooking) selectionKey.attachment();
booking.readBlocked(getCurrentTimeMillis() + timeout);
return socketChannel.read(byteBuffer);
}
这段代码就是创建一个SelectorBooking,并注册一个SelectionKey,并把booking附着到selection key上。这样selection key被select出来之后,就可以根据booking找到对应唤醒的task。注意的是selection key是一个socket一个的,但是可能对应的有四个操作(accept/connect/read/write),所以booking上可能会有四个被阻塞挂起的task分别对应不同的操作。
而booking和task的交互发生在booking.readBlocked这个调用内部:
public void readBlocked(long deadline) throws Pausable {
if (null != readTask) {
throw new RuntimeException("multiple read blocked on same channel");
}
readDeadline = deadline;
updateDeadline();
readTask = Task.getCurrentTask();
Task.pause(this);
if (readDeadline == -1) {
readUnblocked();
throw new RuntimeException("timeout");
}
}
其中 Task.getCurrentTask 是一个神奇的调用。它可以得到当前的“协程”。得到的这个协程可以在挂起之后调用resume重新唤醒。
Task.pause 是另外一处神奇的调用。它使得当前执行的协程挂起。等到下面那行if被执行到,已经是别的地方调用resume之后的事情了。
通过这样的一些列操作,就完成一个协程的挂起,并把协程和异步io等信息注册到selector上的过程。
主循环只需要调用selector,找到就绪了的selection key,然后根据之前attach的附件找到booking,通过booking找到需要唤醒的协程,然后调用resume就可以让协程上的业务逻辑继续往下执行了:
public void loop() {
while (loopOnce()) {
}
}
boolean loopOnce() {
try {
executeReadyTasks();
doSelect();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
ioUnblocked(iterator);
while (hasDeadSelectorBooking()) {
SelectorBooking booking = selectorBookings.poll();
booking.cancelDeadTasks(getCurrentTimeMillis());
}
return true;
} catch (Exception e) {
LOGGER.error("loop died", e);
return false;
}
}
这种做法非常经典。关键之一显然是利用了协程的pause和resume,把回调转换成顺序的逻辑执行。关键之二就是利用了selection key的附件功能,把协程附着到了selection key上从而在select出来之后可以迅速恢复到阻塞之前的程序状态(resume是一个局部上下文恢复的过程)。