From ea9efc859441d0436ed5a365b592e4a80455cc98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=BF=9C=E6=96=B9=E5=A4=95=E9=98=B3?= Date: Thu, 1 Sep 2022 13:51:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9C=8D=E5=8A=A1=E7=AB=AFsd?= =?UTF-8?q?k=E5=8A=9F=E8=83=BD=E6=8E=A5=E5=8F=A3=EF=BC=8C=E5=92=8C?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cim-boot-server/pom.xml | 2 +- cim-server-sdk/pom.xml | 2 +- .../cim/acceptor/config/AppSocketConfig.java | 29 +++- .../cim/acceptor/config/WebsocketConfig.java | 37 ++++- .../cim/coder/json/TextMessageDecoder.java | 21 +++ .../farsunset/cim/constant/ChannelAttr.java | 21 +++ .../com/farsunset/cim/group/SessionGroup.java | 145 +++++++++++++++++- .../farsunset/cim/group/TagSessionGroup.java | 21 +++ .../cim/handler/IllegalRequestHandler.java | 21 +++ 9 files changed, 283 insertions(+), 16 deletions(-) diff --git a/cim-boot-server/pom.xml b/cim-boot-server/pom.xml index 8be0608..e58c850 100644 --- a/cim-boot-server/pom.xml +++ b/cim-boot-server/pom.xml @@ -67,7 +67,7 @@ com.farsunset cim-server-sdk-netty - 4.2.0 + 4.2.5 org.apache.commons diff --git a/cim-server-sdk/pom.xml b/cim-server-sdk/pom.xml index c34d35d..b218241 100644 --- a/cim-server-sdk/pom.xml +++ b/cim-server-sdk/pom.xml @@ -6,7 +6,7 @@ com.farsunset cim-server-sdk-netty - 4.2.0 + 4.2.5 jar ${project.groupId}:${project.artifactId} diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java index 404c3cd..a90b29f 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/AppSocketConfig.java @@ -1,7 +1,32 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.acceptor.config; import com.farsunset.cim.handler.CIMRequestHandler; + +/** + * TLV协议socket端口配置 + */ public class AppSocketConfig { private static final int DEFAULT_PORT = 23456; @@ -11,12 +36,12 @@ public class AppSocketConfig { */ private Integer port; - /* + /** * socket消息处理器 */ private CIMRequestHandler outerRequestHandler; - /* + /** 是否启用TVL协议socket */ private boolean enable; diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java index fcba304..0d5d9f7 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/acceptor/config/WebsocketConfig.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.acceptor.config; import com.farsunset.cim.constant.WebsocketProtocol; @@ -17,24 +38,32 @@ public class WebsocketConfig { private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF; + /** + * websocket端口 + */ private Integer port; + + /** + * websocket端点地址 + */ private String path; - /* + /** * 消息体协议,JSON 或者 Protobuf */ private WebsocketProtocol protocol; - /* + + /** * socket消息处理器 */ private CIMRequestHandler outerRequestHandler; - /* + /** * websocket鉴权实现 */ private Predicate handshakePredicate; - /* + /** * 是否启用websocket */ private boolean enable; diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/json/TextMessageDecoder.java b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/json/TextMessageDecoder.java index 476fe47..ff5ceeb 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/coder/json/TextMessageDecoder.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/coder/json/TextMessageDecoder.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.coder.json; import com.farsunset.cim.constant.ChannelAttr; diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java index 80e8caa..16318e5 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/constant/ChannelAttr.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.constant; import io.netty.util.AttributeKey; diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java b/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java index 1d31bfa..a36af13 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/group/SessionGroup.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.group; @@ -19,8 +40,6 @@ import java.util.stream.Collectors; */ public class SessionGroup extends ConcurrentHashMap> { - private static final Collection EMPTY_LIST = new LinkedList<>(); - private final transient ChannelFutureListener remover = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future){ @@ -33,6 +52,10 @@ public class SessionGroup extends ConcurrentHashMap> return channel.attr(ChannelAttr.UID).get(); } + /** + * 将channel从内存容器中删除 + * @param channel + */ public void remove(Channel channel){ String uid = getKey(channel); @@ -41,7 +64,7 @@ public class SessionGroup extends ConcurrentHashMap> return; } - Collection collections = getOrDefault(uid,EMPTY_LIST); + Collection collections = getOrDefault(uid,Collections.emptyList()); collections.remove(channel); @@ -51,6 +74,11 @@ public class SessionGroup extends ConcurrentHashMap> } + + /** + * 将channel加入内存容器中删除 + * @param channel + */ public void add(Channel channel){ String uid = getKey(channel); @@ -71,29 +99,130 @@ public class SessionGroup extends ConcurrentHashMap> } } - + /** + * 查找到key关联的channel并写入消息体 + * @param key + * @param message + */ public void write(String key, Message message){ - find(key).forEach(channel -> channel.writeAndFlush(message)); + this.write(key, message, channel -> true); } + /** + * 查找到key关联的channel并写入消息体 + * @param key + * @param message + * @param matcher channel筛选条件 + */ public void write(String key, Message message, Predicate matcher){ find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message)); } + /** + * 查找到key关联的channel并写入消息体 + * @param key + * @param message + * @param excludedSet 排除的UID集合 + */ public void write(String key, Message message, Collection excludedSet){ - find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message)); + Predicate predicate = new ExcludedUidPredicate(excludedSet); + this.write(key,message,predicate); } + /** + * 查找到消息接收者关联的channel并写入消息体 + * @param message + */ public void write(Message message){ this.write(message.getReceiver(),message); } + /** + * 通过key查找channel集合 + * @param key + * @return + */ public Collection find(String key){ - return this.getOrDefault(key,EMPTY_LIST); + return this.getOrDefault(key,Collections.emptyList()); } + /** + * 通过key查找channel集合 + * @param key + * @param matcher 过滤条件 + * @return + */ + public Collection find(String key,Predicate matcher){ + return this.find(key) + .stream() + .filter(matcher) + .collect(Collectors.toList()); + } + + /** + * 通过key查找channel集合 + * @param key + * @param channel 连接终端类型过滤条件 + * @return + */ public Collection find(String key,String... channel){ List channels = Arrays.asList(channel); - return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList()); + return this.find(key,channels); + } + + /** + * 通过key查找channel集合 + * @param key + * @param channelSet 连接终端类型过滤条件 + * @return + */ + public Collection find(String key,Collection channelSet){ + Predicate predicate = new ChannelPredicate(channelSet); + return find(key,predicate); + } + + /** + * 检查该channel是否存在内存管理当中 + * @param channel + * @return + */ + public boolean isManaged(Channel channel){ + + String uid = getKey(channel); + + if (uid == null || !channel.isActive()){ + return false; + } + + return getOrDefault(uid,Collections.emptyList()).contains(channel); + } + + private static class ExcludedUidPredicate implements Predicate{ + + private final Collection excludedSet; + + private ExcludedUidPredicate(Collection excludedSet) { + this.excludedSet = excludedSet == null ? Collections.emptySet() : excludedSet; + } + + @Override + public boolean test(Channel channel) { + return !excludedSet.contains(channel.attr(ChannelAttr.UID).get()); + } + } + + private static class ChannelPredicate implements Predicate{ + + private final Collection channelSet; + + private ChannelPredicate(Collection channelSet) { + this.channelSet = channelSet == null ? Collections.emptySet() : channelSet; + } + + + @Override + public boolean test(Channel ioChannel) { + return channelSet.contains(ioChannel.attr(ChannelAttr.CHANNEL).get()); + } } } \ No newline at end of file diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java b/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java index acf4297..f0c24ae 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/group/TagSessionGroup.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.group; diff --git a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java index f848871..88ee329 100644 --- a/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java +++ b/cim-server-sdk/src/main/java/com/farsunset/cim/handler/IllegalRequestHandler.java @@ -1,3 +1,24 @@ +/* + * Copyright 2013-2022 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ package com.farsunset.cim.handler; import io.netty.channel.ChannelFutureListener;