服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.example.netty.io;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* netty 服务端
*
* @author lanx
* @date 2022/3/20
*/
public class Server {

public static void main(String[] args) throws InterruptedException {
//用户接收客户端连接的线程工作组
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于接收客户端连接读写操作的线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//绑定两个工作组
.channel(NioServerSocketChannel.class)//设置NIO模式
//option 针对于服务端配置; childOption 针对于客户端连接通道配置
.option(ChannelOption.SO_BACKLOG, 1024)//设置tcp缓冲区
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)//设置发送数据的缓存大小
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024)//设置读取数据的缓存大小
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持长连接
//初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();

//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.example.netty.io;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* 服务端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {


/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端通道激活---------");
}

/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// NIO 通信 (传输的数据是什么? ---------> Buffer 对象)
ByteBuf buf = (ByteBuf) msg;
//定义byte数组
byte[] req = new byte[buf.readableBytes()];
// 从缓冲区获取数据到 req
buf.readBytes(req);
//读取到的数据转换为字符串
String body = new String(req, "utf-8");
System.out.println("服务端读取到数据:" + body);

//响应给客户端的数据
ctx.writeAndFlush(Unpooled.copiedBuffer("netty server response data".getBytes()));
// 添加 addListener 可以触发关闭通道监听事件(客户端短连接场景使用)
// .addListener(ChannelFutureListener.CLOSE);
}catch (Exception e){
e.printStackTrace();
}finally {
//释放数据
ReferenceCountUtil.release(msg);
}

}

/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端数据读取完毕---------");
}

/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------服务端数据读取异常---------");
ctx.close();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.example.netty.io;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* netty 客户端
* @author lanx
* @date 2022/3/20
*/
public class Client {

public static void main(String[] args) throws InterruptedException {

//线程工作组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
Bootstrap b = new Bootstrap();
b.group( workerGroup)//绑定两个工作组
.channel(NioSocketChannel.class)//设置NIO模式

//初始化绑定服务通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1",8765).syncUninterruptibly();
cf.channel().writeAndFlush(Unpooled.copiedBuffer("netty client request data".getBytes()));
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.example.netty.io;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* 客户端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {

/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端通道激活---------");
}

/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// NIO 通信 (传输的数据是什么? ---------> Buffer 对象)
ByteBuf buf = (ByteBuf) msg;
//定义byte数组
byte[] req = new byte[buf.readableBytes()];
// 从缓冲区获取数据到 req
buf.readBytes(req);
//读取到的数据转换为字符串
String body = new String(req, "utf-8");
System.out.println("客户端读取到数据:" + body);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放数据 (如果你读取数据后又写出去数据就不需要调用此方法,因为底层代码帮忙实现额释放数据)
ReferenceCountUtil.release(msg);
}
}

/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端数据读取完毕---------");
}

/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------客户端数据读取异常---------");
ctx.close();
}
}