Netty的多协议开发和粘包拆包
Bootstrap和ServerBootstrapBootstrap是引导的意思,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中的Bootstrap是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类常见API//应用于客户端,设置一个EventLoopGrouppublic B group(EventLoop
Http协议概述
什么是HTTP协议
HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统
HTTP协议的主要特点可概括如下:
- 支持Client/Server模式。
- 简单快速:客户向服务器请求服务时,只需传送请求方法和路径。请求方法常用的有GET、HEAD、POST。每种方法规定了客户与服务器联系的类型不同。由于HTTP协议简单,使得HTTP服务器的程序规模小,因而通信速度很快。
- 灵活:HTTP允许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
- 无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
- 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。
Http协议交互过程
协议交互本质是指协议两端(客户端、服务端)如何传输数据?如何交换数据?
传输数据:
传输数据一般基于TCP/IP 实现,体现到开发语言上就是我们所熟悉的Socket 编程。
交换数据:
交换数据本质是指,两端(客户端、服务端)能各自识别对方所发送的数据。那么这就需要制定一套报文编码格式,双方以该格式编码数据发送给对方。Http 对应的Request 与Response报文格式如下图:
报文约定好以后两端都需要对其进行解码和编码操作,其过程如下图:
Http协议内容组成
请求方法
- GET: 请求指定的页面信息,并返回实体主体。
- HEAD: 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
- POST:向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中,POST请求可能会导致新的资源的建立和/或已有资源的修改。
- PUT: 从客户端向服务器传送的数据取代指定的文档的内容。
- DELETE: 请求服务器删除指定的页面。
- CONNECT:HTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。
- OPTIONS: 允许客户端查看服务器的性能。
- TRACE: 回显服务器收到的请求,主要用于测试或诊断。
部分请求头:
- Host: 接受请求的服务器地址,可以是IP:端口号,也可以是域名
- User-Agent:发送请求的应用程序名称
- Connection: 指定与连接相关的属性,如Connection:Keep-Alive
- Accept-Charset: 通知服务端可以发送的编码格式
- Accept-Encoding: 通知服务端可以发送的数据压缩格式
- Accept-Language: 通知服务端可以发送的语言
部分响应头:
- Server: 服务器应用程序软件的名称和版本
- Content-Type: 响应正文的类型(是图片还是二进制字符串)
- Content-Length:实体报头域用于指明实体正文的长度,以字节方式存储的十进制数字来表示响应正文长度
- Content-Charset: 响应正文使用的编码
- Content-Encoding: 响应正文使用的数据压缩格式
- Content-Language: 响应正文使用的语言
部分响应状态:
- 200 响应成功
- 302 跳转,跳转地址通过响应头中的Location属性指定(JSP中Forward和Redirect之间的区别)
- 400 客户端请求有语法错误,不能被服务器识别
- 403 服务器接收到请求,但是拒绝提供服务(认证失败)
- 404 请求资源不存在
- 500 服务器内部错误
Netty的Http协议开发
由于Netty天生是异步事件驱动的架构,因此基于NIO TCP协议栈开发的HTTP协议栈也是异步非阻塞的,Netty的HTTP协议栈无论在性能上还是可靠性上相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性更好。
Http服务端开发
public class HttpServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();//处理连接请求
NioEventLoopGroup workerGroup = new NioEventLoopGroup();//处理I/O 读写事件和业务逻辑
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//1. 给pipeline设置httpServerCodec处理器,httpServerCodec是netty提供的处理http的编解码器
socketChannel.pipeline().addLast("myHttpServerCodec", new HttpServerCodec());
//2. SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类;HttpObject表示客户端和服务器端互相通讯的数据被封装成HttpObject类型
//而不是之前的Object类型
socketChannel.pipeline().addLast("httpServerHandler", new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
//不同浏览器对应不同的pipeline和handler
if(msg instanceof HttpRequest){
HttpRequest request = (HttpRequest)msg;
//获取URI过滤特定资源
URI uri = new URI(request.uri());
if("/favicon.ico".equals(uri.toString())){
System.out.println("请求图标不做响应");
return;
}
System.out.println("msg类型=" + msg.getClass());
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
//回复消息给浏览器
ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8);
//构造一个Http响应,即HttpResponse
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
//将构建好的response返回
ctx.writeAndFlush(response);
}
}
});
}
});//给我们workGroup的eventLoop的对应的管道设置处理器
ChannelFuture channelFuture = bootstrap.bind(7788).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
channelFuture.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
上面的HttpServerCodec是Netty提供的编解码器,相当于HttpRequestDecoder, HttpResponseEncoder,也可以在服务端使用HttpRequestDecoder来解码请求并用HttpResponseEncoder来编码响应,如下:
public class HttpSimpleServer {
//open 启动服务
public void openServer() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(8);
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());//添加Http请求消息解码器,解码Request
//将多个消息转换为单一的FullHttpRequest或者FullHttpResponse,因为Http解码器在每个Http消息中会生成多个消息对象
ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());//添加Http消息编码器,编码response
ch.pipeline().addLast("http-server", new HttpServerHandler());
}
});
bootstrap.group(boss, work);
try {
ChannelFuture future = bootstrap.bind(8833).sync();
System.out.println("服务启动:8833");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
private static class HttpServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
String html = "<!DOCTYPE html>\n" +
"<html lang=\"en\">\n" +
"<head>\n" +
" <meta charset=\"UTF-8\">\n" +
" <title>hello word</title>\n" +
"</head>\n" +
"<body>\n" +
"hello word\n" +
"</body>\n" +
"</html>";
response.content().writeBytes(html.getBytes("UTF-8"));
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
public static void main(String[] args) {
HttpSimpleServer simpleServer = new HttpSimpleServer();
simpleServer.openServer();
}
}
WebSocket协议开发
Http协议的弊端
- Http协议为半双工协议,半双工指的是数据可以在客户端和服务端两个方向上传输,但是不能同时传输,同一时刻只有一个方向上的数据传送
- Http消息冗长而繁琐,Http消息包含消息头、消息体、换行符等,文本方式传输。
为了解决Http协议效率抵消的问题,HTML5开始提供了一种浏览器和服务器之间进行全双工通信的网络技术-WebSocket,WebSocket API中浏览器和服务器只需要做一个握手的动作,然后服务器和浏览器之间就形成一条快速通道,两者就可以互相传递数据了,WebSocket基于TCP双向全双工进行消息传递,同一时刻既可以发送消息可以接受消息,相比HTTP的半双工协议,性能得到很大的提升。
WebSocket的特点
- 单一的TCP连接,采用全双工模式通信
- 对代理、防火墙和路由器透明
- 无头部信息、Cookie和身份验证
- 无安全开销
- 通过“ping/pong”帧保持链路激活
- 服务器可主动传递消息给客户端,不在需要客户端轮训
WebSocket 协议报文格式:
任何应用协议都有其特有的报文格式,比如Http协议通过 空格 换行组成其报文。如http 协议不同在于WebSocket属于二进制协议,通过规范进二进位来组成其报文。具体组成如下图:
报文说明:
FIN:标识是否为此消息的最后一个数据包,占 1 bit
RSV1, RSV2, RSV3:用于扩展协议,一般为0,各占1bit
Opcode:数据包类型(frame type),占4bits
0x0:标识一个中间数据包
0x1:标识一个text类型数据包
0x2:标识一个binary类型数据包
0x3-7:保留
0x8:标识一个断开连接类型数据包
0x9:标识一个ping类型数据包
0xA:表示一个pong类型数据包
0xB-F:保留
MASK:占1bits,用于标识PayloadData是否经过掩码处理。如果是1,Masking-key域的数据即是掩码密钥,用于解码PayloadData。客户端发出的数据帧需要进行掩码处理,所以此位是1。
Payload length:
Payload data的长度,占7bits,7+16bits,7+64bits:
如果其值在0-125,则是payload的真实长度。
如果值是126,则后面2个字节形成的16bits无符号整型数的值是payload的真实长度。注意,网络字节序,需要转换。
如果值是127,则后面8个字节形成的64bits无符号整型数的值是payload的真实长度。注意,网络字节序,需要转换。
Payload data
应用层数据
WebSocket连接的建立
建立webSocket连接时,需要通过客户端或浏览器发出握手请求,请求消息示例如下图:
为了建立一个WebSocket连接,客户端浏览器首先向服务器发起一个Http请求,这个Http请求和通常的Http请求不同,包含一些附加头信息,其中附加头信息“Upgrade :Websocket”表明这是一个申请协议升级的Http请求,服务器解析附加头信息并生成应答报文返回给客户端,客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个连接自由传递消息,直到某一方主动关闭该连接,服务端返回给客户端的应答报文如下:
其中请求消息中的“Sec-WebSocket-key”是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要,并进行BASE-64编码,作为“Sec-WebSocket-Accept”头的值返回给客户端。
Websocket协议开发
主要流程如下:
服务端:
pipeline添加 聚合器 HttpObjectAggregator
pipeline添加分包器ChunkedWriteHandler
pipeline添加WebSocket协议处理器 WebSocketServerProtocolHandler
pipeline编写WebSocketServerHandler
客户端:
编写客户端js脚本
服务端代码如下:
public class WebSocketServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//第一次握手请求时由Http协议承载,使用Http的编码器和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,支持异步发送大的码流(如大文件的传输),但不占用过多的内存
pipeline.addLast(new ChunkedWriteHandler());
/**
* Http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
* 这就是为什么当浏览器发送大量数据时就会发送多次Http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/**
* 对于websocket数据是以帧(frame)的形式传递
* 可以看到WebSocketFrame下面有6个子类
* 浏览器请求时:ws://localhost:7000/hello 表示请求的uri,(与页面中websocket中url一样)
* WebSocketServerProtocolHandler的核心功能是将Http协议升级成ws协议(是通过状态码101),保持长连接
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
}
});
ChannelFuture future = bootstrap.bind(7000).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
static class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息" + msg.text());
//响应
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + msg.text()));
}
/**
* 当web客户端连接后,触发方法
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id表示唯一的值,LongText是唯一的,ShortText不是唯一的
System.out.println("handlerAdd被调用" + ctx.channel().id().asLongText());
System.out.println("handlerAdd被调用" + ctx.channel().id().asShortText());
}
/**
* 当web客户端连接断开后,触发方法
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被调用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生" + cause.getMessage());
ctx.close();
}
}
}
通过javaScript 中的API可以直接操作WebSocket 对象,其示例如下,客户端的hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form onsubmit="return false">
<textarea name="message" style="height:300px; width: 300px"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height:300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
<script>
var socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:7000/hello");
//这个方法相当于channel中read方法收到服务器端会送的消息
socket.onmessage = function (msg) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + msg.data;
}
//相当于连接开启
socket.onopen = function (msg) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了"
}
//相当于连接断开
socket.onclose = function (msg) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" +"连接断开了"
}
}else{
alert("不支持websocket")
}
function send(message) {
if(!window.socket){//先判断websocket是否创建好
return
}
if(socket.readyState == WebSocket.OPEN){
//通过socket发送消息
socket.send(message);
}else{
alert("连接没有开启")
}
}
</script>
</html>
打开hello.html
在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;他可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。
我们的应用程序将采用下面的约定:如果被请求的URL以/hello结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。在连接已经升级完成之后,所有数据都将会使用WebSocket进行传输。
在页面中发送消息,客户端和服务端基于长连接,可以发送消息,效率高:
websocket的常用API如下:
var ws = new WebSocket(“ws://localhost:8080”);
ws.onopen = function()// 建⽴成功之后触发的事件
{
console.log(“打开连接”);
ws.send("message"); // 发送消息
};
ws.onmessage = function(evt) { // 接收服务器消息
console.log(evt.data);
};
ws.onclose = function(evt) {
console.log(“WebSocketClosed!”); // 关闭连接
};
ws.onerror = function(evt) {
console.log(“WebSocketError!”); // 连接异常
};
首先:申请创建一个WebSocket对象,并传入WebSocket地址信息,这时client会通过Http先发起握手请求。消息格式如下:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket //告诉服务端需要将通信协议升级到websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== //浏览器base64加密的密钥,server端收到后需要提取
Sec-WebSocket-Key 信息,然后加密。
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat //表⽰客⼾端请求提供的可供选择的⼦协议
Sec-WebSocket-Version: 13 //版本标识
其次:服务端响应、并建立连接
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: SIEylb7zRYJAEgiqJXaOW3V+ZWQ=
然后:握手成功促发客户端 onOpen 事件
连接状态查看:
通过ws.readyState 可查看当前连接状态可选值如下:
- CONNECTING (0):表示还没建立连接;
- OPEN (1): 已经建立连接,可以进行通讯;
- CLOSING (2):通过关闭握手,正在关闭连接;
- CLOSED (3):连接已经关闭或无法打开;
WEBSOCKET帧
WebSocket以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。
IETF发布的WebSocket RFC,定义了6种帧,Netty为它们都提供了一个POJO实现。WebSocketFrame类型的6种子类型:
BinaryWebSocketFrame——包含了二进制数据
TextWebSocketFrame——包含了文本数据
ContinuationWebSocketFrame——包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame的文本数据或者二进制数据
CloseWebSocketFrame——表示一个关闭链路的请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame——请求传输一个PongWebSocketFrame
PongWebSocketFrame——作为一个对于PingWebSocketFrame的响应被发送
TextWebSocketFrame是我们需要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。
webSocket+Netty实现弹幕系统
编码和解码
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码,Codec编解码器由两部分:Decoder(解码器)和Encoder(编码器)组成,Encoder负责把业务数据转换成字节码数据,Decoder负责把字节码数据转换成业务数据。
Netty中提供了一些Codec编解码器,如StringEncoder和StringDecoder对字符串进行编解码,ObjectEncoder和ObjectDecoder对java对象进行编解码。ObjectEncoder和ObjectDecoder对Pojo对象序列化操作的时候底层仍是使用Java序列化技术,而Java序列化技术本身存在以下问题:
- 无法跨语言
- 序列化后码流太大
- 序列化性能比较低
Google的Protobuf
ProtoBuf全称是Google Protocol Buffers,由谷歌开源而来,是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,适合做数据存储或RPC数据交换格式(目前很多公司的数据交换由Http+Json向Tcp+Protobuf转换,),它将数据结构以.proto文件进行描述,通过代码生成工具(protobuf.exe编译器)可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性,特点如下:
结构化的数据存储格式(xml或者Json)
- 高效的编解码性能
- 语言无关、平台无关、扩展性好
- 官方支持Java、C++、Python三种语言
Protobuf的入门
Protobuf是一个灵活的、高效、结构化的数据序列化框架,支持将数据结构化一次而可以到处使用,甚至跨语言使用,通过代码生成工具可以自动生产不通语言版本的源代码,甚至可以使用不同的版本的数据结构进程间进行传递,实现数据结构的前后兼容
准备工作:引入protobuf的依赖和下载protoc-3.13.0-win32.zip(protoc.exe工具主要是将.protp文件生成相应的代码)
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.13.0</version>
</dependency>
编写.proto文件,demo1如下:
syntax = "proto3"; //使用proto3协议
option java_outer_classname = "StudentPOJO"; //生成的外部类名,同时也是文件名
message Student { //会在StudentPOJO外部类中生成一个内部类Student,Student才是真正发送的POJO对象
int32 id = 1; //Student类中有一个属性名字为id,类型为int32(protobuf类型), 1表示属性的序号,而不是值
string name = 2;
}
demo2如下:
syntax = "proto3";
option optimize_for = SPEED;//加快解析
option java_package = "com.geeron.authdemo.nio.codec2";//指定生成在那个包下
option java_outer_classname = "DataPOJO"; //生成的外部类名,同时也是文件名
//protobuf可以使用message 管理其他message
message MultiMessage {
enum MessageType{
StudentType = 0; //在proto3中要求enum的编号从0开始
WorkerType = 1;
}
//哪一个枚举的类型
MessageType type = 1;
//oneof 表示每次枚举的类型最多只能出现其中之一,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}
}
message Student { //会在StudentPOJO外部类中生成一个内部类Student
int32 id = 1; //Student类中有一个属性名字为id,类型为int32(protobuf类型), 1表示属性的序号,而不是值
string name = 2;
}
message Worker { //会在StudentPOJO外部类中生成一个内部类Student
string name = 1;
int32 age = 2;
}
用protoc.exe命令生产java代码,如下,生成的java类名是由proto文件中option java_outer_classname指定的名称。
protoc.exe --java_out=. xxx.proto
代码中引入生成的protobuf java类,在Netty中引入protobuf的编解码器并发送protobuf相关
服务器端:
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();//处理连接请求
NioEventLoopGroup workerGroup = new NioEventLoopGroup();//处理I/O 读写事件和业务逻辑
try {
//ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类,设置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//设置服务器的通道类型:NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数,线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//绑定I/O事件处理类
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("客户的channel的hashCode" +socketChannel.hashCode());
ChannelPipeline pipeline = socketChannel.pipeline();
//在pipeline加入ProtobufDecoder,并指定对那种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(DataPOJO.MultiMessage.getDefaultInstance()));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们workGroup的eventLoop的对应的管道设置处理器
ChannelFuture channelFuture = bootstrap.bind(9911).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
channelFuture.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
//自定义一个Handler继续Netty规定的某个Handler适配器
static class NettyServerHandler extends SimpleChannelInboundHandler<DataPOJO.MultiMessage> {
@Override
public void channelRead0(ChannelHandlerContext ctx, DataPOJO.MultiMessage msg) throws Exception {
DataPOJO.MultiMessage.DataBodyCase type = msg.getDataBodyCase();
if(type == DataPOJO.MultiMessage.DataBodyCase.WORKER){
System.out.println("客户端发送消息age::"+msg.getWorker().getAge() +"名字" + msg.getWorker().getName());
}else if(type == DataPOJO.MultiMessage.DataBodyCase.STUDENT){
System.out.println("客户端发送消息id:"+msg.getStudent().getId() +"名字" + msg.getStudent().getName());
}
}
}
}
客户端:
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();//客户端使用BootStrap
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//在pipeline加入protobufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9911);//连接服务端
channelFuture.channel().closeFuture().sync();
}finally {
loopGroup.shutdownGracefully();
}
}
static class NettyClientHandler extends ChannelInboundHandlerAdapter{
/**
* 当通道就绪就会触发该方法,发送消息给服务器端(写入通道)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DataPOJO.MultiMessage message = null;
if(new Random(10).nextInt() %2 == 0){
message = DataPOJO.MultiMessage.newBuilder().setType(DataPOJO.MultiMessage.MessageType.StudentType).setStudent(DataPOJO.Student.newBuilder().setId(35).setName("dachun").build()).build();
}else{
message = DataPOJO.MultiMessage.newBuilder().setType(DataPOJO.MultiMessage.MessageType.WorkerType).setWorker(DataPOJO.Worker.newBuilder().setAge(35).setName("saoxiu").build()).build();
}
System.out.println("Server ctx: " +ctx);
ctx.writeAndFlush(message);
}
}
Netty的ChannelHandler的调用机制
客户端发送消息(如:Long型),经过(出栈)编码器编码,发送到服务端,服务端收到后,解码消息(入栈),消费后响应一个消息(Long型)给服务端,经过编码器编码(出栈)后响应给服务端,服务端收到后通过解码器解码(入栈)响应消息。
服务端的开发
public class MyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加入栈的Handler进行解码,对客户端的消息进行解码
pipeline.addLast(new MyByteToLongDecoder());
//添加出栈的编码器handler,对客户端做出响应的数据进行编码
pipeline.addLast(new Myclient.MyLongToByteEncoder());
//添加业务逻辑处理
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7009).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//自定义byte-Long解码器
static class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* decoder方法 会根据接受的数据被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多的可读字节为止
* 如果list 不为空,就会将list的内容传递给下一个ChannelInboundHandler处理,该处理器的方法也会被调用多次
* @param ctx 上下文
* @param in 入站的Bytebuf
* @param out 将解码后的数据传给下一个Handler处理(也就是MyServerHandler)
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decoder被调用");
//Long 是8个字节,所以必须满足8个字节才能读取1个Long
if(in.readableBytes() >= 8){
out.add(in.readLong());//添加到list中被下个handler获取
}
}
}
static class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端:" +ctx.channel().remoteAddress() + "读取到long:" + msg);
//给客户端做出响应
ctx.writeAndFlush(985L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
服务端的开发
public class Myclient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加出栈的Handler,对发送服务端的数据进行编码
pipeline.addLast(new MyLongToByteEncoder());
//添加入栈的handler,对服务端响应的数据进行解码
pipeline.addLast(new MyServer.MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 7009).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
static class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encoder方法被调用,msg" + msg);
out.writeLong(msg);//写入到ByteBuf中
}
}
static class MyClientHandler extends SimpleChannelInboundHandler<Long>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器IP=" + ctx.channel().remoteAddress());
System.out.println("收到服务器的消息:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client 发送数据");
ctx.writeAndFlush(211L);
//该处理器的前一个Handler是MessageToByteEncoder的 实现,在write方法中有如下判断:类型与解码器的泛型是否匹配
//因此我们在编写encoder时要主要传入的数据类型和处理的数据类型一致
/**
* if (acceptOutboundMessage(msg)) { 判断当前msg是不是应该处理的类型
* @SuppressWarnings("unchecked")
* I cast = (I) msg;
* buf = allocateBuffer(ctx, cast, preferDirect);
* try {
* encode(ctx, cast, buf);
* } finally {
* ReferenceCountUtil.release(cast);
* }
*
* if (buf.isReadable()) {
* ctx.write(buf, promise);
* } else {
* buf.release();
* ctx.write(Unpooled.EMPTY_BUFFER, promise);
* }
* buf = null;
* } else {
* ctx.write(msg, promise);
* }
*/
// 如果按照这种形式发送, 服务端在接受消息的时候可能会解码多次
//ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcd", CharsetUtil.UTF_8));
}
}
}
结论:不论是解码器Handler还是编码器Handler,即接受的消息的类型必须与待处理的消息类型一致,否则该Handler不会被执行
在解码器在数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致。
TCP的粘包和拆包
TCP编程中无论是服务端还是客户端,读取和发送消息时都要考虑TCP底层的粘包和拆包机制,TCP是面向连接(数据是没有界限的)、面向流的、提供高可靠性服务,服务器和客户端都要有对应的socket,因此发送端为了将多个发送给接收端的包更有效的发给对方,使用了优化算法(Nagle算法),将多次间隔较小且数据量小的数据合并成一个大的数据块,然后进行封包,这样虽然提高了发送的效率,但是接收端就难以分辨完整的数据包,因此面向流的通信是无消息保护边界的。所以相对于业务来说,一个完整的包可能会被TCP拆分多个包进行发送 ,也有可能把许多小的包封装成一个大的数据包发送,这就是TCP的粘包和拆包的问题
粘包、拆包问题图解
假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,由于服务端一次性读取的字节数是不确定的,因此服务端收到的数据可以分为三种,如下所示:
第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不考虑。
第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以接收端不知道如何处理。
第三种情况,接收端收到了两个数据包,但是这两个数据包要么是不完整的,第一次读取到了完整Packet2包和一部分的Packet1包,第二次读取到Packet2包的剩余内容,(或者第一次读取一部分的Packet2包,第二次读取剩下的Packet2包和完整的Packet1包),这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。
粘包和拆包的演示
服务端核心代码:接受服务端发送的消息并做出响应
static class TcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
//服务端接受数据
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);//将msg消息读取到buffer字节数组中
String message = new String(buffer, Charset.forName("UTF-8"));
System.out.println("服务器接收到的数据" + message);
//做出响应
ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("UTF-8"));
ctx.writeAndFlush(byteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端核心代码:发送10个消息并接受服务端的响应消息
static class TcpClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送10个数据
for (int i=0; i< 10; i++){
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello,Server" + i, Charset.forName("UTF-8"));
ctx.writeAndFlush(byteBuf);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);//将msg消息读取到buffer字节数组中
String message = new String(buffer, Charset.forName("UTF-8"));
System.out.println("客户端接收到的数据" + message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
执行可以看出:服务端接受数据时候发生粘包和拆包
服务器接收到的数据Hello,Server0
服务器接收到的数据Hello,Server1
服务器接收到的数据Hello,Server2Hello,Server3Hello,Server4
服务器接收到的数据Hello,Server5Hello,Server6
服务器接收到的数据Hello,Server7Hello,Server8
服务器接收到的数据Hello,Server9
TCP粘包拆包发生的原因有很多,主要包括如下:
-
要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
-
待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
-
要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
-
接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
粘包和拆包的解决策略
使用自定义协议+编解码器来解决
关键是要解决服务端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或者少读数据的问题,从而避免了TCP粘包和拆包的问题
自定义协议MessageProtocol
@Data
public class MessageProtocol {
private int len; //消息长度
private byte[] content;//消息内容
}
在服务端和客户端指定编码器
服务端代码:添加编解码器
public class ProtocolTcpServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加解码器
pipeline.addLast(new MessageDecoder());
//添加编码器
pipeline.addLast(new ProtocolTcpClient.MessageEncoder());
//添加业务逻辑处理
pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7010).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
static class MessageDecoder extends ReplayingDecoder<MessageProtocol>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decoder被调用");
//需要将二进制字节码转成MessageProtocol对象
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
//封装成MessageProtocol对象,放入out,传递下一个handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(bytes);
out.add(messageProtocol);
}
}
static class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("服务器接收到信息:长度" + len + "内容:" +
new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到的消息包数量" + (++ this.count));
//回复消息
String resp = "海底捞";
int respLen =resp.getBytes(Charset.forName("utf-8")).length;
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setContent(resp.getBytes(Charset.forName("utf-8")));
messageProtocol.setLen(respLen);
ctx.writeAndFlush(messageProtocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
客户端代码:
public class ProtocolTcpClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加编码器,对对象进行编码
pipeline.addLast(new MessageEncoder());
//添加解码器,对服务端的响应进行解码
pipeline.addLast(new ProtocolTcpServer.MessageDecoder());
pipeline.addLast(new TcpClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 7010).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
static class MessageEncoder extends MessageToByteEncoder<MessageProtocol>{
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("messageEncoder encoder被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
static class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol>{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送10个数据
for (int i=0; i< 5; i++){
String msg = "吃火锅";
byte[] content = msg.getBytes(Charset.forName("utf-8"));
int length = msg.getBytes(Charset.forName("utf-8")).length;
//创建协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setContent(content);
messageProtocol.setLen(length);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
byte[] content = msg.getContent();
int len = msg.getLen();
System.out.println("客户端接受的消息 长度:" +len + "内容:"
+new String(content, Charset.forName("utf-8")
));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常消息" + cause.getMessage());
ctx.close();
}
}
}
由于TCP无法知道上层业务数据,所以TCP底层无法保证数据包不会被拆分和重组,所以我们只能利用上层的应用协议栈设计来解决,归纳如下:
-
消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格
-
在包尾增加回车换行符进行分割,例如FTP协议
-
将消息分为消息头和消息体,消息头包含消息的总长度(或者消息体长度)
以上3种方式,客户端接受到包的时候就可以根据这些约束区分出来不同的包。
Netty中解决TCP粘包拆包问题
为了解决TCP中粘包、拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,直接使用这些类库,TCP粘包拆包问题就变得非常容易
LineBasedFremeDecoder解决TCP粘包问题
LineBasedFremeDecoder改造服务端代码
public class NettyServer {
public void bind(int port){
//NioEventLoopGroup是一个线程组,包含一组NIO线程
EventLoopGroup bossGroup = new NioEventLoopGroup();//用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//用于SocketChannel的网络读写
try{
//ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类
ServerBootstrap bs = new ServerBootstrap();
bs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//设置创建的channel
.option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数
.childHandler(new ChildChannelHandler());//绑定I/O事件处理类
ChannelFuture sync = bs.bind(port).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
sync.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
class TimeServerHandler extends ChannelHandlerAdapter{
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
System.out.println("The time server received order :" + body +";the counter is:" + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
//响应的消息也添加回车换行符
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
System.out.println("done" +currentTime);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//Netty把write方法并不直接将消息写入到SocketChannel中,调用write方法只是把待发送的消息放到缓冲数组中,
// 调用flush方法才将消息全部写道SocketChanel
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放相关句柄等资源
ctx.close();
}
}
public static void main(String[] args) {
new NettyServer().bind(9988);
}
}
LineBasedFremeDecoder改造客户端代码
public class NetttClient {
public void connect(String host, int port){
EventLoopGroup group = new NioEventLoopGroup();
try{
//创建客户端辅助启动类Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
//创建NioSocketChannel成功之后,进行初始化
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture sync = bootstrap.connect(host, port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放NIO线程组资源
group.shutdownGracefully();
}
}
class TimeServerHandler extends ChannelHandlerAdapter {
//private final ByteBuf firstMessage;
private byte[] req;
private int counter;
public TimeServerHandler() {
//给消息添加回车换行符
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
//当客户端和服务端TCP链路建立成功之后,Netty的NI线程会调用channelActive方法,发送查询指定给服务端
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//将请求消息发送给服务端
ByteBuf message;
for (int i =0; i <100; i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("now is :" + body +";the couter is :" + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("Unexpected exception frm downstream:" + cause.getMessage());
ctx.close();
}
}
public static void main(String[] args){
new NetttClient().connect("127.0.0.1",9988);
}
}
在改造的代码中,新增了2个解码器LineBasedFrameDecoder和StringDecoder,发送的带有回车换行符的消息在被接收后msg就是删除了回车换行符的消息,不需要再对消息进行编码解码。LineBasedFrameDecoder的工作原理就是一次遍历ByteBUF中可读字节,判断看是否有“\n”或者“\r\n”,如有,就以此位置为结束位置,这样可以读到一行一行的息,LineBasedFrameDecoder是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读取到最大长度仍然没有发现换行符就会抛出异常, 同时忽略之前读取的异常码流。StringDecoder的功能就是将接收到的对象转成字符串,然后继续调用Handler,LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器。
当然,基于LineBasedFrameDecoder+StringDecoder组合是针对回车换行符,如果消息没有回车换行符的消息就需要使用其他的半包解码器,Netty提供了支持多种TCP粘包/拆包的解码器,用来满足不同需求
分隔符和定长解码器
分隔符解码器:DelimiterBasedFrameDecoder的应用开发
DelimiterBasedFrameDecoder的服务端开发
public class EchoServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//1024 表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符则抛出TooLongFrameException异常
//第二个参数就是分隔符缓冲对象
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
class EchoServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//发生异常,关闭链路
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
System.out.println("This is " + ++counter + "times received cline :{" + body +"}");
body += "$_";
ByteBuf byteBuf = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(byteBuf);
}
}
public static void main(String[] args) {
new EchoServer().bind(9999);
}
}
DelimiterBasedFrameDecoder的客户端开发
public class EchoClient {
public void connect(String host, int port){
//配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimeter = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimeter));
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
//发起异步连接操作
ChannelFuture sync = bootstrap.connect(host, port).sync();
//等待客户端链路关闭
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放NIO线程组
group.shutdownGracefully();
}
}
class EchoClientHandler extends ChannelHandlerAdapter {
private int counter;
static final String ECHO_REQ = "HI,this is netty.$_";
public EchoClientHandler() {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i =0; i < 100; i ++){
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("This is "+ ++counter + "times receive server :{" + msg +"}");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
public static void main(String[] args) {
new EchoClient().connect("127.0.0.1", 9999);
}
}
分别启动服务端和客户端
This is 1times received cline :{HI,this is netty.}
This is 2times received cline :{HI,this is netty.}
This is 3times received cline :{HI,this is netty.}
This is 4times received cline :{HI,this is netty.}
This is 5times received cline :{HI,this is netty.}
This is 6times received cline :{HI,this is netty.}...
DelimiterBasedFrameDecoder是一个支持自定义分隔符来实现解码,创建的DelimiterBasedFrameDecoder加入到ChannelPipeline中,构造方法中的第一个参数1024 表示单条消息的最大长度,当达到最大长度仍然没有找到分隔符的话就会抛出TooLongFrameExeception异常。防止由于缺失分隔符导致的内存溢出,第二个参数是分隔符缓冲对象,DelimiterBasedFrameDecoder对消息进行解码,后续的ChannelHandler收到的msg就是一个完整的消息包,StringDecoder的作用就是将Bytebuf解码成字符串对象。
FixedLengthFrameDecoder的应用开发
FixedLengthFrameDecoder解码器是固定长度的解码器,能够按照指定的长度对消息进行自动解码,使用的方法和上面的几种解码器是一致的,主要不通在于如下
public void bind(int port){
//NioEventLoopGroup是一个线程组,包含一组NIO线程
EventLoopGroup bossGroup = new NioEventLoopGroup();//用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//用于SocketChannel的网络读写
try{
//ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类
ServerBootstrap bs = new ServerBootstrap();
bs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//设置创建的channel
.option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(15));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});//绑定I/O事件处理类
ChannelFuture sync = bs.bind(port).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
sync.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
通过自定义长度的解码器,无论一次接受多少数据包,都会按照构造设置的固定长度进行解码,如果是半包消息,会缓存宝宝消息并等待下一个包到达后进行拼包,知道读取一个完整的包
更多推荐
所有评论(0)