摘要:年月日上午阿里云消息服,队列消息发送以及消费的并发测试解析配置文件二者等价线程数并发数程序入口准备工作发送消息线程池一个计数信号量。但是,不使用实际的许可对象,只对可用许可的号码进行计数,并采取相应的行动。
package com.study.mq.aliyunmns; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import org.apache.commons.lang3.SystemUtils; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import com.aliyun.mns.client.CloudAccount; import com.aliyun.mns.client.CloudQueue; import com.aliyun.mns.client.MNSClient; import com.aliyun.mns.common.http.ClientConfiguration; import com.aliyun.mns.model.Message; /** * * @author wangkai * @2016年11月22日 上午11:27:14 * @desc:阿里云消息服(MNS),队列消息发送以及消费的并发测试 * https://www.aliyun.com/product/mns?spm=5176.8142029 * .388261.80.fNnCkg */ public class MnsQueueAppV2 { private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName()); private static MNSClient client = null; // private static AtomicLong totalCount = new AtomicLong(0); private static String endpoint = null; private static String accessId = null; private static String accessKey = null; private static String queueName = "articlepricinglog"; private static int threadNum = 100; private static int clientNum = 10000; // private static int totalSeconds = 180; private static String log4jConfPath = "./log4j.properties"; static { PropertyConfigurator.configureAndWatch(log4jConfPath); } /** * 解析配置文件 * * @return */ @SuppressWarnings("unused") protected static boolean parseConf() { // URL resource = // MnsQueueAppV2.class.getClassLoader().getResource("name.properties"); String confFilePath = SystemUtils.getUserDir() + SystemUtils.FILE_SEPARATOR + "src/main/resources/mns.properties"; URL resource = MnsQueueAppV2.class.getResource("/mns.properties"); URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource( "mns.properties");// 二者等价 BufferedInputStream bis = null; try { bis = new BufferedInputStream(new FileInputStream(confFilePath)); if (bis == null) { LOG.info("ConfFile not opened: " + confFilePath); return false; } } catch (FileNotFoundException e) { LOG.error("ConfFile not found: " + confFilePath, e); return false; } // load file Properties properties = new Properties(); try { properties.load(bis); } catch (IOException e) { LOG.error("Load ConfFile Failed: " + e.getMessage()); return false; } finally { try { bis.close(); } catch (Exception e) { // do nothing } } // init the member parameters endpoint = properties.getProperty("Endpoint"); LOG.info("Endpoint: " + endpoint); accessId = properties.getProperty("AccessId"); LOG.info("AccessId: " + accessId); accessKey = properties.getProperty("AccessKey"); queueName = properties.getProperty("QueueName", queueName); LOG.info("QueueName: " + queueName); threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum))); LOG.info("ThreadNum: 线程数" + threadNum); clientNum = Integer.parseInt(properties.getProperty("ClientNum", String.valueOf(clientNum))); LOG.info("ClientNum: 并发数" + clientNum); // totalSeconds = // Integer.parseInt(properties.getProperty("TotalSeconds", // String.valueOf(totalSeconds))); // LOG.info("TotalSeconds: " + totalSeconds); return true; } /** * 程序入口 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 准备工作 if (!parseConf()) { return; } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setMaxConnections(threadNum); clientConfiguration.setMaxConnectionsPerRoute(threadNum); CloudAccount cloudAccount = new CloudAccount(accessId, accessKey, endpoint, clientConfiguration); client = cloudAccount.getMNSClient(); LOG.info("发送消息"); // 线程池 ExecutorService exec = Executors.newFixedThreadPool(500); /** * Semaphore 一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 * acquire(),然后再获取该许可。每个 release() * 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数 * ,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。 */ final Semaphore semp = new Semaphore(threadNum);// ["seməfɔː] final Semaphore semaphore = new Semaphore(10, true); // 拿到信号量的线程可以进入代码,否则就等待 // Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源 // 辅助理解 :很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯, // 比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯, // 可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路, // 那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。 long startTime = System.currentTimeMillis(); // 开启时间 /** * 原理: * 更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源), * 当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 * 当一个线程调用Wait * (等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作 * ,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。 */ // 开始 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp.acquire();// 获取许可 try { // 获取queue CloudQueue queue = client.getQueueRef(queueName); // 组装消息 Message message = new Message(); message.setMessageBody("Test"); // 发送消息 queue.putMessage(message); } catch (Exception e) { e.printStackTrace(); } semp.release();// 归还许可 } catch (Exception e) { e.printStackTrace(); } } }; exec.submit(task); } long endTime = System.currentTimeMillis(); // 开启时间 exec.shutdown(); LOG.info(clientNum + " 的并发发送消息总耗时:>>>" + (endTime - startTime) + " ms"); LOG.info(clientNum + " 的并发发送消息 QPS为:>>>" + (clientNum * 1000) / (endTime - startTime) + " q/s"); LOG.info("接收消息"); Thread.sleep(3000); ExecutorService exec2 = Executors.newFixedThreadPool(500); final Semaphore semp2 = new Semaphore(threadNum); long startTime2 = System.currentTimeMillis(); // 开启时间 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp2.acquire(); try { // 获取queue CloudQueue queue = client.getQueueRef(queueName); // 获取消息 Message message = queue.popMessage(); // 删掉消息 if (message != null) queue.deleteMessage(message.getReceiptHandle()); } catch (Exception e) { e.printStackTrace(); } semp2.release(); } catch (Exception e) { e.printStackTrace(); } } }; exec2.submit(task); } long endTime2 = System.currentTimeMillis(); // 开启时间 exec2.shutdown(); // 忽略线程切换的耗时 精确的做法? LOG.info(clientNum + " 的并发接收消息总耗时:>>>" + (endTime2 - startTime2)
+ " ms"); LOG.info(clientNum + " 的并发接收消息 QPS为:>>>" + (clientNum * 1000) / (endTime2 - startTime2) + " q/s"); }
}
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/66396.html
摘要:多线程工具箱之前言这一篇谈一下信号量。信息信息信息信息信息信息信息信息信息信息信息小结适用于多线程请求数量资源的场景,但无法解决单多个线程对同一资源访问的竞争性访问。在后面我们在我们的多线程工具箱里面陆续会提到。 Java多线程工具箱之Semaphore 前言 这一篇谈一下Semaphore:信号量。 将Semaphore类比为为信号灯,被继承Runable的线程类比为列车:理解信号量...
摘要:在每个线程获取之前,必须先从信号量获取许可。注意,因为同时可能发生取消,所以返回并不保证有其他线程等待获取许可。该值仅是估计的数字,因为在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github.com/kco1989/kco代码已经全部托...
摘要:将屏障重置为其初始状态。注意,在由于其他原因造成损坏之后,实行重置可能会变得很复杂此时需要使用其他方式重新同步线程,并选择其中一个线程来执行重置。 安全共享对象策略 1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改2.共享只读 : 一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它3.线程安全对象 : 一个线程安全...
摘要:前言之前学多线程的时候没有学习线程的同步工具类辅助类。而其它线程完成自己的操作后,调用使计数器减。信号量控制一组线程同时执行。 前言 之前学多线程的时候没有学习线程的同步工具类(辅助类)。ps:当时觉得暂时用不上,认为是挺高深的知识点就没去管了.. 在前几天,朋友发了一篇比较好的Semaphore文章过来,然后在浏览博客的时候又发现面试还会考,那还是挺重要的知识点。于是花了点时间去了解...
摘要:所以得出结论需要分配较多的线程进行读数据,较少的线程进行写数据。注意多线程编程对实际环境和需求有很大的依赖,需要根据实际的需求情况对各个参数做调整。 背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。 不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中...
阅读 1883·2021-11-22 09:34
阅读 3010·2021-09-28 09:35
阅读 13374·2021-09-09 11:34
阅读 3594·2019-08-29 16:25
阅读 2820·2019-08-29 15:23
阅读 2035·2019-08-28 17:55
阅读 2424·2019-08-26 17:04
阅读 3044·2019-08-26 12:21