金点网络-全网资源,一网打尽
  • 网站首页
    • 金点部落
    • 小游戏
    • OpenAPI
    • 设计资产导航
    • 升级会员
  • 技能学习
    • 体育运动
    • 办公教程
    • 口才演讲
    • 小吃技术
    • 建站教程
    • 摄影教程
    • 棋牌教程
    • 网赚教程
      • 爆粉引流
      • 自媒体
      • 贴吧引流
  • 网站源码
    • 商城/淘客/交易
    • 小说/漫画/阅读
    • 影视/音乐/视频
    • 微信/微商/微擎
    • 理财/金融/货币
    • 模板/主题/插件
  • 游戏源码
    • 精品网单
    • 端游源码
    • 手游源码
    • 页游源码
  • 素材资料
    • 电子文档
    • 综合资料
    • 考研资料
    • 设计素材
    • 音频讲座
      • 人文艺术
      • 名师讲座
      • 说书小说
  • 软件工具
    • Windows软件
    • MacOS软件
    • Android软件
  • 寄售资源
    • 游戏源码
    • 网站源码
    • 软件源码
  • 公益服
登录/注册
  • 专享大神特权
立即开通开通会员抄底价

项目实践·Netty 聊天系统

作者 : jamin 本文共9345个字,预计阅读时间需要24分钟 发布时间: 2020-10-18 共1038人阅读

Netty 聊天系统

引入依赖

该聊天项目是一个标准的多模块 spring boot 项目,只需要引入四个基本的依赖包。

netty 提供易于使用的 API 客户端/服务器框架,disruptor 高性能无锁队列进行消息生产和消费,fastjson 进行消息序列和反序列化,bcprov 提供加解密。

<!-- netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>${netty.version}</version>
</dependency>
<!-- disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>
<!-- alibaba fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.55</version>
</dependency>
 <!-- 加解密服务 -->
<dependency>
    <groupId>org.bouncycastle</groupId>
    <artifactId>bcprov-jdk16</artifactId>
    <version>1.46</version>
</dependency>

启动服务器

NettyWebSocketServer

NettyWebSocketServer 服务器构建一对主从线程组,并且绑定端口。

@Slf4j
@Component
public class NettyWebSocketServer {

    /**
     * 端口号
     */
    @Value("${netty.websocket.port}")
    private int port;

    /**
     * 启动服务器
     */
    public void run() {
        // 主线程组,用于接收客户端连接,不做任何处理
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组,专门处理主线程组的任务
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        final ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)         // 设置主从线程组
                .channel(NioServerSocketChannel.class) // 设置 nio 的双向通道
                .childHandler(new WebSocketChannelInitializer()); // 子处理器
        // 监听端口
        bind(serverBootstrap, port);
    }

    /**
     * 监听端口
     */
    private void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("{}: 端口[{}]绑定成功!", new Date(), port);
            } else {
                log.error("端口[{}]绑定失败!", port);
            }
        });
    }
}

WebSocketChannelInitializer

WebSocketChannelInitializer 初始化器注册 channelhandler,里面的初始化方法会被执行。

主要需要注册下面几个 channelhandler:

pipeline.addLast(ConnectionCountHandler.INSTANCE);    // 链接检查
pipeline.addLast(IMIdleStateHandler.INSTANCE);        // 心跳检查
pipeline.addLast(PacketCodecHandler.INSTANCE);        // 编解码
pipeline.addLast(HeartBeatRequestHandler.INSTANCE);   // 心跳包
pipeline.addLast(LoginRequestHandler.INSTANCE);       // 登录
pipeline.addLast(AuthHandler.INSTANCE);               // 认证
pipeline.addLast(IMHandler.INSTANCE);                 // 处理业务

具体实现:

public class WebSocketChannelInitializer extends ChannelInitializer<NioSocketChannel> {

    @Override
    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
        // 通过 SocketChannel 去获得对应的管道,通过管道添加 handler
        ChannelPipeline pipeline = nioSocketChannel.pipeline();

