重构服务端sdk结构

This commit is contained in:
远方夕阳 2019-04-19 14:59:48 +08:00
parent 351c6bc0ae
commit e9cc607776
51 changed files with 4658 additions and 951 deletions

View File

@ -16,7 +16,7 @@ import com.farsunset.cim.handler.SessionClosedHandler;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
@Configuration
public class CIMConfig implements CIMRequestHandler {

View File

@ -27,17 +27,18 @@ import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.service.CIMSessionService;
@Controller
@RequestMapping("/console/session")
public class SessionController {
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
@RequestMapping(value = "/list.action")
public String list(Model model) {
model.addAttribute("sessionList", sessionManager.queryAll());
model.addAttribute("sessionList", memorySessionService.list());
return "console/session/manage";
}
}

View File

@ -30,14 +30,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.push.CIMMessagePusher;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.sdk.server.model.ReplyBody;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.util.StringUtil;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
@ -51,57 +51,57 @@ public class BindHandler implements CIMRequestHandler {
protected final Logger logger = LoggerFactory.getLogger(BindHandler.class);
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
@Value("${server.host}")
private String host;
@Resource
private CIMMessagePusher defaultMessagePusher;
public void process(CIMSession newSession, SentBody message) {
public void process(CIMSession newSession, SentBody body) {
ReplyBody reply = new ReplyBody();
reply.setKey(message.getKey());
reply.setKey(body.getKey());
reply.setCode(CIMConstant.ReturnCode.CODE_200);
reply.setTimestamp(System.currentTimeMillis());
try {
String account = message.get("account");
newSession.setGid(StringUtil.getUUID());
String account = body.get("account");
newSession.setAccount(account);
newSession.setDeviceId(message.get("deviceId"));
newSession.setDeviceId(body.get("deviceId"));
newSession.setHost(host);
newSession.setChannel(message.get("channel"));
newSession.setDeviceModel(message.get("device"));
newSession.setClientVersion(message.get("version"));
newSession.setSystemVersion(message.get("osVersion"));
newSession.setChannel(body.get("channel"));
newSession.setDeviceModel(body.get("device"));
newSession.setClientVersion(body.get("version"));
newSession.setSystemVersion(body.get("osVersion"));
newSession.setBindTime(System.currentTimeMillis());
newSession.setPackageName(message.get("packageName"));
newSession.setHeartbeat(System.currentTimeMillis());
/**
/*
* 由于客户端断线服务端可能会无法获知的情况客户端重连时需要关闭旧的连接
*/
CIMSession oldSession = sessionManager.get(account);
CIMSession oldSession = memorySessionService.get(account);
// 如果是账号已经在另一台终端登录则让另一个终端下线
/*
* 如果是账号已经在另一台终端登录则让另一个终端下线
*/
if (oldSession != null && fromOtherDevice(newSession,oldSession) && oldSession.isConnected()) {
sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel());
}
/**
/*
* 有可能是同一个设备重复连接则关闭旧的链接这种情况一般是客户端断网联网又重新链接上来之前的旧链接没有来得及通过心跳机制关闭在这里手动关闭
* 条件1连接来自是同一个设备
* 条件2.2个连接都是同一台服务器
*/
if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) {
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.closeOnFlush();
closeQuietly(oldSession);
}
sessionManager.add(newSession);
memorySessionService.save(newSession);
} catch (Exception e) {
reply.setCode(CIMConstant.ReturnCode.CODE_500);
@ -124,15 +124,17 @@ public class BindHandler implements CIMRequestHandler {
msg.setSender("system");
msg.setContent(deviceModel);
msg.setId(System.currentTimeMillis());
closeQuietly(oldSession,msg);
defaultMessagePusher.push(msg);
closeQuietly(oldSession);
}
// 不同设备同一账号登录时关闭旧的连接
private void closeQuietly(CIMSession oldSession,Message msg) {
private void closeQuietly(CIMSession oldSession) {
if (oldSession.isConnected() && Objects.equals(host, oldSession.getHost())) {
oldSession.write(msg);
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.setAttribute(CIMConstant.KEY_QUIETLY_CLOSE,true);
oldSession.closeOnFlush();
}
}

View File

@ -22,6 +22,8 @@
package com.farsunset.cim.handler;
import java.util.Objects;
import javax.annotation.Resource;
import org.slf4j.Logger;
@ -31,8 +33,8 @@ import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
@ -46,17 +48,28 @@ public class SessionClosedHandler implements CIMRequestHandler {
protected final Logger logger = LoggerFactory.getLogger(SessionClosedHandler.class);
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
public void process(CIMSession ios, SentBody message) {
Object quietly = ios.getAttribute(CIMConstant.KEY_QUIETLY_CLOSE);
if (Objects.equals(quietly, true)) {
return;
}
Object account = ios.getAttribute(CIMConstant.SESSION_KEY);
Object account = ios.getAttribute(CIMConstant.KEY_ACCOUNT);
if (account == null) {
return;
}
ios.removeAttribute(CIMConstant.SESSION_KEY);
sessionManager.remove(account.toString());
CIMSession oldSession = memorySessionService.get(account.toString());
if (oldSession == null || oldSession.isApnsOpend()) {
return;
}
oldSession.setState(CIMSession.STATE_DISABLED);
oldSession.setNid(null);
memorySessionService.save(oldSession);
}
}

View File

@ -29,10 +29,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.session.DefaultSessionManager;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.ApnsService;
import com.farsunset.cim.service.impl.MessageDispatcherImpl;
import com.farsunset.cim.service.CIMSessionService;
/**
* 消息发送实现类
@ -45,10 +44,8 @@ public class DefaultMessagePusher implements CIMMessagePusher {
private String host;
@Resource
private DefaultSessionManager sessionManager;
private CIMSessionService memorySessionService;
@Resource
private MessageDispatcherImpl messageDispatcher;
@Resource
@ -61,31 +58,39 @@ public class DefaultMessagePusher implements CIMMessagePusher {
* @param msg
*/
public void push(Message message) {
CIMSession session = sessionManager.get(message.getReceiver());
CIMSession session = memorySessionService.get(message.getReceiver());
/**
* 服务器集群时可以在此 判断当前session是否连接于本台服务器如果是继续往下走如果不是将此消息发往当前session连接的服务器并
*/
if (session.isConnected() && !Objects.equals(host, session.getHost())) {
messageDispatcher.forward(message, session.getHost());
if(session == null) {
return;
}
/**
* 如果是在当前服务器则直接推送
/*
* IOS设备如果开启了apns则使用apns推送
*/
if (session.isConnected() && Objects.equals(host, session.getHost())) {
session.write(message);
if (session.isIOSChannel() && session.isApnsOpend()) {
apnsService.push(message, session.getDeviceId());
return;
}
/**
* ios设备流程特别处理如果长链接断开了并且ApnsAble为打开状态的话优走apns
/*
* 服务器集群时判断当前session是否连接于本台服务器
* 如果连接到了其他服务器则转发请求到目标服务器
*/
if (Objects.equals(session.getChannel(), CIMSession.CHANNEL_IOS) && Objects.equals(session.getApnsAble(), CIMSession.APNS_ON)) {
apnsService.push(message, session.getDeviceId());
if (session.isConnected() && !Objects.equals(host, session.getHost())) {
/**
* @TODO
* 在此调用目标服务器接口来发送
*/
return;
}
/*
* 如果是Android浏览器或者windows客户端则直接发送
*/
if (session.isConnected() && Objects.equals(host, session.getHost())) {
session.write(message);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -21,8 +21,23 @@
*/
package com.farsunset.cim.service;
import com.farsunset.cim.sdk.server.model.Message;
import java.util.List;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 集群 session管理实现示例 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者redis中 第三方存储空间中便于所有服务器都可以访问
*/
public interface CIMSessionService {
void save(CIMSession session);
CIMSession get(String account);
List<CIMSession> list();
void remove(String account);
public interface MessageDispatcher {
void forward(final Message msg, final String ip);
}

View File

@ -19,47 +19,54 @@
* *
***************************************************************************************
*/
package com.farsunset.cim.session;
package com.farsunset.cim.service.impl;
import java.util.List;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.session.SessionManager;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
* 集群 session管理实现示例 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
* 集群情况下数据库或者redis存储实现
* 自行实现存储管理
*
*/
public class ClusterSessionManager implements SessionManager {
@Service("clusterSessionService")
public class ClusterSessionServiceImpl implements CIMSessionService {
public CIMSession get(String account) {
// 这里查询数据库
/*
* CIMSession session = database.getSession(account);
* session.setIoSession(ContextHolder.getBean(CIMNioSocketAcceptor.class).
* getManagedSessions().get(session.getNid())); return session;
*/
return null;
@Resource
private CIMNioSocketAcceptor nioSocketAcceptor;
@Override
public void save(CIMSession session) {
}
@Override
public List<CIMSession> queryAll() {
public CIMSession get(String account) {
/*
* CIMSession session = database.getSession(account);
* session.setIoSession(nioSocketAcceptor.getManagedSessions().get(session.getNid()));
* return session;
*/
return null;
}
@Override
public void remove(String account) {
}
@Override
public void update(CIMSession session) {
public List<CIMSession> list() {
return null;
}
@Override
public void add(CIMSession arg0) {
}
}

View File

@ -21,11 +21,45 @@
*/
package com.farsunset.cim.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Service;
import com.farsunset.cim.sdk.server.session.DefaultSessionManager;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
@Service
public class CIMSessionServiceImpl extends DefaultSessionManager {
/**
* 单机内存存储实现
*
*/
@Service("memorySessionService")
public class MemorySessionServiceImpl implements CIMSessionService {
private ConcurrentHashMap<String, CIMSession> sessionMap = new ConcurrentHashMap<String, CIMSession>();
@Override
public void save(CIMSession session) {
sessionMap.put(session.getAccount(), session);
}
@Override
public CIMSession get(String account) {
return sessionMap.get(account);
}
@Override
public void remove(String account) {
sessionMap.remove(account);
}
@Override
public List<CIMSession> list() {
List<CIMSession> onlineList = new ArrayList<>();
onlineList.addAll(sessionMap.values());
return onlineList;
}
}

View File

@ -1,83 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.service.impl;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.service.MessageDispatcher;
import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@Component
public class MessageDispatcherImpl implements MessageDispatcher{
private static final Logger logger = Logger.getLogger(MessageDispatcherImpl.class.getName());
@Value("${sys.message.dispatch.url}")
private String dispatchUrl;
@Override
public void forward(final Message msg, final String ip) {
String apiUrl = String.format(dispatchUrl, ip);
try {
String response = httpPost(apiUrl, msg);
logger.info("消息转发目标服务器{" + ip + "},结果:" + response);
} catch (Exception e) {
e.printStackTrace();
logger.severe("消息转发目标服务器" + apiUrl + "message:" + e.getMessage());
}
}
@SuppressWarnings("deprecation")
private String httpPost(String url, Message msg) throws Exception {
OkHttpClient httpclient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).build();
FormBody.Builder build = new FormBody.Builder();
build.add("id", String.valueOf(msg.getId()));
build.add("action", msg.getAction());
build.add("title", msg.getTitle());
build.add("content", msg.getContent());
build.add("sender", msg.getSender());
build.add("receiver", msg.getReceiver());
build.add("format", msg.getFormat());
build.add("extra", msg.getExtra());
build.add("timestamp", String.valueOf(msg.getTimestamp()));
Request request = new Request.Builder().url(url).post(build.build()).build();
Response response = httpclient.newCall(request).execute();
String data = response.body().string();
IOUtils.closeQuietly(response);
return data;
}
}

View File

@ -3,11 +3,11 @@
<classpathentry kind="src" path="src"/>
<classpathentry kind="lib" path="libs/mina-core-2.0.16.jar"/>
<classpathentry kind="lib" path="libs/slf4j-api-1.7.25.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<classpathentry kind="lib" path="libs/protobuf-java-3.7.0.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="owner.project.facets" value="java"/>
</attributes>
</classpathentry>
<classpathentry kind="lib" path="libs/protobuf-java-3.7.0.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>

View File

@ -1,7 +1,7 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.compiler.source=1.8

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<faceted-project>
<fixed facet="java"/>
<installed facet="java" version="1.6"/>
<installed facet="java" version="1.8"/>
</faceted-project>

View File

@ -40,9 +40,14 @@ public interface CIMConstant {
}
String SESSION_KEY = "account";
String HEARTBEAT_KEY = "heartbeat";
String KEY_ACCOUNT = "account";
String KEY_QUIETLY_CLOSE = "quietlyClose";
String CLIENT_WEBSOCKET_HANDSHAKE = "client_websocket_handshake";
String CLIENT_CONNECT_CLOSED = "client_closed";
public static interface ProtobufType {
byte S_H_RQ = 1;
byte C_H_RS = 0;

View File

@ -96,8 +96,8 @@ public class CIMLoggingFilter extends IoFilterAdapter {
builder.append(" R:").append(session.getRemoteAddress().toString());
}
if (session.containsAttribute(CIMConstant.SESSION_KEY)) {
builder.append(" account:").append(session.getAttribute(CIMConstant.SESSION_KEY));
if (session.containsAttribute(CIMConstant.KEY_ACCOUNT)) {
builder.append(" account:").append(session.getAttribute(CIMConstant.KEY_ACCOUNT));
}
builder.append("]");
return builder.toString();

View File

@ -31,7 +31,7 @@ import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.HandshakerResponse;
import com.farsunset.cim.sdk.server.model.feature.EncodeFormatable;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 服务端发送消息前编码

View File

@ -33,7 +33,7 @@ import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.HeartbeatResponse;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.proto.SentBodyProto;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 原生app发送的消息解码器

View File

@ -30,11 +30,10 @@ import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.model.HeartbeatResponse;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.proto.SentBodyProto;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@ -203,7 +202,7 @@ public class WebMessageDecoder extends MessageDecoderAdapter {
iosession.setAttribute(CIMSession.PROTOCOL,CIMSession.WEBSOCKET);
SentBody body = new SentBody();
body.setKey(CIMNioSocketAcceptor.WEBSOCKET_HANDLER_KEY);
body.setKey(CIMConstant.CLIENT_WEBSOCKET_HANDSHAKE);
body.setTimestamp(System.currentTimeMillis());
body.put("key", getSecWebSocketKey(message));
out.write(body);

View File

@ -1,5 +1,5 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
* Copyright 2013-2023 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.mina.core.service.IoAcceptor;
@ -38,24 +39,16 @@ import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.filter.CIMLoggingFilter;
import com.farsunset.cim.sdk.server.filter.ServerMessageCodecFactory;
import com.farsunset.cim.sdk.server.model.HeartbeatRequest;
import com.farsunset.cim.sdk.server.model.HeartbeatResponse;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveMessageFactory {
/**
* websocket特有的握手处理handler
*/
public final static String WEBSOCKET_HANDLER_KEY = "client_websocket_handshake";
/**
* 连接关闭处理handler
*/
public final static String CIMSESSION_CLOSED_HANDLER_KEY = "client_closed";
private HashMap<String, CIMRequestHandler> innerHandlerMap = new HashMap<String, CIMRequestHandler>();
private CIMRequestHandler outerRequestHandler;
private IoAcceptor acceptor;
@ -69,7 +62,7 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
/**
* 预制websocket握手请求的处理
*/
innerHandlerMap.put(WEBSOCKET_HANDLER_KEY, new WebsocketHandler());
innerHandlerMap.put(CIMConstant.CLIENT_WEBSOCKET_HANDSHAKE, new WebsocketHandler());
acceptor = new NioSocketAcceptor();
acceptor.getSessionConfig().setReadBufferSize(READ_BUFFER_SIZE);
@ -81,11 +74,16 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
keepAliveFilter.setRequestTimeout(TIME_OUT);
keepAliveFilter.setForwardEvent(true);
acceptor.getFilterChain().addLast("executor", new ExecutorFilter(Executors.newCachedThreadPool()));
ExecutorService executor = Executors.newCachedThreadPool(runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName("mina-thread-" + thread.getId());
return thread;
});
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ServerMessageCodecFactory()));
acceptor.getFilterChain().addLast("logger", new CIMLoggingFilter());
acceptor.getFilterChain().addLast("heartbeat", keepAliveFilter);
acceptor.getFilterChain().addLast("executor", new ExecutorFilter(executor));
acceptor.setHandler(this);
acceptor.bind(new InetSocketAddress(port));
@ -93,6 +91,7 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
public void unbind() {
acceptor.unbind();
acceptor.dispose();
}
/**
@ -126,13 +125,12 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
@Override
public void sessionClosed(IoSession ios) {
CIMSession session = new CIMSession(ios);
SentBody body = new SentBody();
body.setKey(CIMSESSION_CLOSED_HANDLER_KEY);
body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED);
outerRequestHandler.process(session, body);
}
@Override
public Object getRequest(IoSession session) {

View File

@ -26,7 +26,7 @@ package com.farsunset.cim.sdk.server.handler;
* @author 3979434@qq.com
*/
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
public interface CIMRequestHandler {

View File

@ -25,7 +25,7 @@ import java.security.MessageDigest;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.HandshakerResponse;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 处理websocket握手请求返回响应的报文给浏览器

View File

@ -1,5 +1,5 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
* Copyright 2013-2023 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,35 +19,33 @@
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
package com.farsunset.cim.sdk.server.model;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Objects;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.proto.SessionProto;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* IoSession包装类,集群时 将此对象存入表中
*/
public class CIMSession implements Serializable {
/**
*
*/
private transient static final long serialVersionUID = 1L;
public transient static String PROTOCOL = "protocol";
public transient static String WEBSOCKET = "websocket";
public transient static String NATIVEAPP = "nativeapp";
public transient static final int STATUS_ENABLED = 0;
public transient static final int STATUS_DISABLED = 1;
public transient static String HOST = "HOST";
public transient static final int STATE_ENABLED = 0;
public transient static final int STATE_DISABLED = 1;
public transient static final int APNS_ON = 1;
public transient static final int APNS_OFF = 0;
@ -59,22 +57,19 @@ public class CIMSession implements Serializable {
private transient IoSession session;
private String gid;// session全局ID
private String account;// session绑定的账号,主键一个账号同一时间之内在一个设备在线
private Long nid;// session在本台服务器上的ID
private String deviceId;// 客户端ID (设备号码+应用包名),ios为devicetoken
private String host;// session绑定的服务器IP
private String account;// session绑定的账号
private String channel;// 终端设备类型
private String deviceModel;// 终端设备型号
private String clientVersion;// 终端应用版本
private String systemVersion;// 终端系统版本
private String packageName;// 终端应用包名
private Long bindTime;// 登录时间
private Long heartbeat;// 心跳时间
private Double longitude;// 经度
private Double latitude;// 维度
private String location;// 位置
private int apnsAble;// apns推送状态
private int apns;// apns推送状态
private int state;// 状态
public CIMSession(IoSession session) {
@ -93,7 +88,7 @@ public class CIMSession implements Serializable {
public void setAccount(String account) {
this.account = account;
setAttribute(CIMConstant.SESSION_KEY, account);
setAttribute(CIMConstant.KEY_ACCOUNT, account);
}
public Double getLongitude() {
@ -120,17 +115,6 @@ public class CIMSession implements Serializable {
this.location = location;
}
public String getGid() {
return gid;
}
public void setGid(String gid) {
this.gid = gid;
setAttribute("gid", gid);
}
public Long getNid() {
return nid;
}
@ -161,7 +145,6 @@ public class CIMSession implements Serializable {
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getHost() {
@ -192,25 +175,16 @@ public class CIMSession implements Serializable {
this.systemVersion = systemVersion;
}
public Long getHeartbeat() {
return heartbeat;
}
public void setHeartbeat(Long heartbeat) {
this.heartbeat = heartbeat;
setAttribute(CIMConstant.HEARTBEAT_KEY, heartbeat);
}
public void setHost(String host) {
this.host = host;
}
public int getApnsAble() {
return apnsAble;
public int getApns() {
return apns;
}
public void setApnsAble(int apnsAble) {
this.apnsAble = apnsAble;
public void setApns(int apns) {
this.apns = apns;
}
public int getState() {
@ -260,27 +234,7 @@ public class CIMSession implements Serializable {
}
public boolean isConnected() {
if (session != null) {
return session.isConnected();
}
if (!isLocalhost()) {
return state == STATUS_ENABLED;
}
return false;
}
public boolean isLocalhost() {
try {
String ip = InetAddress.getLocalHost().getHostAddress();
return ip.equals(host);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return false;
return (session != null && session.isConnected()) || state == STATE_ENABLED;
}
public void closeNow() {
@ -293,70 +247,111 @@ public class CIMSession implements Serializable {
session.closeOnFlush();
}
public void setPackageName(String packageName) {
this.packageName = packageName;
public boolean isIOSChannel() {
return Objects.equals(channel, CHANNEL_IOS);
}
public String getPackageName() {
return packageName;
public boolean isAndroidChannel() {
return Objects.equals(channel, CHANNEL_ANDROID);
}
public boolean isWindowsChannel() {
return Objects.equals(channel, CHANNEL_WINDOWS);
}
public boolean isApnsOpend() {
return Objects.equals(apns, APNS_ON);
}
@Override
public int hashCode() {
return (deviceId + nid + host).hashCode();
return getClass().hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof CIMSession) {
return hashCode() == o.hashCode();
CIMSession target = (CIMSession) o;
return Objects.equals(target.deviceId, deviceId) && Objects.equals(target.nid, nid)
&& Objects.equals(target.host, host);
}
return false;
}
public boolean fromOtherDevice(Object o) {
if (o instanceof CIMSession) {
CIMSession t = (CIMSession) o;
if (t.deviceId != null && deviceId != null) {
return !t.deviceId.equals(deviceId);
}
}
return false;
}
public boolean fromCurrentDevice(Object o) {
return !fromOtherDevice(o);
}
public void setIoSession(IoSession session) {
this.session = session;
}
public IoSession getIoSession() {
public IoSession getSession() {
return session;
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("{");
buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(",");
buffer.append("\"").append("nid").append("\":").append(nid).append(",");
buffer.append("\"").append("deviceId").append("\":").append("\"").append(deviceId).append("\"").append(",");
buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(",");
buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(",");
buffer.append("\"").append("channel").append("\":").append("\"").append(channel).append("\"").append(",");
buffer.append("\"").append("deviceModel").append("\":").append("\"").append(deviceModel).append("\"").append(",");
buffer.append("\"").append("status").append("\":").append(state).append(",");
buffer.append("\"").append("apnsAble").append("\":").append(apnsAble).append(",");
buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(",");
buffer.append("\"").append("heartbeat").append("\":").append(heartbeat);
buffer.append("}");
return buffer.toString();
public void setSession(IoSession session) {
this.session = session;
}
public byte[] getProtobufBody() {
SessionProto.Model.Builder builder = SessionProto.Model.newBuilder();
if (account != null) {
builder.setAccount(account);
}
if (nid != null) {
builder.setNid(nid);
}
if (deviceId != null) {
builder.setDeviceId(deviceId);
}
if (host != null) {
builder.setHost(host);
}
if (channel != null) {
builder.setChannel(channel);
}
if (deviceModel != null) {
builder.setDeviceModel(deviceModel);
}
if (clientVersion != null) {
builder.setClientVersion(clientVersion);
}
if (systemVersion != null) {
builder.setSystemVersion(systemVersion);
}
if (bindTime != null) {
builder.setBindTime(bindTime);
}
if (longitude != null) {
builder.setLongitude(longitude);
}
if (latitude != null) {
builder.setLatitude(latitude);
}
if (location != null) {
builder.setLocation(location);
}
builder.setState(state);
builder.setApns(apns);
return builder.build().toByteArray();
}
public static CIMSession decode(byte[] protobufBody) throws InvalidProtocolBufferException {
if(protobufBody == null) {
return null;
}
SessionProto.Model proto = SessionProto.Model.parseFrom(protobufBody);
CIMSession session = new CIMSession();
session.setApns(proto.getApns());
session.setBindTime(proto.getBindTime());
session.setChannel(proto.getChannel());
session.setClientVersion(proto.getClientVersion());
session.setDeviceId(proto.getDeviceId());
session.setDeviceModel(proto.getDeviceModel());
session.setHost(proto.getHost());
session.setLatitude(proto.getLatitude());
session.setLongitude(proto.getLongitude());
session.setLocation(proto.getLocation());
session.setNid(proto.getNid());
session.setSystemVersion(proto.getSystemVersion());
session.setState(proto.getState());
session.setAccount(proto.getAccount());
return session;
}
}

View File

@ -0,0 +1,19 @@
syntax = "proto3";
package com.farsunset.cim.sdk.server.model.proto;
option java_outer_classname="SessionProto";
message Model {
string account = 1;
int64 nid = 2;
string deviceId = 3;
string host = 4;
string channel = 5;
string deviceModel = 6;
string clientVersion = 7;
string systemVersion = 8;
int64 bindTime = 9;
double longitude = 10;
double latitude = 11;
string location = 12;
int32 apns = 13;
int32 state = 14;
}

View File

@ -1,84 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
/**
* 自带默认 session管理实现 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
*/
public class DefaultSessionManager implements SessionManager {
private static HashMap<String, CIMSession> sessions = new HashMap<String, CIMSession>();
private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
/**
*
*/
public void add(CIMSession session) {
if (session != null) {
session.setAttribute(CIMConstant.SESSION_KEY, session.getAccount());
sessions.put(session.getAccount(), session);
connectionsCounter.incrementAndGet();
}
}
public CIMSession get(String account) {
return sessions.get(account);
}
public List<CIMSession> queryAll() {
List<CIMSession> list = new ArrayList<CIMSession>();
list.addAll(sessions.values());
return list;
}
public void remove(CIMSession session) {
sessions.remove(session.getAttribute(CIMConstant.SESSION_KEY));
}
public void remove(String account) {
sessions.remove(account);
}
public boolean containsCIMSession(String account) {
return sessions.containsKey(account);
}
@Override
public void update(CIMSession session) {
sessions.put(session.getAccount(), session);
}
}

View File

@ -1,64 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
import java.util.List;
/**
* 客户端的 session管理接口 可自行实现此接口管理session
*/
public interface SessionManager {
/**
* 添加新的session
*/
public void add(CIMSession session);
/**
* 更新session
*/
public void update(CIMSession session);
/**
*
* @param account
* 客户端session的 key 一般可用 用户账号来对应session
* @return
*/
CIMSession get(String account);
/**
* 获取所有session
*
* @return
*/
public List<CIMSession> queryAll();
/**
* 删除session
*
* @param session
*/
public void remove(String account);
}

View File

@ -16,7 +16,7 @@ import com.farsunset.cim.handler.SessionClosedHandler;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
@Configuration
public class CIMConfig implements CIMRequestHandler {
@ -28,7 +28,7 @@ public class CIMConfig implements CIMRequestHandler {
@Resource
private SessionClosedHandler closedHandler;
@Resource
private ApplicationContext applicationContext;
@ -55,7 +55,6 @@ public class CIMConfig implements CIMRequestHandler {
return nioSocketAcceptor;
}
@Override
public void process(CIMSession session, SentBody body) {
@ -75,6 +74,5 @@ public class CIMConfig implements CIMRequestHandler {
return applicationContext.getBean(handlerClass);
}
}

View File

@ -27,17 +27,18 @@ import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.service.CIMSessionService;
@Controller
@RequestMapping("/console/session")
public class SessionController {
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
@RequestMapping(value = "/list.action")
public String list(Model model) {
model.addAttribute("sessionList", sessionManager.queryAll());
model.addAttribute("sessionList", memorySessionService.list());
return "console/session/manage";
}
}

View File

@ -32,7 +32,6 @@ import com.farsunset.cim.push.DefaultMessagePusher;
import com.farsunset.cim.push.SystemMessagePusher;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.util.Constants;
import com.farsunset.cim.util.StringUtil;
@RestController

View File

@ -30,14 +30,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.push.CIMMessagePusher;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.sdk.server.model.ReplyBody;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.util.StringUtil;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
@ -51,59 +51,57 @@ public class BindHandler implements CIMRequestHandler {
protected final Logger logger = LoggerFactory.getLogger(BindHandler.class);
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
@Value("${server.host}")
private String host;
@Resource
private CIMMessagePusher defaultMessagePusher;
public void process(CIMSession newSession, SentBody message) {
public void process(CIMSession newSession, SentBody body) {
ReplyBody reply = new ReplyBody();
reply.setKey(message.getKey());
reply.setKey(body.getKey());
reply.setCode(CIMConstant.ReturnCode.CODE_200);
reply.setTimestamp(System.currentTimeMillis());
try {
String account = message.get("account");
newSession.setGid(StringUtil.getUUID());
String account = body.get("account");
newSession.setAccount(account);
newSession.setDeviceId(message.get("deviceId"));
newSession.setDeviceId(body.get("deviceId"));
newSession.setHost(host);
newSession.setChannel(message.get("channel"));
newSession.setDeviceModel(message.get("device"));
newSession.setClientVersion(message.get("version"));
newSession.setSystemVersion(message.get("osVersion"));
newSession.setChannel(body.get("channel"));
newSession.setDeviceModel(body.get("device"));
newSession.setClientVersion(body.get("version"));
newSession.setSystemVersion(body.get("osVersion"));
newSession.setBindTime(System.currentTimeMillis());
newSession.setPackageName(message.get("packageName"));
/**
/*
* 由于客户端断线服务端可能会无法获知的情况客户端重连时需要关闭旧的连接
*/
CIMSession oldSession = sessionManager.get(account);
CIMSession oldSession = memorySessionService.get(account);
// 如果是账号已经在另一台终端登录则让另一个终端下线
/*
* 如果是账号已经在另一台终端登录则让另一个终端下线
*/
if (oldSession != null && fromOtherDevice(newSession,oldSession) && oldSession.isConnected()) {
sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel());
}
/**
/*
* 有可能是同一个设备重复连接则关闭旧的链接这种情况一般是客户端断网联网又重新链接上来之前的旧链接没有来得及通过心跳机制关闭在这里手动关闭
* 条件1连接来自是同一个设备
* 条件2.2个连接都是同一台服务器
*/
if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) {
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.closeNow();
closeQuietly(oldSession);
}
// 第一次设置心跳时间为登录时间
newSession.setBindTime(System.currentTimeMillis());
newSession.setHeartbeat(System.currentTimeMillis());
sessionManager.add(newSession);
memorySessionService.save(newSession);
} catch (Exception e) {
reply.setCode(CIMConstant.ReturnCode.CODE_500);
@ -126,16 +124,18 @@ public class BindHandler implements CIMRequestHandler {
msg.setSender("system");
msg.setContent(deviceModel);
msg.setId(System.currentTimeMillis());
closeQuietly(oldSession,msg);
defaultMessagePusher.push(msg);
closeQuietly(oldSession);
}
// 不同设备同一账号登录时关闭旧的连接
private void closeQuietly(CIMSession oldSession,Message msg) {
private void closeQuietly(CIMSession oldSession) {
if (oldSession.isConnected() && Objects.equals(host, oldSession.getHost())) {
oldSession.write(msg);
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.closeNow();
oldSession.setAttribute(CIMConstant.KEY_QUIETLY_CLOSE,true);
oldSession.closeOnFlush();
}
}

View File

@ -22,6 +22,8 @@
package com.farsunset.cim.handler;
import java.util.Objects;
import javax.annotation.Resource;
import org.slf4j.Logger;
@ -31,8 +33,8 @@ import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.handler.CIMRequestHandler;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.service.impl.CIMSessionServiceImpl;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
@ -46,17 +48,28 @@ public class SessionClosedHandler implements CIMRequestHandler {
protected final Logger logger = LoggerFactory.getLogger(SessionClosedHandler.class);
@Resource
private CIMSessionServiceImpl sessionManager;
private CIMSessionService memorySessionService;
public void process(CIMSession ios, SentBody message) {
Object quietly = ios.getAttribute(CIMConstant.KEY_QUIETLY_CLOSE);
if (Objects.equals(quietly, true)) {
return;
}
Object account = ios.getAttribute(CIMConstant.SESSION_KEY);
Object account = ios.getAttribute(CIMConstant.KEY_ACCOUNT);
if (account == null) {
return;
}
ios.removeAttribute(CIMConstant.SESSION_KEY);
sessionManager.remove(account.toString());
CIMSession oldSession = memorySessionService.get(account.toString());
if (oldSession == null || oldSession.isApnsOpend()) {
return;
}
oldSession.setState(CIMSession.STATE_DISABLED);
oldSession.setNid(null);
memorySessionService.save(oldSession);
}
}

View File

@ -29,10 +29,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.session.DefaultSessionManager;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.ApnsService;
import com.farsunset.cim.service.impl.MessageDispatcherImpl;
import com.farsunset.cim.service.CIMSessionService;
/**
* 消息发送实现类
@ -45,10 +44,8 @@ public class DefaultMessagePusher implements CIMMessagePusher {
private String host;
@Resource
private DefaultSessionManager sessionManager;
private CIMSessionService memorySessionService;
@Resource
private MessageDispatcherImpl messageDispatcher;
@Resource
@ -61,31 +58,39 @@ public class DefaultMessagePusher implements CIMMessagePusher {
* @param msg
*/
public void push(Message message) {
CIMSession session = sessionManager.get(message.getReceiver());
CIMSession session = memorySessionService.get(message.getReceiver());
/**
* 服务器集群时可以在此 判断当前session是否连接于本台服务器如果是继续往下走如果不是将此消息发往当前session连接的服务器并
*/
if (session.isConnected() && !Objects.equals(host, session.getHost())) {
messageDispatcher.forward(message, session.getHost());
if(session == null) {
return;
}
/**
* 如果是在当前服务器则直接推送
/*
* IOS设备如果开启了apns则使用apns推送
*/
if (session.isConnected() && Objects.equals(host, session.getHost())) {
session.write(message);
if (session.isIOSChannel() && session.isApnsOpend()) {
apnsService.push(message, session.getDeviceId());
return;
}
/**
* ios设备流程特别处理如果长链接断开了并且ApnsAble为打开状态的话优走apns
/*
* 服务器集群时判断当前session是否连接于本台服务器
* 如果连接到了其他服务器则转发请求到目标服务器
*/
if (Objects.equals(session.getChannel(), CIMSession.CHANNEL_IOS) && Objects.equals(session.getApnsAble(), CIMSession.APNS_ON)) {
apnsService.push(message, session.getDeviceId());
if (session.isConnected() && !Objects.equals(host, session.getHost())) {
/**
* @TODO
* 在此调用目标服务器接口来发送
*/
return;
}
/*
* 如果是Android浏览器或者windows客户端则直接发送
*/
if (session.isConnected() && Objects.equals(host, session.getHost())) {
session.write(message);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -21,8 +21,23 @@
*/
package com.farsunset.cim.service;
import com.farsunset.cim.sdk.server.model.Message;
import java.util.List;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 集群 session管理实现示例 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者redis中 第三方存储空间中便于所有服务器都可以访问
*/
public interface CIMSessionService {
void save(CIMSession session);
CIMSession get(String account);
List<CIMSession> list();
void remove(String account);
public interface MessageDispatcher {
void forward(final Message msg, final String ip);
}

View File

@ -19,47 +19,54 @@
* *
***************************************************************************************
*/
package com.farsunset.cim.session;
package com.farsunset.cim.service.impl;
import java.util.List;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.session.SessionManager;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
* 集群 session管理实现示例 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
* 集群情况下数据库或者redis存储实现
* 自行实现存储管理
*
*/
public class ClusterSessionManager implements SessionManager {
@Service("clusterSessionService")
public class ClusterSessionServiceImpl implements CIMSessionService {
public CIMSession get(String account) {
// 这里查询数据库
/*
* CIMSession session = database.getSession(account);
* session.setIoSession(ContextHolder.getBean(CIMNioSocketAcceptor.class).
* getManagedSessions().get(session.getNid())); return session;
*/
return null;
@Resource
private CIMNioSocketAcceptor nioSocketAcceptor;
@Override
public void save(CIMSession session) {
}
@Override
public List<CIMSession> queryAll() {
public CIMSession get(String account) {
/*
* CIMSession session = database.getSession(account);
* session.setIoSession(nioSocketAcceptor.getManagedChannel().get(session.getNid()));
* return session;
*/
return null;
}
@Override
public void remove(String account) {
}
@Override
public void update(CIMSession session) {
public List<CIMSession> list() {
return null;
}
@Override
public void add(CIMSession arg0) {
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Service;
import com.farsunset.cim.sdk.server.model.CIMSession;
import com.farsunset.cim.service.CIMSessionService;
/**
* 单机内存存储实现
*
*/
@Service("memorySessionService")
public class MemorySessionServiceImpl implements CIMSessionService {
private ConcurrentHashMap<String, CIMSession> sessionMap = new ConcurrentHashMap<String, CIMSession>();
@Override
public void save(CIMSession session) {
sessionMap.put(session.getAccount(), session);
}
@Override
public CIMSession get(String account) {
return sessionMap.get(account);
}
@Override
public void remove(String account) {
sessionMap.remove(account);
}
@Override
public List<CIMSession> list() {
List<CIMSession> onlineList = new ArrayList<>();
onlineList.addAll(sessionMap.values());
return onlineList;
}
}

View File

@ -1,83 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.service.impl;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.farsunset.cim.sdk.server.model.Message;
import com.farsunset.cim.service.MessageDispatcher;
import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@Component
public class MessageDispatcherImpl implements MessageDispatcher{
private static final Logger logger = Logger.getLogger(MessageDispatcherImpl.class.getName());
@Value("${sys.message.dispatch.url}")
private String dispatchUrl;
@Override
public void forward(final Message msg, final String ip) {
String apiUrl = String.format(dispatchUrl, ip);
try {
String response = httpPost(apiUrl, msg);
logger.info("消息转发目标服务器{" + ip + "},结果:" + response);
} catch (Exception e) {
e.printStackTrace();
logger.severe("消息转发目标服务器" + apiUrl + "message:" + e.getMessage());
}
}
@SuppressWarnings("deprecation")
private String httpPost(String url, Message msg) throws Exception {
OkHttpClient httpclient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).build();
FormBody.Builder build = new FormBody.Builder();
build.add("id", String.valueOf(msg.getId()));
build.add("action", msg.getAction());
build.add("title", msg.getTitle());
build.add("content", msg.getContent());
build.add("sender", msg.getSender());
build.add("receiver", msg.getReceiver());
build.add("format", msg.getFormat());
build.add("extra", msg.getExtra());
build.add("timestamp", String.valueOf(msg.getTimestamp()));
Request request = new Request.Builder().url(url).post(build.build()).build();
Response response = httpclient.newCall(request).execute();
String data = response.body().string();
IOUtils.closeQuietly(response);
return data;
}
}

View File

@ -25,6 +25,8 @@ package com.farsunset.cim.sdk.server.constant;
* 常量
*/
public interface CIMConstant {
// 消息头长度为3个字节第一个字节为消息类型第二第三字节 转换int后为消息长度
int DATA_HEADER_LENGTH = 3;
public static interface ReturnCode {
@ -38,30 +40,30 @@ public interface CIMConstant {
}
String SESSION_KEY = "account";
String KEY_ACCOUNT = "account";
String KEY_QUIETLY_CLOSE = "quietlyClose";
String HEARTBEAT_KEY = "heartbeat";
String CLIENT_WEBSOCKET_HANDSHAKE = "client_websocket_handshake";
String CLIENT_HEARTBEAT = "client_heartbeat";
// 消息头长度为3个字节第一个字节为消息类型第二第三字节 转换int后为消息长度
int DATA_HEADER_LENGTH = 3;
// WEBSOCKET消息头长度为2个字节
int WS_DATA_HEADER_LENGTH = 2;
String CLIENT_CONNECT_CLOSED = "client_closed";
public static interface ProtobufType {
byte C_H_RS = 0;
byte S_H_RQ = 1;
byte C_H_RS = 0;
byte MESSAGE = 2;
byte SENTBODY = 3;
byte REPLYBODY = 4;
}
public static interface MessageAction {
// 被其他设备登录挤下线消息
String ACTION_999 = "999";
// 被系统禁用消息
String ACTION_444 = "444";
}
}

View File

@ -26,11 +26,11 @@ import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.filter.decoder.AppMessageDecoder;
import com.farsunset.cim.sdk.server.filter.decoder.WebMessageDecoder;
import com.farsunset.cim.sdk.server.handler.CIMNioSocketAcceptor;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -95,7 +95,7 @@ public class ServerMessageDecoder extends ByteToMessageDecoder {
arg0.channel().attr(AttributeKey.valueOf(CIMSession.PROTOCOL)).set(CIMSession.WEBSOCKET);
SentBody body = new SentBody();
body.setKey(CIMNioSocketAcceptor.WEBSOCKET_HANDLER_KEY);
body.setKey(CIMConstant.CLIENT_WEBSOCKET_HANDSHAKE);
body.setTimestamp(System.currentTimeMillis());
body.put("key", secKey);
queue.add(body);

View File

@ -26,7 +26,7 @@ import java.util.Objects;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.HandshakerResponse;
import com.farsunset.cim.sdk.server.model.feature.EncodeFormatable;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

View File

@ -26,7 +26,7 @@ import java.util.List;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.proto.SentBodyProto;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

View File

@ -21,11 +21,9 @@
*/
package com.farsunset.cim.sdk.server.filter.decoder;
import java.nio.charset.Charset;
import java.util.List;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.HeartbeatResponse;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.proto.SentBodyProto;
import com.google.protobuf.InvalidProtocolBufferException;
@ -124,8 +122,10 @@ public class WebMessageDecoder extends ByteToMessageDecoder {
* 只处理心跳响应以及sentbody消息
*/
if (type == CIMConstant.ProtobufType.C_H_RS) {
HeartbeatResponse response = HeartbeatResponse.getInstance();
queue.add(response);
SentBody body = new SentBody();
body.setKey(CIMConstant.CLIENT_HEARTBEAT);
body.setTimestamp(System.currentTimeMillis());
queue.add(body);
}
if (type == CIMConstant.ProtobufType.SENTBODY) {

View File

@ -30,7 +30,7 @@ import com.farsunset.cim.sdk.server.filter.ServerMessageDecoder;
import com.farsunset.cim.sdk.server.filter.ServerMessageEncoder;
import com.farsunset.cim.sdk.server.model.HeartbeatRequest;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@ -54,14 +54,6 @@ import io.netty.util.AttributeKey;
@Sharable
public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody> {
/**
* websocket特有的握手处理handler
*/
public final static String WEBSOCKET_HANDLER_KEY = "client_websocket_handshake";
/**
* 连接关闭处理handler
*/
public final static String CIMSESSION_CLOSED_HANDLER_KEY = "client_closed";
private HashMap<String, CIMRequestHandler> innerHandlerMap = new HashMap<String, CIMRequestHandler>();
private CIMRequestHandler outerRequestHandler;
@ -81,8 +73,9 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
/**
* 预制websocket握手请求的处理
*/
innerHandlerMap.put(WEBSOCKET_HANDLER_KEY, new WebsocketHandler());
innerHandlerMap.put(CIMConstant.CLIENT_WEBSOCKET_HANDSHAKE, new WebsocketHandler());
innerHandlerMap.put(CIMConstant.CLIENT_HEARTBEAT, new HeartbeatHandler());
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
@ -123,7 +116,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, SentBody body) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, SentBody body) {
CIMSession session = new CIMSession(ctx.channel());
@ -148,7 +141,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
CIMSession session = new CIMSession(ctx.channel());
SentBody body = new SentBody();
body.setKey(CIMSESSION_CLOSED_HANDLER_KEY);
body.setKey(CIMConstant.CLIENT_CONNECT_CLOSED);
outerRequestHandler.process(session, body);
}
@ -188,4 +181,5 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
}
return channelGroup.get(id);
}
}

View File

@ -26,7 +26,7 @@ package com.farsunset.cim.sdk.server.handler;
* @author 3979434@qq.com
*/
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
public interface CIMRequestHandler {

View File

@ -19,13 +19,15 @@
* *
***************************************************************************************
*/
package com.farsunset.cim.service.impl;
package com.farsunset.cim.sdk.server.handler;
import org.springframework.stereotype.Service;
import com.farsunset.cim.sdk.server.session.DefaultSessionManager;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.model.CIMSession;
@Service
public class CIMSessionServiceImpl extends DefaultSessionManager {
/**
* 心跳handler主要是让netty重置cheannel的空闲时间
*/
public class HeartbeatHandler implements CIMRequestHandler {
public void process(CIMSession session, SentBody body) {}
}

View File

@ -26,12 +26,10 @@ import java.util.Base64;
import com.farsunset.cim.sdk.server.model.HandshakerResponse;
import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession;
import com.farsunset.cim.sdk.server.model.CIMSession;
/**
* 处理websocket握手请求返回响应的报文给浏览器
*
* @author Iraid
*
*/
public class WebsocketHandler implements CIMRequestHandler {

View File

@ -1,5 +1,5 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
* Copyright 2013-2023 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,35 +19,34 @@
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
package com.farsunset.cim.sdk.server.model;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Objects;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.model.proto.SessionProto;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
/**
* Channel包装类,集群时 将此对象存入表中
* IoSession包装类,集群时 将此对象存入表中
*/
public class CIMSession implements Serializable {
/**
*
*/
private transient static final long serialVersionUID = 1L;
public transient static String PROTOCOL = "protocol";
public transient static String WEBSOCKET = "websocket";
public transient static String NATIVEAPP = "nativeapp";
public transient static final int STATUS_ENABLED = 0;
public transient static final int STATUS_DISABLED = 1;
public transient static String HOST = "HOST";
public transient static final int STATE_ENABLED = 0;
public transient static final int STATE_DISABLED = 1;
public transient static final int APNS_ON = 1;
public transient static final int APNS_OFF = 0;
@ -56,31 +55,31 @@ public class CIMSession implements Serializable {
public transient static String CHANNEL_WINDOWS = "windows";
public transient static String CHANNEL_WP = "wp";
public transient static String CHANNEL_BROWSER = "browser";
private transient Channel session;
private String gid;// session全局ID
private String account;// session绑定的账号,主键一个账号同一时间之内在一个设备在线
private String nid;// session在本台服务器上的ID
private String deviceId;// 客户端ID (设备号码+应用包名),ios为devicetoken
private String host;// session绑定的服务器IP
private String account;// session绑定的账号
private String channel;// 终端设备类型
private String deviceModel;// 终端设备型号
private String clientVersion;// 终端应用版本
private String systemVersion;// 终端系统版本
private String packageName;// 终端应用包名
private Long bindTime;// 登录时间
private Long heartbeat;// 心跳时间
private Double longitude;// 经度
private Double latitude;// 维度
private String location;// 位置
private int apnsAble;// apns推送状态
private int status;// 状态
private int apns;// apns推送状态
private int state;// 状态
public CIMSession(Channel session) {
this.session = session;
this.nid = session.id().asShortText();
}
public CIMSession() {
}
@ -91,7 +90,8 @@ public class CIMSession implements Serializable {
public void setAccount(String account) {
this.account = account;
setAttribute(CIMConstant.SESSION_KEY, account);
setAttribute(CIMConstant.KEY_ACCOUNT, account);
}
public Double getLongitude() {
@ -118,14 +118,6 @@ public class CIMSession implements Serializable {
this.location = location;
}
public String getGid() {
return gid;
}
public void setGid(String gid) {
this.gid = gid;
}
public String getNid() {
return nid;
}
@ -186,37 +178,24 @@ public class CIMSession implements Serializable {
this.systemVersion = systemVersion;
}
public Long getHeartbeat() {
return heartbeat;
}
public void setHeartbeat(Long heartbeat) {
this.heartbeat = heartbeat;
setAttribute(CIMConstant.HEARTBEAT_KEY, heartbeat);
}
public void setHost(String host) {
this.host = host;
}
public void setChannel(Channel session) {
this.session = session;
public int getApns() {
return apns;
}
public int getApnsAble() {
return apnsAble;
public void setApns(int apns) {
this.apns = apns;
}
public void setApnsAble(int apnsAble) {
this.apnsAble = apnsAble;
public int getState() {
return state;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
public void setState(int state) {
this.state = state;
}
public void setAttribute(String key, Object value) {
@ -248,36 +227,16 @@ public class CIMSession implements Serializable {
return null;
}
public boolean write(Object msg) {
if (session != null && session.isActive()) {
return session.writeAndFlush(msg).awaitUninterruptibly(5000);
}
return false;
}
public boolean isConnected() {
if (session != null) {
return session.isActive();
}
if (!isLocalhost()) {
return status == STATUS_ENABLED;
}
return false;
}
public boolean isLocalhost() {
try {
String ip = InetAddress.getLocalHost().getHostAddress();
return ip.equals(host);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return false;
return (session != null && session.isActive()) || state == STATE_ENABLED;
}
public void closeNow() {
@ -285,63 +244,108 @@ public class CIMSession implements Serializable {
session.close();
}
public void setPackageName(String packageName) {
this.packageName = packageName;
public void closeOnFlush() {
if (session != null)
session.close();
}
public String getPackageName() {
return packageName;
public boolean isIOSChannel() {
return Objects.equals(channel, CHANNEL_IOS);
}
public boolean isAndroidChannel() {
return Objects.equals(channel, CHANNEL_ANDROID);
}
public boolean isWindowsChannel() {
return Objects.equals(channel, CHANNEL_WINDOWS);
}
public boolean isApnsOpend() {
return Objects.equals(apns, APNS_ON);
}
@Override
public int hashCode() {
return (deviceId + nid + host).hashCode();
return getClass().hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof CIMSession) {
return hashCode() == o.hashCode();
CIMSession target = (CIMSession) o;
return Objects.equals(target.deviceId, deviceId) && Objects.equals(target.nid, nid)
&& Objects.equals(target.host, host);
}
return false;
}
public boolean fromOtherDevice(Object o) {
if (o instanceof CIMSession) {
CIMSession t = (CIMSession) o;
if (t.deviceId != null && deviceId != null) {
return !t.deviceId.equals(deviceId);
}
public byte[] getProtobufBody() {
SessionProto.Model.Builder builder = SessionProto.Model.newBuilder();
if (account != null) {
builder.setAccount(account);
}
return false;
if (nid != null) {
builder.setNid(nid);
}
if (deviceId != null) {
builder.setDeviceId(deviceId);
}
if (host != null) {
builder.setHost(host);
}
if (channel != null) {
builder.setChannel(channel);
}
if (deviceModel != null) {
builder.setDeviceModel(deviceModel);
}
if (clientVersion != null) {
builder.setClientVersion(clientVersion);
}
if (systemVersion != null) {
builder.setSystemVersion(systemVersion);
}
if (bindTime != null) {
builder.setBindTime(bindTime);
}
if (longitude != null) {
builder.setLongitude(longitude);
}
if (latitude != null) {
builder.setLatitude(latitude);
}
if (location != null) {
builder.setLocation(location);
}
builder.setState(state);
builder.setApns(apns);
return builder.build().toByteArray();
}
public boolean fromCurrentDevice(Object o) {
return !fromOtherDevice(o);
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("{");
buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(",");
buffer.append("\"").append("nid").append("\":").append(nid).append(",");
buffer.append("\"").append("deviceId").append("\":").append("\"").append(deviceId).append("\"").append(",");
buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(",");
buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(",");
buffer.append("\"").append("channel").append("\":").append("\"").append(channel).append("\"").append(",");
buffer.append("\"").append("deviceModel").append("\":").append("\"").append(deviceModel).append("\"")
.append(",");
buffer.append("\"").append("status").append("\":").append(status).append(",");
buffer.append("\"").append("apnsAble").append("\":").append(apnsAble).append(",");
buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(",");
buffer.append("\"").append("heartbeat").append("\":").append(heartbeat);
buffer.append("}");
return buffer.toString();
public static CIMSession decode(byte[] protobufBody) throws InvalidProtocolBufferException {
if(protobufBody == null) {
return null;
}
SessionProto.Model proto = SessionProto.Model.parseFrom(protobufBody);
CIMSession session = new CIMSession();
session.setApns(proto.getApns());
session.setBindTime(proto.getBindTime());
session.setChannel(proto.getChannel());
session.setClientVersion(proto.getClientVersion());
session.setDeviceId(proto.getDeviceId());
session.setDeviceModel(proto.getDeviceModel());
session.setHost(proto.getHost());
session.setLatitude(proto.getLatitude());
session.setLongitude(proto.getLongitude());
session.setLocation(proto.getLocation());
session.setNid(proto.getNid());
session.setSystemVersion(proto.getSystemVersion());
session.setState(proto.getState());
session.setAccount(proto.getAccount());
return session;
}
}

View File

@ -0,0 +1,19 @@
syntax = "proto3";
package com.farsunset.cim.sdk.server.model.proto;
option java_outer_classname="SessionProto";
message Model {
string account = 1;
string nid = 2;
string deviceId = 3;
string host = 4;
string channel = 5;
string deviceModel = 6;
string clientVersion = 7;
string systemVersion = 8;
int64 bindTime = 9;
double longitude = 10;
double latitude = 11;
string location = 12;
int32 apns = 13;
int32 state = 14;
}

View File

@ -1,84 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
/**
* 自带默认 session管理实现 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 服务器集群时
* 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
*/
public class DefaultSessionManager implements SessionManager {
private static HashMap<String, CIMSession> sessions = new HashMap<String, CIMSession>();
private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
/**
*
*/
public void add(CIMSession session) {
if (session != null) {
session.setAttribute(CIMConstant.SESSION_KEY, session.getAccount());
sessions.put(session.getAccount(), session);
connectionsCounter.incrementAndGet();
}
}
public CIMSession get(String account) {
return sessions.get(account);
}
public List<CIMSession> queryAll() {
List<CIMSession> list = new ArrayList<CIMSession>();
list.addAll(sessions.values());
return list;
}
public void remove(CIMSession session) {
sessions.remove(session.getAttribute(CIMConstant.SESSION_KEY));
}
public void remove(String account) {
sessions.remove(account);
}
public boolean containsCIMSession(String account) {
return sessions.containsKey(account);
}
@Override
public void update(CIMSession session) {
sessions.put(session.getAccount(), session);
}
}

View File

@ -1,64 +0,0 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.session;
import java.util.List;
/**
* 客户端的 session管理接口 可自行实现此接口管理session
*/
public interface SessionManager {
/**
* 添加新的session
*/
public void add(CIMSession session);
/**
* 更新session
*/
public void update(CIMSession session);
/**
*
* @param account
* 客户端session的 key 一般可用 用户账号来对应session
* @return
*/
CIMSession get(String account);
/**
* 获取所有session
*
* @return
*/
public List<CIMSession> queryAll();
/**
* 删除session
*
* @param session
*/
public void remove(String account);
}