From 7175585d37bb2e360a63f17a889f54c044313a6f Mon Sep 17 00:00:00 2001 From: xia jun <3979434@qq.com> Date: Tue, 11 Jun 2024 12:53:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=A7=A3=E7=A0=81socket?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/farsunset/cim/auth/AuthHandler.java | 2 +- .../cim/coder/protobuf/AppMessageDecoder.java | 37 ++++++++++++++----- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/auth/AuthHandler.java b/cim-server-sdk/src/main/java/com/farsunset/cim/auth/AuthHandler.java index e477503..0538ae7 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/auth/AuthHandler.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/auth/AuthHandler.java @@ -40,7 +40,7 @@ public class AuthHandler extends ChannelInboundHandlerAdapter { /* * 鉴权不通过,发送响应体并关闭链接 */ - if (!authPredicate.test(AuthPredicateInfo.of(request, ctx))) { + if (authPredicate != null && ! authPredicate.test(AuthPredicateInfo.of(request, ctx))) { ctx.channel().writeAndFlush(failedBody).addListener(ChannelFutureListener.CLOSE); return; } diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java index 8121a5a..59c7860 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/protobuf/AppMessageDecoder.java @@ -31,6 +31,8 @@ import com.farsunset.cim.model.proto.SentBodyProto; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; @@ -39,21 +41,40 @@ import java.util.List; */ public class AppMessageDecoder extends ByteToMessageDecoder { + protected final Logger logger = LoggerFactory.getLogger(getClass()); + @Override protected void decode(ChannelHandlerContext context, ByteBuf buffer, List queue) throws Exception { context.channel().attr(ChannelAttr.PING_COUNT).set(null); + buffer.markReaderIndex(); + + byte type = buffer.readByte(); + + /* + * 收到意外数据 + */ + if (DataType.PONG.getValue() != type && type != DataType.SENT.getValue()){ + /* + 消费掉无效数据 + */ + int length = buffer.readableBytes(); + if (length > 0){ + buffer.readBytes(new byte[length]); + } + logger.warn("收到意外的数据,type={},length ={}", type,length); + return; + } + /* * 消息体不足3位,发生断包情况 */ if (buffer.readableBytes() < CIMConstant.DATA_HEADER_LENGTH) { + buffer.resetReaderIndex(); return; } - buffer.markReaderIndex(); - - byte type = buffer.readByte(); byte lv = buffer.readByte(); byte hv = buffer.readByte(); @@ -72,18 +93,14 @@ public class AppMessageDecoder extends ByteToMessageDecoder { buffer.readBytes(content); Object message = mappingMessageObject(content, type); - + queue.add(message); + } private Object mappingMessageObject(byte[] data, byte type) throws com.google.protobuf.InvalidProtocolBufferException { - if (DataType.PONG.getValue() == type) { - return Pong.getInstance(); - } - if (DataType.SENT.getValue() == type) { - SentBodyProto.Model bodyProto = SentBodyProto.Model.parseFrom(data); SentBody body = new SentBody(); body.setData(bodyProto.getDataMap()); @@ -93,7 +110,7 @@ public class AppMessageDecoder extends ByteToMessageDecoder { return body; } - throw new ReadInvalidTypeException(type); + return Pong.getInstance(); } /**