群聊逻辑
1)NETTY客户端的启动流程:创建一个引导类,指定线程模型、IO模型、连接读写处理逻辑,连接上指定的主机和端口,客户端就启动起来了.
2)NETTY服务端的启动流程:创建一个引导类,然后给它指定线程模型、IO模型、连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
3)服务端自动绑定绑定递增端口(监听器)
4)客户端失败重连(重连次数、重连间隔)(监听器、定时任务)
5)双向通信(建立互信发消息)
6)数据传输载体ByteBuf(writeXX()改变指针与readXX()不改变指针。当前连接相关的ByteBuf分配器。引用计数与底层内存回收。)
7)协议编解码与协议处理(自定义协议、序列化方式、版本号、魔数、指令、数据长度、数据)
8)指令处理(业务)
9)Pipeline责任链与handler(inbound()、outbound()-channelAdapter。事件传播。Inbound和outbound分别是处理数据读与数据写的逻辑,可与 tcp 协议栈联系起来。inBoundHandler的执行顺序与我们实际的添加顺序相同,而outBoundHandler则相反。添加多handler,解决多if问题。)
10)拒绝非本协议连接(结合魔数、长度域。不符合可关闭连接。异常同理。)
11)粘包拆包。(4个拆包器。选定长。A->TCP缓冲区->二进制->操作系统->二进制->TCP缓冲区->B。期间传输的二进制字节流与接收顺序可能是不对等的。长度域偏移量、长度域的长度。)(拆包器的作用就是根据我们的自定义协议,把数据拼装成一个个符合我们自定义数据包大小的 ByteBuf,然后送到我们的自定义协议解码器去解码。如果不支持自定义协议,可尽早关闭,节省资源。decode方法是在数据满足完整Frame格式才会被调用。)
12)ChannelHandler生命周期(边界、回调)
13)热插拔(移除校验逻辑、身份校验逻辑)
14)控制台指令器(输入调用)
15)channel绑定与session
16)ChannelGroup(绑定多个Channel,批量处理。)
17)共享handler(单例)
18)合并编解码器(无状态+单例。客户端并不合并。)
19)缩短事件传输路径
19.1)压缩handler-合并平行handler(多个压缩成一个-无状态+单例)
19.2)更改事件传播源-事件传播路径(ctx.writeAndFlush()不需要经过其他handler处理与ctx.channel().writeAndFlush()需要经过后面的handler处理。注意客户端和服务端的channel对象。)
20)减少阻塞主线程的操作(线程池:threadPool.submit(new Runnable(){}))
21)准确统计处理时长(
(线程池:threadPool.submit(new Runnable(){ ctx.channel().writeAndFlush().addListener(
future -> { if(future.done()){}) })))
22)心跳与空闲检测(假死:资源浪费、用户体验)
功能列表
服务端启动流程
https://juejin.cn/book/6844733738119593991/section/6844733738270621709
客户端启动流程
https://juejin.cn/book/6844733738119593991/section/6844733738274783240
客户端与服务端双向通信
https://juejin.cn/book/6844733738119593991/section/6844733738274783246
数据传输载体 ByteBuf 介绍
https://juejin.cn/book/6844733738119593991/section/6844733738274799624
客户端与服务端通信协议编解码
https://juejin.cn/book/6844733738119593991/section/6844733738278977550
Netty 实现客户端登录
https://juejin.cn/book/6844733738119593991/section/6844733738279141383
实现客户端与服务端收发消息
https://juejin.cn/book/6844733738119593991/section/6844733738278977549
pipeline 与 channelHandler
https://juejin.cn/book/6844733738119593991/section/6844733738280796167
构建客户端与服务端 pipeline
https://juejin.cn/book/6844733738119593991/section/6844733738283171848
拆包粘包理论与解决方案
https://juejin.cn/book/6844733738119593991/section/6844733738283171853
channelHandler 的生命周期
https://juejin.cn/book/6844733738119593991/section/6844733738283188238
channelHandler 的热插拔实现客户端身份校验
https://juejin.cn/book/6844733738119593991/section/6844733738287366157
客户端互聊原理与实现
https://juejin.cn/book/6844733738119593991/section/6844733738287366152
群聊的发起与通知
https://juejin.cn/book/6844733738119593991/section/6844733738287366151
群聊的成员管理(加入与退出,获取成员列表)
https://juejin.cn/book/6844733738119593991/section/6844733738291560461
群聊消息的收发及 Netty 性能优化
https://juejin.cn/book/6844733738119593991/section/6844733738291576840
心跳与空闲检测
https://juejin.cn/book/6844733738119593991/section/6844733738291576846
启动demo
Netty-Server
代码示例
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addXX(handler);
}
});
serverBootstrap.bind(8000);
}
逻辑结构
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
bossGroup表示监听端口,accept新连接的线程组。bossGroup处理完连接会扔给workerGroup。
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
workerGroup表示处理每一条连接的数据读写的线程组。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap引导类,引导我们进行服务端的启动工作。
serverBootstrap.group(bossGroup,workerGroup)
给引导类配置两个线程组,这个引导类的线程模型也就定型了。bossGroup和workerGroup可以看做是传统IO模型的两大线程组。
serverBootstrap.channel(XX);
指定服务端的IO模型。
serverBootstrap.channel(NioServerSocketChannel.class);
这里指定服务端的IO模型为NIO,也可以执行其他的IO模型,比如OIO、BIO,但不会这么设置,因为NETTY的优势就在于NIO。NioServerSocketChannel是NETTY对NIO类型的连接的抽象,NioServerSocketChannel对应BIO编程模型中的ServerSocket。
serverBootstrap.childHandler()
用于指定处理新连接数据的读写处理逻辑。而handler()用于指定在服务端启动过程中的一些逻辑,通常用不到此方法。
serverBootstrap.childHandler(new ChannelInitializer
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
}
});
ChannelInitializer定义后续每条连接的数据读写以及业务处理逻辑。
ChannelInitializer中的泛型参数NioSocketChannel,是NETTY对NIO类型的连接的抽象,对应BIO编程模型中的Socket,NioSocketChannel在这里也可以换成SocketChannel。
启动流程
1)NETTY客户端的启动流程:创建一个引导类,指定线程模型、IO模型、连接读写处理逻辑,连接上指定的主机和端口,客户端就启动起来了.
2)NETTY服务端的启动流程:创建一个引导类,然后给它指定线程模型、IO模型、连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
Netty-Client
代码示例
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addXX(handler);
}
});
Channel channel = bootstrap.connect("127.0.0.1",1024).channel();
}
}
逻辑结构
NioEventLoopGroup workGroup = new NioEventLoopGroup();
指定线程模型,驱动着连接的数据读写。对于单纯的客户端代码来说,可以理解为只有workGroup ,因为只需要连接,而没有监听端口的逻辑。
Bootstrap bootstrap = new Bootstrap();
引导类:负责启动客户端以及连接服务端
bootstrap.group(workGroup)
指定线程模型
bootstrap.channel(NioSocketChannel.class)
指定IO模型为NIO
bootstrap.handler(new ChannelInitializer(Channel) {//这里尖括号识别成特殊字符了就给临时去掉了
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
IO处理器,定义连接的业务处理逻辑
Channel channel = bootstrap.connect("127.0.0.1",1024).channel();
建立连接,connect()方法是异步的,返回Future。
启动流程
1)NETTY客户端的启动流程:创建一个引导类,指定线程模型、IO模型、连接读写处理逻辑,连接上指定的主机和端口,客户端就启动起来了.
2)NETTY服务端的启动流程:创建一个引导类,然后给它指定线程模型、IO模型、连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
群聊demo
源码地址:https://github.com/lightningMan/flash-netty
切换分支查看每一章节步骤
代码结构
核心代码
Server
NettyServer
package com.netty.learning.demo20.example1.server;
import com.netty.learning.demo20.example1.codec.PacketCodecHandler;
import com.netty.learning.demo20.example1.codec.Spliter;
import com.netty.learning.demo20.example1.handler.IMIdleStateHandler;
import com.netty.learning.demo20.example1.server.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Date;
public class NettyServer {
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new IMIdleStateHandler());
ch.pipeline().addLast(new LifeCycleTestHandler());
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(PacketCodecHandler.INSTANCE);
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);
ch.pipeline().addLast(AuthHandler.INSTANCE);
ch.pipeline().addLast(IMHandler.INSTANCE);
}
});
bind(serverBootstrap,PORT);
}
private static void bind(final ServerBootstrap serverBootstrap,final int port){
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()){
System.out.println(new Date() + " : port is [ " + port + " ] bind success~!!!");
}else{
System.err.println(new Date() + " : port is [ " + port + " ] bind fail ~~~~!");
}
});
}
}
IMHandler
package com.netty.learning.demo20.example1.server.handler;
import com.netty.learning.demo20.example1.protocol.Packet;
import com.netty.learning.demo20.example1.protocol.command.Command;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.HashMap;
import java.util.Map;
@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<Packet> {
public static final IMHandler INSTANCE = new IMHandler();
private Map<Byte,SimpleChannelInboundHandler<? extends Packet>> handlerMap;
public IMHandler() {
this.handlerMap = new HashMap<>();
this.handlerMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
this.handlerMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
this.handlerMap.put(Command.JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
this.handlerMap.put(Command.QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
this.handlerMap.put(Command.LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
this.handlerMap.put(Command.GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
this.handlerMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}
/**
* 每次回调到 IMHandler 的 channelRead0() 方法的时候,我们通过指令找到具体的 handler,
* 然后调用指令 handler 的 channelRead,他内部会做指令类型转换,最终调用到每个指令 handler 的 channelRead0() 方法。
* @param ctx
* @param packet
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
this.handlerMap.get(packet.getCommand()).channelRead(ctx,packet);
}
}
Client
NettyClient
package com.netty.learning.demo20.example1.client;
import com.netty.learning.demo20.example1.client.console.ConsoleCommandManager;
import com.netty.learning.demo20.example1.client.console.LoginConsoleCommand;
import com.netty.learning.demo20.example1.client.handler.*;
import com.netty.learning.demo20.example1.codec.PacketDecoder;
import com.netty.learning.demo20.example1.codec.PacketEncoder;
import com.netty.learning.demo20.example1.codec.Spliter;
import com.netty.learning.demo20.example1.handler.IMIdleStateHandler;
import com.netty.learning.demo20.example1.util.SessionUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static final int MAX_RETRY = 5;
private static final String HOST = "127.0.0.1";
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
.option(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IMIdleStateHandler());
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new CreateGroupResponseHandler());
ch.pipeline().addLast(new JoinGroupResponseHandler());
ch.pipeline().addLast(new QuitGroupResponseHandler());
ch.pipeline().addLast(new ListGroupMembersResponseHandler());
ch.pipeline().addLast(new GroupMessageResponseHandler());
ch.pipeline().addLast(new LogoutResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
ch.pipeline().addLast(new HeartBeanTimeHandler());
}
});
connect(bootstrap,HOST,PORT,MAX_RETRY);
}
private static void connect(Bootstrap bootstrap,String host,int port,int retry){
bootstrap.connect(host,port).addListener(future -> {
if(future.isSuccess()){
System.out.println(new Date() + " :2-1-connect is success~~!!");
Channel channel = ((ChannelFuture)future).channel();
startConsoleThread(channel);
} else if(retry == 0){
System.out.println(new Date() + " :1-1-retry number is end!!@@");
} else{
//第几次重连
int order = (MAX_RETRY - retry) + 1;
//本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + " :1-2-connect is fail. the retry number is : " + order + " !!!");
bootstrap.config().group().schedule(() -> connect(bootstrap,host,port,retry - 1),delay,TimeUnit.SECONDS);
}
});
}
private static void startConsoleThread(Channel channel){
ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while(!Thread.interrupted()){
if(!SessionUtil.hasLogin(channel)){
loginConsoleCommand.exec(scanner,channel);
}else{
consoleCommandManager.exec(scanner,channel);
}
}
}).start();
}
}
other
ConsoleCommandManager
import com.netty.learning.demo20.example1.util.SessionUtil;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class ConsoleCommandManager implements ConsoleCommand {
private Map<String, ConsoleCommand> consoleCommandMap;
public ConsoleCommandManager() {
this.consoleCommandMap = new HashMap<>();
//测试先输入sendToUser,再输入登录的userId(而不是userName)和message
this.consoleCommandMap.put("sendToUser",new SendToUserConsoleCommand());
//测试先输入logout
this.consoleCommandMap.put("logout",new LogoutConsoleCommand());
//测试先输入createGroup
this.consoleCommandMap.put("createGroup",new CreateGroupConsoleCommand());
//测试先输入joinGroup
this.consoleCommandMap.put("joinGroup",new JoinGroupConsoleCommand());
//测试先输入quitGroup
this.consoleCommandMap.put("quitGroup",new QuitGroupConsoleCommand());
//测试先输入listGroupMembers
this.consoleCommandMap.put("listGroupMembers",new ListGroupMembersConsoleCommand());
//测试先输入sendToGroup
this.consoleCommandMap.put("sendToGroup",new SendToGroupConsoleCommand());
}
@Override
public void exec(Scanner scanner, Channel channel) {
//获取第一个指令
String command = scanner.next();
if(!SessionUtil.hasLogin(channel)){
return;
}
ConsoleCommand consoleCommand = consoleCommandMap.get(command);
if(consoleCommand != null){
consoleCommand.exec(scanner,channel);
}else{
System.err.println("第三部分~~~~~~~~~Channel组:client->" + new Date() + " 命令不存在!");
}
}
}
Spliter
package com.netty.learning.demo20.example1.codec;
import com.netty.learning.demo20.example1.protocol.PacketCodeC;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* @Author:
* @Description: 通过魔数和基于长度域拆包器一同测试拒绝非本协议连接.netty提供的拆包器已经满足需求,不要重复造轮子.
* TODO 拆包器的作用就是根据我们的自定义协议,把数据拼装成一个个符合我们自定义数据包大小的 ByteBuf,然后送到我们的自定义协议解码器去解码。
* @Date: Created in 21-1-5 上午12:02
* @Modified:
*/
public class Spliter extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_OFFSET = 7;
private static final int LENGTH_FIELD_LENGTH = 4;
public Spliter() {
/**
* LENGTH_FIELD_OFFSET 长度域偏移量
* LENGTH_FIELD_LENGTH 长度域的长度
* TODO 这4个字节应该不是表示数据的长度是4个字节,而是4个字节会表示一个数,
* 比如表示2000,那么代表接下来的2000字节都是这个包的
* TODO 也就是数据包的大小。LENGTH_FIELD_OFFSET偏移量是长度。 这样设计的初衷是什么?
*/
super(Integer.MAX_VALUE,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH);
}
/**
* TODO 基于 Netty 自带的拆包器,我们可以在拆包之前判断当前连上来的客户端是否是支持自定义协议的客户端,如果不支持,可尽早关闭,节省资源。
* TODO decode方法是在数据满足完整Frame格式才会被调用
* @param ctx
* @param in
* @return
* @throws Exception
*/
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("======PacketCodeC.MAGIC_NUMBER======" + PacketCodeC.MAGIC_NUMBER);
int readerIndex = in.readerIndex();
int readerIndexGet = in.getInt(readerIndex);
System.out.println("======in.readerIndex()=======" + readerIndex);
System.out.println("======in.getInt(in.readerIndex())=======" + readerIndexGet);
if(readerIndexGet != PacketCodeC.MAGIC_NUMBER){
System.out.println("=======ctx.channel().close()===============");
//TODO 有个问题,这里关闭了,login的逻辑还正常.取消NettyClient中的boolean标识,Message依然能正常接收和发送消息.没截取这个?
//这里为啥会输出三次?看README.MD
ctx.channel().close();
}else{
System.out.println("=======ctx.channel() no close()===============");
}
return super.decode(ctx,in);
}
}
PacketCodecHandler
package com.netty.learning.demo20.example1.codec;
import com.netty.learning.demo20.example1.protocol.Packet;
import com.netty.learning.demo20.example1.protocol.PacketCodeC;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List;
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();
private PacketCodecHandler() {
}
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) throws Exception {
ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
PacketCodeC.INSTANCE.encode(byteBuf,packet);
out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
out.add(PacketCodeC.INSTANCE.decode(byteBuf));
}
}
PacketCodeC
Command
public interface Command {
Byte LOGIN_REQUEST = 1;
Byte LOGIN_RESPONSE = 2;
Byte MESSAGE_REQUEST = 3;
Byte MESSAGE_RESPONSE = 4;
Byte LOGOUT_REQUEST = 5;
Byte LOGOUT_RESPONSE = 6;
Byte CREATE_GROUP_REQUEST = 7;
Byte CREATE_GROUP_RESPONSE = 8;
Byte LIST_GROUP_MEMBERS_REQUEST = 9;
Byte LIST_GROUP_MEMBERS_RESPONSE = 10;
Byte JOIN_GROUP_REQUEST = 11;
Byte JOIN_GROUP_RESPONSE = 12;
Byte QUIT_GROUP_REQUEST = 13;
Byte QUIT_GROUP_RESPONSE = 14;
Byte GROUP_MESSAGE_REQUEST = 15;
Byte GROUP_MESSAGE_RESPONSE = 16;
Byte HEARTBEAT_REQUEST = 17;
Byte HEARTBEAT_RESPONSE = 18;
}
IMIdleStateHandler
package com.netty.learning.demo20.example1.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @Author:
* @Description: 服务端空闲检测
* 对于服务端来说,客户端的连接如果出现假死,那么服务端将无法收到客户端的数据,
* 也就是说,如果能一直收到客户端发来的数据,那么可以说明这条连接还是活的,因此,服务端对于连接假死的应对策略就是空闲检测。
* 何为空闲检测?空闲检测指的是每隔一段时间,检测这段时间内是否有数据读写,
* 简化一下,我们的服务端只需要检测一段时间内,是否收到过客户端发来的数据即可,Netty 自带的 IdleStateHandler 就可以实现这个功能。
* @Date: Created in 21-1-24 上午1:20
* @Modified:
*/
public class IMIdleStateHandler extends IdleStateHandler {
private static final int READER_IDLE_TIME = 15;
/**
* 构造函数,有四个参数,
*
* 其中第一个表示读空闲时间,指的是在这段时间内如果没有数据读到,就表示连接假死;
*
* 第二个是写空闲时间,指的是 在这段时间如果没有写数据,就表示连接假死;
*
* TODO (一段时间?)
* 第三个参数是读写空闲时间,表示在这段时间内如果没有产生数据读或者写,就表示连接假死。
* 写空闲和读写空闲为0,表示我们不关心者两类条件;
* 最后一个参数表示时间单位。
*
* 在我们的例子中,表示的是:如果 15 秒内没有读到数据,就表示连接假死。
*
*/
public IMIdleStateHandler() {
super(READER_IDLE_TIME,0, 0, TimeUnit.SECONDS);
}
/**
* 连接假死之后会回调 channelIdle() 方法
* @param ctx
* @param evt
* @throws Exception
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
System.out.println(new Date() + " 检测到 [" + READER_IDLE_TIME + "]秒内未读取到数据,关闭链接.");
ctx.channel().close();
}
}
SessionUtil
package com.netty.learning.demo20.example1.util;
import com.netty.learning.demo20.example1.attribute.Attributes;
import com.netty.learning.demo20.example1.session.Session;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SessionUtil {
/**
* channel存储在Map中,当channel数量比较多的时候,应当怎么处理?
* TODO channel只是一个引用,一般单机几万十几万,这个量其实还好
*/
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
private static final Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();
public static void bindSession(Session session, Channel channel){
userIdChannelMap.put(session.getUserId(),channel);
channel.attr(Attributes.SESSION).set(session);
}
/**
* logout实现并没有关闭channel,还是说不用关闭channel
* TODO 可以理解为离线模式,可以不用关闭
* @param channel
*/
public static void unBindSession(Channel channel){
if(hasLogin(channel)){
Session session = getSession(channel);
userIdChannelMap.remove(session.getUserId());
channel.attr(Attributes.SESSION).set(null);
System.out.println(new Date() + " : SessionUtil-> 退出登录!");
}
}
public static boolean hasLogin(Channel channel){
return channel.hasAttr(Attributes.SESSION);
}
public static Session getSession(Channel channel){
return channel.attr(Attributes.SESSION).get();
}
public static Channel getChannel(String userId){
return userIdChannelMap.get(userId);
}
public static void bindChannelGroup(String groupId,ChannelGroup channelGroup){
groupIdChannelGroupMap.put(groupId,channelGroup);
}
public static ChannelGroup getChannelGroup(String groupId){
return groupIdChannelGroupMap.get(groupId);
}
}
概念汇总与问答摘抄
1)NETTY客户端的启动流程:创建一个引导类,指定线程模型、IO模型、连接读写处理逻辑,连接上指定的主机和端口,客户端就启动起来了.
2)NETTY服务端的启动流程:创建一个引导类,然后给它指定线程模型、IO模型、连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
3)客户端与服务端交互的二进制数据载体为ByteBuf,ByteBuf通过连接的内存管理器创建,字节数据填充到ByteBuf之后才能写到对端。NETTY里面的数据是以ByteBuf为单位的,所有需要写出的数据都必须塞到一个ByteBuf。
4)Inbound和outbound分别是处理数据读与数据写的逻辑,可与 tcp 协议栈联系起来。inBoundHandler的执行顺序与我们实际的添加顺序相同,而outBoundHandler则相反。
5)事件传播。ctx.writeAndFlush()不需要经过其他handler处理与ctx.channel().writeAndFlush()需要经过后面的handler处理。
6)NIO并不是串行的,NIO只是使用用少量的线程去监听很多过个IO事件,监听到事件后,可以让其他线程去执行IO操作,也就是有专门负责监听事件的线程,那么并发量大的时候,这个线程不需要阻塞,可以一直轮询IO事件,来一个事件,后续的read等IO操作可以交给其他线程,而BIO中IO事件监听及IO操作均是由一个线程去完成,并且这个过程会阻塞,并发量大的时候只能新开启线程去处理。
7)java nio根据操作系统的不同,selector实现也是不同的,并不只是epoll。正常项目都是部署在linux系统上的,linux默认实现是epoll。看Unix的I/O模型。
8)boss线程池的线程越多,同一时间能够连接上的客户端就越多。work线程池执行的任务是每个线程会监听多个channel的事件,对应channel触发监听的事件时候由对应的work线程处理,work线程池的线程越多,同一时间,能够处理的事件就越多(连接、读写等事件)。
9)NIO是从缓存中读取字符的, 读取同量数据, 也是比传统IO单字节读取要快的。
10)pub/sub订阅发布机制
11)从handler,childHandler,attr,childAttr可以看出:带child的是针对连接的,不带的是针对服务器的。
所以可以这么理解:服务器like a mother,每条连接都是她的child。
不带child的对应bossGroup,带child的对应workerGroup。
本来你在确定group的时候,后面的参数就是parentGroup和childGroup。
是可以用一个group的,group方法可以只接受一个eventLoopGroup参数。
12)端口范围
0~1023:分配给系统的端口号。
1024~49151:登记端口号,主要是让第三方应用使用。
49152~65535:短暂端口号,是留给客户进程选择暂时使用,一个进程使用完就可以供其他进程使用。
13)不太建议使用jdk9,jdk10,这两个是过渡版本
14)如果只监听一个端口就设置成一个NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
没必要,设置一个多此一举,nio线程本身就是懒启动,用多少启动多少,没必要专门这么搞。
15)服务端启动,需要设置 childxxx 相关的属性,是因为服务端启动不仅仅监听本地端口,还一直与客户端的连接进行交互,可以理解为有两种类型的 channel。
16)断线重连。需要核实是服务器重新启动还是断开时触发客户端。对应生命周期章节。
17)延迟递归为啥没用 Thread.sleep() 而用 .schedule() 呢?
一是,有更好的封装,为了代码优雅:Thread.sleep()需要处理Interrupted异常。
二是,交给专门的调度线程调度,主线程不会卡在这里可以做其他的事情(例如启动多个NettyClient不必每个都等着)。
18)channelRead() 方法读取数据后,需要调用ByteBuf的release()方法吗
ctx.channel().writeAndFlush 到最后会释放掉创建出来的buf
19)ChannelInboundHandlerAdapter 不是表示流入的吗?对客户端而言发消息不是流出吗?
可以这么理解,在消息流入的处理器里面,处理完消息,然后写出去,当然,消息流出去的时候会经过outBound处理器.
数据缓存为基准点,这样就好理解了,不管是InBound或者是OutBound.
个人理解:相对来说,无论是客户端还是服务端,对于自己客户端或服务端来说都是入口,但对于服务端或客户端都是出口.
20)在哪种场景下需要我们调用retain()去增加引用计数呢
问得好,比如,你抽象出来的一个方法,这个功能就是把bytebuf转换成一个对象,然后release,
如果你想调用这个方法之后还想继续读数据,那么就需要在调用这个方法前 retain一下
个人理解:因为retain()+1,release()-1.不在调用这个方法前retain的话,引用计数变为0(当创建完一个 ByteBuf,它的引用为1),则直接回收ByteBuf底层的内存了,所以数据就没了。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。
默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一,
release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
alloc.directBuffer()指定直接内存,alloc.buffer()指定堆内存。
跟踪一下ByteBufAllocator执行buffer的过程就能发现他是通过directByDefault变量来选择使用directBuffer还是heapBuffer来分配内存,前者就是堆外,后者就是堆内。
21)扩容每次扩多少?从64B开始,指数扩容,直到能装下为止。
22)ByteBuf只是方便操作数据本质还是byte。
在一个函数体里面,只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法
23)ByteBuf:
ByteBuf结构、
容量:capacity()、maxCapacity()、readableBytes() 与 isReadable()、writableBytes()、 isWritable() 与 maxWritableBytes()
读写指针API:readerIndex() 与 readerIndex(int)、writeIndex() 与 writeIndex(int)、markReaderIndex() 与 resetReaderIndex()、markWriterIndex() 与 resetWriterIndex()
读写API:writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)、writeByte(byte b) 与 buffer.readByte()、release() 与 retain()、slice()、duplicate()、copy()、retainedSlice() 与 retainedDuplicate()
24)序列化是将对象序列化为网络可传输的二进制数据,把内容变成计算机可传输的资源。序列化成二进制字节流才是最底层的、计算机可识别出的资源。
编码是为了程序能理解二进制流数据的含义,以便恢复出对象,让程序认识这份资源。可自定义的协议规则,不同的程序指定不同的规则。所以不同的程序能识别不同的编码数据.
个人认为:编码是依据自定义传输协议而指定程序能理解的二进制数据流格式.而序列化的对象是这个协议中的一部分.
25)枚举很难操作的,就比如如何把枚举数据序列化,如何通过字节找到枚举值都是要考虑的。常量接口并不推荐这么做。
26)数据长度:int最大值 1<<31 - 1,大约21亿,可表示数据长度有21亿字节,约2GB,表示不了4GB 。
27)自定义协议属于私有协议,http属于共有协议
28)使用protobuf 生成的对象的二进制要小很多,使用protobuf 可以减小数据包的大小。
一个数据包无法装下一个对象的这种情况怎么处理呢(就是一个ByteBuf 被很多个物理层数据包传输的情况)?
所以设计协议的时候,长度字段的长度需要考量,需要支持最大的数据包大小,这里是4个字节,最大值为 2147483647,已经完全足够了,
然后一个物理层数据包如果塞不下,会被拆成多个数据包,另外一端接受的时候把这些数据包粘合起来。
29)ChannelHandlerContext是否支持序列化和反序列化.
redis存储可以只存用户长连机器的的ip,发送给用户消息的时候,先去redis里找到用户的长连ip,
再与这台长连机器通过http或者rpc进行通信,即可把消息发送到客户端,不用考虑ChannelHandlerContext是否持久化的问题.
一般长链接,网关是需要做IP HASH 的。
30)客户端NioEventLoopGroup不用释放,程序关闭之后,所有线程都自动关闭了。
31)责任链模式和策略模式,基本思想是表驱动,代替 大量if else。
https://www.developer.com/java/data/seven-ways-to-refactor-java-switch-statements.html
32)OutBoundHandler的设计其实是借鉴了tcp的协议栈,
具体实现可以参考源码解析博文https://www.jianshu.com/p/087b7e9a27a2中“pipeline中的outBound事件传播”
看了博客,意思是InBound是正向遍历链表,OutBound是反向遍历链表,跟洋葱模型差不多。
33)如果指令成百上千,用户自定义映射逻辑比较合适,但是大多数情况下,指令不会太多,可以粗暴一些.
34)decode解码容器的遍历循环问题。
35)在我们的 IM 这个 完整的 pipeline 中,如果我们不添加拆包器,客户端连续向服务端发送数据,
会有什么现象发生?为什么会发生这种现象?
发生的现象: 服务器端应用层无法正确解析出每一个应用层对象;
原因: tcp层是流式数据传输,没有明确的包与包之间的分界线,如果没有拆包器,则应用层从tcp层获取到的数据是不完整的(分包)或是多个数据包粘在一起(粘包)
36)比如错包长度设的是20b,数据实际为19b,它缺一个bit,下一个包的第一个bit会不会被认作上个包剩下的。
这样就会一直错下去,除非是按照分隔符来拆包,可以从下一个分隔符开始重新抽取一个完整的包,长度这种是没办法的~
37)websocket和http的拆包,netty已经帮你做好了,不需要自己拆包了。
38)在客户端主动断开时,是以怎样的形式通知服务器的呢?
TCP断开链接是发的FIN报文,其数据长度为0。
39)可以添加websocket支持吗
可以的,只需要添加一些 handler 即可,具体可以netty源码的demo里是有的哦
https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/http/websocketx
40)如果是多个线程同写一个 channel,也是ok的,因为这个写操作最终都会封装成一个异步的task,
扔到该 channel 自身绑定的线程里去执行,原理可以参考:
(Netty的 Channel,而是操作系统的 Socket?)
https://www.jianshu.com/p/feaeaab2ce56
41)当发送方给不在线的用户发送消息时,消息进MQ,等他再上线的时候再从MQ推送过去,可以再做一些持久化等操作.
42)音频,视频等文件又是怎么处理的呢
自定义协议,按文件类型解析就好
音视频文件上传文件服务器,返回url。只发送url,对端收到url再下载。
43)如果集群部署,是不是还需要一个网关来寻址?找到客户端连接到那台服务器上了?完全正确。
44)如果把用户和channel的映射持久化到mysql中,channel怎么存?https://juejin.cn/post/6844903689111470094
45)现在将连接的channel存在map里,如果在线的用户量多的话,就存不下了,要存在哪里。
要做分布式和负载均衡的,netty的channel不能持久化,也就不能像web的session一样放到redis中做登录认证。
倒是可以考虑采用网关转发+消息队列的形式实现分布式和负载。
46)更改事件传播源
https://www.jianshu.com/p/6efa9c5fa702
https://www.jianshu.com/p/087b7e9a27a2
编码器做初outbound处理,解码器当做inbound处理即可
47)减少阻塞主线程的操作
https://www.jianshu.com/p/0d0eece6d467
https://www.jianshu.com/p/467a9b41833e
https://www.jianshu.com/p/58fad8e42379
48)在 handler 的处理中,如果有耗时的操作,我们需要把这些操作都丢到我们自定义的的业务线程池中处理,
因为 NIO 线程是会有很多 channel 共享的,我们不能阻塞他。
为什么NIO线程中调用异步操作的方法时不要通过回调监听器来准确的耗时?
Netty 会判断当前调用的是哪个线程,如果是 NIO 线程本身,那么直接是同步调用的
如果是Nio线程本身的话writerAndFlush不是异步的
49)这里说的无状态有状态其实有点抽象,其实就是我们平时写方法偶尔会加一下内部的变量(显示定义的业务变量)
用来记录处理的结果或者想保存的值一类的,这些变量的值都是某个channel特有的,不希望被其他channel影响到,
目前的例子中都没有用到这种变量,有的数据直接放到channle的attr中了,
所以说以这个教程的例子来说,是无变量的(自己显示定义的业务变量),即无状态的,
所以可以使用单例的handler,某些业务变量的值已经存储到channel里了,channel都是各自单独持有的.
50)消息分片发送没说明,有没有什么最优解
51)连接假死的现象与问题
52)在弱网环境下,netty心跳等是如何处理的
53)Netty 其实可以看做是对 BIO 和 NIO 的封装,
并提供良好的 IO 读写相关的 API,另外还提供了非常多的开箱即用的 handler,工具类等等。
54)Netty 提供了两大启动辅助类,ServerBootstrap 和 Bootstrap, 他们的启动参数类似,都是分为
配置 IO 类型,配置线程模型。
配置 TCP 参数,attr 属性。
55)安全方面的,比如加解密,还有数据压缩方面的内容?
56)带标签推送,群发消息,分布式场景
57)request1请求对应response1 你们会在请求里面加个唯一标识标识每个响应和请求的对等性吗
对的,rpc 就是这么干的。
推荐学习链接
官网与 github.官网一直是学习一门技术最好的资料.这里是官方给出的 4.x 版本的 Netty 的一个学习指引,大家可以温习一遍。
https://netty.io/wiki/user-guide-for-4.x.html
4.1x版本的新的特性,大家也可以感受一下 Netty 的版本变化。
https://netty.io/wiki/new-and-noteworthy-in-4.1.html
大家也可以了解一下目前有哪些开源项目使用了 Netty:https://netty.io/wiki/related-projects.html
关于 Netty 详细的 demo,也可以在官网找到,比如,你想使用 Netty 来实现一个 Http 服务器,或者实现一个 Websocket 服务器,或者实现 redis 协议等等,都可以在 官方提供的 demo 中找到。
https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example
Netty 的作者之一《Netty 实战》
https://github.com/normanmaurer
进阶学习 Netty 的方向与资料
https://juejin.cn/book/6844733738119593991/section/6844733738295918599
netty的作者norman目前在开发类似netty的框架,可以参考一下:
https://github.com/apple/swift-nio
求教如果是集群部署又配合微服务架构,需要提供推送给客户端消息的接口,我要怎么设计好?
https://user-gold-cdn.xitu.io/2018/10/14/1666ffc91d30efaf?w=640&h=3362&f=png&s=1341798
减少阻塞主线程的操作
https://www.jianshu.com/p/0d0eece6d467
https://www.jianshu.com/p/467a9b41833e
https://www.jianshu.com/p/58fad8e42379
更改事件传播源
https://www.jianshu.com/p/6efa9c5fa702
https://www.jianshu.com/p/087b7e9a27a2
关于 Netty 服务端启动的流程可以参考
https://juejin.im/post/6844903695403089927
关于服务端是如何处理一条新的连接的,可以参考
Netty 源码分析之新连接接入全解析
https://juejin.im/post/6844903696413769735
关于 Netty 里面 NIO 到底干了啥事,为啥可以做到一个 NIO 线程就可以处理上万连接,异步机制又是如何实现的,可以参考以下三篇文章
Netty 源码分析之揭开 reactor 线程的面纱(一)
https://juejin.im/post/6844903696992567310
Netty 源码分析之揭开 reactor 线程的面纱(二)
https://juejin.im/post/6844903697789485063
Netty 源码分析之揭开 reactor 线程的面纱(三)
https://juejin.im/post/6844903698456379405
关于事件传播机制 Netty 又是如何实现的,为什么 inBound 和 outBound 的传播顺序与添加顺序的对应规则不同,
Netty 又是如何来区分 inBound 和 outBound 的,Netty 的 pipeline 默认又有哪两个 handler,他们的作用分别是什么,一切尽在以下两篇文章
Netty 源码分析之 pipeline (一)
https://juejin.im/post/6844903699614007310
Netty 源码分析之 pipeline (二)
https://juejin.im/post/6844903701451112456
Netty 中拆包器的原理是什么?可以参考 Netty 源码分析之拆包器的奥秘
https://juejin.im/post/6844903702541631502
基于长度域的拆包器又是如何来实现的,可以参考 Netty 源码分析之 LengthFieldBasedFrameDecoder
https://juejin.im/post/6844903714789015565
最后,我们在本小册接触最频繁的 writeAndFlush() 方法,它又是如何实现异步化的,可以参考 Netty 源码分析之 writeAndFlush 全解析
https://www.jianshu.com/p/feaeaab2ce56
关于源码解析的文章差不多就这么多,另外,如果你对我之前遇到的线上与 Netty 相关的问题排查,以及一些调优相关的经验和实践感兴趣的话,也可以读一读下面几篇文章
Netty 堆外内存泄露排查盛宴
https://juejin.im/post/6844903669335343111
海量连接服务端 jvm 参数调优杂记
https://www.jianshu.com/p/051d566e110d
一次 Netty "引发的" 诡异 old gc 问题排查过程
https://www.jianshu.com/p/702ef10102e4
源码解析视频
如果你觉得文章看不下去,更偏向视频的话,那么我这里也有之前录的一个源码解析视频,手把手带你撸源码:Java读源码之 Netty 深入剖析