GitHub (opens new window)

线程模型

PPG007 ... 2021-12-30 About 4 min

# 线程模型

# 传统阻塞 I/O 服务模型

image-20210813163713373

每个连接都需要一个线程完成业务处理、输入输出,如果线程没有数据可读,会阻塞在 read,浪费资源,并发量很大时,创建大量线程,占用很大系统资源。

# Reactor模型

# 单Reactor单线程

image-20210813164539857

服务端用一个线程多路复用实现了所有的处理任务,例如 NIO 群聊系统,性能不强,适合客户端数量少且业务处理很快的场景。

# 单 Reactor 多线程

image-20210813165428693

充分利用多核 CPU,多线程共享比较复杂,reactor 处理所有事件监听和响应还是单线程,高并发会有性能瓶颈。

# 主从 Reactor 多线程

主从Reactor

# Netty 工作原理

netty工作原理图

# Netty 示例程序

本示例在收到客户端发来的消息后将消息和客户端信息输出到控制台并发送一条消息返回给客户端,客户端在控制台输出这条消息和服务端的信息。

引入依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.55.Final</version>
</dependency>
1
2
3
4
5

服务端:

public class Server {
    public static void main(String[] args) {
//        创建BossGroup和WorkerGroup
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
//            创建配置对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
//                    指定服务端channel类型
                    .channel(NioServerSocketChannel.class)
//                    设置线程队列连接个数
                    .option(ChannelOption.SO_BACKLOG,128)
//                    设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
//                    设置处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ServerHandler());

                        }
                    });
            System.out.println("server is ready");
//            绑定端口并且同步
            ChannelFuture channelFuture = serverBootstrap.bind(8848).sync();
//            对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
//            关闭group
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

服务端处理器:

处理器继承一个 Adapter 并重写需要的方法即可。

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("扎不多得嘞",StandardCharsets.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.channel().close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("server ctx==>"+ctx);
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("client send==>"+byteBuf.toString(StandardCharsets.UTF_8));
        System.out.println("client address==>"+ctx.channel().remoteAddress());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

客户端:

public class Client {
    public static void main(String[] args) {
//        客户端同样是事件驱动
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventExecutors)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });
        System.out.println("client is ready");
        try {
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8848).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            eventExecutors.shutdownGracefully();
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

客户端处理器:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("client received==>"+byteBuf.toString(StandardCharsets.UTF_8));
        System.out.println("server address==>"+ctx.channel().remoteAddress());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("client "+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("蚌埠住了", StandardCharsets.UTF_8));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14