本文共 10732 字,大约阅读时间需要 35 分钟。
通过netty实现服务端与客户端的长连接通讯,及心跳检测。
基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。
环境JDK1.8 和netty5
以下是具体的代码实现和介绍:
1公共的Share部分(主要包含消息协议类型的定义)
设计消息类型:
1 2 3 | public enum MsgType{ PING,ASK,REPLY,LOGIN } |
Message基类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | //必须实现序列,serialVersionUID一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!! public abstract class BaseMsg implements Serializable{ private static final long serialVersionUID=1L; private MsgTypetype; //必须唯一,否者会出现channel调用混乱 private StringclientId; //初始化客户端id public BaseMsg(){ this .clientId=Constants.getClientId(); } public StringgetClientId(){ return clientId; } public void setClientId(StringclientId){ this .clientId=clientId; } public MsgTypegetType(){ return type; } public void setType(MsgTypetype){ this .type=type; } } |
常量设置:
1 2 3 4 5 6 7 8 9 | public class Constants{ private static StringclientId; public static StringgetClientId(){ return clientId; } public static void setClientId(StringclientId){ Constants.clientId=clientId; } } |
登录类型消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class LoginMsg extends BaseMsg{ private StringuserName; private Stringpassword; public LoginMsg(){ super (); setType(MsgType.LOGIN); } public StringgetUserName(){ return userName; } public void setUserName(StringuserName){ this .userName=userName; } public StringgetPassword(){ return password; } public void setPassword(Stringpassword){ this .password=password; } } |
心跳检测Ping类型消息:
1 2 3 4 5 6 | public class PingMsg extends BaseMsg{ public PingMsg(){ super (); setType(MsgType.PING); } } |
请求类型消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public class AskMsg extends BaseMsg{ public AskMsg(){ super (); setType(MsgType.ASK); } private AskParamsparams; public AskParamsgetParams(){ return params; } public void setParams(AskParamsparams){ this .params=params; } } //请求类型参数 //必须实现序列化接口 public class AskParams implements Serializable{ private static final long serialVersionUID=1L; private Stringauth; public StringgetAuth(){ return auth; } public void setAuth(Stringauth){ this .auth=auth; } } |
响应类型消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | public class ReplyMsg extends BaseMsg{ public ReplyMsg(){ super (); setType(MsgType.REPLY); } private ReplyBodybody; public ReplyBodygetBody(){ return body; } public void setBody(ReplyBodybody){ this .body=body; } } //相应类型body对像 public class ReplyBody implements Serializable{ private static final long serialVersionUID=1L; } public class ReplyClientBody extends ReplyBody{ private StringclientInfo; public ReplyClientBody(StringclientInfo){ this .clientInfo=clientInfo; } public StringgetClientInfo(){ return clientInfo; } public void setClientInfo(StringclientInfo){ this .clientInfo=clientInfo; } } public class ReplyServerBody extends ReplyBody{ private StringserverInfo; public ReplyServerBody(StringserverInfo){ this .serverInfo=serverInfo; } public StringgetServerInfo(){ return serverInfo; } public void setServerInfo(StringserverInfo){ this .serverInfo=serverInfo; } } |
2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.
Map:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public class NettyChannelMap{ private static Map<String,SocketChannel>map= new ConcurrentHashMap<String,SocketChannel>(); public static void add(StringclientId,SocketChannelsocketChannel){ map.put(clientId,socketChannel); } public static Channelget(StringclientId){ return map.get(clientId); } public static void remove(SocketChannelsocketChannel){ for (Map.Entryentry:map.entrySet()){ if (entry.getValue()==socketChannel){ map.remove(entry.getKey()); } } } } |
Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg>{ @Override public void channelInactive(ChannelHandlerContextctx) throws Exception{ //channel失效,从Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); } @Override protected void messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg) throws Exception{ if (MsgType.LOGIN.equals(baseMsg.getType())){ LoginMsgloginMsg=(LoginMsg)baseMsg; if ( "robin" .equals(loginMsg.getUserName())&& "yao" .equals(loginMsg.getPassword())){ //登录成功,把channel存到服务端的map中 NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel()); System.out.println( "client" +loginMsg.getClientId()+ "登录成功" ); } } else { if (NettyChannelMap.get(baseMsg.getClientId())== null ){ //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录 LoginMsgloginMsg= new LoginMsg(); channelHandlerContext.channel().writeAndFlush(loginMsg); } } switch (baseMsg.getType()){ case PING:{ PingMsgpingMsg=(PingMsg)baseMsg; PingMsgreplyPing= new PingMsg(); NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing); } break ; case ASK:{ //收到客户端的请求 AskMsgaskMsg=(AskMsg)baseMsg; if ( "authToken" .equals(askMsg.getParams().getAuth())){ ReplyServerBodyreplyBody= new ReplyServerBody( "serverinfo$$$$!!!" ); ReplyMsgreplyMsg= new ReplyMsg(); replyMsg.setBody(replyBody); NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg); } } break ; case REPLY:{ //收到客户端回复 ReplyMsgreplyMsg=(ReplyMsg)baseMsg; ReplyClientBodyclientBody=(ReplyClientBody)replyMsg.getBody(); System.out.println( "receiveclientmsg:" +clientBody.getClientInfo()); } break ; default : break ; } ReferenceCountUtil.release(baseMsg); } } |
ServerBootstrap:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | public class NettyServerBootstrap{ private int port; private SocketChannelsocketChannel; public NettyServerBootstrap( int port) throws InterruptedException{ this .port=port; bind(); } private void bind() throws InterruptedException{ EventLoopGroupboss= new NioEventLoopGroup(); EventLoopGroupworker= new NioEventLoopGroup(); ServerBootstrapbootstrap= new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel. class ); bootstrap.option(ChannelOption.SO_BACKLOG, 128 ); //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去 bootstrap.option(ChannelOption.TCP_NODELAY, true ); //保持长连接状态 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true ); bootstrap.childHandler( new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannelsocketChannel) throws Exception{ ChannelPipelinep=socketChannel.pipeline(); p.addLast( new ObjectEncoder()); p.addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null ))); p.addLast( new NettyServerHandler()); } }); ChannelFuturef=bootstrap.bind(port).sync(); if (f.isSuccess()){ System.out.println( "serverstart---------------" ); } } public static void main(String[]args) throws InterruptedException{ NettyServerBootstrapbootstrap= new NettyServerBootstrap( 9999 ); while ( true ){ SocketChannelchannel=(SocketChannel)NettyChannelMap.get( "001" ); if (channel!= null ){ AskMsgaskMsg= new AskMsg(); channel.writeAndFlush(askMsg); } TimeUnit.SECONDS.sleep( 5 ); } } } |
3 Client端:包含发起登录,发送心跳,及对应消息处理
handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg>{ //利用写空闲发送心跳检测消息 @Override public void userEventTriggered(ChannelHandlerContextctx,Objectevt) throws Exception{ if (evt instanceof IdleStateEvent){ IdleStateEvente=(IdleStateEvent)evt; switch (e.state()){ case WRITER_IDLE: PingMsgpingMsg= new PingMsg(); ctx.writeAndFlush(pingMsg); System.out.println( "sendpingtoserver----------" ); break ; default : break ; } } } @Override protected void messageReceived(ChannelHandlerContextchannelHandlerContext,BaseMsgbaseMsg) throws Exception{ MsgTypemsgType=baseMsg.getType(); switch (msgType){ case LOGIN:{ //向服务器发起登录 LoginMsgloginMsg= new LoginMsg(); loginMsg.setPassword( "yao" ); loginMsg.setUserName( "robin" ); channelHandlerContext.writeAndFlush(loginMsg); } break ; case PING:{ System.out.println( "receivepingfromserver----------" ); } break ; case ASK:{ ReplyClientBodyreplyClientBody= new ReplyClientBody( "clientinfo****!!!" ); ReplyMsgreplyMsg= new ReplyMsg(); replyMsg.setBody(replyClientBody); channelHandlerContext.writeAndFlush(replyMsg); } break ; case REPLY:{ ReplyMsgreplyMsg=(ReplyMsg)baseMsg; ReplyServerBodyreplyServerBody=(ReplyServerBody)replyMsg.getBody(); System.out.println( "receiveclientmsg:" +replyServerBody.getServerInfo()); } default : break ; } ReferenceCountUtil.release(msgType); } } |
bootstrap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | public class NettyClientBootstrap{ private int port; private Stringhost; private SocketChannelsocketChannel; private static final EventExecutorGroupgroup= new DefaultEventExecutorGroup( 20 ); public NettyClientBootstrap( int port,Stringhost) throws InterruptedException{ this .port=port; this .host=host; start(); } private void start() throws InterruptedException{ EventLoopGroupeventLoopGroup= new NioEventLoopGroup(); Bootstrapbootstrap= new Bootstrap(); bootstrap.channel(NioSocketChannel. class ); bootstrap.option(ChannelOption.SO_KEEPALIVE, true ); bootstrap.group(eventLoopGroup); bootstrap.remoteAddress(host,port); bootstrap.handler( new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannelsocketChannel) throws Exception{ socketChannel.pipeline().addLast( new IdleStateHandler( 20 , 10 , 0 )); socketChannel.pipeline().addLast( new ObjectEncoder()); socketChannel.pipeline().addLast( new ObjectDecoder(ClassResolvers.cacheDisabled( null ))); socketChannel.pipeline().addLast( new NettyClientHandler()); } }); ChannelFuturefuture=bootstrap.connect(host,port).sync(); if (future.isSuccess()){ socketChannel=(SocketChannel)future.channel(); System.out.println( "connectserver成功---------" ); } } public static void main(String[]args) throws InterruptedException{ Constants.setClientId( "001" ); NettyClientBootstrapbootstrap= new NettyClientBootstrap( 9999 , "localhost" ); LoginMsgloginMsg= new LoginMsg(); loginMsg.setPassword( "yao" ); loginMsg.setUserName( "robin" ); bootstrap.socketChannel.writeAndFlush(loginMsg); while ( true ){ TimeUnit.SECONDS.sleep( 3 ); AskMsgaskMsg= new AskMsg(); AskParamsaskParams= new AskParams(); askParams.setAuth( "authToken" ); askMsg.setParams(askParams); bootstrap.socketChannel.writeAndFlush(askMsg); } } } |
具体的例子和相应pom.xml 见
转发请注明来源:
转载地址:http://heynn.baihongyu.com/