摘要:我们将使用单个线程管理任务放入队列的操作以及从队列中取出的操作。同时这个线程会持续的管理队列。另一个线程将用来创建,它将一直运行知道服务器终止。此线程永远不会过期,有助于实现持续监控。这些请求将会自动的被获取,并在线程中继续处理。
在Java中,BlockingQueue接口位于java.util.concurrent包下。阻塞队列主要用来线程安全的实现生产者-消费者模型。他们可以使用于多个生产者和多个消费者的场景中。
我们可以在各种论坛和文章中找到BlockingQueue的范例。在这篇文章中,我们将介绍如何持续管理队列中的请求,以及如何在请求进入队列后立刻处理。
我们将使用单个线程管理任务放入队列的操作以及从队列中取出的操作。同时这个线程会持续的管理队列。另一个线程将用来创建BlockingQueue,它将一直运行知道服务器终止。
阻塞队列的大小可以在对象初始化的时候设置。它的大小应该基于系统堆的大小。
现在,让我们回顾创建阻塞队列的步骤以及如何持续的管理和处理请求。
Step 1: EventData新建一个EventData的POJO类,它会存储生产者产生的事件数据并输入到队列中 - 同时它会被消费者从队列中取出e并处理。
package com.dzone.blockingqueue.example; class EventData { private String eventID; private String eventName; private String eventDate; private String eventType; private String eventLocation; public String getEventID() { return eventID; } public void setEventID(String eventID) { this.eventID = eventID; } public String getEventName() { return eventName; } public void setEventName(String eventName) { this.eventName = eventName; } public String getEventDate() { return eventDate; } public void setEventDate(String eventDate) { this.eventDate = eventDate; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public String getEventLocation() { return eventLocation; } public void setEventLocation(String eventLocation) { this.eventLocation = eventLocation; } }Step 2: QueueService
创建一个QueueService单例类,用来将请求放入队列中,以及从队列中提取请求并处理。
package com.dzone.blockingqueue.example; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class QueueService { private static QueueService instance = null; private static BlockingQueue < EventData > eventQueue = null; private QueueService() {} public static QueueService getInstance() { if (instance == null) { instance = new QueueService(); } return instance; } private void initialize() { if (eventQueue == null) { eventQueue = new LinkedBlockingQueue(); EventProcessor eventProcessor = new EventProcessor(); eventProcessor.start(); } } public void putEventInQueue(EventData eventData) { try { initialize(); eventQueue.put(eventData); } catch (InterruptedException ex) { ex.printStackTrace(); } } class EventProcessor extends Thread { @Override public void run() { for (;;) { EventData eventData = null; try { eventData = eventQueue.take(); System.out.println("Process Event Data : Type : " + eventData.getEventType() + " / Name : " + eventData.getEventName()); } catch (InterruptedException ex) { ex.printStackTrace(); } } } } }
我们新建了一个静态的BlockingQueue变量。它在初始化时会比初始化为ArrayBlockingQueue或是LinkedBlockingQueue,这取决于需求。在此之后,这个对象会被用来放入或是提取请求。
我们还新建了一个继承了Thread的EventProcessor私有类。它在BlockingQueue初始化的时候启动。在EventProcessor中使用了一个for循环来管理队列。BlockingQueue的优点在于它会在没有元素的时候进入等待模式。当队列为空时,for循环不会继续遍历。当请求进入队列后,BlockingQueue会继续运行并处理请求。
单个EventProcessor线程将处理特定队列中的所有请求。此线程永远不会过期,有助于实现持续监控。
我们还在QueueService中创建了一个公有的putEventInQueue方法,它会帮助我们将请求放入由getInstance方法获取的队列中。在这个方法里,请求被放入BlockingQueue。这些请求将会自动的被BlockingQueue获取,并在EventProcessor线程中继续处理。
Step 3: EventService现在让我们向队列中加载数据。我们已经实现了一个EventService类。它会将几个请求写入BlockingQueue中。在QueueService中,我们会看到请求是如何被取出并处理的。
package com.dzone.blockingqueue.example; public class EventService { public static void main(String arg[]) { try { EventData event = null; for (int i = 0; i < 100; i++) { event = new EventData(); event.setEventType("EventType " + i); event.setEventName("EventName " + i); QueueService.getInstance().putEventInQueue(event); Thread.sleep(100); } } catch (InterruptedException e) { e.printStackTrace(); } } }Step 4: EventProcessor Output
输出结果如下:
Process Event Data : Type : EventType 0 / Name : EventName 0 Process Event Data : Type : EventType 1 / Name : EventName 1 Process Event Data : Type : EventType 2 / Name : EventName 2 Process Event Data : Type : EventType 3 / Name : EventName 3 Process Event Data : Type : EventType 4 / Name : EventName 4
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注我的微信公众号!将会不定期的发放福利哦~
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68732.html
摘要:如果我们的容器使用,文件如下在这个例子中,我们可以重复创建和销毁,同一个持久存储会被提供给新的,无论容器位于哪个节点上。 前言 临时性存储是容器的一个很大的买点。根据一个镜像启动容器,随意变更,然后停止变更重启一个容器。你看,一个全新的文件系统又诞生了。 在docker的语境下: # docker run -it centos [root@d42876f95c6a /]# echo H...
摘要:如果我们的容器使用,文件如下在这个例子中,我们可以重复创建和销毁,同一个持久存储会被提供给新的,无论容器位于哪个节点上。 前言 临时性存储是容器的一个很大的买点。根据一个镜像启动容器,随意变更,然后停止变更重启一个容器。你看,一个全新的文件系统又诞生了。 在docker的语境下: # docker run -it centos [root@d42876f95c6a /]# echo H...
摘要:读取出数据时,将此版本号一同读出,之后更新时,对此版本号加一。此时,将提交数据的版本数据与数据库表对应记录的当前版本信息进行比对,如果提交的数据版本号大于数据库表当前版本号,则予以更新,否则认为是过期数据。 前言 很多人都在讨论数据的指数型增长,以及我们将会有比想象的还要大的数据量。但是,很少有人从数据库的角度谈论这个问题。随着数据量的暴涨,数据库也需要随之升级。这也是为什么既要了解如...
摘要:根本上来说,这意味着不仅要将整个应用程序分解,而且要将任何一个服务器中的各个部分分解为多个模块化容器,这些容器易于参数化和重复使用。在中,这种模块化容器服务的实施者是。一个是指一组共享文件系统,内核命名空间和地址的一组容器。 过去几年容器逐渐成为了打包和部署代码的流行的方式。容器镜像解决很多现有的打包和部署工具所带来的问题,初次以外,还为我们提供了构建分布式应用的全新的思路。就如SOA...
阅读 1084·2021-11-16 11:45
阅读 3086·2021-10-13 09:40
阅读 697·2019-08-26 13:45
阅读 1157·2019-08-26 13:32
阅读 2146·2019-08-26 13:23
阅读 881·2019-08-26 12:16
阅读 2807·2019-08-26 11:37
阅读 1734·2019-08-26 10:32