mirror of
https://gitee.com/farsunset/cim.git
synced 2025-08-02 20:45:46 +08:00
服务端sdk升级
1、服务端新增 配置长连接心跳间隔和允许超时次数相关参数 2、com.farsunset.cim.acceptor.config.AppSocketConfig修改为com.farsunset.cim.acceptor.config.SocketConfig
This commit is contained in:
parent
adf4f827e6
commit
97fe0a2721
@ -67,7 +67,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.farsunset</groupId>
|
<groupId>com.farsunset</groupId>
|
||||||
<artifactId>cim-server-sdk-netty</artifactId>
|
<artifactId>cim-server-sdk-netty</artifactId>
|
||||||
<version>4.2.5</version>
|
<version>4.2.6</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.commons</groupId>
|
<groupId>org.apache.commons</groupId>
|
||||||
|
@ -23,7 +23,7 @@ package com.farsunset.cim.config;
|
|||||||
|
|
||||||
import com.farsunset.cim.acceptor.AppSocketAcceptor;
|
import com.farsunset.cim.acceptor.AppSocketAcceptor;
|
||||||
import com.farsunset.cim.acceptor.WebsocketAcceptor;
|
import com.farsunset.cim.acceptor.WebsocketAcceptor;
|
||||||
import com.farsunset.cim.acceptor.config.AppSocketConfig;
|
import com.farsunset.cim.acceptor.config.SocketConfig;
|
||||||
import com.farsunset.cim.acceptor.config.WebsocketConfig;
|
import com.farsunset.cim.acceptor.config.WebsocketConfig;
|
||||||
import com.farsunset.cim.component.handler.annotation.CIMHandler;
|
import com.farsunset.cim.component.handler.annotation.CIMHandler;
|
||||||
import com.farsunset.cim.component.predicate.HandshakePredicate;
|
import com.farsunset.cim.component.predicate.HandshakePredicate;
|
||||||
@ -84,6 +84,11 @@ public class CIMConfig implements CIMRequestHandler, ApplicationListener<Applica
|
|||||||
config.setProtocol(properties.getProtocol());
|
config.setProtocol(properties.getProtocol());
|
||||||
config.setOuterRequestHandler(this);
|
config.setOuterRequestHandler(this);
|
||||||
config.setEnable(properties.isEnable());
|
config.setEnable(properties.isEnable());
|
||||||
|
|
||||||
|
config.setWriteIdle(properties.getWriteIdle());
|
||||||
|
config.setReadIdle(properties.getReadIdle());
|
||||||
|
config.setMaxPongTimeout(properties.getMaxPongTimeout());
|
||||||
|
|
||||||
return new WebsocketAcceptor(config);
|
return new WebsocketAcceptor(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,11 +96,15 @@ public class CIMConfig implements CIMRequestHandler, ApplicationListener<Applica
|
|||||||
@ConditionalOnProperty(name = {"cim.app.enable"},matchIfMissing = true)
|
@ConditionalOnProperty(name = {"cim.app.enable"},matchIfMissing = true)
|
||||||
public AppSocketAcceptor appSocketAcceptor(CIMAppSocketProperties properties) {
|
public AppSocketAcceptor appSocketAcceptor(CIMAppSocketProperties properties) {
|
||||||
|
|
||||||
AppSocketConfig config = new AppSocketConfig();
|
SocketConfig config = new SocketConfig();
|
||||||
config.setPort(properties.getPort());
|
config.setPort(properties.getPort());
|
||||||
config.setOuterRequestHandler(this);
|
config.setOuterRequestHandler(this);
|
||||||
config.setEnable(properties.isEnable());
|
config.setEnable(properties.isEnable());
|
||||||
|
|
||||||
|
config.setWriteIdle(properties.getWriteIdle());
|
||||||
|
config.setReadIdle(properties.getReadIdle());
|
||||||
|
config.setMaxPongTimeout(properties.getMaxPongTimeout());
|
||||||
|
|
||||||
return new AppSocketAcceptor(config);
|
return new AppSocketAcceptor(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,8 @@ package com.farsunset.cim.config.properties;
|
|||||||
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
|
||||||
@ConfigurationProperties(prefix = "cim.app")
|
@ConfigurationProperties(prefix = "cim.app")
|
||||||
public class CIMAppSocketProperties {
|
public class CIMAppSocketProperties {
|
||||||
@ -31,6 +33,25 @@ public class CIMAppSocketProperties {
|
|||||||
|
|
||||||
private Integer port;
|
private Integer port;
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接写空闲时间触发时间(s)
|
||||||
|
心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件
|
||||||
|
*/
|
||||||
|
private Duration writeIdle = Duration.ofSeconds(45);
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接读空闲时间触发时间(s)
|
||||||
|
心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
|
||||||
|
*/
|
||||||
|
private Duration readIdle = Duration.ofSeconds(60);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接最大允许心跳响应超时次数
|
||||||
|
达到该次数则 服务端断开链接
|
||||||
|
*/
|
||||||
|
private int maxPongTimeout = 1;
|
||||||
|
|
||||||
public void setPort(Integer port) {
|
public void setPort(Integer port) {
|
||||||
this.port = port;
|
this.port = port;
|
||||||
}
|
}
|
||||||
@ -47,4 +68,37 @@ public class CIMAppSocketProperties {
|
|||||||
public void setEnable(boolean enable) {
|
public void setEnable(boolean enable) {
|
||||||
this.enable = enable;
|
this.enable = enable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Duration getWriteIdle() {
|
||||||
|
return writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteIdle(Duration writeIdle) {
|
||||||
|
if (writeIdle == null || writeIdle.getSeconds() <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.writeIdle = writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Duration getReadIdle() {
|
||||||
|
return readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadIdle(Duration readIdle) {
|
||||||
|
if (readIdle == null || readIdle.getSeconds() <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.readIdle = readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxPongTimeout() {
|
||||||
|
return maxPongTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPongTimeout(int maxPongTimeout) {
|
||||||
|
if (maxPongTimeout <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.maxPongTimeout = maxPongTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,8 @@ package com.farsunset.cim.config.properties;
|
|||||||
import com.farsunset.cim.constant.WebsocketProtocol;
|
import com.farsunset.cim.constant.WebsocketProtocol;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
|
||||||
@ConfigurationProperties(prefix = "cim.websocket")
|
@ConfigurationProperties(prefix = "cim.websocket")
|
||||||
public class CIMWebsocketProperties {
|
public class CIMWebsocketProperties {
|
||||||
@ -34,6 +36,25 @@ public class CIMWebsocketProperties {
|
|||||||
private String path;
|
private String path;
|
||||||
private WebsocketProtocol protocol;
|
private WebsocketProtocol protocol;
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接写空闲时间触发时间(s)
|
||||||
|
心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件
|
||||||
|
*/
|
||||||
|
private Duration writeIdle = Duration.ofSeconds(45);
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接读空闲时间触发时间(s)
|
||||||
|
心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
|
||||||
|
*/
|
||||||
|
private Duration readIdle = Duration.ofSeconds(60);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接最大允许心跳响应超时次数
|
||||||
|
达到该次数则 服务端断开链接
|
||||||
|
*/
|
||||||
|
private int maxPongTimeout = 1;
|
||||||
|
|
||||||
public Integer getPort() {
|
public Integer getPort() {
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
@ -65,4 +86,37 @@ public class CIMWebsocketProperties {
|
|||||||
public void setEnable(boolean enable) {
|
public void setEnable(boolean enable) {
|
||||||
this.enable = enable;
|
this.enable = enable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Duration getWriteIdle() {
|
||||||
|
return writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteIdle(Duration writeIdle) {
|
||||||
|
if (writeIdle == null || writeIdle.getSeconds() <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.writeIdle = writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Duration getReadIdle() {
|
||||||
|
return readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadIdle(Duration readIdle) {
|
||||||
|
if (readIdle == null || readIdle.getSeconds() <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.readIdle = readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxPongTimeout() {
|
||||||
|
return maxPongTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPongTimeout(int maxPongTimeout) {
|
||||||
|
if (maxPongTimeout <= 0){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.maxPongTimeout = maxPongTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,6 +69,9 @@ spring.messages.basename=i18n/messages
|
|||||||
#commented to disable this port.
|
#commented to disable this port.
|
||||||
cim.app.port=23456
|
cim.app.port=23456
|
||||||
cim.app.enable=true
|
cim.app.enable=true
|
||||||
|
cim.app.write-idle=45s
|
||||||
|
cim.app.read-idle=60s
|
||||||
|
cim.app.max-pong-timeout=3
|
||||||
|
|
||||||
cim.websocket.enable=true
|
cim.websocket.enable=true
|
||||||
cim.websocket.port=34567
|
cim.websocket.port=34567
|
||||||
@ -76,6 +79,10 @@ cim.websocket.path=/
|
|||||||
## json or protobuf
|
## json or protobuf
|
||||||
cim.websocket.protocol=protobuf
|
cim.websocket.protocol=protobuf
|
||||||
|
|
||||||
|
cim.websocket.write-idle=45s
|
||||||
|
cim.websocket.read-idle=60s
|
||||||
|
cim.websocket.max-pong-timeout=3
|
||||||
|
|
||||||
#please setting your p12 info and appId.
|
#please setting your p12 info and appId.
|
||||||
cim.apns.p12.file=/apns/app.p12
|
cim.apns.p12.file=/apns/app.p12
|
||||||
cim.apns.p12.password=123
|
cim.apns.p12.password=123
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.farsunset</groupId>
|
<groupId>com.farsunset</groupId>
|
||||||
<artifactId>cim-server-sdk-netty</artifactId>
|
<artifactId>cim-server-sdk-netty</artifactId>
|
||||||
<version>4.2.5</version>
|
<version>4.2.6</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>${project.groupId}:${project.artifactId}</name>
|
<name>${project.groupId}:${project.artifactId}</name>
|
||||||
@ -28,8 +28,8 @@
|
|||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||||
<java.version>1.8</java.version>
|
<java.version>1.8</java.version>
|
||||||
<protobuf.java.version>3.19.4</protobuf.java.version>
|
<protobuf.java.version>3.19.6</protobuf.java.version>
|
||||||
<netty.version>4.1.73.Final</netty.version>
|
<netty.version>4.1.86.Final</netty.version>
|
||||||
<slf4j.version>1.7.36</slf4j.version>
|
<slf4j.version>1.7.36</slf4j.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
@ -92,7 +92,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
<artifactId>jackson-databind</artifactId>
|
<artifactId>jackson-databind</artifactId>
|
||||||
<version>2.13.2</version>
|
<version>2.13.4.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
*/
|
*/
|
||||||
package com.farsunset.cim.acceptor;
|
package com.farsunset.cim.acceptor;
|
||||||
|
|
||||||
import com.farsunset.cim.acceptor.config.AppSocketConfig;
|
import com.farsunset.cim.acceptor.config.SocketConfig;
|
||||||
import com.farsunset.cim.coder.protobuf.AppMessageDecoder;
|
import com.farsunset.cim.coder.protobuf.AppMessageDecoder;
|
||||||
import com.farsunset.cim.coder.protobuf.AppMessageEncoder;
|
import com.farsunset.cim.coder.protobuf.AppMessageEncoder;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
@ -41,20 +41,19 @@ import java.util.concurrent.TimeUnit;
|
|||||||
@ChannelHandler.Sharable
|
@ChannelHandler.Sharable
|
||||||
public class AppSocketAcceptor extends NioSocketAcceptor {
|
public class AppSocketAcceptor extends NioSocketAcceptor {
|
||||||
|
|
||||||
private final AppSocketConfig config;
|
|
||||||
|
|
||||||
public AppSocketAcceptor(AppSocketConfig config){
|
public AppSocketAcceptor(SocketConfig config){
|
||||||
super(config.getOuterRequestHandler());
|
super(config);
|
||||||
this.config = config;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* bind基于tlv协议的socket端口
|
* bind基于tlv协议的socket端口
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void bind(){
|
public void bind(){
|
||||||
|
|
||||||
if (!config.isEnable()){
|
if (!socketConfig.isEnable()){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -65,12 +64,12 @@ public class AppSocketAcceptor extends NioSocketAcceptor {
|
|||||||
ch.pipeline().addLast(new AppMessageDecoder());
|
ch.pipeline().addLast(new AppMessageDecoder());
|
||||||
ch.pipeline().addLast(new AppMessageEncoder());
|
ch.pipeline().addLast(new AppMessageEncoder());
|
||||||
ch.pipeline().addLast(loggingHandler);
|
ch.pipeline().addLast(loggingHandler);
|
||||||
ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS));
|
ch.pipeline().addLast(new IdleStateHandler(socketConfig.getReadIdle().getSeconds(), socketConfig.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS));
|
||||||
ch.pipeline().addLast(AppSocketAcceptor.this);
|
ch.pipeline().addLast(AppSocketAcceptor.this);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
ChannelFuture channelFuture = bootstrap.bind(config.getPort()).syncUninterruptibly();
|
ChannelFuture channelFuture = bootstrap.bind(socketConfig.getPort()).syncUninterruptibly();
|
||||||
channelFuture.channel().newSucceededFuture().addListener(future -> {
|
channelFuture.channel().newSucceededFuture().addListener(future -> {
|
||||||
String logBanner = "\n\n" +
|
String logBanner = "\n\n" +
|
||||||
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
|
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
|
||||||
@ -80,7 +79,7 @@ public class AppSocketAcceptor extends NioSocketAcceptor {
|
|||||||
"* *\n" +
|
"* *\n" +
|
||||||
"* *\n" +
|
"* *\n" +
|
||||||
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
|
"* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
|
||||||
logger.info(logBanner, config.getPort());
|
logger.info(logBanner, socketConfig.getPort());
|
||||||
});
|
});
|
||||||
channelFuture.channel().closeFuture().addListener(future -> this.destroy());
|
channelFuture.channel().closeFuture().addListener(future -> this.destroy());
|
||||||
}
|
}
|
||||||
|
@ -21,9 +21,9 @@
|
|||||||
*/
|
*/
|
||||||
package com.farsunset.cim.acceptor;
|
package com.farsunset.cim.acceptor;
|
||||||
|
|
||||||
|
import com.farsunset.cim.acceptor.config.SocketConfig;
|
||||||
import com.farsunset.cim.constant.CIMConstant;
|
import com.farsunset.cim.constant.CIMConstant;
|
||||||
import com.farsunset.cim.constant.ChannelAttr;
|
import com.farsunset.cim.constant.ChannelAttr;
|
||||||
import com.farsunset.cim.handler.CIMRequestHandler;
|
|
||||||
import com.farsunset.cim.handler.LoggingHandler;
|
import com.farsunset.cim.handler.LoggingHandler;
|
||||||
import com.farsunset.cim.model.Ping;
|
import com.farsunset.cim.model.Ping;
|
||||||
import com.farsunset.cim.model.SentBody;
|
import com.farsunset.cim.model.SentBody;
|
||||||
@ -38,36 +38,22 @@ import io.netty.handler.timeout.IdleStateEvent;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
||||||
|
|
||||||
private static final int PONG_TIME_OUT_COUNT = 3;
|
|
||||||
|
|
||||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
|
|
||||||
protected final ChannelHandler loggingHandler = new LoggingHandler();
|
protected final ChannelHandler loggingHandler = new LoggingHandler();
|
||||||
|
|
||||||
|
protected final SocketConfig socketConfig;
|
||||||
|
|
||||||
private final EventLoopGroup bossGroup;
|
private final EventLoopGroup bossGroup;
|
||||||
private final EventLoopGroup workerGroup;
|
private final EventLoopGroup workerGroup;
|
||||||
|
|
||||||
private final CIMRequestHandler outerRequestHandler;
|
protected NioSocketAcceptor(SocketConfig socketConfig){
|
||||||
|
|
||||||
/**
|
this.socketConfig = socketConfig;
|
||||||
* 读空闲时间(秒)
|
|
||||||
*/
|
|
||||||
public final Duration writeIdle = Duration.ofSeconds(45);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 写接空闲时间(秒)
|
|
||||||
*/
|
|
||||||
public final Duration readIdle = Duration.ofSeconds(60);
|
|
||||||
|
|
||||||
|
|
||||||
protected NioSocketAcceptor(CIMRequestHandler outerRequestHandler){
|
|
||||||
|
|
||||||
this.outerRequestHandler = outerRequestHandler;
|
|
||||||
|
|
||||||
ThreadFactory bossThreadFactory = r -> {
|
ThreadFactory bossThreadFactory = r -> {
|
||||||
Thread thread = new Thread(r);
|
Thread thread = new Thread(r);
|
||||||
@ -91,6 +77,15 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行启动SOCKET服务
|
||||||
|
*/
|
||||||
|
public abstract void bind();
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行注销SOCKET服务
|
||||||
|
*/
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
if(bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown() ) {
|
if(bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown() ) {
|
||||||
try {bossGroup.shutdownGracefully();}catch(Exception ignore) {}
|
try {bossGroup.shutdownGracefully();}catch(Exception ignore) {}
|
||||||
@ -115,8 +110,8 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
|||||||
/*
|
/*
|
||||||
* 由业务层去处理其他的sentBody
|
* 由业务层去处理其他的sentBody
|
||||||
*/
|
*/
|
||||||
if (outerRequestHandler != null){
|
if (socketConfig.getOuterRequestHandler() != null){
|
||||||
outerRequestHandler.process(ctx.channel(), body);
|
socketConfig.getOuterRequestHandler().process(ctx.channel(), body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,13 +127,13 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (outerRequestHandler == null){
|
if (socketConfig.getOuterRequestHandler() == null){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SentBody body = new SentBody();
|
SentBody body = new SentBody();
|
||||||
body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED);
|
body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED);
|
||||||
outerRequestHandler.process(ctx.channel(), body);
|
socketConfig.getOuterRequestHandler().process(ctx.channel(), body);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -177,7 +172,7 @@ abstract class NioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>{
|
|||||||
* 如果心跳请求发出(readIdle-writeIdle)秒内没收到响应,则关闭连接
|
* 如果心跳请求发出(readIdle-writeIdle)秒内没收到响应,则关闭连接
|
||||||
*/
|
*/
|
||||||
Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get();
|
Integer pingCount = ctx.channel().attr(ChannelAttr.PING_COUNT).get();
|
||||||
if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= PONG_TIME_OUT_COUNT) {
|
if (idleEvent.state() == IdleState.READER_IDLE && pingCount != null && pingCount >= socketConfig.getMaxPongTimeout()) {
|
||||||
ctx.close();
|
ctx.close();
|
||||||
logger.info("{} pong timeout.",ctx.channel());
|
logger.info("{} pong timeout.",ctx.channel());
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
|
|||||||
private final ChannelHandler illegalRequestHandler = new IllegalRequestHandler();
|
private final ChannelHandler illegalRequestHandler = new IllegalRequestHandler();
|
||||||
|
|
||||||
public WebsocketAcceptor(WebsocketConfig config){
|
public WebsocketAcceptor(WebsocketConfig config){
|
||||||
super(config.getOuterRequestHandler());
|
super(config);
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.handshakeHandler = new HandshakeHandler(config.getHandshakePredicate());
|
this.handshakeHandler = new HandshakeHandler(config.getHandshakePredicate());
|
||||||
}
|
}
|
||||||
@ -82,6 +82,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
|
|||||||
/**
|
/**
|
||||||
* bind基于websocket协议的socket端口
|
* bind基于websocket协议的socket端口
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void bind(){
|
public void bind(){
|
||||||
|
|
||||||
if (!config.isEnable()){
|
if (!config.isEnable()){
|
||||||
@ -105,7 +106,7 @@ public class WebsocketAcceptor extends NioSocketAcceptor {
|
|||||||
ch.pipeline().addLast(new WebMessageDecoder());
|
ch.pipeline().addLast(new WebMessageDecoder());
|
||||||
ch.pipeline().addLast(new WebMessageEncoder());
|
ch.pipeline().addLast(new WebMessageEncoder());
|
||||||
}
|
}
|
||||||
ch.pipeline().addLast(new IdleStateHandler(readIdle.getSeconds(), writeIdle.getSeconds(), 0, TimeUnit.SECONDS));
|
ch.pipeline().addLast(new IdleStateHandler(config.getReadIdle().getSeconds(), config.getWriteIdle().getSeconds(), 0, TimeUnit.SECONDS));
|
||||||
ch.pipeline().addLast(loggingHandler);
|
ch.pipeline().addLast(loggingHandler);
|
||||||
ch.pipeline().addLast(WebsocketAcceptor.this);
|
ch.pipeline().addLast(WebsocketAcceptor.this);
|
||||||
ch.pipeline().addLast(illegalRequestHandler);
|
ch.pipeline().addLast(illegalRequestHandler);
|
||||||
|
@ -23,15 +23,17 @@ package com.farsunset.cim.acceptor.config;
|
|||||||
|
|
||||||
import com.farsunset.cim.handler.CIMRequestHandler;
|
import com.farsunset.cim.handler.CIMRequestHandler;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TLV协议socket端口配置
|
* TLV协议socket端口配置
|
||||||
*/
|
*/
|
||||||
public class AppSocketConfig {
|
public class SocketConfig {
|
||||||
|
|
||||||
private static final int DEFAULT_PORT = 23456;
|
private static final int DEFAULT_PORT = 23456;
|
||||||
|
|
||||||
/*
|
/**
|
||||||
TVL协议socket端口
|
TVL协议socket端口
|
||||||
*/
|
*/
|
||||||
private Integer port;
|
private Integer port;
|
||||||
@ -46,8 +48,27 @@ public class AppSocketConfig {
|
|||||||
*/
|
*/
|
||||||
private boolean enable;
|
private boolean enable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接写空闲时间触发时间(s)
|
||||||
|
心跳发送定时,每当x秒无数据下发写入,触发 服务端-->客户端 心跳事件
|
||||||
|
*/
|
||||||
|
private Duration writeIdle = Duration.ofSeconds(45);
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接读空闲时间触发时间(s)
|
||||||
|
心跳响应定时,每当readIdle - writeIdle秒无数据接收,触发心跳超时计数
|
||||||
|
*/
|
||||||
|
private Duration readIdle = Duration.ofSeconds(60);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
长链接最大允许心跳响应超时次数
|
||||||
|
达到该次数则 服务端断开链接
|
||||||
|
*/
|
||||||
|
private int maxPongTimeout = 1;
|
||||||
|
|
||||||
public Integer getPort() {
|
public Integer getPort() {
|
||||||
return port == null ? DEFAULT_PORT : port;
|
return port == null || port <= 0 ? DEFAULT_PORT : port;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPort(Integer port) {
|
public void setPort(Integer port) {
|
||||||
@ -69,4 +90,28 @@ public class AppSocketConfig {
|
|||||||
public void setEnable(boolean enable) {
|
public void setEnable(boolean enable) {
|
||||||
this.enable = enable;
|
this.enable = enable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Duration getWriteIdle() {
|
||||||
|
return writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteIdle(Duration writeIdle) {
|
||||||
|
this.writeIdle = writeIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Duration getReadIdle() {
|
||||||
|
return readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadIdle(Duration readIdle) {
|
||||||
|
this.readIdle = readIdle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxPongTimeout() {
|
||||||
|
return maxPongTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxPongTimeout(int maxPongTimeout) {
|
||||||
|
this.maxPongTimeout = maxPongTimeout;
|
||||||
|
}
|
||||||
}
|
}
|
@ -22,7 +22,6 @@
|
|||||||
package com.farsunset.cim.acceptor.config;
|
package com.farsunset.cim.acceptor.config;
|
||||||
|
|
||||||
import com.farsunset.cim.constant.WebsocketProtocol;
|
import com.farsunset.cim.constant.WebsocketProtocol;
|
||||||
import com.farsunset.cim.handler.CIMRequestHandler;
|
|
||||||
import com.farsunset.cim.handshake.HandshakeEvent;
|
import com.farsunset.cim.handshake.HandshakeEvent;
|
||||||
|
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
@ -30,7 +29,7 @@ import java.util.function.Predicate;
|
|||||||
/**
|
/**
|
||||||
* 基于websocket的服务配置
|
* 基于websocket的服务配置
|
||||||
*/
|
*/
|
||||||
public class WebsocketConfig {
|
public class WebsocketConfig extends SocketConfig{
|
||||||
|
|
||||||
private static final int DEFAULT_PORT = 34567;
|
private static final int DEFAULT_PORT = 34567;
|
||||||
|
|
||||||
@ -38,11 +37,6 @@ public class WebsocketConfig {
|
|||||||
|
|
||||||
private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF;
|
private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF;
|
||||||
|
|
||||||
/**
|
|
||||||
* websocket端口
|
|
||||||
*/
|
|
||||||
private Integer port;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* websocket端点地址
|
* websocket端点地址
|
||||||
*/
|
*/
|
||||||
@ -53,27 +47,15 @@ public class WebsocketConfig {
|
|||||||
*/
|
*/
|
||||||
private WebsocketProtocol protocol;
|
private WebsocketProtocol protocol;
|
||||||
|
|
||||||
/**
|
|
||||||
* socket消息处理器
|
|
||||||
*/
|
|
||||||
private CIMRequestHandler outerRequestHandler;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* websocket鉴权实现
|
* websocket鉴权实现
|
||||||
*/
|
*/
|
||||||
private Predicate<HandshakeEvent> handshakePredicate;
|
private Predicate<HandshakeEvent> handshakePredicate;
|
||||||
|
|
||||||
/**
|
|
||||||
* 是否启用websocket
|
|
||||||
*/
|
|
||||||
private boolean enable;
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public Integer getPort() {
|
public Integer getPort() {
|
||||||
return port == null ? DEFAULT_PORT : port;
|
return super.getPort() == null || super.getPort() <= 0 ? DEFAULT_PORT : super.getPort();
|
||||||
}
|
|
||||||
|
|
||||||
public void setPort(Integer port) {
|
|
||||||
this.port = port;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPath() {
|
public String getPath() {
|
||||||
@ -84,11 +66,6 @@ public class WebsocketConfig {
|
|||||||
return protocol == null ? DEFAULT_PROTOCOL : protocol;
|
return protocol == null ? DEFAULT_PROTOCOL : protocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public CIMRequestHandler getOuterRequestHandler() {
|
|
||||||
return outerRequestHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Predicate<HandshakeEvent> getHandshakePredicate() {
|
public Predicate<HandshakeEvent> getHandshakePredicate() {
|
||||||
return handshakePredicate;
|
return handshakePredicate;
|
||||||
}
|
}
|
||||||
@ -101,19 +78,9 @@ public class WebsocketConfig {
|
|||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setOuterRequestHandler(CIMRequestHandler outerRequestHandler) {
|
|
||||||
this.outerRequestHandler = outerRequestHandler;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHandshakePredicate(Predicate<HandshakeEvent> handshakePredicate) {
|
public void setHandshakePredicate(Predicate<HandshakeEvent> handshakePredicate) {
|
||||||
this.handshakePredicate = handshakePredicate;
|
this.handshakePredicate = handshakePredicate;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isEnable() {
|
|
||||||
return enable;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setEnable(boolean enable) {
|
|
||||||
this.enable = enable;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user