项目实践·Netty 聊天系统

作者 : jamin 本文共9345个字,预计阅读时间需要24分钟 发布时间: 2020-10-18 共1068人阅读

Netty 聊天系统

引入依赖

该聊天项目是一个标准的多模块 spring boot 项目,只需要引入四个基本的依赖包。

netty 提供易于使用的 API 客户端/服务器框架,disruptor 高性能无锁队列进行消息生产和消费,fastjson 进行消息序列和反序列化,bcprov 提供加解密。

<!-- netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>${netty.version}</version>
</dependency>
<!-- disruptor -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>
<!-- alibaba fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.55</version>
</dependency>
 <!-- 加解密服务 -->
<dependency>
    <groupId>org.bouncycastle</groupId>
    <artifactId>bcprov-jdk16</artifactId>
    <version>1.46</version>
</dependency>

启动服务器

NettyWebSocketServer

NettyWebSocketServer 服务器构建一对主从线程组,并且绑定端口。

@Slf4j
@Component
public class NettyWebSocketServer {

    /**
     * 端口号
     */
    @Value("${netty.websocket.port}")
    private int port;

    /**
     * 启动服务器
     */
    public void run() {
        // 主线程组,用于接收客户端连接,不做任何处理
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组,专门处理主线程组的任务
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        final ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)         // 设置主从线程组
                .channel(NioServerSocketChannel.class) // 设置 nio 的双向通道
                .childHandler(new WebSocketChannelInitializer()); // 子处理器
        // 监听端口
        bind(serverBootstrap, port);
    }

    /**
     * 监听端口
     */
    private void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("{}: 端口[{}]绑定成功!", new Date(), port);
            } else {
                log.error("端口[{}]绑定失败!", port);
            }
        });
    }
}

WebSocketChannelInitializer

WebSocketChannelInitializer 初始化器注册 channelhandler,里面的初始化方法会被执行。

主要需要注册下面几个 channelhandler:

pipeline.addLast(ConnectionCountHandler.INSTANCE);    // 链接检查
pipeline.addLast(IMIdleStateHandler.INSTANCE);        // 心跳检查
pipeline.addLast(PacketCodecHandler.INSTANCE);        // 编解码
pipeline.addLast(HeartBeatRequestHandler.INSTANCE);   // 心跳包
pipeline.addLast(LoginRequestHandler.INSTANCE);       // 登录
pipeline.addLast(AuthHandler.INSTANCE);               // 认证
pipeline.addLast(IMHandler.INSTANCE);                 // 处理业务

具体实现:

public class WebSocketChannelInitializer extends ChannelInitializer<NioSocketChannel> {

    @Override
    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
        // 通过 SocketChannel 去获得对应的管道,通过管道添加 handler
        ChannelPipeline pipeline = nioSocketChannel.pipeline();

        /**
         * ==========================================================================
         *                             以下用于支持 http 协议
         * ==========================================================================
         */
        // HttpServerCodec 是由 netty 提供的助手类,可以理解为拦截器,当请求到服务端做解码,响应到客户端做编码
        // websocket 基于 http 协议,所以要有 http 编解码器
        pipeline.addLast(new HttpServerCodec());
        // 对写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        // 对 httpMessage 进行聚合,聚合成 FullHttpRequest 或 FullHttpResponse,几乎在 netty 中的编程,都会使用到此 handler
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));

        /**
         * ============================================================================
         *                            websocket 服务器处理协议l
         * 处理握手动作:handshaking(close, ping, pong) ping + pong = 心跳
         * 对于 websokcet 来讲,都是以 frames 进行传输的,不同的数据类型对应不同的 frames 也不同
         * ============================================================================
         */
        pipeline.addLast(new WebSocketServerProtocolHandler("/chat"));
        pipeline.addLast(ConnectionCountHandler.INSTANCE);    // 链接检查
        pipeline.addLast(IMIdleStateHandler.INSTANCE);        // 心跳检查
        pipeline.addLast(PacketCodecHandler.INSTANCE);        // 编解码
        pipeline.addLast(HeartBeatRequestHandler.INSTANCE);   // 心跳包
        pipeline.addLast(LoginRequestHandler.INSTANCE);       // 登录
        pipeline.addLast(AuthHandler.INSTANCE);               // 认证
        pipeline.addLast(IMHandler.INSTANCE);                 // 处理业务
    }

}

