金点网络-全网资源,一网打尽
  • 网站首页
    • 金点部落
    • 小游戏
    • OpenAPI
    • 设计资产导航
    • 升级会员
  • 技能学习
    • 体育运动
    • 办公教程
    • 口才演讲
    • 小吃技术
    • 建站教程
    • 摄影教程
    • 棋牌教程
    • 网赚教程
      • 爆粉引流
      • 自媒体
      • 贴吧引流
  • 网站源码
    • 商城/淘客/交易
    • 小说/漫画/阅读
    • 影视/音乐/视频
    • 微信/微商/微擎
    • 理财/金融/货币
    • 模板/主题/插件
  • 游戏源码
    • 精品网单
    • 端游源码
    • 手游源码
    • 页游源码
  • 素材资料
    • 电子文档
    • 综合资料
    • 考研资料
    • 设计素材
    • 音频讲座
      • 人文艺术
      • 名师讲座
      • 说书小说
  • 软件工具
    • Windows软件
    • MacOS软件
    • Android软件
  • 寄售资源
    • 游戏源码
    • 网站源码
    • 软件源码
  • 公益服
登录/注册
  • 专享大神特权
立即开通开通会员抄底价
  • 最近更新:2020年10月18日

Java 实战系列·高性能无锁队列 Disruptor

1786。
2020-10-18 jamin 已收录 已售172次 关注1778次
免费 优惠信息:免费大神特权 该资源永久大神免费 去升级
登录后下载 演示地址 QQ咨询
  • 安全无毒
  • 持续更新
  • 支持二开
  • 云盘备份

