1、服务端SDK支持IP黑名单拦截

2、优化代码注释和升级依赖组件版本
This commit is contained in:
xia jun 2025-03-29 13:14:55 +08:00
parent e8c87358ac
commit 965abd1467
22 changed files with 198 additions and 119 deletions

View File

@ -57,7 +57,7 @@ CIM采用业内主流开源技术构建易于扩展和使用并完美支
<dependency>
<groupId>com.farsunset</groupId>
<artifactId>cim-server-sdk-netty</artifactId>
<version>4.2.8</version>
<version>4.2.10</version>
</dependency>
```

View File

@ -17,11 +17,10 @@
<properties>
<java.version>1.8</java.version>
<cim.server.sdk.version>4.2.8</cim.server.sdk.version>
<netty.version>4.1.79.Final</netty.version>
<protobuf.version>3.21.5</protobuf.version>
<mysql.jdbc.version>8.0.30</mysql.jdbc.version>
<common.pool.version>2.11.1</common.pool.version>
<cim.server.sdk.version>4.2.10</cim.server.sdk.version>
<protobuf.version>3.25.0</protobuf.version>
<mysql.jdbc.version>9.2.0</mysql.jdbc.version>
<common.pool.version>2.12.1</common.pool.version>
<swagger.version>3.0.0</swagger.version>
</properties>
<dependencies>
@ -70,6 +69,7 @@
<artifactId>cim-server-sdk-netty</artifactId>
<version>${cim.server.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
@ -77,52 +77,11 @@
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.jdbc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- linux下有效. 其他linux平台自行修改对应的classifier -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>${netty.version}</version>
</dependency>
<!--- ##################使用netty本SDK时的配置 end ##################-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>

View File

@ -1,3 +1,6 @@
# 服务端文档地址
# https://www.yuque.com/yuanfangxiyang/ma4ytb/vvy3iz#pW3DQ
server.port=8080
spring.jackson.default-property-inclusion=non_empty

View File

@ -6,7 +6,7 @@
<groupId>com.farsunset</groupId>
<artifactId>cim-server-sdk-netty</artifactId>
<version>4.2.8</version>
<version>4.2.10</version>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>
@ -29,8 +29,8 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<protobuf.java.version>3.19.6</protobuf.java.version>
<netty.version>4.1.86.Final</netty.version>
<slf4j.version>1.7.36</slf4j.version>
<netty.version>4.1.119.Final</netty.version>
<slf4j.version>2.0.17</slf4j.version>
</properties>
<dependencies>
@ -77,6 +77,7 @@
<classifier>linux-x86_64</classifier>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@ -92,7 +93,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4.1</version>
<version>2.18.3</version>
</dependency>
</dependencies>

View File

@ -22,6 +22,7 @@
package com.farsunset.cim.acceptor;
import com.farsunset.cim.acceptor.config.SocketConfig;
import com.farsunset.cim.auth.BlacklistHandler;
import com.farsunset.cim.coder.protobuf.AppMessageDecoder;
import com.farsunset.cim.coder.protobuf.AppMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
@ -61,6 +62,7 @@ public class AppSocketAcceptor extends NioSocketAcceptor {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){
ch.pipeline().addLast(blacklistHandler);
ch.pipeline().addLast(new AppMessageDecoder());
ch.pipeline().addLast(new AppMessageEncoder());
ch.pipeline().addLast(socketConfig.getLoggingHandler() == null ? defaultLoggingHandler : socketConfig.getLoggingHandler() );

View File

@ -22,8 +22,10 @@
package com.farsunset.cim.acceptor;
import com.farsunset.cim.acceptor.config.SocketConfig;
import com.farsunset.cim.auth.BlacklistHandler;
import com.farsunset.cim.constant.CIMConstant;
import com.farsunset.cim.constant.ChannelAttr;
import com.farsunset.cim.handler.IllegalRequestHandler;
import com.farsunset.cim.handler.LoggingHandler;
import com.farsunset.cim.model.Ping;
import com.farsunset.cim.model.SentBody;
@ -48,6 +50,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
protected final SocketConfig socketConfig;
protected final BlacklistHandler blacklistHandler;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
@ -55,6 +59,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
this.socketConfig = socketConfig;
this.blacklistHandler = new BlacklistHandler(socketConfig.getBlacklistPredicate());
ThreadFactory bossThreadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("nio-boss-" + thread.getId());

View File

@ -23,6 +23,7 @@ package com.farsunset.cim.acceptor;
import com.farsunset.cim.acceptor.config.WebsocketConfig;
import com.farsunset.cim.auth.AuthHandler;
import com.farsunset.cim.auth.BlacklistHandler;
import com.farsunset.cim.coder.json.TextMessageDecoder;
import com.farsunset.cim.coder.json.TextMessageEncoder;
import com.farsunset.cim.coder.protobuf.WebMessageDecoder;
@ -62,7 +63,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
"* *\n" +
"* *\n" +
"* Websocket Server started on port {} for [protobuf] mode. *\n" +
"* Websocket Server started on port {} for [PROTOBUF] mode. *\n" +
"* *\n" +
"* *\n" +
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
@ -94,6 +95,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
@Override
public void initChannel(SocketChannel ch){
ch.pipeline().addLast(blacklistHandler);
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(4 * 1024));

View File

@ -25,6 +25,7 @@ import com.farsunset.cim.handler.CIMRequestHandler;
import com.farsunset.cim.handler.LoggingHandler;
import java.time.Duration;
import java.util.function.Predicate;
/**
@ -39,11 +40,6 @@ public class SocketConfig {
*/
private Integer port;
/**
* socket消息处理器
*/
private CIMRequestHandler outerRequestHandler;
/**
是否启用TVL协议socket
*/
@ -61,17 +57,25 @@ public class SocketConfig {
*/
private Duration readIdle = Duration.ofSeconds(60);
/**
长链接最大允许心跳响应超时次数
达到该次数则 服务端断开链接
*/
private int maxPongTimeout = 1;
/**
* socket消息处理器
*/
private CIMRequestHandler outerRequestHandler;
/**
* IP黑名单处理
*/
private Predicate<String> blacklistPredicate;
/**
* 自定义日志打印处理器可不设置
*/
private LoggingHandler loggingHandler;
@ -83,26 +87,11 @@ public class SocketConfig {
this.port = port;
}
public CIMRequestHandler getOuterRequestHandler() {
return outerRequestHandler;
}
public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) {
this.outerRequestHandler = outerRequestHandler;
}
public boolean isEnable() {
return enable;
}
public void setLoggingHandler(LoggingHandler loggingHandler) {
this.loggingHandler = loggingHandler;
}
public LoggingHandler getLoggingHandler() {
return loggingHandler;
}
public void setEnable(boolean enable) {
this.enable = enable;
}
@ -130,4 +119,28 @@ public class SocketConfig {
public void setMaxPongTimeout(int maxPongTimeout) {
this.maxPongTimeout = maxPongTimeout;
}
public CIMRequestHandler getOuterRequestHandler() {
return outerRequestHandler;
}
public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) {
this.outerRequestHandler = outerRequestHandler;
}
public void setLoggingHandler(LoggingHandler loggingHandler) {
this.loggingHandler = loggingHandler;
}
public LoggingHandler getLoggingHandler() {
return loggingHandler;
}
public Predicate<String> getBlacklistPredicate() {
return blacklistPredicate;
}
public void setBlacklistPredicate(Predicate<String> blacklistPredicate) {
this.blacklistPredicate = blacklistPredicate;
}
}

View File

@ -0,0 +1,30 @@
package com.farsunset.cim.auth;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.function.Predicate;
/**
* IP黑名单处理
*/
@ChannelHandler.Sharable
public class BlacklistHandler extends ChannelInboundHandlerAdapter {
private final Predicate<String> predicate;
public BlacklistHandler(Predicate<String> blacklistPredicate) {
this.predicate = blacklistPredicate;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (predicate == null || predicate.test(ctx.channel().remoteAddress().toString())) {
super.channelRegistered(ctx);
return;
}
ctx.close().addListener(ChannelFutureListener.CLOSE);
}
}

View File

@ -56,9 +56,8 @@ public class AppMessageDecoder extends ByteToMessageDecoder {
* 收到意外数据
*/
if (DataType.PONG.getValue() != type && type != DataType.SENT.getValue()){
/*
消费掉无效数据
*/
// 消费掉无效数据
int length = buffer.readableBytes();
if (length > 0){
buffer.readBytes(new byte[length]);
@ -67,9 +66,7 @@ public class AppMessageDecoder extends ByteToMessageDecoder {
return;
}
/*
* 消息体不足3位发生断包情况
*/
// 消息体不足3位发生断包情况
if (buffer.readableBytes() < CIMConstant.DATA_HEADER_LENGTH) {
buffer.resetReaderIndex();
return;
@ -110,7 +107,11 @@ public class AppMessageDecoder extends ByteToMessageDecoder {
return body;
}
return Pong.getInstance();
if (DataType.PONG.getValue() == type) {
return Pong.getInstance();
}
throw new ReadInvalidTypeException(type);
}
/**

View File

@ -31,8 +31,14 @@ public interface CIMConstant {
*/
byte DATA_HEADER_LENGTH = 3;
/**
* 内置的 SentBody key 链接关闭是通知应用层处理
*/
String CLIENT_CONNECT_CLOSED = "client_closed";
/**
* 内置的握手失败ReplyBody key 输入给客户端
*/
String CLIENT_HANDSHAKE = "client_handshake";
}

View File

@ -24,11 +24,40 @@ package com.farsunset.cim.constant;
import io.netty.util.AttributeKey;
public interface ChannelAttr {
/**
* 心跳请求次数
*/
AttributeKey<Integer> PING_COUNT = AttributeKey.valueOf("ping_count");
/**
* UID标识
*/
AttributeKey<String> UID = AttributeKey.valueOf("uid");
/**
* 客户端类型 webapp
*/
AttributeKey<String> CHANNEL = AttributeKey.valueOf("channel");
AttributeKey<String> ID = AttributeKey.valueOf("id");
/**
* 客户端设备ID
*/
AttributeKey<String> DEVICE_ID = AttributeKey.valueOf("device_id");
AttributeKey<String> TAG = AttributeKey.valueOf("tag");
/**
* 客户端语言偏好(en_US)
*/
AttributeKey<String> LANGUAGE = AttributeKey.valueOf("language");
/**
* Nio链接的ID
*/
AttributeKey<String> ID = AttributeKey.valueOf("id");
/**
* Nio链接的标签
*/
AttributeKey<String> TAG = AttributeKey.valueOf("tag");
}

View File

@ -21,6 +21,9 @@
*/
package com.farsunset.cim.exception;
/**
* 通常是端口受到非法访问发送了意外的数据
*/
public class ReadInvalidTypeException extends RuntimeException{
public ReadInvalidTypeException(byte type) {

View File

@ -48,13 +48,18 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
}
};
/**
* 返回Nio链接的标识
* @param channel Nio链接
* @return UID标识
*/
protected String getKey(Channel channel){
return channel.attr(ChannelAttr.UID).get();
}
/**
* 将channel从内存容器中删除
* @param channel
* @param channel Nio链接
*/
public void remove(Channel channel){
@ -77,7 +82,7 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 将channel加入内存容器中删除
* @param channel
* @param channel Nio链接
*/
public void add(Channel channel){
@ -101,8 +106,8 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
* @param key 链接标识(一般是用户ID)
* @param message 消息体
*/
public void write(String key, Message message){
this.write(key, message, channel -> true);
@ -110,8 +115,8 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
* @param key 链接标识(一般是用户ID)
* @param message 消息体
* @param matcher channel筛选条件
*/
public void write(String key, Message message, Predicate<Channel> matcher){
@ -120,9 +125,9 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
* @param excludedSet 排除的UID集合
* @param key 链接标识(一般是用户ID)
* @param message 消息体
* @param excludedSet 排除的标识集合
*/
public void write(String key, Message message, Collection<String> excludedSet){
Predicate<Channel> predicate = new ExcludedUidPredicate(excludedSet);
@ -131,7 +136,7 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 查找到消息接收者关联的channel并写入消息体
* @param message
* @param message 消息体
*/
public void write(Message message){
this.write(message.getReceiver(),message);
@ -139,8 +144,8 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 通过key查找channel集合
* @param key
* @return
* @param key 链接标识(一般是用户ID)
* @return 匹配的链接集合
*/
public Collection<Channel> find(String key){
return this.getOrDefault(key,Collections.emptyList());
@ -148,9 +153,9 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 通过key查找channel集合
* @param key
* @param key 链接标识(一般是用户ID)
* @param matcher 过滤条件
* @return
* @return 匹配的链接集合
*/
public Collection<Channel> find(String key,Predicate<Channel> matcher){
return this.find(key)
@ -161,9 +166,9 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 通过key查找channel集合
* @param key
* @param key 链接标识(一般是用户ID)
* @param channel 连接终端类型过滤条件
* @return
* @return 匹配的链接集合
*/
public Collection<Channel> find(String key,String... channel){
List<String> channels = Arrays.asList(channel);
@ -172,9 +177,9 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 通过key查找channel集合
* @param key
* @param key 链接标识(一般是用户ID)
* @param channelSet 连接终端类型过滤条件
* @return
* @return 匹配的链接集合
*/
public Collection<Channel> find(String key,Collection<String> channelSet){
Predicate<Channel> predicate = new ChannelPredicate(channelSet);
@ -183,8 +188,8 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
/**
* 检查该channel是否存在内存管理当中
* @param channel
* @return
* @param channel NIO链接
* @return 是否存储在了内存中
*/
public boolean isManaged(Channel channel){

View File

@ -28,10 +28,15 @@ import io.netty.channel.Channel;
/**
* 基于内存管理的channel组
* 根据tag管理
* 根据tag管理多个链接共用一个tag
*/
public class TagSessionGroup extends SessionGroup {
/**
* 返回Nio链接的标识
* @param channel Nio链接
* @return 标签
*/
@Override
protected String getKey(Channel channel){
return channel.attr(ChannelAttr.TAG).get();

View File

@ -21,12 +21,12 @@
*/
package com.farsunset.cim.handler;
/**
* 请求处理接口,所有的请求实现必须实现此接口
*/
import com.farsunset.cim.model.SentBody;
import io.netty.channel.Channel;
/**
* 请求处理接口,所有的请求实现必须实现此接口
*/
public interface CIMRequestHandler {
/**

View File

@ -48,7 +48,6 @@ public class IllegalRequestHandler extends SimpleChannelInboundHandler<FullHttpR
/*
走到这里意味着来自非法请求错误的路径导致需要关闭链接
*/
String path = request.uri();
LOGGER.warn("收到无效的请求地址,path:{} header:{}",path,request.headers());

View File

@ -43,10 +43,12 @@ public class Message implements Serializable, Transportable,Cloneable {
* 消息类型用户自定义消息类别
*/
private String action;
/**
* 消息标题
*/
private String title;
/**
* 消息类容于action 组合为任何类型消息content 根据 format 可表示为 text,json ,xml数据格式
*/
@ -56,6 +58,7 @@ public class Message implements Serializable, Transportable,Cloneable {
* 消息发送者账号
*/
private String sender;
/**
* 消息发送者接收者
*/
@ -71,6 +74,9 @@ public class Message implements Serializable, Transportable,Cloneable {
*/
private String extra;
/**
* 消息产生时间戳(13位)
*/
private long timestamp;
public Message() {
@ -195,7 +201,6 @@ public class Message implements Serializable, Transportable,Cloneable {
/*
* 下面字段可能为空
*/
if (receiver != null){
builder.setReceiver(receiver);
}

View File

@ -32,7 +32,7 @@ import java.io.Serializable;
public class Ping implements Serializable, Transportable {
private static final long serialVersionUID = 1L;
private static final String TAG = "PING";
private static final String DATA = "PING";
private static final Ping INSTANCE = new Ping();
@ -44,7 +44,7 @@ public class Ping implements Serializable, Transportable {
@Override
public String toString() {
return TAG;
return DATA;
}
@Override

View File

@ -29,7 +29,7 @@ import java.io.Serializable;
public class Pong implements Serializable {
private static final long serialVersionUID = 1L;
private static final String TAG = "PONG";
private static final String DATA = "PONG";
private static final Pong INSTANCE = new Pong();
private Pong() {
@ -41,7 +41,7 @@ public class Pong implements Serializable {
@Override
public String toString() {
return TAG;
return DATA;
}
}

View File

@ -30,8 +30,7 @@ import java.util.HashMap;
import java.util.Map;
/**
* 请求应答对象
*
* 长连接请求应答对象
*/
public class ReplyBody implements Serializable, Transportable {
@ -57,6 +56,9 @@ public class ReplyBody implements Serializable, Transportable {
*/
private final Map<String, String> data = new HashMap<>();
/**
* 服务端产生时间戳(13位)
*/
private long timestamp;
public ReplyBody() {

View File

@ -27,17 +27,25 @@ import java.util.HashMap;
import java.util.Map;
/**
* java |android 客户端请求结构
*
* 客户端通过长连接请求结构
*/
public class SentBody implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 请求key
*/
private String key;
/**
* 请求数据
*/
private final Map<String, String> data = new HashMap<>();
/**
* 客户端产生的时间戳(13位)
*/
private long timestamp;
public Map<String, String> getData() {