diff --git a/cim-boot-server/pom.xml b/cim-boot-server/pom.xml
index e58c850..3979b7d 100644
--- a/cim-boot-server/pom.xml
+++ b/cim-boot-server/pom.xml
@@ -67,7 +67,7 @@
com.farsunset
cim-server-sdk-netty
- 4.2.5
+ 4.2.6
org.apache.commons
diff --git a/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java b/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java
index 1901820..8ca4bac 100644
--- a/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java
+++ b/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java
@@ -23,7 +23,7 @@ package com.farsunset.cim.config;
import com.farsunset.cim.acceptor.AppSocketAcceptor;
import com.farsunset.cim.acceptor.WebsocketAcceptor;
-import com.farsunset.cim.acceptor.config.AppSocketConfig;
+import com.farsunset.cim.acceptor.config.SocketConfig;
import com.farsunset.cim.acceptor.config.WebsocketConfig;
import com.farsunset.cim.component.handler.annotation.CIMHandler;
import com.farsunset.cim.component.predicate.HandshakePredicate;
@@ -84,6 +84,11 @@ public class CIMConfig implements CIMRequestHandler, ApplicationListener客户端 心跳事件
+ */
+ private Duration writeIdle = Duration.ofSeconds(45);
+
+ /**
+ 长链接读空闲时间触发时间(s)
+ 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
+ */
+ private Duration readIdle = Duration.ofSeconds(60);
+
+
+ /**
+ 长链接最大允许心跳响应超时次数
+ 达到该次数则 服务端断开链接
+ */
+ private int maxPongTimeout = 1;
+
public void setPort(Integer port) {
this.port = port;
}
@@ -47,4 +68,37 @@ public class CIMAppSocketProperties {
public void setEnable(boolean enable) {
this.enable = enable;
}
+
+ public Duration getWriteIdle() {
+ return writeIdle;
+ }
+
+ public void setWriteIdle(Duration writeIdle) {
+ if (writeIdle == null || writeIdle.getSeconds() <= 0){
+ return;
+ }
+ this.writeIdle = writeIdle;
+ }
+
+ public Duration getReadIdle() {
+ return readIdle;
+ }
+
+ public void setReadIdle(Duration readIdle) {
+ if (readIdle == null || readIdle.getSeconds() <= 0){
+ return;
+ }
+ this.readIdle = readIdle;
+ }
+
+ public int getMaxPongTimeout() {
+ return maxPongTimeout;
+ }
+
+ public void setMaxPongTimeout(int maxPongTimeout) {
+ if (maxPongTimeout <= 0){
+ return;
+ }
+ this.maxPongTimeout = maxPongTimeout;
+ }
}
diff --git a/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java b/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java
index 4f4d2ae..d668137 100644
--- a/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java
+++ b/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java
@@ -24,6 +24,8 @@ package com.farsunset.cim.config.properties;
import com.farsunset.cim.constant.WebsocketProtocol;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.time.Duration;
+
@ConfigurationProperties(prefix = "cim.websocket")
public class CIMWebsocketProperties {
@@ -34,6 +36,25 @@ public class CIMWebsocketProperties {
private String path;
private WebsocketProtocol protocol;
+ /**
+ 长链接写空闲时间触发时间(s)
+ 心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件
+ */
+ private Duration writeIdle = Duration.ofSeconds(45);
+
+ /**
+ 长链接读空闲时间触发时间(s)
+ 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
+ */
+ private Duration readIdle = Duration.ofSeconds(60);
+
+
+ /**
+ 长链接最大允许心跳响应超时次数
+ 达到该次数则 服务端断开链接
+ */
+ private int maxPongTimeout = 1;
+
public Integer getPort() {
return port;
}
@@ -65,4 +86,37 @@ public class CIMWebsocketProperties {
public void setEnable(boolean enable) {
this.enable = enable;
}
+
+ public Duration getWriteIdle() {
+ return writeIdle;
+ }
+
+ public void setWriteIdle(Duration writeIdle) {
+ if (writeIdle == null || writeIdle.getSeconds() <= 0){
+ return;
+ }
+ this.writeIdle = writeIdle;
+ }
+
+ public Duration getReadIdle() {
+ return readIdle;
+ }
+
+ public void setReadIdle(Duration readIdle) {
+ if (readIdle == null || readIdle.getSeconds() <= 0){
+ return;
+ }
+ this.readIdle = readIdle;
+ }
+
+ public int getMaxPongTimeout() {
+ return maxPongTimeout;
+ }
+
+ public void setMaxPongTimeout(int maxPongTimeout) {
+ if (maxPongTimeout <= 0){
+ return;
+ }
+ this.maxPongTimeout = maxPongTimeout;
+ }
}
diff --git a/cim-boot-server/src/main/resources/application.properties b/cim-boot-server/src/main/resources/application.properties
index 9fcfbbb..4b9b080 100644
--- a/cim-boot-server/src/main/resources/application.properties
+++ b/cim-boot-server/src/main/resources/application.properties
@@ -69,6 +69,9 @@ spring.messages.basename=i18n/messages
#commented to disable this port.
cim.app.port=23456
cim.app.enable=true
+cim.app.write-idle=45s
+cim.app.read-idle=60s
+cim.app.max-pong-timeout=3
cim.websocket.enable=true
cim.websocket.port=34567
@@ -76,6 +79,10 @@ cim.websocket.path=/
## json or protobuf
cim.websocket.protocol=protobuf
+cim.websocket.write-idle=45s
+cim.websocket.read-idle=60s
+cim.websocket.max-pong-timeout=3
+
#please setting your p12 info and appId.
cim.apns.p12.file=/apns/app.p12
cim.apns.p12.password=123
diff --git a/cim-server-sdk/pom.xml b/cim-server-sdk/pom.xml
index b218241..c02b92b 100644
--- a/cim-server-sdk/pom.xml
+++ b/cim-server-sdk/pom.xml
@@ -6,7 +6,7 @@
com.farsunset
cim-server-sdk-netty
- 4.2.5
+ 4.2.6
jar
${project.groupId}:${project.artifactId}
@@ -28,8 +28,8 @@
UTF-8
UTF-8
1.8
- 3.19.4
- 4.1.73.Final
+ 3.19.6
+ 4.1.86.Final
1.7.36
@@ -92,7 +92,7 @@
com.fasterxml.jackson.core
jackson-databind
- 2.13.2
+ 2.13.4.1
diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java
index 04b0d20..6fce197 100644
--- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java
+++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java
@@ -21,7 +21,7 @@
*/
package com.farsunset.cim.acceptor;
-import com.farsunset.cim.acceptor.config.AppSocketConfig;
+import com.farsunset.cim.acceptor.config.SocketConfig;
import com.farsunset.cim.coder.protobuf.AppMessageDecoder;
import com.farsunset.cim.coder.protobuf.AppMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
@@ -41,20 +41,19 @@ import java.util.concurrent.TimeUnit;
@ChannelHandler.Sharable
public class AppSocketAcceptor extends NioSocketAcceptor {
- private final AppSocketConfig config;
- public AppSocketAcceptor(AppSocketConfig config){
- super(config.getOuterRequestHandler());
- this.config = config;
+ public AppSocketAcceptor(SocketConfig config){
+ super(config);
}
/**
* bind基于tlv协议的socket端口
*/
+ @Override
public void bind(){
- if (!config.isEnable()){
+ if (!socketConfig.isEnable()){
return;
}
@@ -65,12 +64,12 @@ public class AppSocketAcceptor extends NioSocketAcceptor {
ch.pipeline().addLast(new AppMessageDecoder());
ch.pipeline().addLast(new AppMessageEncoder());
ch.pipeline().addLast(loggingHandler);
- ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS));
+ ch.pipeline().addLast(new IdleStateHandler(socketConfig.getReadIdle().getSeconds(), socketConfig.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS));
ch.pipeline().addLast(AppSocketAcceptor.this);
}
});
- ChannelFuture channelFuture = bootstrap.bind(config.getPort()).syncUninterruptibly();
+ ChannelFuture channelFuture = bootstrap.bind(socketConfig.getPort()).syncUninterruptibly();
channelFuture.channel().newSucceededFuture().addListener(future -> {
String logBanner = "\n\n" +
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
@@ -80,7 +79,7 @@ public class AppSocketAcceptor extends NioSocketAcceptor {
"* *\n" +
"* *\n" +
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
- logger.info(logBanner, config.getPort());
+ logger.info(logBanner, socketConfig.getPort());
});
channelFuture.channel().closeFuture().addListener(future -> this.destroy());
}
diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java
index eb8e88b..920bda8 100644
--- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java
+++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java
@@ -21,9 +21,9 @@
*/
package com.farsunset.cim.acceptor;
+import com.farsunset.cim.acceptor.config.SocketConfig;
import com.farsunset.cim.constant.CIMConstant;
import com.farsunset.cim.constant.ChannelAttr;
-import com.farsunset.cim.handler.CIMRequestHandler;
import com.farsunset.cim.handler.LoggingHandler;
import com.farsunset.cim.model.Ping;
import com.farsunset.cim.model.SentBody;
@@ -38,36 +38,22 @@ import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Duration;
import java.util.concurrent.ThreadFactory;
abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{
- private static final int PONG_TIME_OUT_COUNT = 3;
-
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final ChannelHandler loggingHandler = new LoggingHandler();
+ protected final SocketConfig socketConfig;
+
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final CIMRequestHandler outerRequestHandler;
+ protected NioSocketAcceptor(SocketConfig socketConfig){
- /**
- * 读空闲时间(秒)
- */
- public final Duration writeIdle = Duration.ofSeconds(45);
-
- /**
- * 写接空闲时间(秒)
- */
- public final Duration readIdle = Duration.ofSeconds(60);
-
-
- protected NioSocketAcceptor(CIMRequestHandler outerRequestHandler){
-
- this.outerRequestHandler = outerRequestHandler;
+ this.socketConfig = socketConfig;
ThreadFactory bossThreadFactory = r -> {
Thread thread = new Thread(r);
@@ -91,6 +77,15 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{
}
+ /**
+ * 执行启动SOCKET服务
+ */
+ public abstract void bind();
+
+
+ /**
+ * 执行注销SOCKET服务
+ */
public void destroy() {
if(bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown() ) {
try {bossGroup.shutdownGracefully();}catch(Exception ignore) {}
@@ -115,8 +110,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{
/*
* 由业务层去处理其他的sentBody
*/
- if (outerRequestHandler != null){
- outerRequestHandler.process(ctx.channel(), body);
+ if (socketConfig.getOuterRequestHandler() != null){
+ socketConfig.getOuterRequestHandler().process(ctx.channel(), body);
}
}
@@ -132,13 +127,13 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{
return;
}
- if (outerRequestHandler == null){
+ if (socketConfig.getOuterRequestHandler() == null){
return;
}
SentBody body = new SentBody();
body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED);
- outerRequestHandler.process(ctx.channel(), body);
+ socketConfig.getOuterRequestHandler().process(ctx.channel(), body);
}
@Override
@@ -177,7 +172,7 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{
* 如果心跳请求发出(readIdle-writeIdle)秒内没收到响应,则关闭连接
*/
Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get();
- if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= PONG_TIME_OUT_COUNT) {
+ if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= socketConfig.getMaxPongTimeout()) {
ctx.close();
logger.info("{} pong timeout.",ctx.channel());
}
diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java
index 6fd9002..f63ecc4 100644
--- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java
+++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java
@@ -74,7 +74,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
private final ChannelHandler illegalRequestHandler = new IllegalRequestHandler();
public WebsocketAcceptor(WebsocketConfig config){
- super(config.getOuterRequestHandler());
+ super(config);
this.config = config;
this.handshakeHandler = new HandshakeHandler(config.getHandshakePredicate());
}
@@ -82,6 +82,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
/**
* bind基于websocket协议的socket端口
*/
+ @Override
public void bind(){
if (!config.isEnable()){
@@ -105,7 +106,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
ch.pipeline().addLast(new WebMessageDecoder());
ch.pipeline().addLast(new WebMessageEncoder());
}
- ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS));
+ ch.pipeline().addLast(new IdleStateHandler(config.getReadIdle().getSeconds(), config.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS));
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(WebsocketAcceptor.this);
ch.pipeline().addLast(illegalRequestHandler);
diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java
similarity index 61%
rename from cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java
rename to cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java
index a90b29f..020a6d4 100644
--- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java
+++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java
@@ -23,15 +23,17 @@ package com.farsunset.cim.acceptor.config;
import com.farsunset.cim.handler.CIMRequestHandler;
+import java.time.Duration;
+
/**
* TLV协议socket端口配置
*/
-public class AppSocketConfig {
+public class SocketConfig {
private static final int DEFAULT_PORT = 23456;
- /*
+ /**
TVL协议socket端口
*/
private Integer port;
@@ -46,8 +48,27 @@ public class AppSocketConfig {
*/
private boolean enable;
+ /**
+ 长链接写空闲时间触发时间(s)
+ 心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件
+ */
+ private Duration writeIdle = Duration.ofSeconds(45);
+
+ /**
+ 长链接读空闲时间触发时间(s)
+ 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
+ */
+ private Duration readIdle = Duration.ofSeconds(60);
+
+
+ /**
+ 长链接最大允许心跳响应超时次数
+ 达到该次数则 服务端断开链接
+ */
+ private int maxPongTimeout = 1;
+
public Integer getPort() {
- return port == null ? DEFAULT_PORT : port;
+ return port == null || port <= 0 ? DEFAULT_PORT : port;
}
public void setPort(Integer port) {
@@ -69,4 +90,28 @@ public class AppSocketConfig {
public void setEnable(boolean enable) {
this.enable = enable;
}
+
+ public Duration getWriteIdle() {
+ return writeIdle;
+ }
+
+ public void setWriteIdle(Duration writeIdle) {
+ this.writeIdle = writeIdle;
+ }
+
+ public Duration getReadIdle() {
+ return readIdle;
+ }
+
+ public void setReadIdle(Duration readIdle) {
+ this.readIdle = readIdle;
+ }
+
+ public int getMaxPongTimeout() {
+ return maxPongTimeout;
+ }
+
+ public void setMaxPongTimeout(int maxPongTimeout) {
+ this.maxPongTimeout = maxPongTimeout;
+ }
}
diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java
index 0d5d9f7..0012cd6 100644
--- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java
+++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java
@@ -22,7 +22,6 @@
package com.farsunset.cim.acceptor.config;
import com.farsunset.cim.constant.WebsocketProtocol;
-import com.farsunset.cim.handler.CIMRequestHandler;
import com.farsunset.cim.handshake.HandshakeEvent;
import java.util.function.Predicate;
@@ -30,7 +29,7 @@ import java.util.function.Predicate;
/**
* 基于websocket的服务配置
*/
-public class WebsocketConfig {
+public class WebsocketConfig extends SocketConfig{
private static final int DEFAULT_PORT = 34567;
@@ -38,11 +37,6 @@ public class WebsocketConfig {
private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF;
- /**
- * websocket端口
- */
- private Integer port;
-
/**
* websocket端点地址
*/
@@ -53,27 +47,15 @@ public class WebsocketConfig {
*/
private WebsocketProtocol protocol;
- /**
- * socket消息处理器
- */
- private CIMRequestHandler outerRequestHandler;
-
/**
* websocket鉴权实现
*/
private Predicate handshakePredicate;
- /**
- * 是否启用websocket
- */
- private boolean enable;
+ @Override
public Integer getPort() {
- return port == null ? DEFAULT_PORT : port;
- }
-
- public void setPort(Integer port) {
- this.port = port;
+ return super.getPort() == null || super.getPort() <= 0 ? DEFAULT_PORT : super.getPort();
}
public String getPath() {
@@ -84,11 +66,6 @@ public class WebsocketConfig {
return protocol == null ? DEFAULT_PROTOCOL : protocol;
}
-
- public CIMRequestHandler getOuterRequestHandler() {
- return outerRequestHandler;
- }
-
public Predicate getHandshakePredicate() {
return handshakePredicate;
}
@@ -101,19 +78,9 @@ public class WebsocketConfig {
this.protocol = protocol;
}
- public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) {
- this.outerRequestHandler = outerRequestHandler;
- }
public void setHandshakePredicate(Predicate handshakePredicate) {
this.handshakePredicate = handshakePredicate;
}
- public boolean isEnable() {
- return enable;
- }
-
- public void setEnable(boolean enable) {
- this.enable = enable;
- }
}