Netty 案例:群聊系统
PPG007 ... 2021-12-30 About 2 min
# Netty 案例:群聊系统
# 服务端
public class Server {
private final int port;
public Server(int port) {
this.port = port;
}
public void run(){
// 给BossEventLoopGroup只分配一个EventLoop
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 使用Netty提供的字符串编码、解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自己的处理器
pipeline.addLast(new MyHandler());
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()){
System.out.println("服务端启动成功");
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
public static void main(String[] args) {
Server server = new Server(8848);
server.run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 服务端处理器
public class MyHandler extends SimpleChannelInboundHandler<String> {
/**
* 创建channelGroup管理所有连接的channel
*/
private static final ChannelGroup CHANNELS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 一连接就调用,第一个调用
* @param ctx 上下文
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println(ctx.channel().remoteAddress()+"上线");
CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+"上线\n");
CHANNELS.add(ctx.channel());
}
private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* channel被激活时调用
* @param ctx 上下文
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(ctx.channel().remoteAddress()+"加入聊天");
}
/**
* 断开连接时调用
* @param ctx 上下文
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+" "+dateFormat.format(new Date())+" 断开连接\n");
}
/**
* channel失活时调用
* @param ctx 上下文
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
CHANNELS.writeAndFlush("[客户端] "+ctx.channel().remoteAddress()+" "+dateFormat.format(new Date())+" 下线\n");
}
/**
* 发生异常时调用
* @param ctx 上下文
* @param cause 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
/**
* 读取客户端发送的消息
* @param ctx 上下文
* @param msg 客户端发来的经过解码的字符串消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Channel channel = ctx.channel();
CHANNELS.forEach(channel1 -> {
if (channel!=channel1){
channel1.writeAndFlush("[客户端] "+channel.remoteAddress()+" "+dateFormat.format(new Date())+":\n"+msg+"\n");
}else {
channel1.writeAndFlush("[我] "+dateFormat.format(new Date())+":\n"+msg+"\n");
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 客户端
public class Client {
private final String host;
private final int port;
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public void run(){
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加Netty提供的字符串解码、编码器
pipeline.addLast(new StringDecoder())
.addLast(new StringEncoder())
// 添加自定义处理器
.addLast(new ClientHandler());
}
});
try {
ChannelFuture channelFuture = bootstrap.connect(this.host, this.port).sync();
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()){
System.out.println("客户端启动成功");
}
});
// 不断读取输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
channelFuture.channel().writeAndFlush(s);
}
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) {
Client localhost = new Client("localhost", 8848);
localhost.run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 客户端处理器
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.print(msg);
}
}
1
2
3
4
5
6
2
3
4
5
6