摘要:分段策略尝试自旋此,然后调用次,如果经过这两百次的操作还未获取到任务,就会尝试阶段性挂起自身线程。
零 前期准备 0 FBI WARNING
文章异常啰嗦且绕弯。
1 版本Disruptor 版本 : Disruptor 3.4.2
IDE : idea 2018.3
JDK 版本 : OpenJDK 11.0.1
2 Disruptor 简介高性能线程间消息队列框架 Disruptor,是金融与游戏领域的常用开发组件之一,也是 java 日志框架和流处理框架底层的常用依赖。
3 DemoDisruptor 的 github 主页有非常详细的 quick start demo,本文依照此 demo 做追踪的模板(做了很小的改动)。
另外,对于官方提供的 jdk8 lambda 简化版 demo 暂不做讨论。
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import java.nio.ByteBuffer; public class LongEventMain { //main 方法,启动入口 public static void main(String[] args) throws Exception { //在该框架中,所有的 task 的包装类被称为 Event,EventFactory 则是 Event 的生产者 LongEventFactory factory = new LongEventFactory(); //RingBuffer 的大小,数字为字节数 //RingBuffer 是框架启动器内部的缓存区,用来存储 event 内的 task 数据 int bufferSize = 1024; //创建一个 Disruptor 启动器,其中 DaemonThreadFactory 是一个线程工厂的实现类 Disruptor一 DaemonThreadFactorydisruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); //该框架本质上是 生产-消费 设计模式的应用。所有的消费者被冠名为 handler //handleEventsWith(...) 方法会在启动器中注册 handler //此处的参数是不定数量的,可以有多个消费者,每个消费者都可以获取 Event disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2")); //启动器开始执行,并获取其内部的缓存区 RingBuffer ringBuffer = disruptor.start(); //创建一个生产者,负责往缓存区内写入数据 LongEventProducer producer = new LongEventProducer(ringBuffer); //官方 demo 中使用了 ByteBuffer 来方便操作,其实非必须 ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { //将变量 l 作为一个 long 类型的数存入 ByteBuffer 中 bb.putLong(0, l); //将 ByteBuffer 传入生产者的相关方法中,该方法会负责将 ByteBuffer 中的数据写入 RingBuffer producer.onData(bb); //线程休眠 Thread.sleep(1000); } } } //Event 类,本质上是数据的封装,是生产者和消费者之间进行数据传递的介质 class LongEvent { private long value; public void set(long value) { this.value = value; } public long get() { return value; } } //Event 的生产工厂类,必须实现 Disruptor 自带的 EventFactory 接口 class LongEventFactory implements EventFactory { @Override public LongEvent newInstance() { return new LongEvent(); } } //消费者,必须实现 Disruptor 自带的 EventHandler 接口 class LongEventHandler implements EventHandler { private String handlerName; public LongEventHandler(String handlerName){ this.handlerName = handlerName; } //此方法为最终的消费 Event 的方法 @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event " + handlerName + " : " + event.get()); } } //生产者,主要负责往 RingBuffer 中写入数据 //生产者类在框架中并非必须,但是一般情况下都会做一定程度的封装 class LongEventProducer { private final RingBuffer ringBuffer; //生产者的构造器负责获取并存储启动器中的 RingBuffer public LongEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { //sequence 是 RingBuffer 中的一个数据块,类似于一个数据地址 long sequence = ringBuffer.next(); try { //用数据地址去获取到一个 Event 事件类实例 LongEvent event = ringBuffer.get(sequence); //在实例中存入 ByteBuffer 中的数据 event.set(bb.getLong(0)); } finally { //发布该数据块,此时消费者们都可以看到该数据块了,可以进行消费 ringBuffer.publish(sequence); } } }
在开始正式追踪代码之前有必要先来理解 DaemonThreadFactory。这是 Disruptor 自身携带的线程工厂类:
public enum DaemonThreadFactory implements ThreadFactory{ //线程工厂使用枚举实现单例模式 INSTANCE; @Override public Thread newThread(final Runnable r){ Thread t = new Thread(r); //此处创建的线程是守护线程 t.setDaemon(true); return t; } }二 Disruptor
本 part 主要追踪 demo 中 Disruptor 相关的代码:
Disruptor1 disruptor 的创建disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2")); RingBuffer ringBuffer = disruptor.start();
来看下方代码:
Disruptordisruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
追踪 Disruptor 的构造器:
//step 1 //Disruptor.class public Disruptor(final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory){ //RingBuffer.createMultiProducer(...) 方法会返回一个 RingBuffer //BasicExecutor 是线程和线程工厂的封装类 this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); } //step 2 //Disruptor.class private Disruptor(final RingBuffer ringBuffer, final Executor executor){ //存入 RingBuffer 和 Executor this.ringBuffer = ringBuffer; this.executor = executor; }
但是实际上 Disruptor 提供了很多的构造器,其中还有一个较高配置权限的:
//Disruptor.class public Disruptor(final EventFactoryeventFactory,final int ringBufferSize, final ThreadFactory threadFactory,final ProducerType producerType, final WaitStrategy waitStrategy){ //解释传入的参数: //eventFactory 是 Event 类的创建工厂 //ringBufferSize 是 RingBuffer 的字节数大小 //threadFactory 是线程工厂 //ProducerType 是生产者的类型,分为单生产者类型(single)和多生产者类型(multi),默认为 multi //waitStrategy 是框架中的一个接口,表示等待策略,默认为 BlockingWaitStrategy(阻塞等待),WaitStrategy 的可讲内容较多,在后头开一个多带带 part this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory)); }
先来看 ProducerType:
public enum ProducerType{ SINGLE, MULTI }
仅仅只是个标记,不多赘述。
1.1 BasicExecutorBasicExecutor 是 Executor 的实现类,其内部维护着一个线程工厂和一个线程队列,核心方法为 execute(...):
//BasicExecutor.class public void execute(Runnable command){ //使用线程工厂创建一个线程,此处的 factory 即为 DaemonThreadFactory final Thread thread = factory.newThread(command); //有效性验证 if (null == thread){ throw new RuntimeException("Failed to create thread to run: " + command); } //开启线程 thread.start(); //threads 是一个 ConcurrentLinkedQueue1.2 RingBuffer 的创建类型的变量,用来存储线程 threads.add(thread); }
再来追踪一下 RingBuffer 的创建:
//RingBuffer.class public staticRingBuffer create(ProducerType producerType,EventFactory factory, int bufferSize,WaitStrategy waitStrategy){ //此处根据 ProducerType 进行分发操作 switch (producerType){ case SINGLE: //创建单消费者的 producer return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: //创建多消费者的 producer return createMultiProducer(factory, bufferSize, waitStrategy); default: //抛出错误 throw new IllegalStateException(producerType.toString()); } }
本质上这两种模式的 RingBuffer 的创建差距并不大,此处追踪 createMultiProducer(...) 方法:
//step 1 //RingBuffer.class public static2 消费者的注册RingBuffer createMultiProducer(EventFactory factory,int bufferSize,WaitStrategy waitStrategy){ //MultiProducerSequencer 是 RingBuffer 中用来在生产者和消费者之间传递数据的组件 //sequencer 是 RingBuffer 中的核心组件,是区别 SINGLE 和 MULTI 的关键,后文会继续理解 MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); //自身构造器 return new RingBuffer (factory, sequencer); } //step 2 //RingBuffer.class RingBuffer(EventFactory eventFactory,Sequencer sequencer){ //调用父类 RingBufferFields 的构造方法 super(eventFactory, sequencer); } //step 3 //RingBufferFields.class RingBufferFields(EventFactory eventFactory,Sequencer sequencer){ //此处为 MultiProducerSequencer this.sequencer = sequencer; //获取使用者自定义的 bufferSize 并记录下来 this.bufferSize = sequencer.getBufferSize(); //bufferSize 的有效性验证 if (bufferSize < 1){ throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1){ throw new IllegalArgumentException("bufferSize must be a power of 2"); } //根据 bufferSize 确定序列号最大值,因为从 0 开始所以要减一 this.indexMask = bufferSize - 1; //entries 是一个 Object 数组,用于存放 Event //BUFFER_PAD 是对整个缓冲区的填充 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //fill(...) 方法会重新设置 entries fill(eventFactory); } //step 4 //RingBuffer.class private void fill(EventFactory eventFactory){ for (int i = 0; i < bufferSize; i++){ //遍历数组进行 Event 的填充 entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
来看下方代码:
disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));
追踪 handleEventsWith(...) 方法:
//step 1 //Disruptor.class public final EventHandlerGroup2.1 newBarrierhandleEventsWith(final EventHandler super T>... handlers){ //Sequence 可以看做是 long 型的封装类 //此处的第一个参数是前置关卡,在处理 handler 之前会进行处理的事件 //handlers 即为消费者 return createEventProcessors(new Sequence[0], handlers); } //step 2 //Disruptor.class EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences,final EventHandler super T>[] eventHandlers){ //Disruptor 中有一个 AtomicBoolean 类型的变量 started //checkNotStarted() 会检查该变量的值是否为 true,如果是的话就证明已经启动了,则抛出异常 checkNotStarted(); //processorSequences 是每个消费者对应的执行器的序列号 final Sequence[] processorSequences = new Sequence[eventHandlers.length]; //此处返回的 barrier 可以看做是上文 MultiProducerSequencer 的封装增强 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0,eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){ final EventHandler super T> eventHandler = eventHandlers[i]; //batchEventProcessor 是存储了消费者和生产者的执行器,实现了 Runnable 接口,内部会不断循环去接收并处理事件 final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); //exceptionHandler 是用于处理错误的消费者组件 if (exceptionHandler != null){ batchEventProcessor.setExceptionHandler(exceptionHandler); } //consumerRepository 可以看做是消费者的集合封装 //consumerRepository 会将传入的三个参数包装成 EventProcessorInfo 并储存在集合和 map 里 consumerRepository.add(batchEventProcessor, eventHandler, barrier); //记录下消费者对应的执行器的序列号 processorSequences[i] = batchEventProcessor.getSequence(); } //处理一些前置事件,在本例中没有前置事件存在 updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
来追踪一下 ringBuffer.newBarrier(...) 方法:
//step 1 //RingBuffer.class public SequenceBarrier newBarrier(Sequence... sequencesToTrack){ //在本例中,此处的 sequencesToTrack 是 Sequence[0] //此处的 sequencer 即为 MultiProducerSequencer return sequencer.newBarrier(sequencesToTrack); } //step 2 //AbstractSequencer.class public SequenceBarrier newBarrier(Sequence... sequencesToTrack){ //此方法被定义在 MultiProducerSequencer 的父类 AbstractSequencer 中 //cursor 是在 AbstractSequencer 中实例化的一个 Sequence 类型对象,是 MultiProducerSequencer 的序列号 return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } //step 3 //ProcessingSequenceBarrier.class ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy, final Sequence cursorSequence,final Sequence[] dependentSequences){ //即为 Disruptor 启动器中的 MultiProducerSequencer this.sequencer = sequencer; //即为 Disruptor 启动器中的阻塞策略 this.waitStrategy = waitStrategy; //上述方法的 cursor this.cursorSequence = cursorSequence; if (0 == dependentSequences.length){ //此处的 dependentSequences 是长度是 0,所以此处 dependentSequence = cursorSequence; }else{ dependentSequence = new FixedSequenceGroup(dependentSequences); } }
需要注意的是,此处的 sequencer 已经被抽象成了 SingleProducerSequencer 和 MultiProducerSequencer 的共同实现接口 Sequencer。
所以对于 SingleProducerSequencer 来说,这个流程也是没有区别的。
2.2 updateGatingSequencesForNextInChain回到上述代码:
//此处的 barrierSequences 是一个 Sequence[0] 数组,processorSequences 是所有消费者的序列号集合 updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
追踪该方法的实现:
//step 1 //Disruptor.class private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){ //processorSequences.length 大于 0 意味着消费者数量大于 0 if (processorSequences.length > 0){ ringBuffer.addGatingSequences(processorSequences); //barrierSequences 是前置事件的集合 //由于此处的 barrierSequences 是长度为 0 的 Sequence 数组,即没有前置事件,所以此处不会进入循环,忽略 for (final Sequence barrierSequence : barrierSequences){ ringBuffer.removeGatingSequence(barrierSequence); } //unMarkEventProcessorsAsEndOfChain(...) 方法也是处理 barrierSequences 的,忽略 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }addGatingSequences
追踪 ringBuffer.addGatingSequences(...) 方法:
//step 1 //RingBuffer.class public void addGatingSequences(Sequence... gatingSequences){ //sequencer 为 MultiProducerSequencer sequencer.addGatingSequences(gatingSequences); } //step 2 //AbstractSequencer.class public final void addGatingSequences(Sequence... gatingSequences){ //此处的 SEQUENCE_UPDATER 是一个 AtomicReferenceFieldUpdaterImpl 类型的变量,用于 CAS 操作 gatingSequences SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } //step 3 //SequenceGroups.class static3 Disruptor 的启动void addSequences(final T holder,final AtomicReferenceFieldUpdater updater, final Cursored cursor,final Sequence... sequencesToAdd){ long cursorSequence; Sequence[] updatedSequences; Sequence[] currentSequences; do{ //此处的 holder 即为 MultiProducerSequencer,此处获取其内部的 gatingSequences 变量 currentSequences = updater.get(holder); //此处为 copyOf(...) 方法为 java.util.Arrays.copyOf(...) 方法,用于将 currentSequences 复制一份 updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); //此处的 cursor 即为 MultiProducerSequencer,getCursor() 方法会获取其的序列号 cursorSequence = cursor.getCursor(); int index = currentSequences.length; //此处的 sequencesToAdd 是之前消费者的序列号集合,更新 sequencesToAdd 中的每个序列号封装 //将 MultiProducerSequencer 的序列号注册进去,并填充到新集合的后面一半中 for (Sequence sequence : sequencesToAdd){ sequence.set(cursorSequence); updatedSequences[index++] = sequence; } }while (!updater.compareAndSet(holder, currentSequences, updatedSequences)); //此处的 while 会死循环 CAS 操作直到更新成功 //在此获取 MultiProducerSequencer 的序列号,更新到 sequencesToAdd 的每个序列号封装类中 cursorSequence = cursor.getCursor(); for (Sequence sequence : sequencesToAdd){ sequence.set(cursorSequence); } }
来看下方代码:
disruptor.start();
追踪 start(...) 方法:
//Disruptor.class public RingBufferstart(){ //确认该 Disruptor 没有启动 checkOnlyStartedOnce(); //此处的 consumerInfo 是 EventProcessorInfo 类型的变量 for (final ConsumerInfo consumerInfo : consumerRepository){ consumerInfo.start(executor); } return ringBuffer; }
先来看 checkOnlyStartedOnce() 方法:
//Disruptor.class private void checkOnlyStartedOnce(){ //如果在调用该 CAS 方法之前已经为 true 了,会抛出错误 //其实就是确保在调用该方法之前还处于未开启的状态 if (!started.compareAndSet(false, true)){ throw new IllegalStateException("Disruptor.start() must only be called once."); } }
再来追踪 EventProcessorInfo 的 start(...) 方法:
//EventProcessorInfo.class public void start(final Executor executor){ //此处的 executor 即为 BasicExecutor executor.execute(eventprocessor); }
所以本质上 Disruptor 的启动就是开启 BasicExecutor,借此启动线程。
3.1 BatchEventProcessor上述代码中启动线程的时候会传入 eventprocessor 对象作为 task 去启动消费者。eventprocessor 对象本质上是上文中提到过的 BatchEventProcessor。
BatchEventProcessor 能够被传入 execute(...) 方法,证明其实现了 Runnable 接口:
//step 1 //BatchEventProcessor.class @Override public void run(){ //running 是一个定义在 BatchEventProcessor 中的 AtomicInteger 类型的变量 //CAS 操作,先判断 running 的值是否等于 IDLE,如果是的话就修改成 RUNNING,并返回 true //IDLE = 1,RUNNING = 2,皆为 int 类型的常量 if (running.compareAndSet(IDLE, RUNNING)){ //此处修改 sequenceBarrier 中 alert 变量的状态值,清除掉中断状态 sequenceBarrier.clearAlert(); //如果传入的消费者实现了 LifecycleAware 接口,就会在 notifyStart() 方法中去执行相关方法 //LifecycleAware 中定义了 onStart() 和 onShutdown() 方法,会分别在消费者真正执行之前和关闭之前执行一次 //执行 LifecycleAware 的 onStart() 方法 notifyStart(); try{ //如果 running 是 RUNNING 状态,就会进入死循环 if (running.get() == RUNNING){ //核心方法 processEvents(); } }finally{ //执行 LifecycleAware 的 onShutdown() 方法 notifyShutdown(); //切换 running 的状态值 running.set(IDLE); } }else{ if (running.get() == RUNNING){ throw new IllegalStateException("Thread is already running"); }else{ earlyExit(); } } } //step 2 //BatchEventProcessor.class private void processEvents(){ T event = null; //此处的 sequence 记录着当前消费者已经处理过的事件的编号,初始化的时候为 -1,所以 nextSequence 初始为 0,每次加一 //nextSequence 是当前消费者下一项准备处理的事件的编号 long nextSequence = sequence.get() + 1L; //死循环 while (true){ try{ //当没有事件发生的时候,消费者所在的线程会在此等待,具体的实现依照使用者设置的等待策略的不同而不同 //本例中使用的是 BlockingWaitStrategy,所以会在此阻塞直到出现了事件 //返回的 availableSequence 是最新的事件的编号,在任务量较小的情况下和 nextSequence 数值相同,在任务量较大的情况下小于 nextSequence //等待策略留在后头展开 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null){ batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } //nextSequence 大于 availableSequence 的情况理论上不会出现 while (nextSequence <= availableSequence){ //dataProvider 就是之前初始化的 RingBuffer,RingBuffer 在此处会去获取当前编号的 Event event = dataProvider.get(nextSequence); //onEvent(...) 是 EventHandler 接口定义的方法,是消费者消费 Event 的最重要方法,方法体由使用者进行定义 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); //编号自增 nextSequence++; } //在消费完当前的所有事件之后,记录下事件编号 sequence.set(availableSequence); }catch (final TimeoutException e){ //如果消费者实现了 TimeoutHandler 接口,就可以在这里处理超时问题 notifyTimeout(sequence.get()); }catch (final AlertException ex){ //running 的状态值非 RUNNING,就会退出死循环 if (running.get() != RUNNING){ break; } }catch (final Throwable ex){ //如果当前的消费者实现了 ExceptionHandler 接口的话,可以在此处进行错误处理 exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }3.2 WaitStrategy
回到上述代码的以下这句:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
追踪一下 waitFor(...) 方法:
//ProcessingSequenceBarrier.class public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ //如果变量 alert 为 true 的话会抛出错误 checkAlert(); //调用等待策略的相关方法 //返回最新的事件的编号 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); //如果当前可用的最新事件编号小于传入的 sequence,就直接返回可用编号即可 if (availableSequence < sequence){ return availableSequence; } //getHighestPublishedSequence(...) 方法会判断最大的可用的事件编号 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
等待策略的所有实现类都实现了 WaitStrategy 接口:
public interface WaitStrategy{ //休眠方法 long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; //唤醒方法 void signalAllWhenBlocking(); }
Disruptor 自带的策略中,常用的有以下几种:
阻塞策略 BlockingWaitStrategy:默认策略,没有获取到任务的情况下线程会进入等待状态。cpu 消耗少,但是延迟高。 阻塞限时策略 TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常。 自旋策略 BusySpinWaitStrategy:线程一直自旋等待。cpu 占用高,延迟低. Yield 策略 YieldingWaitStrategy:尝试自旋 100 次,然后调用 Thread.yield() 让出 cpu。cpu 占用高,延迟低。 分段策略 SleepingWaitStrategy:尝试自旋 100 此,然后调用 Thread.yield() 100 次,如果经过这两百次的操作还未获取到任务,就会尝试阶段性挂起自身线程。此种方式是对 cpu 占用和延迟的一种平衡,性能不太稳定。
还有几种譬如 PhasedBackoffWaitStrategy 和 LiteBlockingWaitStrategy 等,不多介绍。
详细看一下 BlockingWaitStrategy 的实现:
public final class BlockingWaitStrategy implements WaitStrategy{ //重入锁 private final Lock lock = new ReentrantLock(); //Condition 用来控制线程的休眠和唤醒 private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence; if (cursorSequence.get() < sequence){ //上锁 lock.lock(); try{ while (cursorSequence.get() < sequence){ //检查线程是否中断了,如果已经中断了就会抛出异常 barrier.checkAlert(); //休眠线程 processorNotifyCondition.await(); } }finally{ //解锁 lock.unlock(); } } //生产者进度小于消费者的消费进度,此循环进行等待 //正常情况下都会在上方阻塞,不会进入该循环 while ((availableSequence = dependentSequence.get()) < sequence){ barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; } @Override public void signalAllWhenBlocking(){ lock.lock(); try{ //用 Condition 唤醒全部的线程 processorNotifyCondition.signalAll(); }finally{ lock.unlock(); } } //toString() 方法,忽略 @Override public String toString(){ return "BlockingWaitStrategy{" + "processorNotifyCondition=" + processorNotifyCondition + "}"; } }3.3 DataProvider
回到上述代码的以下这句:
event = dataProvider.get(nextSequence);
dataProvider 是一个 DataProvider 类型的变量。DataProvider 本质上是一个 Disruptor 内的接口:
public interface DataProvider{ T get(long sequence); }
其存在唯一实现类 RingBuffer。所以 get(...) 方法也在 RingBuffer 中:
//step 1 //RingBuffer.class @Override public E get(long sequence){ //elementAt(...) 方法定义在 RingBuffer 的抽象父类 RingBufferFields 中 return elementAt(sequence); } //step 2 //RingBufferFields.class protected final E elementAt(long sequence){ //调用 UNSAFE 的相关方法,通过地址去直接获取 //entries 在上文代码中申请了一系列地址连续的内存 //REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT) 是一个很巧妙的算法,结果永远只会在申请下来的内存中循环 return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }
由此可知,Disruptor 中的所有的事件都非存储在虚拟机中,而是储存在虚拟机外,由 Unsafe 类直接调用。
Unsafe 具有 "调用内存对象很快,但是申请内存块很慢" 的特性,所以也就可以解释为什么在初始化的时候要一次性将储存 Event 的数组进行逐个初始化了(代码在上述 1.2 小节的 step 4 中)。
有一个注意点,entries 上的元素实际上是在 jvm 管辖范围内的,并不一定需要使用 Unsafe 去调用,这里只是为了更高的性能。
三 Event 的产生在开头的 demo 中,可以看到 LongEventProducer 中有一个核心方法:
//LongEventProducer.class public void onData(ByteBuffer bb) { //sequence 是 RingBuffer 中的一个数据块,类似于一个数据地址 long sequence = ringBuffer.next(); try { //用数据地址去获取到一个 Event 事件类实例 LongEvent event = ringBuffer.get(sequence); //在实例中存入 ByteBuffer 中的数据 event.set(bb.getLong(0)); } finally { //发布该数据块,此时消费者们都可以看到该数据块了,可以进行消费 ringBuffer.publish(sequence); } }
这个方法内通过调用 ringBuffer.next() 方法获取数组内对象的地址,然后通过 ringBuffer.get(...) 方法获取对象。
在 finally 代码块中调用 ringBuffer.publish(...) 方法去发布该信息。
1 next回到上述代码的以下这句:
long sequence = ringBuffer.next();
追踪 next() 方法:
//step 1 //RingBuffer.class @Override public long next(){ //调用 RingBuffer 内的 MultiProducerSequencer 的相关方法 return sequencer.next(); } //step 2 //MultiProducerSequencer.class @Override public long next(){ //调用自身的相关方法 return next(1); } //step 3 //MultiProducerSequencer.class @Override public long next(int n){ //参数有效性验证,此处 n = 1 if (n < 1){ throw new IllegalArgumentException("n must be > 0"); } long current; long next; //死循环 do{ //current 是当前最新的事件编号 current = cursor.get(); //此处为 current + 1,用作下一个事件的编号 next = current + n; //wrapPoint 是事件编号和数组大小的差 long wrapPoint = next - bufferSize; //gatingSequenceCache 的设计很巧妙,它是一个 Sequence 类型的变量,可以看做是一个 long 整数 //gatingSequenceCache 的存在意义是每隔一段时间去检查一次消费者的处理进度 //gatingSequenceCache 在每次检查进度的时候都会更新成 "当前处理最慢的消费者已经处理完成的事件编号" //处理逻辑在下方 if 判断中 long cachedGatingSequence = gatingSequenceCache.get(); //cachedGatingSequence > current 的情况就不会发生,因为 cachedGatingSequence 是消费者处理进度,current 是目前的事件总编号,所以最多相等 //在消费者算力充足的情况下,cachedGatingSequence 会和 current 相等 //wrapPoint > cachedGatingSequence 的情况,在极端情况下可能是因为生产者的速度太快了,已经远超过最慢的那个消费者,超过了 "一圈"(即 bufferSize 的大小) //此处可以这么理解,由于 RingBuffer 内数组的大小是有限的,如果事件生产的多了,就会覆盖掉最开始的几个事件 //但是如果消费者的进度没有跟上,来不及消费就被覆盖了,就造成了 bug,此处即为抑制策略 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){ //getMinimumSequence(...) 方法会获取当前处理事件最慢的那个消费者的处理位置 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); //wrapPoint - gatingSequence = next - bufferSize - gatingSequence >0 //即 next > bufferSize + gatingSequence,落后了 "一圈" if (wrapPoint > gatingSequence){ //线程挂起 1 纳秒,然后跳过本次循环进行下一次循环 //此处会陷入死循环,阻塞掉生产者,去等待消费者的进度 LockSupport.parkNanos(1); continue; } //跳出上述循环之后在这里更新 gatingSequenceCache 的值 gatingSequenceCache.set(gatingSequence); }else if (cursor.compareAndSet(current, next)){ //如果消费者的进度正常,那么会在此用 CAS 操作更新 cursor 的值,并且跳出 while 循环 break; } }while (true); //返回 return next; }
在线程池(比如笔者比较了解的 ThreadPoolExecutor)的实现中,对于 task 过多,溢出等待队列的情况,一般会有一种策略去应对。在 ThreadPoolExecutor 中,默认的策略为抛出错误,直接终止程序。
在 Disruptor 中,其实 RingBuffer 就类似一个等待队列,溢出策略则是暂停 task 的产生,等待线程池去执行。
【此处仅为类比,不能简单的把 Disruptor 想成是一个线程池】
2 publishringBuffer.publish(...) 是事件发布的核心方法:
//step 1 //RingBuffer.class @Override public void publish(long sequence){ sequencer.publish(sequence); } //step 2 //MultiProducerSequencer.class @Override public void publish(final long sequence){ //此处更新数据 setAvailable(sequence); //此处调用等待策略的 signalAllWhenBlocking() 方法唤醒所有等待的线程 //具体实现依照 waitStrategy 的不同而不同 waitStrategy.signalAllWhenBlocking(); } //step 3 //MultiProducerSequencer.class private void setAvailable(final long sequence){ //calculateAvailabilityFlag(sequence) 可以简单理解为是计算出的圈数,即 (sequence / bufferSize) //calculateIndex(sequence) 会计算出新的 sequence 对应组中的哪一个位置,即 (sequence % bufferSize) setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } //step 4 //MultiProducerSequencer.class private void setAvailableBufferValue(int index, int flag){ //SCALE 是本机 Object[] 引用的大小,一般为 4 long bufferAddress = (index * SCALE) + BASE; //使用 Unsafe 更新元素 //availableBuffer 是一个 int 数组,大小为 bufferSize,即和 entries 相同 //Unsafe.putOrderedInt(...) 会将 availableBuffer 的指定位置(bufferAddress)的元素修改成 flag UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }四 一点唠叨
· Disruptor 的封装很薄(比起 Netty、Spring 之类的重量级框架),调用链路都相对较短
· Disruptor 的环装缓存区(RingBuffer)的很多概念还有待理解
· 对于笔者这样的数学苦手来说看底层算法代码略头疼
· 仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73243.html
摘要:纯分享直接上干货操作系统并发支持进程管理内存管理文件系统系统进程间通信网络通信阻塞队列数组有界队列链表无界队列优先级有限无界队列延时无界队列同步队列队列内存模型线程通信机制内存共享消息传递内存模型顺序一致性指令重排序原则内存语义线程 纯分享 , 直接上干货! 操作系统并发支持 进程管理内存管...
摘要:结合之前的线程快照,我发现这个消费线程也是处于状态,和后面的业务线程池一模一样。本地模拟本地也是创建了一个单线程的线程池,分别执行了两个任务。发现当任务中抛出一个没有捕获的异常时,线程池中的线程就会处于状态,同时所有的堆栈都和生产相符。 showImg(https://segmentfault.com/img/remote/1460000018482477); 背景 事情(事故)是这样...
摘要:我们知道是一个队列,生产者往队列里发布一项事件或称之为消息也可以时,消费者能获得通知如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。实战本文先不具体去阐述的工作具体原理,只是简单地将与其整合。 什么是Disruptor 从功能上来看,Disruptor 是实现了队列的功能,而且是一个有界队列。那么它的应用场景自然就是生产者-消费者模型的应用场合了。可以拿 JDK 的 Block...
摘要:发现这是的一个堆栈,前段时间正好解决过一个由于队列引起的一次强如也发生内存溢出没想到又来一出。因此初步判断为大量线程执行函数之后互相竞争导致使用率增高,而通过对堆栈发现是和使用有关。 showImg(https://segmentfault.com/img/remote/1460000017395756?w=1816&h=1080); 前言 到了年底果然都不太平,最近又收到了运维报警:...
摘要:结合的日志发现就算是发生了老年代也已经回收不了,内存已经到顶。定位由于生产上的内存文件非常大,达到了几十。也是由于我们的内存设置太大有关。同时后台也开始打印内存溢出了,这样便复现出问题。结果发现类型的对象占用了将近的内存。 showImg(https://segmentfault.com/img/remote/1460000016186784?w=2048&h=1365); 前言 Ou...
阅读 4157·2023-04-26 02:40
阅读 2658·2023-04-26 02:31
阅读 2749·2021-11-15 18:08
阅读 571·2021-11-12 10:36
阅读 1425·2021-09-30 09:57
阅读 5193·2021-09-22 15:31
阅读 2628·2019-08-30 14:17
阅读 1271·2019-08-30 12:58