NettyBootstrap

NettyBootstrap 同时启动 netty 服务器和 disruptor 消息队列。

@Component
public class NettyBootstrap implements ApplicationListener<ContextRefreshedEvent> {

    private NettyWebSocketServer mNettyWebSocketServer;

    @Autowired
    public NettyBootstrap(NettyWebSocketServer nettyWebSocketServer) {
        mNettyWebSocketServer = nettyWebSocketServer;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
            try {
                // 启动 disruptor
                MessageConsumer[] consumers = new MessageConsumer[16];
                for (int i = 0; i < consumers.length; i++) {
                    MessageConsumer messageConsumer = new MessageConsumerImpl();
                    consumers[i] = messageConsumer;
                }
                RingBufferWorkerPoolFactory factory = SpringUtil.getBean(RingBufferWorkerPoolFactory.class);
                factory.initAndStart(consumers);

                // 启动 netty server
                mNettyWebSocketServer.run();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消息队列

等待策略配置

@Configuration
public class DisruptorWaitStrategyConfig {
    @Bean
    @ConditionalOnMissingBean(WaitStrategy.class)
    public WaitStrategy getWaitStrategy() {
        // 如果 CPU 比较叼的话,可以用 YieldingWaitStrategy
        return new BlockingWaitStrategy();
    }
}

构造工厂

@Component
public class RingBufferWorkerPoolFactory {

    @Value("${disruptor.buffer.size}")
    private int mBufferSize;

    @Autowired
    private WaitStrategy mWaitStrategy;

    private Map<Integer, MessageProducer> producers = new ConcurrentHashMap<>();

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    public void initAndStart(MessageConsumer[] messageConsumers) {
        // 1. 构建 ringBuffer 对象
        this.ringBuffer = RingBuffer.create(ProducerType.MULTI,
                TranslatorDataWrapper::new,
                mBufferSize,
                mWaitStrategy);
        // 2. 通过 ringBuffer 创建一个屏障
        SequenceBarrier sequenceBarrier = this.ringBuffer.newBarrier();
        // 3. 创建多个消费者数组
        WorkerPool<TranslatorDataWrapper> workerPool = new WorkerPool<>(
                this.ringBuffer,
                sequenceBarrier,
                new EventExceptionHandler(),
                messageConsumers);
        // 4. 设置多个消费者的 sequence 序号 用于单独统计消费进度,并且设置到 ringBuffer 中
        this.ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        // 5. 启动我们的工作池
        workerPool.start(Executors.newFixedThreadPool(16));
    }

    public MessageProducer getMessageProducer(Integer commandId) {
        MessageProducer messageProducer = producers.get(commandId);
        if (messageProducer == null) {
            messageProducer = new MessageProducerImpl(commandId, this.ringBuffer);
            producers.put(commandId, messageProducer);
        }
        return messageProducer;
    }

    /**
     * 异常静态类
     *
     * @author Alienware
     */
    @Slf4j
    static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
        @Override
        public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
            log.error("handleEventException -> ex:{} sequence:{} event:{}", ex.getMessage(), sequence, event.getClass().toString());
            ex.printStackTrace();
        }

        @Override
        public void handleOnStartException(Throwable ex) {
            log.error("handleOnStartException -> ex:{}", ex.getMessage());
            ex.printStackTrace();
        }

        @Override
        public void handleOnShutdownException(Throwable ex) {
            log.error("handleOnShutdownException -> ex:{}", ex.getMessage());
            ex.printStackTrace();
        }
    }

}

消息包体

@Data
public class TranslatorDataWrapper {
    private Packet packet;
    private ChannelHandlerContext ctx;
}

消息生产者

@Slf4j
public class MessageProducer {
    /**
     * 发布事件
     *
     * @param packet 应用包
     * @param ctx    上下文
     */
    public void publish(Packet packet, ChannelHandlerContext ctx) {
        log.info("生成消息 -> {}", packet.getCommand());
    }
}

实现类:

@Data
@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
public class MessageProducerImpl extends MessageProducer {
    private Integer commandId;

    private RingBuffer<TranslatorDataWrapper> ringBuffer;

