博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty长连接实例
阅读量:3718 次
发布时间:2019-05-22

本文共 10732 字,大约阅读时间需要 35 分钟。

通过netty实现服务端与客户端的长连接通讯,及心跳检测。

基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

环境JDK1.8 和netty5

以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

设计消息类型:

1
2
3
public
enum
MsgType{
PING,ASK,REPLY,LOGIN
}

Message基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//必须实现序列,serialVersionUID一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public
abstract
class
BaseMsg
implements
Serializable{
private
static
final
long
serialVersionUID=1L;
private
MsgTypetype;
//必须唯一,否者会出现channel调用混乱
private
StringclientId;
//初始化客户端id
public
BaseMsg(){
this
.clientId=Constants.getClientId();
}
public
StringgetClientId(){
return
clientId;
}
public
void
setClientId(StringclientId){
this
.clientId=clientId;
}
public
MsgTypegetType(){
return
type;
}
public
void
setType(MsgTypetype){
this
.type=type;
}
}

常量设置:

1
2
3
4
5
6
7
8
9
public
class
Constants{
private
static
StringclientId;
public
static
StringgetClientId(){
return
clientId;
}
public
static
void
setClientId(StringclientId){
Constants.clientId=clientId;
}
}

登录类型消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public
class
LoginMsg
extends
BaseMsg{
private
StringuserName;
private
Stringpassword;
public
LoginMsg(){
super
();
setType(MsgType.LOGIN);
}
public
StringgetUserName(){
return
userName;
}
public
void
setUserName(StringuserName){
this
.userName=userName;
}
public
StringgetPassword(){
return
password;
}
public
void
setPassword(Stringpassword){
this
.password=password;
}
}

心跳检测Ping类型消息:

1
2
3
4
5
6
public
class
PingMsg
extends
BaseMsg{
public
PingMsg(){
super
();
setType(MsgType.PING);
}
}

请求类型消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public
class
AskMsg
extends
BaseMsg{
public
AskMsg(){
super
();
setType(MsgType.ASK);
}
private
AskParamsparams;
public
AskParamsgetParams(){
return
params;
}
public
void
setParams(AskParamsparams){
this
.params=params;
}
}
//请求类型参数
//必须实现序列化接口
public
class
AskParams
implements
Serializable{
private
static
final
long
serialVersionUID=1L;
private
Stringauth;
public
StringgetAuth(){
return
auth;
}
public
void
setAuth(Stringauth){
this
.auth=auth;
}
}

