资讯专栏INFORMATION COLUMN

【JAVA新生】echo server的第n种写法

Luosunce / 2259人阅读

摘要:基本上所有的网络应用都会示范一个的写法。除了这些操作的主体是而不是,操作的是,而不是。以为例其过程是这样的这段代码就是创建一个,并注册一个,并把附着到上。关键之一显然是利用了协程的和,把回调转换成顺序的逻辑执行。

基本上所有的网络应用都会示范一个tcp的echo写法。前面我们已经看到了如何使用协程和异步io来做tcp服务器的第一步,accept。下面是一个完整的echo server的实现(完整代码):

package org.github.taowen.daili;

import kilim.Pausable;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Main {
    public static void main(String[] args) throws Exception {
        Scheduler scheduler = new Scheduler();
        DailiTask task = new DailiTask(scheduler) {
            @Override
            public void execute() throws Pausable, Exception {
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.socket().bind(new InetSocketAddress(9090));
                serverSocketChannel.configureBlocking(false);
                System.out.println("listening...");
                scheduler.timeout = 5000;
                SocketChannel socketChannel = scheduler.accept(serverSocketChannel);
                socketChannel.configureBlocking(false);
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                while (scheduler.read(socketChannel, byteBuffer) > 0) {
                    byteBuffer.flip();
                    scheduler.write(socketChannel, byteBuffer);
                    byteBuffer.clear();
                }
            }
        };
        scheduler.callSoon(task);
        scheduler.loop();
    }
}

从上面的代码来看,完全没有异步IO的感觉,代码写出来和传统Java同步网络编码是一样的。除了scheduler.accept,scheduler.read这些操作的主体是scheduler而不是socket,操作的是byte buffer,而不是input/output stream。
这段代码中最关键的是其中的那个task,是一个协程。scheduler.accept,read和accept三处会引起task的跳出执行,跳出的时候会把task当前在做的IO等待记录到内部的一个叫SelectorBooking的身上。以readBlocked为例其过程是这样的:

public int read(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException, Pausable {
    int bytesCount = socketChannel.read(byteBuffer);
    if (bytesCount > 0) {
        return bytesCount;
    }
    SelectionKey selectionKey = socketChannel.keyFor(selector);
    if (null == selectionKey) {
        selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
        SelectorBooking booking = addSelectorBooking(selectionKey);
        selectionKey.attach(booking);
    } else {
        selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
    }
    SelectorBooking booking = (SelectorBooking) selectionKey.attachment();
    booking.readBlocked(getCurrentTimeMillis() + timeout);
    return socketChannel.read(byteBuffer);
}

这段代码就是创建一个SelectorBooking,并注册一个SelectionKey,并把booking附着到selection key上。这样selection key被select出来之后,就可以根据booking找到对应唤醒的task。注意的是selection key是一个socket一个的,但是可能对应的有四个操作(accept/connect/read/write),所以booking上可能会有四个被阻塞挂起的task分别对应不同的操作。
而booking和task的交互发生在booking.readBlocked这个调用内部:

public void readBlocked(long deadline) throws Pausable {
    if (null != readTask) {
        throw new RuntimeException("multiple read blocked on same channel");
    }
    readDeadline = deadline;
    updateDeadline();
    readTask = Task.getCurrentTask();
    Task.pause(this);
    if (readDeadline == -1) {
        readUnblocked();
        throw new RuntimeException("timeout");
    }
}

其中 Task.getCurrentTask 是一个神奇的调用。它可以得到当前的“协程”。得到的这个协程可以在挂起之后调用resume重新唤醒。
Task.pause 是另外一处神奇的调用。它使得当前执行的协程挂起。等到下面那行if被执行到,已经是别的地方调用resume之后的事情了。
通过这样的一些列操作,就完成一个协程的挂起,并把协程和异步io等信息注册到selector上的过程。
主循环只需要调用selector,找到就绪了的selection key,然后根据之前attach的附件找到booking,通过booking找到需要唤醒的协程,然后调用resume就可以让协程上的业务逻辑继续往下执行了:

public void loop() {
    while (loopOnce()) {
    }
}

boolean loopOnce() {
    try {
        executeReadyTasks();
        doSelect();
        Iterator iterator = selector.selectedKeys().iterator();
        ioUnblocked(iterator);
        while (hasDeadSelectorBooking()) {
            SelectorBooking booking = selectorBookings.poll();
            booking.cancelDeadTasks(getCurrentTimeMillis());
        }
        return true;
    } catch (Exception e) {
        LOGGER.error("loop died", e);
        return false;
    }
}

这种做法非常经典。关键之一显然是利用了协程的pause和resume,把回调转换成顺序的逻辑执行。关键之二就是利用了selection key的附件功能,把协程附着到了selection key上从而在select出来之后可以迅速恢复到阻塞之前的程序状态(resume是一个局部上下文恢复的过程)。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64153.html

相关文章

  • JAVA新生echo server

    摘要:基于的和的,比和之流好看实在太多了。而且同样是异步实现的,应该性能不差的。支持多个客户端同时连接的。这个是配套的文件特别提一下,使用非常方便,直接可以打开的项目。 现代的Java开发真的和我当年认识的很不一样了,这三篇文章非常值得一读: http://blog.paralleluniverse.co/2014/05/01/modern-java/http://blog.paralle...

    laznrbfe 评论0 收藏0
  • [译]GC专家系列5-Java应用性能优化的原则

    摘要:在本文中我将会介绍应用性能优化的一般原则。性能优化的流程图摘取自和合著的性能,描述了应用性能优化的处理流程。例如,对每台服务器,你面临着为单个分配堆内存和运行个并为每个分配堆内存的选择。不过位能使用堆内存最大理论值只有。 原文链接:http://www.cubrid.org/blog/dev-platform/the-principles-of-java-application-per...

    lufficc 评论0 收藏0
  • jvm垃圾回收三部曲

    摘要:强引用中最常见的引用,引用计数算法的就是典型的强引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象。 概述 早在半个世纪以前,第一个使用了内存动态分配和垃圾收集技术的语言Lisp就已经诞生了,从那时,人们就在思考关于gc需要完成的三件事请: 哪些内存需要回收 什么时候回收 如何回收 直到今天已经有越来越多的语言开始内置内存动态分配和垃圾收集技术。经过长时间的发展,这些技术...

    wanghui 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<