mirror of
https://gitee.com/farsunset/cim.git
synced 2025-07-10 10:39:58 +08:00
优化解码socket数据
This commit is contained in:
parent
baa536f442
commit
7175585d37
@ -40,7 +40,7 @@ public class AuthHandler extends ChannelInboundHandlerAdapter {
|
||||
/*
|
||||
* 鉴权不通过,发送响应体并关闭链接
|
||||
*/
|
||||
if (!authPredicate.test(AuthPredicateInfo.of(request, ctx))) {
|
||||
if (authPredicate != null && ! authPredicate.test(AuthPredicateInfo.of(request, ctx))) {
|
||||
ctx.channel().writeAndFlush(failedBody).addListener(ChannelFutureListener.CLOSE);
|
||||
return;
|
||||
}
|
||||
|
@ -31,6 +31,8 @@ import com.farsunset.cim.model.proto.SentBodyProto;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -39,21 +41,40 @@ import java.util.List;
|
||||
*/
|
||||
public class AppMessageDecoder extends ByteToMessageDecoder {
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext context, ByteBuf buffer, List<Object> queue) throws Exception {
|
||||
|
||||
context.channel().attr(ChannelAttr.PING_COUNT).set(null);
|
||||
|
||||
buffer.markReaderIndex();
|
||||
|
||||
byte type = buffer.readByte();
|
||||
|
||||
/*
|
||||
* 收到意外数据
|
||||
*/
|
||||
if (DataType.PONG.getValue() != type && type != DataType.SENT.getValue()){
|
||||
/*
|
||||
消费掉无效数据
|
||||
*/
|
||||
int length = buffer.readableBytes();
|
||||
if (length > 0){
|
||||
buffer.readBytes(new byte[length]);
|
||||
}
|
||||
logger.warn("收到意外的数据,type={},length ={}", type,length);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* 消息体不足3位,发生断包情况
|
||||
*/
|
||||
if (buffer.readableBytes() < CIMConstant.DATA_HEADER_LENGTH) {
|
||||
buffer.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
buffer.markReaderIndex();
|
||||
|
||||
byte type = buffer.readByte();
|
||||
|
||||
byte lv = buffer.readByte();
|
||||
byte hv = buffer.readByte();
|
||||
@ -72,18 +93,14 @@ public class AppMessageDecoder extends ByteToMessageDecoder {
|
||||
buffer.readBytes(content);
|
||||
|
||||
Object message = mappingMessageObject(content, type);
|
||||
|
||||
|
||||
queue.add(message);
|
||||
|
||||
}
|
||||
|
||||
private Object mappingMessageObject(byte[] data, byte type) throws com.google.protobuf.InvalidProtocolBufferException {
|
||||
|
||||
if (DataType.PONG.getValue() == type) {
|
||||
return Pong.getInstance();
|
||||
}
|
||||
|
||||
if (DataType.SENT.getValue() == type) {
|
||||
|
||||
SentBodyProto.Model bodyProto = SentBodyProto.Model.parseFrom(data);
|
||||
SentBody body = new SentBody();
|
||||
body.setData(bodyProto.getDataMap());
|
||||
@ -93,7 +110,7 @@ public class AppMessageDecoder extends ByteToMessageDecoder {
|
||||
return body;
|
||||
}
|
||||
|
||||
throw new ReadInvalidTypeException(type);
|
||||
return Pong.getInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user