WebSocket介绍

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

直接开始进入正题–SpringBoot WebSocket 开始

1、搭建环境
你还在手动搭建项目码?都9102年了还在手动,我们早都全自动了。不会的请戳这里https://start.spring.io/

alt text

点击生成,浏览器会下载一个压缩包。这就是你要的项目了。是不是很简单、很easy、很nice。

2、准备工作

项目采用IDEA + Maven + JDK1.8 + Netty4.1.43
https://mvnrepository.com/artifact/io.netty/netty-all/4.1.43.Final

maven地址如下:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>

3、开始集成Netty

websocket启动类
我们使用的实现ApplicationRunner进行项目启动加载启动我们的netty服务。当然实现CommandLineRunner也可以,他两个区别在于run方法输入参数(封装获取方式),大家自行斟酌自行选择。
还有一种方式就是@PostConstruct注解在启动的方法上也可以,但是我在使用springboot scheduling(定时器)时发现定时器不会初始化不会触发。我也是百思不得其姐,SO放弃了…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.laowj.websocket;

import com.laowj.websocket.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
* web socket 服务类
*/
@Component
public class WebSocketService implements ApplicationRunner {

@Value("${websocket.port}")
private int socketPort;

@Override
public void run(ApplicationArguments args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec",
new HttpServerCodec());
pipeline.addLast("aggregator",
new HttpObjectAggregator(65536));
pipeline.addLast("http-chunked",
new ChunkedWriteHandler());
pipeline.addLast("handler",
new WebSocketServerHandler());
pipeline.addLast("logger",new LoggingHandler(LogLevel.INFO));
}
});

Channel ch = b.bind(socketPort).sync().channel();
System.out.println("Web socket server started at port " + socketPort
+ '.');
System.out
.println("Open your browser and navigate to http://localhost:"
+ socketPort + '/');

ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

重点来了、这块需要那笔记、敲黑板划重点。考试要考的
websocket消息处理类WebSocketServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package com.laowj.websocket.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = Logger
.getLogger(WebSocketServerHandler.class.getName());

private WebSocketServerHandshaker handshaker;

// websocket 服务的 uri
private static final String WEBSOCKET_PATH = "/ws_protocol";

// 一个 ChannelGroup 代表一个频道
public static Map<String, ChannelGroup> channelGroupMap = new ConcurrentHashMap<>();

// 本次请求的 code
private static final String HTTP_REQUEST_STRING = "token";

/**
* 存储token
*/
private static AttributeKey<String> TOKEN_CHANNEL_KEYS = AttributeKey.valueOf("TOKEN.CHANNEL.KEYS");

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传统的HTTP接入
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
// WebSocket接入
else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}

private void handleHttpRequest(ChannelHandlerContext ctx,
FullHttpRequest req) throws Exception {

// 如果HTTP解码失败,返回HTTP异常
if (!req.decoderResult().isSuccess()
|| (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
BAD_REQUEST));
return;
}

// Allow only GET methods.
if (req.method() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}

if ("/favicon.ico".equals(req.uri()) || ("/".equals(req.uri()))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
return;
}

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
Map<String, List<String>> parameters = queryStringDecoder.parameters();

if (parameters.size() == 0 || !parameters.containsKey(HTTP_REQUEST_STRING)) {
System.err.printf(HTTP_REQUEST_STRING + "参数不可缺省");
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
return;
}

String token = parameters.get(HTTP_REQUEST_STRING).get(0);

// 用户唯一标识不存在返回,则新增一个频道 ChannelGroup
if (!channelGroupMap.containsKey(token)) {
channelGroupMap.put(token, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
}
// 确定有唯一标识,才将客户端加入到频道中
channelGroupMap.get(token).add(ctx.channel());
Attribute<String> tokenAttr = ctx.channel().attr(TOKEN_CHANNEL_KEYS);
tokenAttr.setIfAbsent(token);

// 构造握手响应返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}

private void handleWebSocketFrame(ChannelHandlerContext ctx,
WebSocketFrame frame) throws Exception {

// 判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(),
(CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(
new PongWebSocketFrame(frame.content().retain()));
return;
}
// 本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass().getName()));
}

// 返回应答消息
String request = ((TextWebSocketFrame) frame).text();
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("%s received %s", ctx.channel(), request));
}
ctx.channel().write(new TextWebSocketFrame(" 收到客户端请求:"+ request ));
}

private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, FullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}

// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
close(ctx);
}

private void close(ChannelHandlerContext ctx){
Attribute<String> tokenAttr = ctx.channel().attr(TOKEN_CHANNEL_KEYS);
String token = tokenAttr.get();
if (token != null && channelGroupMap.containsKey(token)) {//从群组中移除
ChannelGroup channelGroup = channelGroupMap.get(token);
int len = channelGroup.size();
if (len <= 1) {//群组就一个人直接移除群主
channelGroupMap.remove(token);
}else{
channelGroupMap.get(token).remove(ctx.channel());
}
}
ctx.close();
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
close(ctx);
}

private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get(HOST) + WEBSOCKET_PATH;
return "ws://" + location;
}
}

测试与实战

1、采用html进行测试
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body>

<form onsubmit="return false;">
<h1> Netty WebSocket 协议 </h1>
<h3>客户端请求消息</h3>
<textarea id="requestText" style="width:500px;height:200px;"></textarea>
<input type="button" value="发送WebSocket请求消息" onclick="send(document.getElementById('requestText').value)"/>
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width:600px;height:200px;"></textarea>
</form>

<script type="text/javascript">
function guid() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
return v.toString(16);
});
}
window.WebSocket = window.WebSocket || window.MozWebSocket;
if (!window.WebSocket){
alert("你的浏览器不支持websocket协议");
}else{
var socket = new WebSocket("ws://localhost:5052/ws_protocol?token="+guid());
socket.onmessage = function (event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "\r\n" + event.data
};
socket.onopen = function (event) {
alert("websocket连接建立成功...");
};
socket.onclose = function (event) {
alert("连接关闭");
};
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}
else {
alert("WebSocket not supported by this browser");
}
}
}
</script>
</body>
</html>