From 97fe0a2721ca480229b55cfb4f336840521724e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=BF=9C=E6=96=B9=E5=A4=95=E9=98=B3?= Date: Mon, 6 Feb 2023 18:22:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=AB=AFsdk=E5=8D=87?= =?UTF-8?q?=E7=BA=A7=201=E3=80=81=E6=9C=8D=E5=8A=A1=E7=AB=AF=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20=E9=85=8D=E7=BD=AE=E9=95=BF=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E9=97=B4=E9=9A=94=E5=92=8C=E5=85=81=E8=AE=B8?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=AC=A1=E6=95=B0=E7=9B=B8=E5=85=B3=E5=8F=82?= =?UTF-8?q?=E6=95=B0=202=E3=80=81com.farsunset.cim.acceptor.config.AppSock?= =?UTF-8?q?etConfig=E4=BF=AE=E6=94=B9=E4=B8=BAcom.farsunset.cim.acceptor.c?= =?UTF-8?q?onfig.SocketConfig?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cim-boot-server/pom.xml | 2 +- .../com/farsunset/cim/config/CIMConfig.java | 13 ++++- .../properties/CIMAppSocketProperties.java | 54 +++++++++++++++++++ .../properties/CIMWebsocketProperties.java | 54 +++++++++++++++++++ .../src/main/resources/application.properties | 7 +++ cim-server-sdk/pom.xml | 8 +-- .../cim/acceptor/AppSocketAcceptor.java | 17 +++--- .../cim/acceptor/NioSocketAcceptor.java | 43 +++++++-------- .../cim/acceptor/WebsocketAcceptor.java | 5 +- ...AppSocketConfig.java => SocketConfig.java} | 51 ++++++++++++++++-- .../cim/acceptor/config/WebsocketConfig.java | 39 ++------------ 11 files changed, 212 insertions(+), 81 deletions(-) rename cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/{AppSocketConfig.java => SocketConfig.java} (61%) diff --git a/cim-boot-server/pom.xml b/cim-boot-server/pom.xml index e58c850..3979b7d 100644 --- a/cim-boot-server/pom.xml +++ b/cim-boot-server/pom.xml @@ -67,7 +67,7 @@ com.farsunset cim-server-sdk-netty - 4.2.5 + 4.2.6 org.apache.commons diff --git a/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java b/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java index 1901820..8ca4bac 100644 --- a/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java +++ b/cim-boot-server/src/main/java/com/farsunset/cim/config/CIMConfig.java @@ -23,7 +23,7 @@ package com.farsunset.cim.config; import com.farsunset.cim.acceptor.AppSocketAcceptor; import com.farsunset.cim.acceptor.WebsocketAcceptor; -import com.farsunset.cim.acceptor.config.AppSocketConfig; +import com.farsunset.cim.acceptor.config.SocketConfig; import com.farsunset.cim.acceptor.config.WebsocketConfig; import com.farsunset.cim.component.handler.annotation.CIMHandler; import com.farsunset.cim.component.predicate.HandshakePredicate; @@ -84,6 +84,11 @@ public class CIMConfig implements CIMRequestHandler, ApplicationListener客户端 心跳事件 + */ + private Duration writeIdle = Duration.ofSeconds(45); + + /** + 长链接读空闲时间触发时间(s) + 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数 + */ + private Duration readIdle = Duration.ofSeconds(60); + + + /** + 长链接最大允许心跳响应超时次数 + 达到该次数则 服务端断开链接 + */ + private int maxPongTimeout = 1; + public void setPort(Integer port) { this.port = port; } @@ -47,4 +68,37 @@ public class CIMAppSocketProperties { public void setEnable(boolean enable) { this.enable = enable; } + + public Duration getWriteIdle() { + return writeIdle; + } + + public void setWriteIdle(Duration writeIdle) { + if (writeIdle == null || writeIdle.getSeconds() <= 0){ + return; + } + this.writeIdle = writeIdle; + } + + public Duration getReadIdle() { + return readIdle; + } + + public void setReadIdle(Duration readIdle) { + if (readIdle == null || readIdle.getSeconds() <= 0){ + return; + } + this.readIdle = readIdle; + } + + public int getMaxPongTimeout() { + return maxPongTimeout; + } + + public void setMaxPongTimeout(int maxPongTimeout) { + if (maxPongTimeout <= 0){ + return; + } + this.maxPongTimeout = maxPongTimeout; + } } diff --git a/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java b/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java index 4f4d2ae..d668137 100644 --- a/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java +++ b/cim-boot-server/src/main/java/com/farsunset/cim/config/properties/CIMWebsocketProperties.java @@ -24,6 +24,8 @@ package com.farsunset.cim.config.properties; import com.farsunset.cim.constant.WebsocketProtocol; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.time.Duration; + @ConfigurationProperties(prefix = "cim.websocket") public class CIMWebsocketProperties { @@ -34,6 +36,25 @@ public class CIMWebsocketProperties { private String path; private WebsocketProtocol protocol; + /** + 长链接写空闲时间触发时间(s) + 心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件 + */ + private Duration writeIdle = Duration.ofSeconds(45); + + /** + 长链接读空闲时间触发时间(s) + 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数 + */ + private Duration readIdle = Duration.ofSeconds(60); + + + /** + 长链接最大允许心跳响应超时次数 + 达到该次数则 服务端断开链接 + */ + private int maxPongTimeout = 1; + public Integer getPort() { return port; } @@ -65,4 +86,37 @@ public class CIMWebsocketProperties { public void setEnable(boolean enable) { this.enable = enable; } + + public Duration getWriteIdle() { + return writeIdle; + } + + public void setWriteIdle(Duration writeIdle) { + if (writeIdle == null || writeIdle.getSeconds() <= 0){ + return; + } + this.writeIdle = writeIdle; + } + + public Duration getReadIdle() { + return readIdle; + } + + public void setReadIdle(Duration readIdle) { + if (readIdle == null || readIdle.getSeconds() <= 0){ + return; + } + this.readIdle = readIdle; + } + + public int getMaxPongTimeout() { + return maxPongTimeout; + } + + public void setMaxPongTimeout(int maxPongTimeout) { + if (maxPongTimeout <= 0){ + return; + } + this.maxPongTimeout = maxPongTimeout; + } } diff --git a/cim-boot-server/src/main/resources/application.properties b/cim-boot-server/src/main/resources/application.properties index 9fcfbbb..4b9b080 100644 --- a/cim-boot-server/src/main/resources/application.properties +++ b/cim-boot-server/src/main/resources/application.properties @@ -69,6 +69,9 @@ spring.messages.basename=i18n/messages #commented to disable this port. cim.app.port=23456 cim.app.enable=true +cim.app.write-idle=45s +cim.app.read-idle=60s +cim.app.max-pong-timeout=3 cim.websocket.enable=true cim.websocket.port=34567 @@ -76,6 +79,10 @@ cim.websocket.path=/ ## json or protobuf cim.websocket.protocol=protobuf +cim.websocket.write-idle=45s +cim.websocket.read-idle=60s +cim.websocket.max-pong-timeout=3 + #please setting your p12 info and appId. cim.apns.p12.file=/apns/app.p12 cim.apns.p12.password=123 diff --git a/cim-server-sdk/pom.xml b/cim-server-sdk/pom.xml index b218241..c02b92b 100644 --- a/cim-server-sdk/pom.xml +++ b/cim-server-sdk/pom.xml @@ -6,7 +6,7 @@ com.farsunset cim-server-sdk-netty - 4.2.5 + 4.2.6 jar ${project.groupId}:${project.artifactId} @@ -28,8 +28,8 @@ UTF-8 UTF-8 1.8 - 3.19.4 - 4.1.73.Final + 3.19.6 + 4.1.86.Final 1.7.36 @@ -92,7 +92,7 @@ com.fasterxml.jackson.core jackson-databind - 2.13.2 + 2.13.4.1 diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java index 04b0d20..6fce197 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/AppSocketAcceptor.java @@ -21,7 +21,7 @@ */ package com.farsunset.cim.acceptor; -import com.farsunset.cim.acceptor.config.AppSocketConfig; +import com.farsunset.cim.acceptor.config.SocketConfig; import com.farsunset.cim.coder.protobuf.AppMessageDecoder; import com.farsunset.cim.coder.protobuf.AppMessageEncoder; import io.netty.bootstrap.ServerBootstrap; @@ -41,20 +41,19 @@ import java.util.concurrent.TimeUnit; @ChannelHandler.Sharable public class AppSocketAcceptor extends NioSocketAcceptor { - private final AppSocketConfig config; - public AppSocketAcceptor(AppSocketConfig config){ - super(config.getOuterRequestHandler()); - this.config = config; + public AppSocketAcceptor(SocketConfig config){ + super(config); } /** * bind基于tlv协议的socket端口 */ + @Override public void bind(){ - if (!config.isEnable()){ + if (!socketConfig.isEnable()){ return; } @@ -65,12 +64,12 @@ public class AppSocketAcceptor extends NioSocketAcceptor { ch.pipeline().addLast(new AppMessageDecoder()); ch.pipeline().addLast(new AppMessageEncoder()); ch.pipeline().addLast(loggingHandler); - ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS)); + ch.pipeline().addLast(new IdleStateHandler(socketConfig.getReadIdle().getSeconds(), socketConfig.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS)); ch.pipeline().addLast(AppSocketAcceptor.this); } }); - ChannelFuture channelFuture = bootstrap.bind(config.getPort()).syncUninterruptibly(); + ChannelFuture channelFuture = bootstrap.bind(socketConfig.getPort()).syncUninterruptibly(); channelFuture.channel().newSucceededFuture().addListener(future -> { String logBanner = "\n\n" + "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" + @@ -80,7 +79,7 @@ public class AppSocketAcceptor extends NioSocketAcceptor { "* *\n" + "* *\n" + "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n"; - logger.info(logBanner, config.getPort()); + logger.info(logBanner, socketConfig.getPort()); }); channelFuture.channel().closeFuture().addListener(future -> this.destroy()); } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java index eb8e88b..920bda8 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/NioSocketAcceptor.java @@ -21,9 +21,9 @@ */ package com.farsunset.cim.acceptor; +import com.farsunset.cim.acceptor.config.SocketConfig; import com.farsunset.cim.constant.CIMConstant; import com.farsunset.cim.constant.ChannelAttr; -import com.farsunset.cim.handler.CIMRequestHandler; import com.farsunset.cim.handler.LoggingHandler; import com.farsunset.cim.model.Ping; import com.farsunset.cim.model.SentBody; @@ -38,36 +38,22 @@ import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.concurrent.ThreadFactory; abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ - private static final int PONG_TIME_OUT_COUNT = 3; - protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final ChannelHandler loggingHandler = new LoggingHandler(); + protected final SocketConfig socketConfig; + private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; - private final CIMRequestHandler outerRequestHandler; + protected NioSocketAcceptor(SocketConfig socketConfig){ - /** - * 读空闲时间(秒) - */ - public final Duration writeIdle = Duration.ofSeconds(45); - - /** - * 写接空闲时间(秒) - */ - public final Duration readIdle = Duration.ofSeconds(60); - - - protected NioSocketAcceptor(CIMRequestHandler outerRequestHandler){ - - this.outerRequestHandler = outerRequestHandler; + this.socketConfig = socketConfig; ThreadFactory bossThreadFactory = r -> { Thread thread = new Thread(r); @@ -91,6 +77,15 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ } + /** + * 执行启动SOCKET服务 + */ + public abstract void bind(); + + + /** + * 执行注销SOCKET服务 + */ public void destroy() { if(bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown() ) { try {bossGroup.shutdownGracefully();}catch(Exception ignore) {} @@ -115,8 +110,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ /* * 由业务层去处理其他的sentBody */ - if (outerRequestHandler != null){ - outerRequestHandler.process(ctx.channel(), body); + if (socketConfig.getOuterRequestHandler() != null){ + socketConfig.getOuterRequestHandler().process(ctx.channel(), body); } } @@ -132,13 +127,13 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ return; } - if (outerRequestHandler == null){ + if (socketConfig.getOuterRequestHandler() == null){ return; } SentBody body = new SentBody(); body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED); - outerRequestHandler.process(ctx.channel(), body); + socketConfig.getOuterRequestHandler().process(ctx.channel(), body); } @Override @@ -177,7 +172,7 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler{ * 如果心跳请求发出(readIdle-writeIdle)秒内没收到响应,则关闭连接 */ Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get(); - if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= PONG_TIME_OUT_COUNT) { + if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= socketConfig.getMaxPongTimeout()) { ctx.close(); logger.info("{} pong timeout.",ctx.channel()); } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java index 6fd9002..f63ecc4 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/WebsocketAcceptor.java @@ -74,7 +74,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor { private final ChannelHandler illegalRequestHandler = new IllegalRequestHandler(); public WebsocketAcceptor(WebsocketConfig config){ - super(config.getOuterRequestHandler()); + super(config); this.config = config; this.handshakeHandler = new HandshakeHandler(config.getHandshakePredicate()); } @@ -82,6 +82,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor { /** * bind基于websocket协议的socket端口 */ + @Override public void bind(){ if (!config.isEnable()){ @@ -105,7 +106,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor { ch.pipeline().addLast(new WebMessageDecoder()); ch.pipeline().addLast(new WebMessageEncoder()); } - ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS)); + ch.pipeline().addLast(new IdleStateHandler(config.getReadIdle().getSeconds(), config.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS)); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(WebsocketAcceptor.this); ch.pipeline().addLast(illegalRequestHandler); diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java similarity index 61% rename from cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java rename to cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java index a90b29f..020a6d4 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/SocketConfig.java @@ -23,15 +23,17 @@ package com.farsunset.cim.acceptor.config; import com.farsunset.cim.handler.CIMRequestHandler; +import java.time.Duration; + /** * TLV协议socket端口配置 */ -public class AppSocketConfig { +public class SocketConfig { private static final int DEFAULT_PORT = 23456; - /* + /** TVL协议socket端口 */ private Integer port; @@ -46,8 +48,27 @@ public class AppSocketConfig { */ private boolean enable; + /** + 长链接写空闲时间触发时间(s) + 心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件 + */ + private Duration writeIdle = Duration.ofSeconds(45); + + /** + 长链接读空闲时间触发时间(s) + 心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数 + */ + private Duration readIdle = Duration.ofSeconds(60); + + + /** + 长链接最大允许心跳响应超时次数 + 达到该次数则 服务端断开链接 + */ + private int maxPongTimeout = 1; + public Integer getPort() { - return port == null ? DEFAULT_PORT : port; + return port == null || port <= 0 ? DEFAULT_PORT : port; } public void setPort(Integer port) { @@ -69,4 +90,28 @@ public class AppSocketConfig { public void setEnable(boolean enable) { this.enable = enable; } + + public Duration getWriteIdle() { + return writeIdle; + } + + public void setWriteIdle(Duration writeIdle) { + this.writeIdle = writeIdle; + } + + public Duration getReadIdle() { + return readIdle; + } + + public void setReadIdle(Duration readIdle) { + this.readIdle = readIdle; + } + + public int getMaxPongTimeout() { + return maxPongTimeout; + } + + public void setMaxPongTimeout(int maxPongTimeout) { + this.maxPongTimeout = maxPongTimeout; + } } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java index 0d5d9f7..0012cd6 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java @@ -22,7 +22,6 @@ package com.farsunset.cim.acceptor.config; import com.farsunset.cim.constant.WebsocketProtocol; -import com.farsunset.cim.handler.CIMRequestHandler; import com.farsunset.cim.handshake.HandshakeEvent; import java.util.function.Predicate; @@ -30,7 +29,7 @@ import java.util.function.Predicate; /** * 基于websocket的服务配置 */ -public class WebsocketConfig { +public class WebsocketConfig extends SocketConfig{ private static final int DEFAULT_PORT = 34567; @@ -38,11 +37,6 @@ public class WebsocketConfig { private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF; - /** - * websocket端口 - */ - private Integer port; - /** * websocket端点地址 */ @@ -53,27 +47,15 @@ public class WebsocketConfig { */ private WebsocketProtocol protocol; - /** - * socket消息处理器 - */ - private CIMRequestHandler outerRequestHandler; - /** * websocket鉴权实现 */ private Predicate handshakePredicate; - /** - * 是否启用websocket - */ - private boolean enable; + @Override public Integer getPort() { - return port == null ? DEFAULT_PORT : port; - } - - public void setPort(Integer port) { - this.port = port; + return super.getPort() == null || super.getPort() <= 0 ? DEFAULT_PORT : super.getPort(); } public String getPath() { @@ -84,11 +66,6 @@ public class WebsocketConfig { return protocol == null ? DEFAULT_PROTOCOL : protocol; } - - public CIMRequestHandler getOuterRequestHandler() { - return outerRequestHandler; - } - public Predicate getHandshakePredicate() { return handshakePredicate; } @@ -101,19 +78,9 @@ public class WebsocketConfig { this.protocol = protocol; } - public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) { - this.outerRequestHandler = outerRequestHandler; - } public void setHandshakePredicate(Predicate handshakePredicate) { this.handshakePredicate = handshakePredicate; } - public boolean isEnable() { - return enable; - } - - public void setEnable(boolean enable) { - this.enable = enable; - } }