响应类型消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public
class
ReplyMsg
extends
BaseMsg{
public
ReplyMsg(){
super
();
setType(MsgType.REPLY);
}
private
ReplyBodybody;
public
ReplyBodygetBody(){
return
body;
}
public
void
setBody(ReplyBodybody){
this
.body=body;
}
}
//相应类型body对像
public
class
ReplyBody
implements
Serializable{
private
static
final
long
serialVersionUID=1L;
}
public
class
ReplyClientBody
extends
ReplyBody{
private
StringclientInfo;
public
ReplyClientBody(StringclientInfo){
this
.clientInfo=clientInfo;
}
public
StringgetClientInfo(){
return
clientInfo;
}
public
void
setClientInfo(StringclientInfo){
this
.clientInfo=clientInfo;
}
}
public
class
ReplyServerBody
extends
ReplyBody{
private
StringserverInfo;
public
ReplyServerBody(StringserverInfo){
this
.serverInfo=serverInfo;
}
public
StringgetServerInfo(){
return
serverInfo;
}
public
void
setServerInfo(StringserverInfo){
this
.serverInfo=serverInfo;
}
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public
class
NettyChannelMap{
private
static
Map<String,SocketChannel>map=
new
ConcurrentHashMap<String,SocketChannel>();
public
static
void
add(StringclientId,SocketChannelsocketChannel){
map.put(clientId,socketChannel);
}
public
static
Channelget(StringclientId){
return
map.get(clientId);
}
public
static
void
remove(SocketChannelsocketChannel){
for
(Map.Entryentry:map.entrySet()){
if
(entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public
class
NettyServerHandler
extends
SimpleChannelInboundHandler<BaseMsg>{
@Override
public
void
channelInactive(ChannelHandlerContextctx)
throws
Exception{
//channel失效,从Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected
void
messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg)
throws
Exception{
if
(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsgloginMsg=(LoginMsg)baseMsg;
if
(
"robin"
.equals(loginMsg.getUserName())&&
"yao"
.equals(loginMsg.getPassword())){
//登录成功,把channel存到服务端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println(
"client"
+loginMsg.getClientId()+
"登录成功"
);
}
}
else
{
if
(NettyChannelMap.get(baseMsg.getClientId())==
null
){
//说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
LoginMsgloginMsg=
new
LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch
(baseMsg.getType()){
case
PING:{
PingMsgpingMsg=(PingMsg)baseMsg;
PingMsgreplyPing=
new
PingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}
break
;
case
ASK:{
//收到客户端的请求
AskMsgaskMsg=(AskMsg)baseMsg;
if
(
"authToken"
.equals(askMsg.getParams().getAuth())){
ReplyServerBodyreplyBody=
new
ReplyServerBody(
"serverinfo$$$$!!!"
);
ReplyMsgreplyMsg=
new
ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}
break
;
case
REPLY:{
//收到客户端回复
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyClientBodyclientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println(
"receiveclientmsg:"
+clientBody.getClientInfo());
}
break
;
default
:
break
;
}
ReferenceCountUtil.release(baseMsg);
}
}

ServerBootstrap:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public
class
NettyServerBootstrap{
private
int
port;
private
SocketChannelsocketChannel;
public
NettyServerBootstrap(
int
port)
throws
InterruptedException{
this
.port=port;
bind();
}
private
void
bind()
throws
InterruptedException{
EventLoopGroupboss=
new
NioEventLoopGroup();
EventLoopGroupworker=
new
NioEventLoopGroup();
ServerBootstrapbootstrap=
new
ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.
class
);
bootstrap.option(ChannelOption.SO_BACKLOG,
128
);
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY,
true
);
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE,
true
);
bootstrap.childHandler(
new
ChannelInitializer<SocketChannel>(){
@Override
protected
void
initChannel(SocketChannelsocketChannel)
throws
Exception{
ChannelPipelinep=socketChannel.pipeline();
p.addLast(
new
ObjectEncoder());
p.addLast(
new
ObjectDecoder(ClassResolvers.cacheDisabled(
null
)));
p.addLast(
new
NettyServerHandler());
}
});
ChannelFuturef=bootstrap.bind(port).sync();
if
(f.isSuccess()){
System.out.println(
"serverstart---------------"
);
}
}
public
static
void
main(String[]args)
throws
InterruptedException{
NettyServerBootstrapbootstrap=
new
NettyServerBootstrap(
9999
);
while
(
true
){
SocketChannelchannel=(SocketChannel)NettyChannelMap.get(
"001"
);
if
(channel!=
null
){
AskMsgaskMsg=
new
AskMsg();
channel.writeAndFlush(askMsg);
}
TimeUnit.SECONDS.sleep(
5
);
}
}
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public
class
NettyClientHandler
extends
SimpleChannelInboundHandler<BaseMsg>{
//利用写空闲发送心跳检测消息
@Override
public
void
userEventTriggered(ChannelHandlerContextctx,Objectevt)
throws
Exception{
if
(evt
instanceof
IdleStateEvent){
IdleStateEvente=(IdleStateEvent)evt;
switch
(e.state()){
case
WRITER_IDLE:
PingMsgpingMsg=
new
PingMsg();
ctx.writeAndFlush(pingMsg);
System.out.println(
"sendpingtoserver----------"
);
break
;
default
:
break
;
}
}
}
@Override
protected
void
messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg)
throws
Exception{
MsgTypemsgType=baseMsg.getType();
switch
(msgType){
case
LOGIN:{
//向服务器发起登录
LoginMsgloginMsg=
new
LoginMsg();
loginMsg.setPassword(
"yao"
);
loginMsg.setUserName(
"robin"
);
channelHandlerContext.writeAndFlush(loginMsg);
}
break
;
case
PING:{
System.out.println(
"receivepingfromserver----------"
);
}
break
;
case
ASK:{
ReplyClientBodyreplyClientBody=
new
ReplyClientBody(
"clientinfo****!!!"
);
ReplyMsgreplyMsg=
new
ReplyMsg();
replyMsg.setBody(replyClientBody);
channelHandlerContext.writeAndFlush(replyMsg);
}
break
;
case
REPLY:{
ReplyMsgreplyMsg=(ReplyMsg)baseMsg;
ReplyServerBodyreplyServerBody=(ReplyServerBody)replyMsg.getBody();
System.out.println(
"receiveclientmsg:"
+replyServerBody.getServerInfo());
}
default
:
break
;
}
ReferenceCountUtil.release(msgType);
}
}

bootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public
class
NettyClientBootstrap{
private
int
port;
private
Stringhost;
private
SocketChannelsocketChannel;
private
static
final
EventExecutorGroupgroup=
new
DefaultEventExecutorGroup(
20
);
public
NettyClientBootstrap(
int
port,Stringhost)
throws
InterruptedException{
this
.port=port;
this
.host=host;
start();
}
private
void
start()
throws
InterruptedException{
EventLoopGroupeventLoopGroup=
new
NioEventLoopGroup();
Bootstrapbootstrap=
new
Bootstrap();
bootstrap.channel(NioSocketChannel.
class
);
bootstrap.option(ChannelOption.SO_KEEPALIVE,
true
);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(
new
ChannelInitializer<SocketChannel>(){
@Override
protected
void
initChannel(SocketChannelsocketChannel)
throws
Exception{
socketChannel.pipeline().addLast(
new
IdleStateHandler(
20
,
10
,
0
));
socketChannel.pipeline().addLast(
new
ObjectEncoder());
socketChannel.pipeline().addLast(
new
ObjectDecoder(ClassResolvers.cacheDisabled(
null
)));
socketChannel.pipeline().addLast(
new
NettyClientHandler());
}
});
ChannelFuturefuture=bootstrap.connect(host,port).sync();
if
(future.isSuccess()){
socketChannel=(SocketChannel)future.channel();
System.out.println(
"connectserver成功---------"
);
}
}
public
static
void
main(String[]args)
throws
InterruptedException{
Constants.setClientId(
"001"
);
NettyClientBootstrapbootstrap=
new
NettyClientBootstrap(
9999
,
"localhost"
);
LoginMsgloginMsg=
new
LoginMsg();
loginMsg.setPassword(
"yao"
);
loginMsg.setUserName(
"robin"
);
bootstrap.socketChannel.writeAndFlush(loginMsg);
while
(
true
){
TimeUnit.SECONDS.sleep(
3
);
AskMsgaskMsg=
new
AskMsg();
AskParamsaskParams=
new
AskParams();
askParams.setAuth(
"authToken"
);
askMsg.setParams(askParams);
bootstrap.socketChannel.writeAndFlush(askMsg);
}
}
}

具体的例子和相应pom.xml 见

转发请注明来源:

转载地址:http://heynn.baihongyu.com/

你可能感兴趣的文章
Diamond types are not supported at language level '6'报错
查看>>
使用栈实现表达式的计算前缀、中缀、后缀表达式(逆波兰表达式)
查看>>
数据结构Java版之递归
查看>>
Scala 语言概述
查看>>
Scala的变量声明和数据类型详解
查看>>
数据结构——使用递归回溯实现迷宫问题(Java代码实现)
查看>>
数据结构——使用递归回溯实现八皇后问题(Java代码实现)
查看>>
Scala学习笔记(一)基本语法
查看>>
算法的时间复杂度与空间复杂度详解 (Java)
查看>>
数据结构之排序算法——冒泡排序(Java实现)
查看>>
数据结构之排序算法——堆排序(Java实现)
查看>>
Spring MVC的数据绑定(复杂数据绑定——数组与集合)
查看>>
数据结构——赫夫曼树(Java代码实现)
查看>>
赫夫曼编码(压缩与解压 Java代码实现)
查看>>
数据结构——二叉排序树(Java代码实现)
查看>>
数据结构——平衡二叉树(Java代码实现)
查看>>
Spark SQL 结构化数据文件处理 详解
查看>>
Spark 的DataFrame常用操作之DSL的风格语法
查看>>
Spark RDD弹性分布数据集详解
查看>>
数据结构——多叉树、B树
查看>>