特别声明:原创产品提供以上服务,破解产品仅供参考学习,不提供售后服务(均已杀毒检测),如有需求,建议购买正版!如果源码侵犯了您的利益请留言告知!如何获得 金豆

  • 正文概述
  • 售后服务
  • 高性能无锁队列 Disruptor

    Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题,因其出色的性能表现获得 2011 Duke’s 程序框架创新奖。

    A High Performance Inter-Thread Messaging Library
    项目地址:LMAX Disruptor

    介绍

    从数据结构上来看,Disruptor 是一个支持生产者/消费者模式的环形队列。能够在无锁的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后消费次序。

    Disruptor 高效原理:

    1. Disruptor 使用了一个 RingBuffer 替代队列,用生产者消费者指针替代锁。
    2. 生产者消费者指针使用 CPU 支持的整数自增,无需加锁并且速度很快。Java 的实现在 Unsafe package 中。

    消费者的等待策略

    名称 措施 适用场景
    BlockingWaitStrategy 加锁 CPU 资源紧缺,吞吐量和延迟并不重要的场景
    BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用
    PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU 资源紧缺,吞吐量和延迟并不重要的场景
    SleepingWaitStrategy 自旋 + yield + sleep 性能和 CPU 资源之间有很好的折中。延迟不均匀
    TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU 资源紧缺,吞吐量和延迟并不重要的场景
    YieldingWaitStrategy 自旋 + yield + 自旋 性能和 CPU 资源之间有很好的折中。延迟比较均匀
    名称 适用场景
    BlockingWaitStrategy CPU 资源紧缺,吞吐量和延迟并不重要的场景
    BusySpinWaitStrategy 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的 CPU 的场景下使用
    PhasedBackoffWaitStrategy CPU 资源紧缺,吞吐量和延迟并不重要的场景
    SleepingWaitStrategy 性能和 CPU 资源之间有很好的折中。延迟不均匀
    TimeoutBlockingWaitStrategy CPU 资源紧缺,吞吐量和延迟并不重要的场景
    YieldingWaitStrategy 性能和 CPU 资源之间有很好的折中。延迟比较均匀

    食用方式

    引入依赖

    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.4.2</version>
    </dependency>
    

    命令字和数据包

    /**
     * @author Nicestar
     * @description 无锁队列命令字
     * @since 2020-06-13
     */
    public interface IDisruptorCommand {
    
        /**
         * 测试消息 hello
         */
        int CHECK_MSG_HELLO = 1;
    
        /**
         * 测试消息 hi
         */
        int CHECK_MSG_HI = 2;
    
    }
    
    /**
     * @author Nicestar
     * @description 传输的数据
     * @since 2020-06-13
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class TranslatorDataWrapper {
    
        private int command;
    
        private Object target;
    
    }
    

    轮询策略

    /**
     * @author Nicestar
     * @description 轮询策略
     * @since 2020-06-13
     */
    @Configuration
    public class DisruptorWaitStrategyConfiguration {
    
        @Bean
        @ConditionalOnMissingBean(WaitStrategy.class)
        public WaitStrategy getWaitStrategy() {
            // 如果 CPU 比较叼的话,可以用 YieldingWaitStrategy
            return new BlockingWaitStrategy();
        }
    
    }
    

    生成者和消费者

    /**
     * @author Nicestar
     * @description 消息生产者
     * @since 2020-06-13
     */
    @Data
    @Slf4j
    @AllArgsConstructor
    public class MessageProducer {
    
        private RingBuffer<TranslatorDataWrapper> ringBuffer;
    
        /**
         * 发布事件
         *
         * @param command 命令字
         * @param object 数据
         */
        public void publish(int command, Object object) {
            long sequence = ringBuffer.next();
            try {
                TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
                wrapper.setCommand(command);
                wrapper.setTarget(object);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    
    }
    
    /**
     * 消息消费者
     *
     * @author nk
     */
    @Slf4j
    public class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
    
        @Override
        public void onEvent(TranslatorDataWrapper wrapper) {
            int command = wrapper.getCommand();
            switch (command) {
                case IDisruptorCommand.CHECK_MSG_HELLO:
                    log.info("消费消息 =============== hello");
                    break;
                case IDisruptorCommand.CHECK_MSG_HI:
                    log.info("消费消息 =============== hi");
                    break;
                default:
                    break;
            }
        }
    
    }
    

    构造工厂

    disruptor.buffer.size 这里设置为 1024 * 1024 即 1048576。

    disruptor:
      buffer:
        size: 1048576
    
    /**
     * @author Nicestar
     * @description 环型无锁队列
     * @since 2020-06-13
     */
    @Slf4j
    @Component
    @RequiredArgsConstructor(onConstructor = @__(@Autowired))
    public class RingBufferWorkerPoolFactory {
    
        @Value("${disruptor.buffer.size}")
        private int mBufferSize;
    
        private final WaitStrategy mWaitStrategy;
    
        private Map<Integer, MessageProducer> producers = new ConcurrentHashMap<>();
    
        private RingBuffer<TranslatorDataWrapper> ringBuffer;
    
        public void initAndStart(MessageConsumer[] messageConsumers) {
            // 1.构建 ringBuffer 对象
            this.ringBuffer = RingBuffer.create(ProducerType.MULTI,
                    TranslatorDataWrapper::new,
                    mBufferSize,
                    mWaitStrategy);
            // 2.通过 ringBuffer 创建一个屏障
            SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier();
            // 3.创建多个消费者数组
            WorkerPool<TranslatorDataWrapper> workerPool = new WorkerPool<>(
                    this.ringBuffer,
                    sequenceBarrier,
                    new EventExceptionHandler(),
                    messageConsumers);
            // 4.设置多个消费者的 sequence 序号,用于单独统计消费进度,并且设置到 ringBuffer 中
            this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
            // 5.启动工作池
            int processorsCount = Runtime.getRuntime().availableProcessors();
            log.info("进程数 -> {}", processorsCount);
            workerPool.start(Executors.newFixedThreadPool(processorsCount));
        }
    
        public MessageProducer getMessageProducer(int command) {
            MessageProducer messageProducer = producers.get(command);
            if (messageProducer == null) {
                messageProducer = new MessageProducer(this.ringBuffer);
                producers.put(command, messageProducer);
            }
            return messageProducer;
        }
    
        /**
         * 异常静态类
         */
        @Slf4j
        static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
    
            @Override
            public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
                log.error("handleEventException -> ex:{}  sequence:{} event:{}", ex.getMessage(), sequence, event.getClass().toString());
                ex.printStackTrace();
            }
    
            @Override
            public void handleOnStartException(Throwable ex) {
                log.error("handleOnStartException -> ex:{}", ex.getMessage());
                ex.printStackTrace();
            }
    
            @Override
            public void handleOnShutdownException(Throwable ex) {
                log.error("handleOnShutdownException -> ex:{} ", ex.getMessage());
                ex.printStackTrace();
            }
        }
    
    }
    
    public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final ThreadFactory threadFactory,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
    {
        this(
            RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
    }
    
    • eventFactory:在环形缓冲区中创建事件的 factory;
    • ringBufferSize:环形缓冲区的大小,必须是 2 的幂;
    • threadFactory:用于为处理器创建线程;
    • producerType:生成器类型以支持使用正确的 sequencer 和 publisher 创建 RingBuffer;枚举类型,SINGLE、MULTI 两个项。对应于 SingleProducerSequencer 和 MultiProducerSequencer 两种 Sequencer;
    • waitStrategy:等待策略;

    启动

    public static void main(String[] args) {
        SpringApplication.run(YukoApplication.class, args);
    
        // 启动 disruptor
        MessageConsumer[] consumers = new MessageConsumer[8];
        for (int i = 0; i < consumers.length; i++) {
            MessageConsumer messageConsumer = new MessageConsumer();
            consumers[i] = messageConsumer;
        }
        RingBufferWorkerPoolFactory factory = SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
        factory.initAndStart(consumers);
    }
    

    测试消息生产消费

    private RingBufferWorkerPoolFactory getWorkerPoolFactory() {
        return SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
    }
    
    @Scheduled(fixedDelay = 1000, initialDelay = 3000)
    private void msg() {
        IntStream.range(1, 9).forEach(i -> {
            int command = i % 2 == 0 ? IDisruptorCommand.CHECK_MSG_HELLO : IDisruptorCommand.CHECK_MSG_HI;
            TranslatorDataWrapper wrapper = new TranslatorDataWrapper(command, "WORLD");
            MessageProducer messageProducer = getWorkerPoolFactory().getMessageProducer(command);
            messageProducer.publish(command, wrapper);
        });
    }
    
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-1] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-7] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-6] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-3] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-2] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-4] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hi
    2020-06-13 09:45:09.404  INFO 21580 --- [pool-1-thread-5] c.c.y.d.consumer.MessageConsumer  : 消费消息 =============== hello
    

    一些方案

    规避数据覆盖

    使用 Disruptor,首先需要构建一个 RingBuffer,并指定一个大小,注意如果 RingBuffer 里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但 Disruptor 提供了检查 RingBuffer 是否写满的机制用于规避这个问题。

    // if capacity less than 10%, don't use ringbuffer anymore
    if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {
        log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");
        return;
    }
    // Publishers claim events in sequence
    long sequence = ringBuffer.next();
    try {
        TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
        wrapper.setCommand(command);
        wrapper.setTarget(object);
    } finally {
        ringBuffer.publish(sequence);
    }
    

    Bless Bless!

    参考文章:
    高性能队列 Disruptor 的使用
    蚂蚁金服分布式链路跟踪组件 SOFATracer 中 Disruptor 实践

    Atom ExecutorService ForkJoin Java ReentrantLock synchronized ThreadLocal volatile 后台
    本站所提供的部分资源来自于网络,版权争议与本站无关,版权归原创者所有!仅限用于学习和研究目的,不得将上述内容资源用于商业或者非法用途,否则,一切后果请用户自负。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源。如果上述内容资对您的版权或者利益造成损害,请提供相应的资质证明,我们将于3个工作日内予以删除。本站不保证所提供下载的资源的准确性、安全性和完整性,源码仅供下载学习之用!如用于商业或者非法用途,与本站无关,一切后果请用户自负!本站也不承担用户因使用这些下载资源对自己和他人造成任何形式的损失或伤害。如有侵权、不妥之处,请联系站长以便删除!
    金点网络-全网资源,一网打尽 » Java 实战系列·高性能无锁队列 Disruptor

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。
    是否提供免费更新服务?
    持续更新,永久免费
    是否经过安全检测?
    安全无毒,放心食用
    jamin

    jamin 大神

    下一篇
    网络协议-组播介绍

    相关推荐

    传奇【免虚拟机一键端】复古传奇莽荒纪+补丁原版画质+视频教程

    传奇【免虚拟机一键端】复古传奇莽荒纪+补丁原版画质+视频教程

    梦幻西游【免虚拟机一键端】孙小空西游5.0优化版

    梦幻西游【免虚拟机一键端】孙小空西游5.0优化版

    全民突击H5 页游【win手工端】【免虚拟机一键端】

    全民突击H5 页游【win手工端】【免虚拟机一键端】

    Java 并发编程·ReadWriteLock

    Java 并发编程·ReadWriteLock

    梦幻修仙2 一键即玩端 电脑架设 网页游戏 回合制 单机 非联网

    梦幻修仙2 一键即玩端 电脑架设 网页游戏 回合制 单机 非联网

    标签云
    Android Atom ExecutorService ForkJoin GM GM后台 GM授权后台 H5 Java Javascript Linux手工服务端 pipbestcom Python ReentrantLock synchronized ThreadLocal volatile Win一键即玩服务端 一键端 传奇 写作 创业 单机 后台 商业端 外网 安卓 安卓苹果双端 工具 手工端 手游 搭建教程 教程 数据分析 文案 游戏源码 端游 经典 网单 职场 自媒体 视频教程 详细搭建教程 运营后台 页游

    近期文章

    • 回合手游【逍遥西游之繁华西游】最新整理单机一键既玩镜像服务端_Linux手工端_GM后台_教程
    • 最新整理精品回合制手游【天书奇谈3D混沌完整版】VM一键单机版_linux手工外网端_隐盟视频教程_授权GM后台_双端
    • 典藏修真页游【诸仙列传OL】最新整理Win系服务端_GM工具_详细外网搭建教程
    • MT3换皮MH【浮生若梦尊享挂机修复版】最新整理单机一键即玩镜像端_Linux手工服务端_安卓苹果双端_GM后台_详细搭建教程
    • 大话回合手游【最新引擎之缥缈西游渡劫版】最新整理Linux手工服务端_安卓苹果双端_管理后台_CDK后台_详细搭建教程_视频教程

    分类

    • | wordpress插件 |
    • | wordpress模板 |
    • | 其它模板 |
    • | 帝国模板 |
    • | 织梦插件 |
    • | 织梦模板 |
    • A5源码
    • Android软件
    • APP引流
    • E语言
    • H5
    • LUA
    • QQ营销
    • SEO推广
    • Windows软件
    • 体育运动
    • 信息数据
    • 创业专题
    • 办公教程
    • 口才演讲
    • 名师讲座
    • 商城/淘客/交易
    • 小吃技术
    • 小说/漫画/阅读
    • 建站教程
    • 引流脚本
    • 影视/音乐/视频
    • 影视资源
    • 微信/微商/微擎
    • 微信小程序
    • 微信营销
    • 微擎模块
    • 手游源码
    • 技能学习
    • 抖音课程
    • 摄影教程
    • 棋牌教程
    • 模板/主题/插件
    • 游戏源码
    • 爆粉引流
    • 理财/金融/货币
    • 生活老师
    • 电商客
    • 电子文档
    • 电脑教程
    • 社群营销
    • 站长工具
    • 精品网单
    • 系统工具
    • 素材资料
    • 综合资料
    • 编程经验
    • 网站源码
    • 网络安全
    • 网赚教程
    • 网赚源码
    • 考研资料
    • 脚本/AI/智能
    • 自媒体
    • 英语学习
    • 营销软件
    • 设计素材
    • 说书小说
    • 贴吧引流
    • 软件工具
    • 软文营销
    • 逆向软件
    • 音频讲座
    • 页游源码

    提供最优质的资源集合

    立即加入 友好社区
    金点网络-全网资源,一网打尽

    新一代全网资源综合门户网(www.pipbest.com-金点网络)专注服务于互联网,提供各类最新最全的免费源码下载(PHP、ASP、JSP、.NET),更提供免费工具,免费源码下载,软件下载,素材下载,赚钱教程下载,交流论坛等网站运营相关的一切内容,为网友搜罗最有价值的网站源码下载与技术教程等服务!

    服务目录
    • 金点OpenAPI
    • 金点云
    • 金点支付
    友情链接
    • 数媒派
    • 国家电网
    快速搜索

    本站由Nice强力驱动

    声明: 本站部分内容属于原创转载请注明出处 如有侵权行为请严格参照本站【版权声明】与我们联系,我们将在48小时内容进行处理!

    本站部分内容属于原创转载请注明出处 如有侵权行为请严格参照本站【版权声明】与我们联系,我们将在48小时内容进行处理!
    © 2016-2023 PipBest.Com - 金点网络 & 金点部落. All rights reserved 京ICP备2022005359号-1
    • 关注有礼
    • 签到
    • 客服
      官方QQ群 常见问题 FAQ

      在线客服

      点我联系

      直接说出您的需求!
      切记!带上资源连接与问题!

      工作时间: 9:30-21:30

    • 暗黑
      模式
    • 全屏
    • 投稿
      赚钱
    • 首页

    • 签到

    • 切换

    • 客服

    金点网络-全网资源,一网打尽
    • 登录
    • 注册
    or
    or
    忘记密码?
    金点网络-全网资源,一网打尽
    • 网站首页 ►
      • 金点部落
      • 小游戏
      • OpenAPI
      • 设计资产导航
      • 升级会员
    • 技能学习 ►
      • 体育运动
      • 办公教程
      • 口才演讲
      • 小吃技术
      • 建站教程
      • 摄影教程
      • 棋牌教程
      • 网赚教程 ►
        • 爆粉引流
        • 自媒体
        • 贴吧引流
    • 网站源码 ►
      • 商城/淘客/交易
      • 小说/漫画/阅读
      • 影视/音乐/视频
      • 微信/微商/微擎
      • 理财/金融/货币
      • 模板/主题/插件
    • 游戏源码 ►
      • 精品网单
      • 端游源码
      • 手游源码
      • 页游源码
    • 素材资料 ►
      • 电子文档
      • 综合资料
      • 考研资料
      • 设计素材
      • 音频讲座 ►
        • 人文艺术
        • 名师讲座
        • 说书小说
    • 软件工具 ►
      • Windows软件
      • MacOS软件
      • Android软件
    • 寄售资源
      ►
      • 游戏源码
      • 网站源码
      • 软件源码
    • 公益服
    ×
    u3** 刚刚下载了 爆款吸金文案训练

      全网资源·一网打尽

    • 金点出品,必属精品!
    • 发布原创内容,获取高额提成!
    • 我们与你共创美好数字生态!
    • 无特殊说明密码默认:pipbest.com