diff --git a/README.md b/README.md index 831a9b2..fc4d1e5 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ CIM采用业内主流开源技术构建,易于扩展和使用,并完美支 com.farsunset cim-server-sdk-netty - 4.2.8 + 4.2.10 ``` diff --git a/cim-boot-server/pom.xml b/cim-boot-server/pom.xml index 86000ee..7b6a201 100644 --- a/cim-boot-server/pom.xml +++ b/cim-boot-server/pom.xml @@ -17,11 +17,10 @@ 1.8 - 4.2.8 - 4.1.79.Final - 3.21.5 - 8.0.30 - 2.11.1 + 4.2.10 + 3.25.0 + 9.2.0 + 2.12.1 3.0.0 @@ -70,6 +69,7 @@ cim-server-sdk-netty ${cim.server.sdk.version} + org.apache.commons commons-pool2 @@ -77,52 +77,11 @@ - mysql - mysql-connector-java + com.mysql + mysql-connector-j ${mysql.jdbc.version} - - io.netty - netty-handler - ${netty.version} - - - io.netty - netty-buffer - ${netty.version} - - - io.netty - netty-codec - ${netty.version} - - - io.netty - netty-codec-http - ${netty.version} - - - io.netty - netty-common - ${netty.version} - - - io.netty - netty-transport - ${netty.version} - - - - io.netty - netty-transport-native-epoll - linux-x86_64 - ${netty.version} - - - - - com.google.protobuf protobuf-java diff --git a/cim-boot-server/src/main/resources/application.properties b/cim-boot-server/src/main/resources/application.properties index 4b9b080..d271a65 100644 --- a/cim-boot-server/src/main/resources/application.properties +++ b/cim-boot-server/src/main/resources/application.properties @@ -1,3 +1,6 @@ +# 服务端文档地址 +# https://www.yuque.com/yuanfangxiyang/ma4ytb/vvy3iz#pW3DQ + server.port=8080 spring.jackson.default-property-inclusion=non_empty diff --git a/cim-server-sdk/pom.xml b/cim-server-sdk/pom.xml index 39924a3..c755c2c 100644 --- a/cim-server-sdk/pom.xml +++ b/cim-server-sdk/pom.xml @@ -6,7 +6,7 @@ com.farsunset cim-server-sdk-netty - 4.2.8 + 4.2.10 jar ${project.groupId}:${project.artifactId} @@ -29,8 +29,8 @@ UTF-8 1.8 3.19.6 - 4.1.86.Final - 1.7.36 + 4.1.119.Final + 2.0.17 @@ -77,6 +77,7 @@ linux-x86_64 ${netty.version} + com.google.protobuf protobuf-java @@ -92,7 +93,7 @@ com.fasterxml.jackson.core jackson-databind - 2.13.4.1 + 2.18.3 diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java index f9b90d0..ec3d64b 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java @@ -22,6 +22,7 @@ package com.farsunset.cim.acceptor; import com.farsunset.cim.acceptor.config.SocketConfig; +import com.farsunset.cim.auth.BlacklistHandler; import com.farsunset.cim.coder.protobuf.AppMessageDecoder; import com.farsunset.cim.coder.protobuf.AppMessageEncoder; import io.netty.bootstrap.ServerBootstrap; @@ -61,6 +62,7 @@ public class AppSocketAcceptor extends NioSocketAcceptor { bootstrap.childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch){ + ch.pipeline().addLast(blacklistHandler); ch.pipeline().addLast(new AppMessageDecoder()); ch.pipeline().addLast(new AppMessageEncoder()); ch.pipeline().addLast(socketConfig.getLoggingHandler() == null ? defaultLoggingHandler : socketConfig.getLoggingHandler() ); diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java index 9c20ffb..b4a49f2 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java @@ -22,8 +22,10 @@ package com.farsunset.cim.acceptor; import com.farsunset.cim.acceptor.config.SocketConfig; +import com.farsunset.cim.auth.BlacklistHandler; import com.farsunset.cim.constant.CIMConstant; import com.farsunset.cim.constant.ChannelAttr; +import com.farsunset.cim.handler.IllegalRequestHandler; import com.farsunset.cim.handler.LoggingHandler; import com.farsunset.cim.model.Ping; import com.farsunset.cim.model.SentBody; @@ -48,6 +50,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ protected final SocketConfig socketConfig; + protected final BlacklistHandler blacklistHandler; + private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; @@ -55,6 +59,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ this.socketConfig = socketConfig; + this.blacklistHandler = new BlacklistHandler(socketConfig.getBlacklistPredicate()); + ThreadFactory bossThreadFactory = r -> { Thread thread = new Thread(r); thread.setName("nio-boss-" + thread.getId()); diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java index d30fd84..48721d3 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java @@ -23,6 +23,7 @@ package com.farsunset.cim.acceptor; import com.farsunset.cim.acceptor.config.WebsocketConfig; import com.farsunset.cim.auth.AuthHandler; +import com.farsunset.cim.auth.BlacklistHandler; import com.farsunset.cim.coder.json.TextMessageDecoder; import com.farsunset.cim.coder.json.TextMessageEncoder; import com.farsunset.cim.coder.protobuf.WebMessageDecoder; @@ -62,7 +63,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor { "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" + "* *\n" + "* *\n" + - "* Websocket Server started on port {} for [protobuf] mode. *\n" + + "* Websocket Server started on port {} for [PROTOBUF] mode. *\n" + "* *\n" + "* *\n" + "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n"; @@ -94,6 +95,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor { @Override public void initChannel(SocketChannel ch){ + ch.pipeline().addLast(blacklistHandler); ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(4 * 1024)); diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java index 6cf1dda..0630ff9 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java @@ -25,6 +25,7 @@ import com.farsunset.cim.handler.CIMRequestHandler; import com.farsunset.cim.handler.LoggingHandler; import java.time.Duration; +import java.util.function.Predicate; /** @@ -39,11 +40,6 @@ public class SocketConfig { */ private Integer port; - /** - * socket消息处理器 - */ - private CIMRequestHandler outerRequestHandler; - /** 是否启用TVL协议socket */ @@ -61,17 +57,25 @@ public class SocketConfig { */ private Duration readIdle = Duration.ofSeconds(60); - /** 长链接最大允许心跳响应超时次数 达到该次数则 服务端断开链接 */ private int maxPongTimeout = 1; + /** + * socket消息处理器 + */ + private CIMRequestHandler outerRequestHandler; + + /** + * IP黑名单处理 + */ + private Predicate blacklistPredicate; + /** * 自定义日志打印处理器,可不设置 */ - private LoggingHandler loggingHandler; @@ -83,26 +87,11 @@ public class SocketConfig { this.port = port; } - public CIMRequestHandler getOuterRequestHandler() { - return outerRequestHandler; - } - - public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) { - this.outerRequestHandler = outerRequestHandler; - } public boolean isEnable() { return enable; } - public void setLoggingHandler(LoggingHandler loggingHandler) { - this.loggingHandler = loggingHandler; - } - - public LoggingHandler getLoggingHandler() { - return loggingHandler; - } - public void setEnable(boolean enable) { this.enable = enable; } @@ -130,4 +119,28 @@ public class SocketConfig { public void setMaxPongTimeout(int maxPongTimeout) { this.maxPongTimeout = maxPongTimeout; } + + public CIMRequestHandler getOuterRequestHandler() { + return outerRequestHandler; + } + + public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) { + this.outerRequestHandler = outerRequestHandler; + } + + public void setLoggingHandler(LoggingHandler loggingHandler) { + this.loggingHandler = loggingHandler; + } + + public LoggingHandler getLoggingHandler() { + return loggingHandler; + } + + public Predicate getBlacklistPredicate() { + return blacklistPredicate; + } + + public void setBlacklistPredicate(Predicate blacklistPredicate) { + this.blacklistPredicate = blacklistPredicate; + } } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/auth/BlacklistHandler.java b/cim-server-sdk/src/main/java/com/farsunset/cim/auth/BlacklistHandler.java new file mode 100644 index 0000000..386a4e9 --- /dev/null +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/auth/BlacklistHandler.java @@ -0,0 +1,30 @@ +package com.farsunset.cim.auth; + +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.function.Predicate; + +/** + * IP黑名单处理 + */ +@ChannelHandler.Sharable +public class BlacklistHandler extends ChannelInboundHandlerAdapter { + + private final Predicate predicate; + + public BlacklistHandler(Predicate blacklistPredicate) { + this.predicate = blacklistPredicate; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + if (predicate == null || predicate.test(ctx.channel().remoteAddress().toString())) { + super.channelRegistered(ctx); + return; + } + ctx.close().addListener(ChannelFutureListener.CLOSE); + } +} diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java index 59c7860..d1ebcf3 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java @@ -56,9 +56,8 @@ public class AppMessageDecoder extends ByteToMessageDecoder { * 收到意外数据 */ if (DataType.PONG.getValue() != type && type != DataType.SENT.getValue()){ - /* - 消费掉无效数据 - */ + + // 消费掉无效数据 int length = buffer.readableBytes(); if (length > 0){ buffer.readBytes(new byte[length]); @@ -67,9 +66,7 @@ public class AppMessageDecoder extends ByteToMessageDecoder { return; } - /* - * 消息体不足3位,发生断包情况 - */ + // 消息体不足3位,发生断包情况 if (buffer.readableBytes() < CIMConstant.DATA_HEADER_LENGTH) { buffer.resetReaderIndex(); return; @@ -110,7 +107,11 @@ public class AppMessageDecoder extends ByteToMessageDecoder { return body; } - return Pong.getInstance(); + if (DataType.PONG.getValue() == type) { + return Pong.getInstance(); + } + + throw new ReadInvalidTypeException(type); } /** diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/CIMConstant.java b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/CIMConstant.java index dcd51c2..6d40ec5 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/CIMConstant.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/CIMConstant.java @@ -31,8 +31,14 @@ public interface CIMConstant { */ byte DATA_HEADER_LENGTH = 3; + /** + * 内置的 SentBody key 链接关闭是通知应用层处理 + */ String CLIENT_CONNECT_CLOSED = "client_closed"; + /** + * 内置的握手失败ReplyBody key 输入给客户端 + */ String CLIENT_HANDSHAKE = "client_handshake"; } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java index 16318e5..608af33 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java @@ -24,11 +24,40 @@ package com.farsunset.cim.constant; import io.netty.util.AttributeKey; public interface ChannelAttr { + + /** + * 心跳请求次数 + */ AttributeKey PING_COUNT = AttributeKey.valueOf("ping_count"); + + /** + * UID标识 + */ AttributeKey UID = AttributeKey.valueOf("uid"); + + /** + * 客户端类型 web、app + */ AttributeKey CHANNEL = AttributeKey.valueOf("channel"); - AttributeKey ID = AttributeKey.valueOf("id"); + + /** + * 客户端设备ID + */ AttributeKey DEVICE_ID = AttributeKey.valueOf("device_id"); - AttributeKey TAG = AttributeKey.valueOf("tag"); + + /** + * 客户端语言偏好(en_US) + */ AttributeKey LANGUAGE = AttributeKey.valueOf("language"); + + /** + * Nio链接的ID + */ + AttributeKey ID = AttributeKey.valueOf("id"); + + /** + * Nio链接的标签 + */ + AttributeKey TAG = AttributeKey.valueOf("tag"); + } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/exception/ReadInvalidTypeException.java b/cim-server-sdk/src/main/java/com/farsunset/cim/exception/ReadInvalidTypeException.java index 2eb6778..72d5961 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/exception/ReadInvalidTypeException.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/exception/ReadInvalidTypeException.java @@ -21,6 +21,9 @@ */ package com.farsunset.cim.exception; +/** + * 通常是端口受到非法访问,发送了意外的数据 + */ public class ReadInvalidTypeException extends RuntimeException{ public ReadInvalidTypeException(byte type) { diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java b/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java index a36af13..666df24 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java @@ -48,13 +48,18 @@ public class SessionGroup extends ConcurrentHashMap> } }; + /** + * 返回Nio链接的标识 + * @param channel Nio链接 + * @return UID标识 + */ protected String getKey(Channel channel){ return channel.attr(ChannelAttr.UID).get(); } /** * 将channel从内存容器中删除 - * @param channel + * @param channel Nio链接 */ public void remove(Channel channel){ @@ -77,7 +82,7 @@ public class SessionGroup extends ConcurrentHashMap> /** * 将channel加入内存容器中删除 - * @param channel + * @param channel Nio链接 */ public void add(Channel channel){ @@ -101,8 +106,8 @@ public class SessionGroup extends ConcurrentHashMap> /** * 查找到key关联的channel并写入消息体 - * @param key - * @param message + * @param key 链接标识(一般是用户ID) + * @param message 消息体 */ public void write(String key, Message message){ this.write(key, message, channel -> true); @@ -110,8 +115,8 @@ public class SessionGroup extends ConcurrentHashMap> /** * 查找到key关联的channel并写入消息体 - * @param key - * @param message + * @param key 链接标识(一般是用户ID) + * @param message 消息体 * @param matcher channel筛选条件 */ public void write(String key, Message message, Predicate matcher){ @@ -120,9 +125,9 @@ public class SessionGroup extends ConcurrentHashMap> /** * 查找到key关联的channel并写入消息体 - * @param key - * @param message - * @param excludedSet 排除的UID集合 + * @param key 链接标识(一般是用户ID) + * @param message 消息体 + * @param excludedSet 排除的标识集合 */ public void write(String key, Message message, Collection excludedSet){ Predicate predicate = new ExcludedUidPredicate(excludedSet); @@ -131,7 +136,7 @@ public class SessionGroup extends ConcurrentHashMap> /** * 查找到消息接收者关联的channel并写入消息体 - * @param message + * @param message 消息体 */ public void write(Message message){ this.write(message.getReceiver(),message); @@ -139,8 +144,8 @@ public class SessionGroup extends ConcurrentHashMap> /** * 通过key查找channel集合 - * @param key - * @return + * @param key 链接标识(一般是用户ID) + * @return 匹配的链接集合 */ public Collection find(String key){ return this.getOrDefault(key,Collections.emptyList()); @@ -148,9 +153,9 @@ public class SessionGroup extends ConcurrentHashMap> /** * 通过key查找channel集合 - * @param key + * @param key 链接标识(一般是用户ID) * @param matcher 过滤条件 - * @return + * @return 匹配的链接集合 */ public Collection find(String key,Predicate matcher){ return this.find(key) @@ -161,9 +166,9 @@ public class SessionGroup extends ConcurrentHashMap> /** * 通过key查找channel集合 - * @param key + * @param key 链接标识(一般是用户ID) * @param channel 连接终端类型过滤条件 - * @return + * @return 匹配的链接集合 */ public Collection find(String key,String... channel){ List channels = Arrays.asList(channel); @@ -172,9 +177,9 @@ public class SessionGroup extends ConcurrentHashMap> /** * 通过key查找channel集合 - * @param key + * @param key 链接标识(一般是用户ID) * @param channelSet 连接终端类型过滤条件 - * @return + * @return 匹配的链接集合 */ public Collection find(String key,Collection channelSet){ Predicate predicate = new ChannelPredicate(channelSet); @@ -183,8 +188,8 @@ public class SessionGroup extends ConcurrentHashMap> /** * 检查该channel是否存在内存管理当中 - * @param channel - * @return + * @param channel NIO链接 + * @return 是否存储在了内存中 */ public boolean isManaged(Channel channel){ diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java b/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java index f0c24ae..9f3d5d2 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java @@ -28,10 +28,15 @@ import io.netty.channel.Channel; /** * 基于内存管理的channel组 - * 根据tag管理 + * 根据tag管理,多个链接共用一个tag */ public class TagSessionGroup extends SessionGroup { + /** + * 返回Nio链接的标识 + * @param channel Nio链接 + * @return 标签 + */ @Override protected String getKey(Channel channel){ return channel.attr(ChannelAttr.TAG).get(); diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/CIMRequestHandler.java b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/CIMRequestHandler.java index 10f6b07..b2723ff 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/CIMRequestHandler.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/CIMRequestHandler.java @@ -21,12 +21,12 @@ */ package com.farsunset.cim.handler; -/** - * 请求处理接口,所有的请求实现必须实现此接口 - */ import com.farsunset.cim.model.SentBody; import io.netty.channel.Channel; +/** + * 请求处理接口,所有的请求实现必须实现此接口 + */ public interface CIMRequestHandler { /** diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java index 88ee329..f793c6d 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java @@ -48,7 +48,6 @@ public class IllegalRequestHandler extends SimpleChannelInboundHandler data = new HashMap<>(); + /** + * 服务端产生时间戳(13位) + */ private long timestamp; public ReplyBody() { diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/model/SentBody.java b/cim-server-sdk/src/main/java/com/farsunset/cim/model/SentBody.java index aefbf5d..580d9bc 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/model/SentBody.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/model/SentBody.java @@ -27,17 +27,25 @@ import java.util.HashMap; import java.util.Map; /** - * java |android 客户端请求结构 - * + * 客户端通过长连接请求结构 */ public class SentBody implements Serializable { private static final long serialVersionUID = 1L; + /** + * 请求key + */ private String key; + /** + * 请求数据 + */ private final Map data = new HashMap<>(); + /** + * 客户端产生的时间戳(13位) + */ private long timestamp; public Map getData() {