    /**
     * 发布事件
     *
     * @param packet 应用包
     * @param ctx    上下文
     */
    @Override
    public void publish(Packet packet, ChannelHandlerContext ctx) {
        super.publish(packet, ctx);
        // 取盘
        long sequence = ringBuffer.next();
        try {
            TranslatorDataWrapper wrapper = ringBuffer.get(sequence);
            wrapper.setPacket(packet);
            wrapper.setCtx(ctx);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

消息消费者

@Slf4j
public class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
    @Override
    public void onEvent(TranslatorDataWrapper wrapper) throws Exception {
        log.info("消费消息 -> {}", wrapper.getPacket().getCommand());
    }
}

实现类:

@Slf4j
public class MessageConsumerImpl extends MessageConsumer {
    @Override
    public void onEvent(TranslatorDataWrapper wrapper) throws Exception {
        super.onEvent(wrapper);
        Packet packet = wrapper.getPacket();
        ChannelHandlerContext ctx = wrapper.getCtx();
        Channel channel = ctx.channel();
        Integer command = packet.getCommand();
        log.info("开始消息处理 -> {}", command);
        switch (command) {
            case Command.LOGIN_REQUEST:
                // 登陆处理
                try {
                    login(ctx, (LoginRequestPacket) packet);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
        }
    }

    /**
     * 登录处理
     *
     * @param ctx
     * @param packet
     * @throws Exception
     */
    private void login(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception {
        log.info("登录 -> 绑定 session");
        // 绑定会话
        Session session = new Session(packet.getId(), packet.getUsername(), packet.getNickname());
        SessionUtil.bindSession(session, ctx.channel());
    }
}

消息加解密

ApiApplication 启动时导入加解密依赖包:

// 导入支持AES/CBC/PKCS7Padding的Provider
Security.addProvider(new BouncyCastleProvider());

加解密工具类:

public class CryptoAesUtil {

    private static final Base64.Decoder decoder = Base64.getDecoder();

    private static final Base64.Encoder encoder = Base64.getEncoder();

    public static String encrypt(String data, String key, String iv) throws Exception {
        String baseData = encoder.encodeToString(data.getBytes());
        byte[] result = handleMsg(baseData, key, iv, Cipher.ENCRYPT_MODE);
        return encoder.encodeToString(result);
    }

    public static String decrypt(String data, String key, String iv) throws Exception {
        byte[] result = handleMsg(data, key, iv, Cipher.DECRYPT_MODE);
        return new String(result);
    }

    private static byte[] handleMsg(String data, String key, String iv, int mode) throws Exception {
        log.info("data: {}, key: {}, iv: {}, mode: {}", data, key, iv, mode);
        String baseKey = encoder.encodeToString(key.getBytes());
        String baseIv = encoder.encodeToString(iv.getBytes());
        // 从 Base64 格式还原到原始格式
        byte[] dataByte = decoder.decode(data);
        byte[] keyByte = decoder.decode(baseKey);
        byte[] ivByte = decoder.decode(baseIv);
        // 指定算法,模式,填充方法 创建一个 Cipher 实例
        Cipher cipher = Cipher.getInstance("AES/CBC/PKCS7Padding", "BC");
        // 生成 Key 对象
        Key sKeySpec = new SecretKeySpec(keyByte, "AES");
        // 把向量初始化到算法参数
        AlgorithmParameters params = AlgorithmParameters.getInstance("AES");
        params.init(new IvParameterSpec(ivByte));
        // 指定模式、密钥、参数,初始化 Cipher 对象
        cipher.init(mode, sKeySpec, params);
        // 执行加解密
        return cipher.doFinal(dataByte);
    }

}
本站所提供的部分资源来自于网络,版权争议与本站无关,版权归原创者所有!仅限用于学习和研究目的,不得将上述内容资源用于商业或者非法用途,否则,一切后果请用户自负。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源。如果上述内容资对您的版权或者利益造成损害,请提供相应的资质证明,我们将于3个工作日内予以删除。本站不保证所提供下载的资源的准确性、安全性和完整性,源码仅供下载学习之用!如用于商业或者非法用途,与本站无关,一切后果请用户自负!本站也不承担用户因使用这些下载资源对自己和他人造成任何形式的损失或伤害。如有侵权、不妥之处,请联系站长以便删除!
金点网络 » 项目实践·Netty 聊天系统

常见问题FAQ

免费下载或者VIP会员专享资源能否直接商用?
本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。
是否提供免费更新服务?
持续更新,永久免费
是否经过安全检测?
安全无毒,放心食用

提供最优质的资源集合

立即加入 友好社区
×