        /**
         * ==========================================================================
         *                             以下用于支持 http 协议
         * ==========================================================================
         */
        // HttpServerCodec 是由 netty 提供的助手类,可以理解为拦截器,当请求到服务端做解码,响应到客户端做编码
        // websocket 基于 http 协议,所以要有 http 编解码器
        pipeline.addLast(new HttpServerCodec());
        // 对写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        // 对 httpMessage 进行聚合,聚合成 FullHttpRequest 或 FullHttpResponse,几乎在 netty 中的编程,都会使用到此 handler
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));

        /**
         * ============================================================================
         *                            websocket 服务器处理协议l
         * 处理握手动作:handshaking(close, ping, pong) ping + pong = 心跳
         * 对于 websokcet 来讲,都是以 frames 进行传输的,不同的数据类型对应不同的 frames 也不同
         * ============================================================================
         */
        pipeline.addLast(new WebSocketServerProtocolHandler("/chat"));
        pipeline.addLast(ConnectionCountHandler.INSTANCE);    // 链接检查
        pipeline.addLast(IMIdleStateHandler.INSTANCE);        // 心跳检查
        pipeline.addLast(PacketCodecHandler.INSTANCE);        // 编解码
        pipeline.addLast(HeartBeatRequestHandler.INSTANCE);   // 心跳包
        pipeline.addLast(LoginRequestHandler.INSTANCE);       // 登录
        pipeline.addLast(AuthHandler.INSTANCE);               // 认证
        pipeline.addLast(IMHandler.INSTANCE);                 // 处理业务
    }

}

NettyBootstrap

NettyBootstrap 同时启动 netty 服务器和 disruptor 消息队列。

@Component
public class NettyBootstrap implements ApplicationListener<ContextRefreshedEvent> {

    private NettyWebSocketServer mNettyWebSocketServer;

