#Java 使用 Disruptor 并发框架

#一、什么是 Disruptor

Disruptor 是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的 JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。能够在无锁的情况下实现网络的 Queue 并发操作。Disruptor 使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现 queue(环形, RingBuffer) 的并发操作, 性能远高于 BlockingQueue。

#二、Disruptor 的优化策略

  1. 环形数组结构
    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  2. 元素位置定位
    数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。
  3. 无锁设计
    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

参考 : https://xie.infoq.cn/article/fe04e42cd473982d7d901b2c8

  • cas代替锁

  • 独占缓存行 缓存行填充

  • 环形队列 2的n次方 ,就是将取模转变为取与运算。 m % 2^n = m & ( 2^n - 1 )

  • 预分配内存, 重用

#二、在 java 中使用

  1. pom 引入依赖
  	    <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.2.1</version>
        </dependency>
  1. 为了方便操作 bean,引入 lombok 框架
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
  1. 定义消息 bean
@Data
public class Msg {
    private int code;
    private String msg;
}
  1. 定义消费者
public class ConsumerOne implements EventHandler<Msg> {
    public void onEvent(Msg msg, long l, boolean b) throws Exception {
        System.out.println("ConsumerOne - 消费者1 :="+msg);
    }
}
public class ConsumerTwo implements EventHandler<Msg> {
    public void onEvent(Msg msg, long l, boolean b) throws Exception {
        System.out.println("ConsumerTwo - 消费者2 :="+msg);
    }
}
  1. 定义工厂
public class MsgEventFactory implements EventFactory<Msg> {
	public Msg newInstance() {
		return new Msg();
	}
}
  1. 提供者
public class MsgProducer {

    private RingBuffer<Msg> ringBuffer;
    private ExecutorService executor;
    private  Disruptor<Msg> disruptor;
    public MsgProducer() {
        init();
    }

    private void init() {
        executor = Executors.newCachedThreadPool();

        EventFactory<Msg> factory = new MsgEventFactory();
        // 2的N次方。
        int ringbuffer = 1024 * 1024;

        //创建disruptor对象
        disruptor = new Disruptor<Msg>(factory, ringbuffer, executor, ProducerType.MULTI, new YieldingWaitStrategy());

        //注册消费者
        disruptor.handleEventsWith(new ConsumerOne());
        disruptor.handleEventsWith(new ConsumerTwo());
        //启动
        disruptor.start();

        RingBuffer<Msg> ringBuffer = disruptor.getRingBuffer();

        this.ringBuffer = ringBuffer;
    }

    public boolean Public(int code,String message){
        long sequence =ringBuffer.next();
        try {
            Msg msg = ringBuffer.get(sequence);
            msg.setMsg(message);
            msg.setCode(code);
            ringBuffer.publish(sequence);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public void Close(){
        executor.shutdown();
        disruptor.shutdown();
    }
}
  1. 测试
public class Test {
    public static void main(String[] args) {
        MsgProducer producer = new MsgProducer();
        producer.Public(200,"哈哈");
        producer.Public(200,"哈哈22");
        producer.Public(200,"哈哈22哈哈");
        producer.Close();
    }
}

原文地址 blog.csdn.net