首页
登录 | 注册

Disruptor 详解 一

这篇博客将主要通过几个示例,简单讲述 Disruptor 的使用方法;

一、disruptor 简介

Disruptor 是英国外汇交易公司 LMAX 开发的一个无锁高性能的线程间消息传递的框架。目前包括 Apache Storm、Camel、Log4j2 等知名项目都是用了 Disruptor;

因为 Disruptor 中的一个很重要的结构 RingBuffer 和 JDK 中的 ArrayBlockingQueue 很相似,其内部都是一个环形数组,所以经常将他们放在一起比较,以下是官网公布测试结果

Disruptor 详解 一

从图中可以明显看到他们之间性能的巨大差异;

此外在使用 Disruptor 的项目中也能看到其性能的差异,例如 Log4j

Disruptor 详解 一

其中 Loggers all async 采用的是 Disruptor,Async Appender 采用的是 ArrayBlockingQueue, Sync 是同步模式;从图中可以看到,线程越多竞争越激烈的时候 Disruptor 的性能优势越明显,其原因很很容易想到,因为 ArrayBlockingQueue 的进出由同一把锁控制,所以竞争对其性能有巨大的影响;

此外我的笔记本配置为 “i7-8550U 8G”,使用的版本为:

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

二、ArrayBlockingQueue 性能对比

以下通过一个单线程的 demo,演示Disruptor 的基本用法,并个 ArrayBlockingQueue 做简单对比;

public class Contrast {
  public static final int count = 50000000;
  public static final int size = 1024;
  private static CountDownLatch latch = new CountDownLatch(1);