    @Autowired
    public NettyBootstrap(NettyWebSocketServer nettyWebSocketServer) {
        mNettyWebSocketServer = nettyWebSocketServer;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
            try {
                // 启动 disruptor
                MessageConsumer[] consumers = new MessageConsumer[16];
                for (int i = 0; i < consumers.length; i++) {
                    MessageConsumer messageConsumer = new MessageConsumerImpl();
                    consumers[i] = messageConsumer;
                }
                RingBufferWorkerPoolFactory factory = SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
                factory.initAndStart(consumers);

                // 启动 netty server
                mNettyWebSocketServer.run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消息队列

等待策略配置

@Configuration
public class DisruptorWaitStrategyConfig {
    @Bean
    @ConditionalOnMissingBean(WaitStrategy.class)
    public WaitStrategy getWaitStrategy() {
        // 如果 CPU 比较叼的话,可以用 YieldingWaitStrategy
        return new BlockingWaitStrategy();
    }
}

构造工厂

@Component
public class RingBufferWorkerPoolFactory {

    @Value("${disruptor.buffer.size}")
    private int mBufferSize;

    @Autowired
    private WaitStrategy mWaitStrategy;

    private Map<Integer, MessageProducer> producers = new ConcurrentHashMap<>();

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    public void initAndStart(MessageConsumer[] messageConsumers) {
        // 1. 构建 ringBuffer 对象
        this.ringBuffer = RingBuffer.create(ProducerType.MULTI,
                TranslatorDataWrapper::new,
                mBufferSize,
                mWaitStrategy);
        // 2. 通过 ringBuffer 创建一个屏障
        SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier();
        // 3. 创建多个消费者数组
        WorkerPool<TranslatorDataWrapper> workerPool = new WorkerPool<>(
                this.ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                messageConsumers);
        // 4. 设置多个消费者的 sequence 序号 用于单独统计消费进度,并且设置到 ringBuffer 中
        this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        // 5. 启动我们的工作池
        workerPool.start(Executors.newFixedThreadPool(16));
    }

    public MessageProducer getMessageProducer(Integer commandId) {
        MessageProducer messageProducer = producers.get(commandId);
        if (messageProducer == null) {
            messageProducer = new MessageProducerImpl(commandId, this.ringBuffer);
            producers.put(commandId, messageProducer);
        }
        return messageProducer;
    }

    /**
     * 异常静态类
     *
     * @author Alienware
     */
    @Slf4j
    static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
        @Override
        public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
            log.error("handleEventException -> ex:{} sequence:{} event:{}", ex.getMessage(), sequence, event.getClass().toString());
            ex.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable ex) {
            log.error("handleOnStartException -> ex:{}", ex.getMessage());
            ex.printStackTrace();
        }

        @Override
        public void handleOnShutdownException(Throwable ex) {
            log.error("handleOnShutdownException -> ex:{}", ex.getMessage());
            ex.printStackTrace();
        }
    }

}

消息包体

@Data
public class TranslatorDataWrapper {
    private Packet packet;
    private ChannelHandlerContext ctx;
}

消息生产者

@Slf4j
public class MessageProducer {
    /**
     * 发布事件
     *
     * @param packet 应用包
     * @param ctx    上下文
     */
    public void publish(Packet packet, ChannelHandlerContext ctx) {
        log.info("生成消息 -> {}", packet.getCommand());
    }
}

实现类:

@Data
@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
public class MessageProducerImpl extends MessageProducer {
    private Integer commandId;

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    /**
     * 发布事件
     *
     * @param packet 应用包
     * @param ctx    上下文
     */
    @Override
    public void publish(Packet packet, ChannelHandlerContext ctx) {
        super.publish(packet, ctx);
        // 取盘
        long sequence = ringBuffer.next();
        try {
            TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
            wrapper.setPacket(packet);
            wrapper.setCtx(ctx);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

消息消费者

@Slf4j
public class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
    @Override
    public void onEvent(TranslatorDataWrapper wrapper) throws Exception {
        log.info("消费消息 -> {}", wrapper.getPacket().getCommand());
    }
}

实现类:

@Slf4j
public class MessageConsumerImpl extends MessageConsumer {
    @Override
    public void onEvent(TranslatorDataWrapper wrapper) throws Exception {
        super.onEvent(wrapper);
        Packet packet = wrapper.getPacket();
        ChannelHandlerContext ctx = wrapper.getCtx();
        Channel channel = ctx.channel();
        Integer command = packet.getCommand();
        log.info("开始消息处理 -> {}", command);
        switch (command) {
            case Command.LOGIN_REQUEST:
                // 登陆处理
                try {
                    login(ctx, (LoginRequestPacket) packet);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
        }
    }

    /**
     * 登录处理
     *
     * @param ctx
     * @param packet
     * @throws Exception
     */
    private void login(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception {
        log.info("登录 -> 绑定 session");
        // 绑定会话
        Session session = new Session(packet.getId(), packet.getUsername(), packet.getNickname());
        SessionUtil.bindSession(session, ctx.channel());
    }
}

消息加解密

ApiApplication 启动时导入加解密依赖包:

// 导入支持AES/CBC/PKCS7Padding的Provider
Security.addProvider(new BouncyCastleProvider());

加解密工具类:

public class CryptoAesUtil {

    private static final Base64.Decoder decoder = Base64.getDecoder();

    private static final Base64.Encoder encoder = Base64.getEncoder();

    public static String encrypt(String data, String key, String iv) throws Exception {
        String baseData = encoder.encodeToString(data.getBytes());
        byte[] result = handleMsg(baseData, key, iv, Cipher.ENCRYPT_MODE);
        return encoder.encodeToString(result);
    }

    public static String decrypt(String data, String key, String iv) throws Exception {
        byte[] result = handleMsg(data, key, iv, Cipher.DECRYPT_MODE);
        return new String(result);
    }

    private static byte[] handleMsg(String data, String key, String iv, int mode) throws Exception {
        log.info("data: {}, key: {}, iv: {}, mode: {}", data, key, iv, mode);
        String baseKey = encoder.encodeToString(key.getBytes());
        String baseIv = encoder.encodeToString(iv.getBytes());
        // 从 Base64 格式还原到原始格式
        byte[] dataByte = decoder.decode(data);
        byte[] keyByte = decoder.decode(baseKey);
        byte[] ivByte = decoder.decode(baseIv);
        // 指定算法,模式,填充方法 创建一个 Cipher 实例
        Cipher cipher = Cipher.getInstance("AES/CBC/PKCS7Padding", "BC");
        // 生成 Key 对象
        Key sKeySpec = new SecretKeySpec(keyByte, "AES");
        // 把向量初始化到算法参数
        AlgorithmParameters params = AlgorithmParameters.getInstance("AES");
        params.init(new IvParameterSpec(ivByte));
        // 指定模式、密钥、参数,初始化 Cipher 对象
        cipher.init(mode, sKeySpec, params);
        // 执行加解密
        return cipher.doFinal(dataByte);
    }

}
project
本站所提供的部分资源来自于网络,版权争议与本站无关,版权归原创者所有!仅限用于学习和研究目的,不得将上述内容资源用于商业或者非法用途,否则,一切后果请用户自负。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源。如果上述内容资对您的版权或者利益造成损害,请提供相应的资质证明,我们将于3个工作日内予以删除。本站不保证所提供下载的资源的准确性、安全性和完整性,源码仅供下载学习之用!如用于商业或者非法用途,与本站无关,一切后果请用户自负!本站也不承担用户因使用这些下载资源对自己和他人造成任何形式的损失或伤害。如有侵权、不妥之处,请联系站长以便删除!
金点网络-全网资源,一网打尽 » 项目实践·Netty 聊天系统

常见问题FAQ

免费下载或者VIP会员专享资源能否直接商用?
本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。
是否提供免费更新服务?
持续更新,永久免费
是否经过安全检测?
安全无毒,放心食用
jamin

jamin 大神

下一篇
网络协议-组播介绍

相关推荐

项目实践·Docker FastDFS

项目实践·Docker FastDFS

项目实践·Snippets

项目实践·Snippets

项目实践·Docker RocketMQ

项目实践·Docker RocketMQ

项目实践·微信点餐系统

项目实践·微信点餐系统

标签云
Android Atom ExecutorService ForkJoin GM GM后台 GM授权后台 H5 Java Javascript Linux手工服务端 pipbestcom Python ReentrantLock synchronized ThreadLocal volatile Win一键即玩服务端 一键端 传奇 写作 创业 单机 后台 商业端 外网 安卓 安卓苹果双端 工具 手工端 手游 搭建教程 教程 数据分析 文案 游戏源码 端游 经典 网单 职场 自媒体 视频教程 详细搭建教程 运营后台 页游

近期文章

  • 回合手游【逍遥西游之繁华西游】最新整理单机一键既玩镜像服务端_Linux手工端_GM后台_教程
  • 最新整理精品回合制手游【天书奇谈3D混沌完整版】VM一键单机版_linux手工外网端_隐盟视频教程_授权GM后台_双端
  • 典藏修真页游【诸仙列传OL】最新整理Win系服务端_GM工具_详细外网搭建教程
  • MT3换皮MH【浮生若梦尊享挂机修复版】最新整理单机一键即玩镜像端_Linux手工服务端_安卓苹果双端_GM后台_详细搭建教程
  • 大话回合手游【最新引擎之缥缈西游渡劫版】最新整理Linux手工服务端_安卓苹果双端_管理后台_CDK后台_详细搭建教程_视频教程

分类

  • | wordpress插件 |
  • | wordpress模板 |
  • | 其它模板 |
  • | 帝国模板 |
  • | 织梦插件 |
  • | 织梦模板 |
  • A5源码
  • Android软件
  • APP引流
  • E语言
  • H5
  • LUA
  • QQ营销
  • SEO推广
  • Windows软件
  • 体育运动
  • 信息数据
  • 创业专题
  • 办公教程
  • 口才演讲
  • 名师讲座
  • 商城/淘客/交易
  • 小吃技术
  • 小说/漫画/阅读
  • 建站教程
  • 引流脚本
  • 影视/音乐/视频
  • 影视资源
  • 微信/微商/微擎
  • 微信小程序
  • 微信营销
  • 微擎模块
  • 手游源码
  • 技能学习
  • 抖音课程
  • 摄影教程
  • 棋牌教程
  • 模板/主题/插件
  • 游戏源码
  • 爆粉引流
  • 理财/金融/货币
  • 生活老师
  • 电商客
  • 电子文档
  • 电脑教程
  • 社群营销
  • 站长工具
  • 精品网单
  • 系统工具
  • 素材资料
  • 综合资料
  • 编程经验
  • 网站源码
  • 网络安全
  • 网赚教程
  • 网赚源码
  • 考研资料
  • 脚本/AI/智能
  • 自媒体
  • 英语学习
  • 营销软件
  • 设计素材
  • 说书小说
  • 贴吧引流
  • 软件工具
  • 软文营销
  • 逆向软件
  • 音频讲座
  • 页游源码

提供最优质的资源集合

立即加入 友好社区
金点网络-全网资源,一网打尽

新一代全网资源综合门户网(www.pipbest.com-金点网络)专注服务于互联网,提供各类最新最全的免费源码下载(PHP、ASP、JSP、.NET),更提供免费工具,免费源码下载,软件下载,素材下载,赚钱教程下载,交流论坛等网站运营相关的一切内容,为网友搜罗最有价值的网站源码下载与技术教程等服务!

服务目录
  • 金点OpenAPI
  • 金点云
  • 金点支付
友情链接
  • 数媒派
  • 国家电网
快速搜索

本站由Nice强力驱动

声明: 本站部分内容属于原创转载请注明出处 如有侵权行为请严格参照本站【版权声明】与我们联系,我们将在48小时内容进行处理!

本站部分内容属于原创转载请注明出处 如有侵权行为请严格参照本站【版权声明】与我们联系,我们将在48小时内容进行处理!
© 2016-2023 PipBest.Com - 金点网络 & 金点部落. All rights reserved 京ICP备2022005359号-1
  • 关注有礼
  • 签到
  • 客服
    官方QQ群 常见问题 FAQ

    在线客服

    点我联系

    直接说出您的需求!
    切记!带上资源连接与问题!

    工作时间: 9:30-21:30

  • 暗黑
    模式
  • 全屏
  • 投稿
    赚钱
  • 首页

  • 签到

  • 切换

  • 客服

金点网络-全网资源,一网打尽
  • 登录
  • 注册
or
or
忘记密码?
金点网络-全网资源,一网打尽
  • 网站首页 ►
    • 金点部落
    • 小游戏
    • OpenAPI
    • 设计资产导航
    • 升级会员
  • 技能学习 ►
    • 体育运动
    • 办公教程
    • 口才演讲
    • 小吃技术
    • 建站教程
    • 摄影教程
    • 棋牌教程
    • 网赚教程 ►
      • 爆粉引流
      • 自媒体
      • 贴吧引流
  • 网站源码 ►
    • 商城/淘客/交易
    • 小说/漫画/阅读
    • 影视/音乐/视频
    • 微信/微商/微擎
    • 理财/金融/货币
    • 模板/主题/插件
  • 游戏源码 ►
    • 精品网单
    • 端游源码
    • 手游源码
    • 页游源码
  • 素材资料 ►
    • 电子文档
    • 综合资料
    • 考研资料
    • 设计素材
    • 音频讲座 ►
      • 人文艺术
      • 名师讲座
      • 说书小说
  • 软件工具 ►
    • Windows软件
    • MacOS软件
    • Android软件
  • 寄售资源
    ►
    • 游戏源码
    • 网站源码
    • 软件源码
  • 公益服
×
** 刚刚下载了 爆款吸金文案训练

    全网资源·一网打尽

  • 金点出品,必属精品!
  • 发布原创内容,获取高额提成!
  • 我们与你共创美好数字生态!
  • 无特殊说明密码默认:pipbest.com