新增服务端sdk功能接口,和代码注释

This commit is contained in:
远方夕阳 2022-09-01 13:51:38 +08:00
parent c93ac73e80
commit ea9efc8594
9 changed files with 283 additions and 16 deletions

View File

@ -67,7 +67,7 @@
<dependency>
<groupId>com.farsunset</groupId>
<artifactId>cim-server-sdk-netty</artifactId>
<version>4.2.0</version>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -6,7 +6,7 @@
<groupId>com.farsunset</groupId>
<artifactId>cim-server-sdk-netty</artifactId>
<version>4.2.0</version>
<version>4.2.5</version>
<packaging>jar</packaging>
<name>${project.groupId}:${project.artifactId}</name>

View File

@ -1,7 +1,32 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.acceptor.config;
import com.farsunset.cim.handler.CIMRequestHandler;
/**
* TLV协议socket端口配置
*/
public class AppSocketConfig {
private static final int DEFAULT_PORT = 23456;
@ -11,12 +36,12 @@ public class AppSocketConfig {
*/
private Integer port;
/*
/**
* socket消息处理器
*/
private CIMRequestHandler outerRequestHandler;
/*
/**
是否启用TVL协议socket
*/
private boolean enable;

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.acceptor.config;
import com.farsunset.cim.constant.WebsocketProtocol;
@ -17,24 +38,32 @@ public class WebsocketConfig {
private static final WebsocketProtocol DEFAULT_PROTOCOL = WebsocketProtocol.PROTOBUF;
/**
* websocket端口
*/
private Integer port;
/**
* websocket端点地址
*/
private String path;
/*
/**
* 消息体协议JSON 或者 Protobuf
*/
private WebsocketProtocol protocol;
/*
/**
* socket消息处理器
*/
private CIMRequestHandler outerRequestHandler;
/*
/**
* websocket鉴权实现
*/
private Predicate<HandshakeEvent> handshakePredicate;
/*
/**
* 是否启用websocket
*/
private boolean enable;

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.coder.json;
import com.farsunset.cim.constant.ChannelAttr;

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.constant;
import io.netty.util.AttributeKey;

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.group;
@ -19,8 +40,6 @@ import java.util.stream.Collectors;
*/
public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {
private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();
private final transient ChannelFutureListener remover = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future){
@ -33,6 +52,10 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
return channel.attr(ChannelAttr.UID).get();
}
/**
* 将channel从内存容器中删除
* @param channel
*/
public void remove(Channel channel){
String uid = getKey(channel);
@ -41,7 +64,7 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
return;
}
Collection<Channel> collections = getOrDefault(uid,EMPTY_LIST);
Collection<Channel> collections = getOrDefault(uid,Collections.emptyList());
collections.remove(channel);
@ -51,6 +74,11 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
}
/**
* 将channel加入内存容器中删除
* @param channel
*/
public void add(Channel channel){
String uid = getKey(channel);
@ -71,29 +99,130 @@ public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>
}
}
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
*/
public void write(String key, Message message){
find(key).forEach(channel -> channel.writeAndFlush(message));
this.write(key, message, channel -> true);
}
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
* @param matcher channel筛选条件
*/
public void write(String key, Message message, Predicate<Channel> matcher){
find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));
}
/**
* 查找到key关联的channel并写入消息体
* @param key
* @param message
* @param excludedSet 排除的UID集合
*/
public void write(String key, Message message, Collection<String> excludedSet){
find(key).stream().filter(channel -> excludedSet == null || !excludedSet.contains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writeAndFlush(message));
Predicate<Channel> predicate = new ExcludedUidPredicate(excludedSet);
this.write(key,message,predicate);
}
/**
* 查找到消息接收者关联的channel并写入消息体
* @param message
*/
public void write(Message message){
this.write(message.getReceiver(),message);
}
/**
* 通过key查找channel集合
* @param key
* @return
*/
public Collection<Channel> find(String key){
return this.getOrDefault(key,EMPTY_LIST);
return this.getOrDefault(key,Collections.emptyList());
}
/**
* 通过key查找channel集合
* @param key
* @param matcher 过滤条件
* @return
*/
public Collection<Channel> find(String key,Predicate<Channel> matcher){
return this.find(key)
.stream()
.filter(matcher)
.collect(Collectors.toList());
}
/**
* 通过key查找channel集合
* @param key
* @param channel 连接终端类型过滤条件
* @return
*/
public Collection<Channel> find(String key,String... channel){
List<String> channels = Arrays.asList(channel);
return find(key).stream().filter(item -> channels.contains(item.attr(ChannelAttr.CHANNEL).get())).collect(Collectors.toList());
return this.find(key,channels);
}
/**
* 通过key查找channel集合
* @param key
* @param channelSet 连接终端类型过滤条件
* @return
*/
public Collection<Channel> find(String key,Collection<String> channelSet){
Predicate<Channel> predicate = new ChannelPredicate(channelSet);
return find(key,predicate);
}
/**
* 检查该channel是否存在内存管理当中
* @param channel
* @return
*/
public boolean isManaged(Channel channel){
String uid = getKey(channel);
if (uid == null || !channel.isActive()){
return false;
}
return getOrDefault(uid,Collections.emptyList()).contains(channel);
}
private static class ExcludedUidPredicate implements Predicate<Channel>{
private final Collection<String> excludedSet;
private ExcludedUidPredicate(Collection<String> excludedSet) {
this.excludedSet = excludedSet == null ? Collections.emptySet() : excludedSet;
}
@Override
public boolean test(Channel channel) {
return !excludedSet.contains(channel.attr(ChannelAttr.UID).get());
}
}
private static class ChannelPredicate implements Predicate<Channel>{
private final Collection<String> channelSet;
private ChannelPredicate(Collection<String> channelSet) {
this.channelSet = channelSet == null ? Collections.emptySet() : channelSet;
}
@Override
public boolean test(Channel ioChannel) {
return channelSet.contains(ioChannel.attr(ChannelAttr.CHANNEL).get());
}
}
}

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.group;

View File

@ -1,3 +1,24 @@
/*
* Copyright 2013-2022 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.handler;
import io.netty.channel.ChannelFutureListener;