资讯专栏INFORMATION COLUMN

【JAVA新生】nio attach引发的问题

ruicbAndroid / 1337人阅读

摘要:理由是如果到了上,而这个对应的操作迟迟不能就绪被出来。但我认为这其实是一个超时处理问题。问题是,原生的是没有超时支持的。如果是回调性质的,一般的做法是正常就绪给一个,超时给另外一个。只要时间合理,作者之前所说的会引发的问题并不会出现。

grizzly框架的作者曾经提到NIO框架不应该使用selection key的attach功能(链接)。理由是如果attach到了selection key上,而这个selection key对应的操作迟迟不能就绪(被select出来)。那么这些selection key所attach的附件都是被强引用的,从而无法被gc。如果有大量这样的selection key累积,程序就好像发生了内存泄漏了一样。
但我认为这其实是一个超时处理问题。框架应该支持设置超时,并且可以在超时之后调用框架用户预先设置的处理逻辑,并且释放掉对应的资源。问题是,原生的NIO 1是没有超时支持的。它提供的是selector,可以注册,可以select,可以cancel。但是超时需要自己做记录,程序自己判断超时了,也就是select了老半天了仍然没有就绪,那么就需要去调用cancel把selection key注销掉。如果使用netty这样的封装库,它是把selector的api转成回调的形式,同时也添加了超时的支持。NIO 2除了windows的proactor(OICP)部分之外,对于selector基本上就是一个官方版的netty,也是回调的形式,也支持了超时。
基于协程来封装selector的话,支持超时处理自然也不在话下(代码在此)。如果是回调性质的api,一般的做法是正常就绪给一个callback,超时给另外一个callback。框架根据实际情况决定调用哪个callback。如果是协程的api,最自然的方式自然是抛异常了。

private SocketChannel tryAccept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable {
    while(true) {
        try {
            return scheduler.accept(serverSocketChannel);
        } catch (TimeoutException e) {
            System.out.println("time out, try again");
            continue;
        }
    }
}

scheduler.accept会有两个路径的返回。一个路径是正常的return一个socket channel,这表明accept阻塞等待成功,拿到了一个socket channel。另外一个返回是抛出了TimeoutException异常,这表明等待超时了。框架要做的就是要在超时的时候抛出这个异常,同时要确保相关的资源这个时候已经释放掉了,不会引起内存泄漏。
首先,需要在做阻塞调用之前说明超时时间的长度。

scheduler.timeout = 5000;
SocketChannel socketChannel = tryAccept(serverSocketChannel);

这里设置的是5秒之后超时。根据这个超时时间可以计算一个dead line:

booking.acceptBlocked(getCurrentTimeMillis() + timeout);

然后拿一个小本子记着这个dead line:

public void acceptBlocked(long deadline) throws Pausable, TimeoutException {
    if (null != acceptTask) {
        throw new RuntimeException("multiple accept blocked on same channel");
    }
    acceptDeadline = deadline;
    updateDeadline();
    acceptTask = Task.getCurrentTask();
    Task.pause(this);
    if (acceptDeadline == -1) {
        acceptUnblocked();
        throw new TimeoutException();
    }
}

这个dead line会用来计算整个selector booking四个操作的earliest dead line:

public void updateDeadline() {
    earliestDeadline = Long.MAX_VALUE;
    if (readDeadline > 0 && readDeadline < earliestDeadline) {
        earliestDeadline = readDeadline;
    }
    if (writeDeadline > 0 && writeDeadline < earliestDeadline) {
        earliestDeadline = writeDeadline;
    }
    if (acceptDeadline > 0 && acceptDeadline < earliestDeadline) {
        earliestDeadline = acceptDeadline;
    }
    if (connectDeadline > 0 && connectDeadline < earliestDeadline) {
        earliestDeadline = connectDeadline;
    }
    bookings.remove(this); // when timed out, the booking might be removed already
    if (earliestDeadline != Long.MAX_VALUE) {
        // add back in case read timed out, but write is still blocking
        if (!bookings.offer(this)) {
            throw new RuntimeException("update booking failed");
        }
    }
}

也就是说每个selector booking通过这样的设置都会有一个自己的时间戳(earliestDeadline)。用这个时间戳可以对booking进行一个时间上的排序:

@Override
public int compareTo(SelectorBooking that) {
    if (that.earliestDeadline > this.earliestDeadline) {
        return -1;
    } else if (that.earliestDeadline < this.earliestDeadline) {
        return 1;
    }
    return 0;
}

因为可以排序,所以也就可以用一个PriorityQueue来维护一个链表以记录哪个booking是最近会到期的booking。因为PriorityQueue的排序是发生在插入时的,所以在这个booking的时间戳发生变更的时候,需要从链表中删除然后二次插入已达到更新排序的目的。有了这个排序的链表之后,就可以用来做两个事情:决定selector的select等待时间,以及哪些booking的哪些task是超时了的:

