Netty 案例:群聊系统

PPG007 ... 2021-12-30 About 2 min

# Netty 案例:群聊系统

# 服务端

public class Server {

    private final int port;

    public Server(int port) {
        this.port = port;
    }

    public void run(){
//        给BossEventLoopGroup只分配一个EventLoop
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
//                        使用Netty提供的字符串编码、解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
//                        添加自己的处理器
                        pipeline.addLast(new MyHandler());
                    }
                });
        try {
            ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
            channelFuture.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()){
                    System.out.println("服务端启动成功");
                }
            });
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        Server server = new Server(8848);
        server.run();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# 服务端处理器

public class MyHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 创建channelGroup管理所有连接的channel
     */
    private static final ChannelGroup CHANNELS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    /**
     * 一连接就调用,第一个调用
     * @param ctx 上下文
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println(ctx.channel().remoteAddress()+"上线");
        CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+"上线\n");
        CHANNELS.add(ctx.channel());
    }

    private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * channel被激活时调用
     * @param ctx 上下文
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(ctx.channel().remoteAddress()+"加入聊天");
    }

    /**
     * 断开连接时调用
     * @param ctx 上下文
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+" "+dateFormat.format(new Date())+" 断开连接\n");
    }

    /**
     * channel失活时调用
     * @param ctx 上下文
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+" "+dateFormat.format(new Date())+" 下线\n");
    }

    /**
     * 发生异常时调用
     * @param ctx 上下文
     * @param cause 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    /**
     * 读取客户端发送的消息
     * @param ctx 上下文
     * @param msg 客户端发来的经过解码的字符串消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        Channel channel = ctx.channel();
        CHANNELS.forEach(channel1 -> {
            if (channel!=channel1){
                channel1.writeAndFlush("[客户端] "+channel.remoteAddress()+" "+dateFormat.format(new Date())+":\n"+msg+"\n");
            }else {
                channel1.writeAndFlush("[我] "+dateFormat.format(new Date())+":\n"+msg+"\n");
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

# 客户端

public class Client {

    private final String host;

    private final int port;

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run(){
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventExecutors)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
//                        添加Netty提供的字符串解码、编码器
                        pipeline.addLast(new StringDecoder())
                                .addLast(new StringEncoder())
//                                添加自定义处理器
                                .addLast(new ClientHandler());
                    }
                });
        try {
            ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).sync();
            channelFuture.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()){
                    System.out.println("客户端启动成功");
                }
            });
//            不断读取输入
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String s = scanner.nextLine();
                channelFuture.channel().writeAndFlush(s);
            }
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            eventExecutors.shutdownGracefully();
        }


    }

    public static void main(String[] args) {
        Client localhost = new Client("localhost", 8848);
        localhost.run();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 客户端处理器

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.print(msg);
    }
}
1
2
3
4
5
6
Last update: December 30, 2021 10:25
Contributors: PPG007