  public void testDisruptor() throws InterruptedException {
    long start = System.currentTimeMillis();
    final Disruptor<Event> disruptor = new Disruptor<>(
        () -> new Event(),             // 绑定事件工厂,主要用于初始化 RingBuffer
        size,                          // RingBuffer 大小
        DaemonThreadFactory.INSTANCE,  // 指定生产者线程工厂,也可以直接传入线程池
        ProducerType.SINGLE,           // 指定生产者为单线程,也支持多线程模式
          new YieldingWaitStrategy()   // 等待策略
//        new BlockingWaitStrategy()
    );

    Handler handler = new Handler();
    disruptor.handleEventsWith(handler);  // 绑定事件处理程序
    disruptor.start();

    RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();  // 开始之后 RingBuffer 的所有位置就已经初始化完成
    for (int i = 0; i < count; i++) {
      long seq = ringBuffer.next();       // 获取下一个放置位置
      Event event = ringBuffer.get(seq);  // 等到指定位置的槽
      event.seId(i);                      // 更新事件,注意这里是更新,不是放入新的,所以不会有 GC 产生
      ringBuffer.publish(seq);            // 发布事件
    }

    latch.await();
    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  private void testQueue() throws InterruptedException {
    long start = System.currentTimeMillis();
    final BlockingQueue<Event> queue = new ArrayBlockingQueue<>(size);
    new Thread(() -> {
      for (int i = 0; i < count; i++) {
        try {
          queue.put(new Event(i));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    new Thread(() -> {
      for (int i = 0; i < count; i++) {
        try {
          Event event = queue.take();
          if (i == count - 1) {
            System.out.println("last: " + event.getLogId());
            latch.countDown();
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    latch.await();
    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  class Event {
    private long id;
    Event() {}
    Event(long id) { this.id = id; }
    public long getLogId() { return id; }
    public void seId(int id) { this.id = id; }
  }

  class Handler implements EventHandler<Event> {
    private int i = 0;

    @Override
    public void onEvent(Event event, long seq, boolean bool) {
      if (++i == count) {
        System.out.println("last: " + event.getLogId());
        latch.countDown();
      }
    }
  }

  public static void main(String[] args) throws InterruptedException {
    Contrast contrast = new Contrast();
    contrast.testDisruptor();
//    contrast.testQueue();
  }
}

Disruptor-YieldingWaitStrategy: 919
Disruptor-BlockingWaitStrategy: 3142
ArrayBlockingQueue : 4307

其中 BlockingWaitStrategy 等待策略和 ArrayBlockingQueue 大致相识

三、多消费者

上面的例子在使用多个消费这时,会出现重复消费的情况,如果想要一条消息只消费一次,可以参照下面的代码:

public class MoreConsumer {
  public static final int count = 5000;
  public static final int size = 16;

  public void testDisruptor() {
    long start = System.currentTimeMillis();
    final Disruptor<Event> disruptor = new Disruptor<>(
        () -> new Event(),
        size, DaemonThreadFactory.INSTANCE,
        ProducerType.SINGLE,
        new BlockingWaitStrategy()
    );

    disruptor.handleEventsWithWorkerPool(new Handler("h1"), new Handler("h2"), new Handler("h3"));
    disruptor.start();

    RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
    for (int i = 0; i < count; i++) {
      long seq = ringBuffer.next();
      Event event = ringBuffer.get(seq);
      event.id = i;
      ringBuffer.publish(seq);
    }

    System.out.println("time: " + (System.currentTimeMillis() - start));
  }

  class Event { public long id; }

  class Handler implements WorkHandler<Event> {
    private String name;

    Handler(String name) { this.name = name; }
      
    @Override
    public void onEvent(Event event) { System.out.println(name + ": " + event.id); }
  }

  public static void main(String[] args) {
    MoreConsumer moreConsumer = new MoreConsumer();
    moreConsumer.testDisruptor();
  }
}

如上面的代码所示使用 WorkHandler 即可,同时还需要注意选择等待策略,策略不同也可能导致重复消费的问题,同时官网也只出需要在代码里面保证重复消费问题;

四、复杂业务逻辑

很多也业务逻辑会出现以下的类似情况,第三个消费者,需要等待前面的任务完成后才能继续执行的情况;通常我们会使用锁、同步工具以及一些其他的方式,但都显得比较麻烦,而且效率比较低,这里如果我们使用 Disruptor 就能很方便的解决;

Disruptor 详解 一

disruptor.handleEventsWith(c1Handler, c2Handler);
disruptor.after(c1Handler, c2Handler).handleEventsWith(c3Handler);

如此仅需两行代码,就能将上面的关系表述清楚,对于更复杂的情况同样;

对于更多的使用技巧就需要你根据实际情况分析了,下一篇博客将主要分析 Disruptor 为什么会那么快;


相关文章

  • 详解linux进程间通信-消息队列
    前言:前面讨论了信号.管道的进程间通信方式,接下来将讨论消息队列. 一.系统V IPC 三种系统V IPC:消息队列.信号量以及共享内存(共享存储器)之间有很多相似之处. 每个内核中的 I P C结构(消息队列.信号量或共享存储段)都用一个 ...
  • 详解linux进程间通信-管道 popen函数 dup2函数
    前言:进程之间交换信息的唯一方法是经由f o r k或e x e c传送打开文件,或通过文件系统.本章将说明进程之间相互通信的其他技术—I P C(InterProcess Communication).今天将介绍半双工的管道. 一.匿名管 ...
  • python接口自动化(二十三)--unittest断言——上(详解)
    简介 在测试用例中,执行完测试用例后,最后一步是判断测试结果是 pass 还是 fail,自动化测试脚本里面一般把这种生成测试结果的方法称为断言(assert).用 unittest 组件测试用例的时候,断言的方法还是很多的,下面介绍几种常 ...
  • python接口自动化(二十一)--unittest简介(详解)
    简介 前边的随笔主要介绍的requests模块的有关知识个内容,接下来看一下python的单元测试框架unittest.熟悉 或者了解java 的小伙伴应该都清楚常见的单元测试框架 Junit 和 TestNG,这个招聘的需求上也是经常见到 ...
  • 问题引入 在ASP.NET Core - 依赖注入这篇文章里面,我们知道了如何利用ASP.NET Core原生的容器来实现依赖注入的,那我们为什么要替换掉默认的 IoC容器呢?从ASP.NET Core - 依赖注入这篇文章来看的话,默认的 ...
  • SpringBoot集成Lombok,应用+源码解析,让代码优雅起来
    一.Lombok简介 (1)Lombok官网(https://projectlombok.org/)对lombok的介绍 (2)GitHub项目地址:https://github.com/rzwitserloot/lombok 虽然是生硬的 ...

2019 cecdns.com webmaster#cecdns.com
12 q. 0.081 s.
京ICP备10005923号