protected int doSelect() throws IOException {
    SelectorBooking booking = selectorBookings.peek();
    if (null == booking) {
        return selector.select();
    } else {
        long delta = booking.getEarliestDeadline() - getCurrentTimeMillis();
        if (delta > 0) {
            return selector.select(delta);
        } else {
            return selector.selectNow();
        }
    }
}

boolean loopOnce() {
    try {
        executeReadyTasks();
        doSelect();
        Iterator iterator = selector.selectedKeys().iterator();
        ioUnblocked(iterator);
        while (hasDeadSelectorBooking()) {
            SelectorBooking booking = selectorBookings.poll();
            booking.cancelDeadTasks(getCurrentTimeMillis());
        }
        return true;
    } catch (Exception e) {
        LOGGER.error("loop died", e);
        return false;
    }
}

最后就是一件事情了,如果协程所阻塞的io操作确实超时了,如何在超时的调用处抛出异常,以达到走不通业务逻辑路径的目的:

public void cancelDeadTasks(long currentTimeMillis) {
    // ...
    if (null != acceptTask && currentTimeMillis > acceptDeadline) {
        selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_ACCEPT);
        acceptDeadline = -1;
        updateDeadline();
        acceptTask.resume();
        if (-1 == acceptDeadline) {
            throw new RuntimeException("accept deadline unhandled");
        }
    }
    // ...
    if (0 == selectionKey.interestOps()) {
        selectionKey.cancel();
    }
}
public void acceptBlocked(long deadline) throws Pausable, TimeoutException {
    if (null != acceptTask) {
        throw new RuntimeException("multiple accept blocked on same channel");
    }
    acceptDeadline = deadline;
    updateDeadline();
    acceptTask = Task.getCurrentTask();
    Task.pause(this);
    if (acceptDeadline == -1) {
        acceptUnblocked();
        throw new TimeoutException();
    }
}

这里是两方面的配合。一方面是在io循环的地方设置一个-1为标志位。然后去唤醒协程。协程唤醒了之后立即去检查-1这个标志位有没有设置,如果设置了,则认为自己被唤醒是因为超时,而不是io操作就绪了。于是TimeoutException被抛出了。特别注意这行:

    if (0 == selectionKey.interestOps()) {
        selectionKey.cancel();
    }

通过在超时之后取消了interestOps,然后在所有interestOps都没有之后自动cancel对应的selection key。这个时候对应的附件也会被垃圾回收给干掉了。只要time out时间合理,grizzly作者之前所说的attach会引发的问题并不会出现。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64152.html

相关文章

  • JAVA新生】拿协程开始写个异步io应用

    摘要:接下来,就看怎么用协程来实现异步了。直接拿的原始写代码会死人的。引入协程就是为了把上下连续的业务逻辑放在一个协程里,把与业务关系不大的的处理部分放到框架的里。第三部分是放弃掉执行权。这样一个只能接收打印一行的异步应用就写好了。 前面已经准备好了greenlet对应的Java版本了,一个删减后的kilim(http://segmentfault.com/blog/taowen/11900...

    singerye 评论0 收藏0
  • JAVA新生】echo server第n种写法

    摘要:基本上所有的网络应用都会示范一个的写法。除了这些操作的主体是而不是,操作的是,而不是。以为例其过程是这样的这段代码就是创建一个,并注册一个,并把附着到上。关键之一显然是利用了协程的和,把回调转换成顺序的逻辑执行。 基本上所有的网络应用都会示范一个tcp的echo写法。前面我们已经看到了如何使用协程和异步io来做tcp服务器的第一步,accept。下面是一个完整的echo server的...

    Luosunce 评论0 收藏0
  • Java NIO之Selector(选择器)

    摘要:抽象类有一个方法用于使通道处于阻塞模式或非阻塞模式。注意抽象类的方法是由抽象类实现的,都是直接继承了抽象类。大家有兴趣可以看看的源码,各种抽象类和抽象类上层的抽象类。 历史回顾: Java NIO 概览 Java NIO 之 Buffer(缓冲区) Java NIO 之 Channel(通道) 其他高赞文章: 面试中关于Redis的问题看这篇就够了 一文轻松搞懂redis集群原理及搭建...

    xiaokai 评论0 收藏0
  • Netty序章之BIO NIO AIO演变

    摘要:后改良为用线程池的方式代替新增线程,被称为伪异步。最大的问题是阻塞,同步。每次请求都由程序执行并返回,这是同步的缺陷。这些都会被注册在多路复用器上。多路复用器提供选择已经就绪状态任务的能力。并没有采用的多路复用器,而是使用异步通道的概念。 Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司...

    VincentFF 评论0 收藏0
  • Netty序章之BIO NIO AIO演变

    摘要:后改良为用线程池的方式代替新增线程,被称为伪异步。最大的问题是阻塞,同步。每次请求都由程序执行并返回,这是同步的缺陷。这些都会被注册在多路复用器上。多路复用器提供选择已经就绪状态任务的能力。并没有采用的多路复用器,而是使用异步通道的概念。 Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司...

    CntChen 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<