1.修改文档中的遗漏

2.修复android SDK不当使用,可能会创建多个连接的问题
3.统一android SDK日志管理,添加日志开关接口
This commit is contained in:
远方夕阳 2018-08-29 13:47:40 +08:00
parent 252c09e56a
commit 7a78be0f3b
43 changed files with 729 additions and 357 deletions

View File

@ -5,6 +5,6 @@
<classpathentry kind="lib" path="libs/log4j-1.2.17.jar"/> <classpathentry kind="lib" path="libs/log4j-1.2.17.jar"/>
<classpathentry kind="lib" path="libs/mina-core-2.0.16.jar"/> <classpathentry kind="lib" path="libs/mina-core-2.0.16.jar"/>
<classpathentry kind="lib" path="libs/protobuf-java-3.2.0.jar"/> <classpathentry kind="lib" path="libs/protobuf-java-3.2.0.jar"/>
<classpathentry kind="lib" path="D:/devtools/android-sdk-windows/platforms/android-22/android.jar"/> <classpathentry kind="lib" path="D:/dev/Android-SDK-Windows/platforms/android-25/android.jar"/>
<classpathentry kind="output" path="bin"/> <classpathentry kind="output" path="bin"/>
</classpath> </classpath>

View File

@ -26,8 +26,10 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IdleStatus;
@ -41,11 +43,11 @@ import android.content.Context;
import android.content.Intent; import android.content.Intent;
import android.net.ConnectivityManager; import android.net.ConnectivityManager;
import android.net.NetworkInfo; import android.net.NetworkInfo;
import android.util.Log;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.exception.SessionClosedException; import com.farsunset.cim.sdk.android.exception.SessionClosedException;
import com.farsunset.cim.sdk.android.exception.NetworkDisabledException; import com.farsunset.cim.sdk.android.exception.NetworkDisabledException;
import com.farsunset.cim.sdk.android.filter.CIMLoggingFilter;
import com.farsunset.cim.sdk.android.filter.ClientMessageCodecFactory; import com.farsunset.cim.sdk.android.filter.ClientMessageCodecFactory;
import com.farsunset.cim.sdk.android.model.HeartbeatRequest; import com.farsunset.cim.sdk.android.model.HeartbeatRequest;
import com.farsunset.cim.sdk.android.model.HeartbeatResponse; import com.farsunset.cim.sdk.android.model.HeartbeatResponse;
@ -60,7 +62,6 @@ import com.farsunset.cim.sdk.android.model.SentBody;
*/ */
class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFactory { class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFactory {
private final String TAG = CIMConnectorManager.class.getSimpleName();
private final int READ_BUFFER_SIZE = 2048;// bit private final int READ_BUFFER_SIZE = 2048;// bit
private final int CONNECT_TIMEOUT = 10 * 1000;// private final int CONNECT_TIMEOUT = 10 * 1000;//
private final int WRITE_TIMEOUT = 10 * 1000;// private final int WRITE_TIMEOUT = 10 * 1000;//
@ -70,8 +71,9 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME"; private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME";
private NioSocketConnector connector; private NioSocketConnector connector;
private ConnectFuture connectFuture;
private ExecutorService executor = Executors.newFixedThreadPool(1); private ExecutorService executor = Executors.newFixedThreadPool(1);
private Semaphore semaphore = new Semaphore(1,true);
private Context context; private Context context;
private static CIMConnectorManager manager; private static CIMConnectorManager manager;
@ -92,7 +94,7 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientMessageCodecFactory())); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientMessageCodecFactory()));
connector.getFilterChain().addLast("heartbeat", keepAliveaHandler); connector.getFilterChain().addLast("heartbeat", keepAliveaHandler);
connector.getFilterChain().addLast("logger",CIMLoggingFilter.getLogger());
connector.setHandler(this); connector.setHandler(this);
} }
@ -105,34 +107,17 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
} }
private synchronized void syncConnection(final String host, final int port) { private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
if (isConnected()) { CIMLoggingFilter.getLogger().connectFailure(remoteAddress, interval);
return;
}
try {
Log.i(TAG, "****************CIM正在连接服务器 " + host + ":" + port + "......"); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); intent.putExtra(Exception.class.getName(), error.getClass().getSimpleName());
InetSocketAddress remoteSocketAddress = new InetSocketAddress(host, port); intent.putExtra("interval", interval);
connectFuture = connector.connect(remoteSocketAddress); context.sendBroadcast(intent);
connectFuture.awaitUninterruptibly();
connectFuture.getSession();
} catch (Exception e) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), e.getClass().getSimpleName());
intent.putExtra("interval", interval);
context.sendBroadcast(intent);
Log.e(TAG, "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
} }
@ -148,15 +133,35 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
return; return;
} }
if (isConnected() || !semaphore.tryAcquire()) {
return;
}
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
syncConnection(host, port);
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false);
final InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
CIMLoggingFilter.getLogger().startConnect(remoteAddress);
connector.connect(remoteAddress).addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
semaphore.release();
future.removeListener(this);
if(future.getException() != null) {
handleConnectFailure(future.getException(),remoteAddress);
}
}
});
} }
}); });
} }
public synchronized void send(SentBody body) { public void send(SentBody body) {
boolean isSuccessed = false; boolean isSuccessed = false;
@ -222,8 +227,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
@Override @Override
public void sessionCreated(IoSession session) throws Exception { public void sessionCreated(IoSession session) throws Exception {
Log.i(TAG, "****************CIM连接服务器成功:" + session.getLocalAddress() + " NID:" + session.getId());
setLastHeartbeatTime(session); setLastHeartbeatTime(session);
Intent intent = new Intent(); Intent intent = new Intent();
@ -235,8 +238,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
@Override @Override
public void sessionClosed(IoSession session) { public void sessionClosed(IoSession session) {
Log.e(TAG, "****************CIM与服务器断开连接:" + session.getLocalAddress() + " NID:" + session.getId());
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED);
context.sendBroadcast(intent); context.sendBroadcast(intent);
@ -245,9 +246,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
@Override @Override
public void sessionIdle(IoSession session, IdleStatus status) { public void sessionIdle(IoSession session, IdleStatus status) {
Log.d(TAG, "****************CIM " + status.toString().toUpperCase() + ":" + session.getLocalAddress() + " NID:"
+ session.getId() + " isConnected:" + session.isConnected());
/** /**
* 用于解决wifi情况下偶而路由器与服务器断开连接时客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题 * 用于解决wifi情况下偶而路由器与服务器断开连接时客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题
* *
@ -255,17 +253,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
long lastHeartbeatTime = getLastHeartbeatTime(session); long lastHeartbeatTime = getLastHeartbeatTime(session);
if (System.currentTimeMillis() - lastHeartbeatTime >= HEARBEAT_TIME_OUT) { if (System.currentTimeMillis() - lastHeartbeatTime >= HEARBEAT_TIME_OUT) {
session.closeNow(); session.closeNow();
Log.e(TAG, "****************CIM心跳超时 ,即将重新连接......" + " NID:" + session.getId());
}
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
Log.e(TAG, "****************CIM连接出现未知异常:" + session.getLocalAddress() + " NID:" + session.getId());
if (cause != null && cause.getMessage() != null) {
Log.e(TAG, cause.getMessage());
} }
} }

View File

@ -45,6 +45,8 @@ public class CIMPushManager {
static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION"; static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION";
static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE";
static String ACTION_DESTORY = "ACTION_DESTORY"; static String ACTION_DESTORY = "ACTION_DESTORY";
static String KEY_SEND_BODY = "KEY_SEND_BODY"; static String KEY_SEND_BODY = "KEY_SEND_BODY";
@ -85,6 +87,13 @@ public class CIMPushManager {
} }
public static void setLoggerEnable(Context context,boolean enable) {
Intent serviceIntent = new Intent(context, CIMPushService.class);
serviceIntent.putExtra(CIMPushService.KEY_LOGGER_ENABLE, enable);
serviceIntent.setAction(ACTION_SET_LOGGER_EANABLE);
context.startService(serviceIntent);
}
protected static void connect(Context context, long delayedTime) { protected static void connect(Context context, long delayedTime) {
boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP); boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP);

View File

@ -26,7 +26,8 @@ import android.content.Intent;
import android.os.Handler; import android.os.Handler;
import android.os.IBinder; import android.os.IBinder;
import android.os.Message; import android.os.Message;
import android.util.Log;
import com.farsunset.cim.sdk.android.filter.CIMLoggingFilter;
import com.farsunset.cim.sdk.android.model.SentBody; import com.farsunset.cim.sdk.android.model.SentBody;
/** /**
@ -36,8 +37,9 @@ import com.farsunset.cim.sdk.android.model.SentBody;
* *
*/ */
public class CIMPushService extends Service { public class CIMPushService extends Service {
private final String TAG = CIMPushService.class.getSimpleName();
public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME";
public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE";
private CIMConnectorManager manager; private CIMConnectorManager manager;
@Override @Override
@ -97,17 +99,24 @@ public class CIMPushService extends Service {
if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) {
if (!manager.isConnected()) { if (!manager.isConnected()) {
boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_MANUAL_STOP);
CIMCacheManager.KEY_MANUAL_STOP); boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_CIM_DESTROYED);
Log.w(TAG, "manager.isConnected() == false, isManualStop == " + isManualStop);
CIMLoggingFilter.getLogger().connectState(false, isManualStop, isDestroyed);
CIMPushManager.connect(this, 0); CIMPushManager.connect(this, 0);
} else { } else {
Log.i(TAG, "manager.isConnected() == true"); CIMLoggingFilter.getLogger().connectState(true);
} }
} }
if (CIMPushManager.ACTION_SET_LOGGER_EANABLE.equals(action)) {
boolean enable = intent.getBooleanExtra(KEY_LOGGER_ENABLE, true);
CIMLoggingFilter.getLogger().debugMode(enable);
}
return START_STICKY; return START_STICKY;
} }

View File

@ -0,0 +1,154 @@
/**
* Copyright 2013-2023 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.android.filter;
import java.net.InetSocketAddress;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;
import android.util.Log;
/**
* 日志打印添加session 的id和ip address
*/
public class CIMLoggingFilter extends IoFilterAdapter {
private final static String TAG = "CIM";
private boolean debug = true;
public static CIMLoggingFilter getLogger() {
return LoggerHolder.logger;
}
private CIMLoggingFilter() {
}
private static class LoggerHolder{
private static CIMLoggingFilter logger = new CIMLoggingFilter();
}
public void debugMode(boolean mode) {
debug = mode;
}
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
if(debug) {
Log.d(TAG,String.format("EXCEPTION" + getSessionInfo(session) + "\n%s", cause.getClass().getName()));
}
session.closeOnFlush();
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
if(debug) {
Log.i(TAG,String.format("RECEIVED" + getSessionInfo(session) + "\n%s", message));
}
nextFilter.messageReceived(session, message);
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
if(debug) {
Log.i(TAG,String.format("SENT" + getSessionInfo(session) + "\n%s", writeRequest.getOriginalRequest().getMessage()));
}
nextFilter.messageSent(session, writeRequest);
}
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
nextFilter.sessionCreated(session);
}
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
if(debug) {
Log.i(TAG,"OPENED" + getSessionInfo(session));
}
nextFilter.sessionOpened(session);
}
@Override
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
if(debug) {
Log.i(TAG,"IDLE " + status.toString().toUpperCase() + getSessionInfo(session));
}
nextFilter.sessionIdle(session, status);
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session) {
if(debug) {
Log.i(TAG,"CLOSED" + getSessionInfo(session));
}
nextFilter.sessionClosed(session);
}
public void connectFailure(InetSocketAddress remoteAddress,long interval) {
if(debug) {
Log.i(TAG,"CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms");
}
}
public void startConnect(InetSocketAddress remoteAddress) {
if(debug) {
Log.i(TAG,"START CONNECT REMOTE HOST: " + remoteAddress.toString());
}
}
public void connectState(boolean isConnected) {
if(debug) {
Log.i(TAG,"CONNECTED:" + isConnected);
}
}
public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) {
if(debug) {
Log.i(TAG,"CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed);
}
}
private String getSessionInfo(IoSession session) {
StringBuilder builder = new StringBuilder();
if (session == null) {
return "";
}
builder.append(" [");
builder.append("id:").append(session.getId());
if (session.getLocalAddress() != null) {
builder.append(" L:").append(session.getLocalAddress().toString());
}
if (session.getRemoteAddress() != null) {
builder.append(" R:").append(session.getRemoteAddress().toString());
}
builder.append("]");
return builder.toString();
}
}

View File

@ -25,7 +25,6 @@ import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import android.util.Log;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.model.HeartbeatRequest; import com.farsunset.cim.sdk.android.model.HeartbeatRequest;
@ -40,7 +39,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
*/ */
public class ClientMessageDecoder extends CumulativeProtocolDecoder { public class ClientMessageDecoder extends CumulativeProtocolDecoder {
final static String TAG = ClientMessageDecoder.class.getSimpleName();
@Override @Override
public boolean doDecode(IoSession iosession, IoBuffer iobuffer, ProtocolDecoderOutput out) throws Exception { public boolean doDecode(IoSession iosession, IoBuffer iobuffer, ProtocolDecoderOutput out) throws Exception {
@ -82,7 +80,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder {
if (CIMConstant.ProtobufType.S_H_RQ == type) { if (CIMConstant.ProtobufType.S_H_RQ == type) {
HeartbeatRequest request = HeartbeatRequest.getInstance(); HeartbeatRequest request = HeartbeatRequest.getInstance();
Log.i(TAG, request.toString());
return request; return request;
} }
@ -94,9 +91,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder {
body.putAll(bodyProto.getDataMap()); body.putAll(bodyProto.getDataMap());
body.setCode(bodyProto.getCode()); body.setCode(bodyProto.getCode());
body.setMessage(bodyProto.getMessage()); body.setMessage(bodyProto.getMessage());
Log.i(TAG, body.toString());
return body; return body;
} }
@ -112,8 +106,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder {
message.setExtra(bodyProto.getExtra()); message.setExtra(bodyProto.getExtra());
message.setTimestamp(bodyProto.getTimestamp()); message.setTimestamp(bodyProto.getTimestamp());
message.setFormat(bodyProto.getFormat()); message.setFormat(bodyProto.getFormat());
Log.i(TAG, message.toString());
return message; return message;
} }

View File

@ -25,8 +25,6 @@ import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import android.util.Log;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.model.Protobufable; import com.farsunset.cim.sdk.android.model.Protobufable;
@ -35,8 +33,6 @@ import com.farsunset.cim.sdk.android.model.Protobufable;
*/ */
public class ClientMessageEncoder extends ProtocolEncoderAdapter { public class ClientMessageEncoder extends ProtocolEncoderAdapter {
final static String TAG = ClientMessageEncoder.class.getSimpleName();
@Override @Override
public void encode(IoSession iosession, Object object, ProtocolEncoderOutput out) throws Exception { public void encode(IoSession iosession, Object object, ProtocolEncoderOutput out) throws Exception {
@ -52,8 +48,6 @@ public class ClientMessageEncoder extends ProtocolEncoderAdapter {
buff.flip(); buff.flip();
out.write(buff); out.write(buff);
// 打印出收到的消息
Log.i(TAG, data.toString());
} }
} }

View File

@ -1,8 +1,2 @@
auto.sync=false
build.scans.enabled=true
connection.gradle.distribution=GRADLE_DISTRIBUTION(VERSION(4.5))
connection.project.dir= connection.project.dir=
eclipse.preferences.version=1 eclipse.preferences.version=1
gradle.user.home=
offline.mode=false
override.workspace.settings=true

View File

@ -74,6 +74,7 @@ public class BindHandler implements CIMRequestHandler {
newSession.setSystemVersion(message.get("osVersion")); newSession.setSystemVersion(message.get("osVersion"));
newSession.setBindTime(System.currentTimeMillis()); newSession.setBindTime(System.currentTimeMillis());
newSession.setPackageName(message.get("packageName")); newSession.setPackageName(message.get("packageName"));
newSession.setHeartbeat(System.currentTimeMillis());
/** /**
* 由于客户端断线服务端可能会无法获知的情况客户端重连时需要关闭旧的连接 * 由于客户端断线服务端可能会无法获知的情况客户端重连时需要关闭旧的连接
@ -86,13 +87,18 @@ public class BindHandler implements CIMRequestHandler {
sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel()); sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel());
} }
// 第一次设置心跳时间为登录时间
newSession.setBindTime(System.currentTimeMillis());
/**
* 有可能是同一个设备重复连接则关闭旧的链接这种情况一般是客户端断网联网又重新链接上来之前的旧链接没有来得及通过心跳机制关闭在这里手动关闭
* 条件1连接来自是同一个设备
* 条件2.2个连接都是同一台服务器
*/
if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) {
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.closeOnFlush();
}
// 第一次设置心跳时间为登录时间
newSession.setBindTime(System.currentTimeMillis());
newSession.setHeartbeat(System.currentTimeMillis());
sessionManager.add(newSession); sessionManager.add(newSession);

View File

@ -24,7 +24,7 @@ package com.farsunset.ichat.example.app;
public interface Constant { public interface Constant {
//服务端IP地址 //服务端IP地址
public static final String CIM_SERVER_HOST = "192.168.1.110"; public static final String CIM_SERVER_HOST = "192.168.1.106";
//注意这里的端口不是tomcat的端口CIM端口在服务端spring-cim.xml中配置的没改动就使用默认的23456 //注意这里的端口不是tomcat的端口CIM端口在服务端spring-cim.xml中配置的没改动就使用默认的23456

View File

@ -29,6 +29,7 @@ import android.view.View;
import android.view.animation.AlphaAnimation; import android.view.animation.AlphaAnimation;
import com.farsunset.cim.sdk.android.CIMPushManager; import com.farsunset.cim.sdk.android.CIMPushManager;
import com.farsunset.ichat.example.BuildConfig;
import com.farsunset.ichat.example.R; import com.farsunset.ichat.example.R;
import com.farsunset.ichat.example.app.CIMMonitorActivity; import com.farsunset.ichat.example.app.CIMMonitorActivity;
import com.farsunset.ichat.example.app.Constant; import com.farsunset.ichat.example.app.Constant;
@ -42,6 +43,7 @@ public class SplanshActivity extends CIMMonitorActivity{
super.onCreate(savedInstanceState); super.onCreate(savedInstanceState);
CIMPushManager.setLoggerEnable(this,BuildConfig.DEBUG);
//连接服务端 //连接服务端
CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT); CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT);

View File

@ -26,9 +26,11 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IdleStatus;
@ -38,6 +40,7 @@ import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler; import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.apache.mina.transport.socket.nio.NioSocketConnector;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException; import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException;
import com.farsunset.cim.sdk.client.filter.ClientMessageCodecFactory; import com.farsunset.cim.sdk.client.filter.ClientMessageCodecFactory;
@ -65,8 +68,8 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME"; private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME";
private NioSocketConnector connector; private NioSocketConnector connector;
private ConnectFuture connectFuture;
private ExecutorService executor = Executors.newFixedThreadPool(1); private ExecutorService executor = Executors.newFixedThreadPool(1);
private Semaphore semaphore = new Semaphore(1,true);
private static CIMConnectorManager manager; private static CIMConnectorManager manager;
@ -97,49 +100,50 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa
} }
private synchronized void syncConnection(final String host, final int port) {
if (isConnected()) {
return;
}
try {
logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......");
CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false);
InetSocketAddress remoteSocketAddress = new InetSocketAddress(host, port);
connectFuture = connector.connect(remoteSocketAddress);
connectFuture.awaitUninterruptibly();
connectFuture.getSession();
} catch (Exception e) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), e);
intent.putExtra("interval", interval);
sendBroadcast(intent);
logger.error(
"****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
}
public void connect(final String host, final int port) { public void connect(final String host, final int port) {
if (isConnected() || !semaphore.tryAcquire()) {
return;
}
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
syncConnection(host, port);
logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......");
InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false);
connector.connect(remoteAddress).addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
semaphore.release();
future.removeListener(this);
if(future.getException() != null) {
handleConnectFailure(future.getException(),remoteAddress);
}
}
});
} }
}); });
} }
public synchronized void send(SentBody body) { private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), error);
intent.putExtra("interval", interval);
sendBroadcast(intent);
logger.error("****************CIM连接服务器失败 " + remoteAddress.getHostString() + ":" + remoteAddress.getPort() + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
public void send(SentBody body) {
boolean isSuccessed = false; boolean isSuccessed = false;

View File

@ -1,15 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<classpath> <classpath>
<classpathentry kind="src" path="src" /> <classpathentry kind="src" path="src"/>
<classpathentry kind="lib" <classpathentry kind="lib" path="libs/mina-core-2.0.16.jar"/>
path="libs/mina-core-2.0.16.jar" /> <classpathentry kind="lib" path="libs/protobuf-java-3.2.0.jar"/>
<classpathentry kind="lib" <classpathentry kind="lib" path="libs/slf4j-api-1.7.25.jar"/>
path="libs/protobuf-java-3.2.0.jar" /> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<classpathentry kind="con" <attributes>
path="org.eclipse.jst.j2ee.internal.module.container" /> <attribute name="owner.project.facets" value="java"/>
<classpathentry kind="con" </attributes>
path="org.eclipse.jdt.launching.JRE_CONTAINER" /> </classpathentry>
<classpathentry kind="lib" <classpathentry kind="output" path="bin"/>
path="libs/slf4j-api-1.7.25.jar" />
<classpathentry kind="output" path="bin" />
</classpath> </classpath>

View File

@ -22,8 +22,6 @@
</buildCommand> </buildCommand>
</buildSpec> </buildSpec>
<natures> <natures>
<nature>org.eclipse.jem.workbench.JavaEMFNature</nature>
<nature>org.eclipse.wst.common.modulecore.ModuleCoreNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature> <nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.wst.common.project.facet.core.nature</nature> <nature>org.eclipse.wst.common.project.facet.core.nature</nature>
</natures> </natures>

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project-modules id="moduleCoreId"
project-version="1.5.0">
<wb-module deploy-name="cim-server-sdk">
<wb-resource deploy-path="/" source-path="/src" />
</wb-module>
</project-modules>

View File

@ -1,8 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<faceted-project> <faceted-project>
<runtime name="Apache Tomcat v7.0" /> <fixed facet="java"/>
<fixed facet="java" /> <installed facet="java" version="1.6"/>
<fixed facet="jst.utility" />
<installed facet="java" version="1.6" />
<installed facet="jst.utility" version="1.0" />
</faceted-project> </faceted-project>

View File

@ -28,6 +28,8 @@ import org.apache.mina.core.write.WriteRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.farsunset.cim.sdk.server.constant.CIMConstant;
/** /**
* 日志打印添加session 的id和ip address * 日志打印添加session 的id和ip address
*/ */
@ -36,21 +38,20 @@ public class CIMLoggingFilter extends IoFilterAdapter {
private final Logger logger = LoggerFactory.getLogger(CIMLoggingFilter.class); private final Logger logger = LoggerFactory.getLogger(CIMLoggingFilter.class);
@Override @Override
public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) {
logger.info("EXCEPTION" + getSessionInfo(session) + "\n{}", cause.getClass().getName());
logger.info("EXCEPTION" + getSessionInfo(session) + ":", cause); session.closeOnFlush();
nextFilter.exceptionCaught(session, cause);
} }
@Override @Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { public void messageReceived(NextFilter nextFilter, IoSession session, Object message) {
logger.info("RECEIVED" + getSessionInfo(session) + ": {}", message); logger.info("RECEIVED" + getSessionInfo(session) + "\n{}", message);
nextFilter.messageReceived(session, message); nextFilter.messageReceived(session, message);
} }
@Override @Override
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) {
logger.info("SENT" + getSessionInfo(session) + ": {}", writeRequest.getOriginalRequest().getMessage()); logger.info("SENT" + getSessionInfo(session) + "\n{}", writeRequest.getOriginalRequest().getMessage());
nextFilter.messageSent(session, writeRequest); nextFilter.messageSent(session, writeRequest);
} }
@ -67,13 +68,13 @@ public class CIMLoggingFilter extends IoFilterAdapter {
} }
@Override @Override
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) {
logger.info("IDLE" + getSessionInfo(session)); logger.info("IDLE" + getSessionInfo(session));
nextFilter.sessionIdle(session, status); nextFilter.sessionIdle(session, status);
} }
@Override @Override
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { public void sessionClosed(NextFilter nextFilter, IoSession session) {
logger.info("CLOSED" + getSessionInfo(session)); logger.info("CLOSED" + getSessionInfo(session));
nextFilter.sessionClosed(session); nextFilter.sessionClosed(session);
} }
@ -83,10 +84,20 @@ public class CIMLoggingFilter extends IoFilterAdapter {
if (session == null) { if (session == null) {
return ""; return "";
} }
builder.append("["); builder.append(" [");
builder.append("id:").append(session.getId()); builder.append("id:").append(session.getId());
if (session.getLocalAddress() != null) {
builder.append(" L:").append(session.getLocalAddress().toString());
}
if (session.getRemoteAddress() != null) { if (session.getRemoteAddress() != null) {
builder.append(" address:").append(session.getRemoteAddress().toString()); builder.append(" R:").append(session.getRemoteAddress().toString());
}
if (session.containsAttribute(CIMConstant.SESSION_KEY)) {
builder.append(" account:").append(session.getAttribute(CIMConstant.SESSION_KEY));
} }
builder.append("]"); builder.append("]");
return builder.toString(); return builder.toString();

View File

@ -21,17 +21,18 @@
*/ */
package com.farsunset.cim.sdk.android; package com.farsunset.cim.sdk.android;
import java.net.InetSocketAddress;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import android.content.Context; import android.content.Context;
import android.content.Intent; import android.content.Intent;
import android.net.ConnectivityManager; import android.net.ConnectivityManager;
import android.net.NetworkInfo; import android.net.NetworkInfo;
import android.util.Log;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -45,7 +46,11 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.filter.CIMLoggingHandler;
import com.farsunset.cim.sdk.android.filter.ClientMessageDecoder; import com.farsunset.cim.sdk.android.filter.ClientMessageDecoder;
import com.farsunset.cim.sdk.android.filter.ClientMessageEncoder; import com.farsunset.cim.sdk.android.filter.ClientMessageEncoder;
import com.farsunset.cim.sdk.android.exception.NetworkDisabledException; import com.farsunset.cim.sdk.android.exception.NetworkDisabledException;
@ -56,15 +61,9 @@ import com.farsunset.cim.sdk.android.model.Message;
import com.farsunset.cim.sdk.android.model.ReplyBody; import com.farsunset.cim.sdk.android.model.ReplyBody;
import com.farsunset.cim.sdk.android.model.SentBody; import com.farsunset.cim.sdk.android.model.SentBody;
/**
* 连接服务端管理cim核心处理类管理连接以及消息处理
*
* @author 3979434@qq.com
*/
@Sharable @Sharable
class CIMConnectorManager extends SimpleChannelInboundHandler<Object> { class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
private final String TAG = CIMConnectorManager.class.getSimpleName();
private final int CONNECT_TIMEOUT = 10 * 1000;// private final int CONNECT_TIMEOUT = 10 * 1000;//
private final int WRITE_TIMEOUT = 10 * 1000;// private final int WRITE_TIMEOUT = 10 * 1000;//
@ -76,6 +75,8 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
private EventLoopGroup loopGroup; private EventLoopGroup loopGroup;
private Channel channel;; private Channel channel;;
private ExecutorService executor = Executors.newFixedThreadPool(1); private ExecutorService executor = Executors.newFixedThreadPool(1);
private Semaphore semaphore = new Semaphore(1,true);
private Context context; private Context context;
private static CIMConnectorManager manager; private static CIMConnectorManager manager;
@ -84,19 +85,19 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
context = ctx; context = ctx;
bootstrap = new Bootstrap(); bootstrap = new Bootstrap();
loopGroup = new NioEventLoopGroup(); loopGroup = new NioEventLoopGroup(1);
bootstrap.group(loopGroup); bootstrap.group(loopGroup);
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() { bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientMessageDecoder()); ch.pipeline().addLast(new ClientMessageDecoder());
ch.pipeline().addLast(new ClientMessageEncoder()); ch.pipeline().addLast(new ClientMessageEncoder());
ch.pipeline().addLast(new IdleStateHandler(READ_IDLE_TIME, 0, 0)); ch.pipeline().addLast(new IdleStateHandler(READ_IDLE_TIME, 0, 0));
ch.pipeline().addLast(CIMLoggingHandler.getLogger());
ch.pipeline().addLast(CIMConnectorManager.this); ch.pipeline().addLast(CIMConnectorManager.this);
} }
}); });
@ -111,33 +112,23 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
} }
private synchronized void syncConnection(String host, int port) { private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) {
if (isConnected()) { long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
return;
}
try { CIMLoggingHandler.getLogger().connectFailure(remoteAddress, interval);
Log.i(TAG, "****************CIM正在连接服务器 " + host + ":" + port + "......"); Intent intent = new Intent();
ChannelFuture channelFuture = bootstrap.connect(host, port).syncUninterruptibly(); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
channel = channelFuture.channel(); intent.putExtra(Exception.class.getName(), error.getClass().getSimpleName());
} catch (Exception e) { intent.putExtra("interval", interval);
context.sendBroadcast(intent);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), e.getClass().getSimpleName());
intent.putExtra("interval", interval);
context.sendBroadcast(intent);
Log.e(TAG, "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
} }
public void connect(final String host, final int port) { public void connect(final String host, final int port) {
if (!isNetworkConnected(context)) { if (!isNetworkConnected(context)) {
@ -150,21 +141,43 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
return; return;
} }
if (isConnected() || !semaphore.tryAcquire()) {
return;
}
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
syncConnection(host, port);
final InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
bootstrap.connect(remoteAddress).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
semaphore.release();
future.removeListener(this);
if(!future.isSuccess() && future.cause() != null) {
handleConnectFailure(future.cause(),remoteAddress);
}
if(future.isSuccess()) {
channel = future.channel();
}
}
});
} }
}); });
} }
public synchronized void send(SentBody body) { public void send(SentBody body) {
boolean isSuccessed = false; boolean isSuccessed = false;
String exceptionName = SessionClosedException.class.getSimpleName(); String exceptionName = SessionClosedException.class.getSimpleName();
if (channel != null && channel.isActive()) { if (isConnected()) {
ChannelFuture future = channel.writeAndFlush(body); ChannelFuture future = channel.writeAndFlush(body);
isSuccessed = future.awaitUninterruptibly(WRITE_TIMEOUT); isSuccessed = future.awaitUninterruptibly(WRITE_TIMEOUT);
if (!isSuccessed && future.cause() != null) { if (!isSuccessed && future.cause() != null) {
@ -201,10 +214,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
} }
public boolean isConnected() { public boolean isConnected() {
if (channel == null) { return channel != null && channel.isActive();
return false;
}
return channel.isActive();
} }
public void closeSession() { public void closeSession() {
@ -216,8 +226,6 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
Log.i(TAG, "****************CIM连接服务器成功:" + ctx.channel().localAddress() + " NID:"
+ ctx.channel().id().asShortText());
setLastHeartbeatTime(ctx.channel()); setLastHeartbeatTime(ctx.channel());
@ -230,9 +238,6 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
Log.e(TAG, "****************CIM与服务器断开连接:" + ctx.channel().localAddress() + " NID:"
+ ctx.channel().id().asShortText());
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED);
context.sendBroadcast(intent); context.sendBroadcast(intent);
@ -247,34 +252,14 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
* *
*/ */
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.READER_IDLE)) { if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.READER_IDLE)) {
Log.d(TAG, "****************CIM " + IdleState.READER_IDLE + ":" + ctx.channel().localAddress() + " NID:"
+ ctx.channel().id().asShortText());
Long lastTime = getLastHeartbeatTime(ctx.channel()); Long lastTime = getLastHeartbeatTime(ctx.channel());
if (lastTime != null && System.currentTimeMillis() - lastTime > HEARBEAT_TIME_OUT) { if (lastTime != null && System.currentTimeMillis() - lastTime > HEARBEAT_TIME_OUT) {
channel.close(); channel.close();
Log.e(TAG, "****************CIM心跳超时 ,即将重新连接......" + " NID:" + ctx.channel().id().asShortText());
} }
} }
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Log.e(TAG, "****************CIM连接出现未知异常:" + ctx.channel().localAddress() + " NID:"
+ ctx.channel().id().asShortText());
if (cause != null && cause.getMessage() != null) {
Log.e(TAG, cause.getMessage());
}
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_UNCAUGHT_EXCEPTION);
intent.putExtra(Exception.class.getName(), cause.getClass().getSimpleName());
context.sendBroadcast(intent);
}
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Message) { if (msg instanceof Message) {

View File

@ -45,6 +45,8 @@ public class CIMPushManager {
static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION"; static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION";
static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE";
static String ACTION_DESTORY = "ACTION_DESTORY"; static String ACTION_DESTORY = "ACTION_DESTORY";
static String KEY_SEND_BODY = "KEY_SEND_BODY"; static String KEY_SEND_BODY = "KEY_SEND_BODY";
@ -85,6 +87,14 @@ public class CIMPushManager {
} }
public static void setLoggerEnable(Context context,boolean enable) {
Intent serviceIntent = new Intent(context, CIMPushService.class);
serviceIntent.putExtra(CIMPushService.KEY_LOGGER_ENABLE, enable);
serviceIntent.setAction(ACTION_SET_LOGGER_EANABLE);
context.startService(serviceIntent);
}
protected static void connect(Context context, long delayedTime) { protected static void connect(Context context, long delayedTime) {
boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP); boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP);

View File

@ -26,7 +26,8 @@ import android.content.Intent;
import android.os.Handler; import android.os.Handler;
import android.os.IBinder; import android.os.IBinder;
import android.os.Message; import android.os.Message;
import android.util.Log;
import com.farsunset.cim.sdk.android.filter.CIMLoggingHandler;
import com.farsunset.cim.sdk.android.model.SentBody; import com.farsunset.cim.sdk.android.model.SentBody;
/** /**
@ -36,8 +37,8 @@ import com.farsunset.cim.sdk.android.model.SentBody;
* *
*/ */
public class CIMPushService extends Service { public class CIMPushService extends Service {
private final String TAG = CIMPushService.class.getSimpleName();
public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME";
public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE";
private CIMConnectorManager manager; private CIMConnectorManager manager;
@Override @Override
@ -97,17 +98,25 @@ public class CIMPushService extends Service {
if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) {
if (!manager.isConnected()) { if (!manager.isConnected()) {
boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_MANUAL_STOP);
CIMCacheManager.KEY_MANUAL_STOP); boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_CIM_DESTROYED);
Log.w(TAG, "manager.isConnected() == false, isManualStop == " + isManualStop);
CIMLoggingHandler.getLogger().connectState(false, isManualStop, isDestroyed);
CIMPushManager.connect(this, 0); CIMPushManager.connect(this, 0);
} else { } else {
Log.i(TAG, "manager.isConnected() == true");
CIMLoggingHandler.getLogger().connectState(true);
} }
} }
if (CIMPushManager.ACTION_SET_LOGGER_EANABLE.equals(action)) {
boolean enable = intent.getBooleanExtra(KEY_LOGGER_ENABLE, true);
CIMLoggingHandler.getLogger().debugMode(enable);
}
return START_STICKY; return START_STICKY;
} }

View File

@ -100,9 +100,6 @@ public interface CIMConstant {
// 重试连接 // 重试连接
String ACTION_CONNECTION_RECOVERY = "com.farsunset.cim.CONNECTION_RECOVERY"; String ACTION_CONNECTION_RECOVERY = "com.farsunset.cim.CONNECTION_RECOVERY";
// 未知异常
String ACTION_UNCAUGHT_EXCEPTION = "com.farsunset.cim.UNCAUGHT_EXCEPTION";
} }
} }

View File

@ -0,0 +1,200 @@
/**
* Copyright 2013-2023 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.android.filter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import android.util.Log;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleStateEvent;
@Sharable
public class CIMLoggingHandler extends ChannelDuplexHandler{
private final static String TAG = "CIM";
private boolean debug = true;
public static CIMLoggingHandler getLogger() {
return LoggerHolder.logger;
}
private CIMLoggingHandler() {
}
private static class LoggerHolder{
private static CIMLoggingHandler logger = new CIMLoggingHandler();
}
public void debugMode(boolean mode) {
debug = mode;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if(debug) {
Log.i(TAG,"OPENED" + getSessionInfo(ctx.channel()));
}
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if(debug) {
Log.i(TAG,"CLOSED" + getSessionInfo(ctx.channel()));
}
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(debug) {
Log.d(TAG,String.format("EXCEPTION" + getSessionInfo(ctx.channel()) + "\n%s", cause.getClass().getName() + ":" + cause.getMessage()));
}
ctx.channel().close();
}
public void connectFailure(InetSocketAddress remoteAddress,long interval) {
if(debug) {
Log.d(TAG,"CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(debug && evt instanceof IdleStateEvent) {
Log.d(TAG,String.format("IDLE %s" + getSessionInfo(ctx.channel()) , ((IdleStateEvent)evt).state().toString()));
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if(debug) {
Log.d(TAG,"START CONNECT REMOTE HOST: " + remoteAddress.toString());
}
ctx.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(debug) {
Log.i(TAG,String.format("RECEIVED" + getSessionInfo(ctx.channel()) + "\n%s", msg.toString()));
}
ctx.fireChannelRead(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if(debug) {
Log.i(TAG,String.format("SENT" + getSessionInfo(ctx.channel()) + "\n%s", msg.toString()));
}
ctx.write(msg, promise);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private String getSessionInfo(Channel session) {
StringBuilder builder = new StringBuilder();
if (session == null) {
return "";
}
builder.append(" [");
builder.append("id:").append(session.id().asShortText());
if (session.localAddress() != null) {
builder.append(" L:").append(session.localAddress().toString());
}
if (session.remoteAddress() != null) {
builder.append(" R:").append(session.remoteAddress().toString());
}
builder.append("]");
return builder.toString();
}
public void connectState(boolean isConnected) {
if(debug) {
Log.i(TAG,"CONNECTED:" + isConnected);
}
}
public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) {
if(debug) {
Log.i(TAG,"CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed);
}
}
}

View File

@ -31,7 +31,6 @@ import com.farsunset.cim.sdk.android.model.proto.MessageProto;
import com.farsunset.cim.sdk.android.model.proto.ReplyBodyProto; import com.farsunset.cim.sdk.android.model.proto.ReplyBodyProto;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import android.util.Log;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
@ -40,7 +39,6 @@ import io.netty.handler.codec.ByteToMessageDecoder;
* 客户端消息解码 * 客户端消息解码
*/ */
public class ClientMessageDecoder extends ByteToMessageDecoder { public class ClientMessageDecoder extends ByteToMessageDecoder {
final static String TAG = ClientMessageDecoder.class.getSimpleName();
@Override @Override
protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List<Object> queue) throws Exception { protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List<Object> queue) throws Exception {
@ -84,7 +82,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
if (CIMConstant.ProtobufType.S_H_RQ == type) { if (CIMConstant.ProtobufType.S_H_RQ == type) {
HeartbeatRequest request = HeartbeatRequest.getInstance(); HeartbeatRequest request = HeartbeatRequest.getInstance();
Log.i(TAG, request.toString());
return request; return request;
} }
@ -96,9 +93,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
body.putAll(bodyProto.getDataMap()); body.putAll(bodyProto.getDataMap());
body.setCode(bodyProto.getCode()); body.setCode(bodyProto.getCode());
body.setMessage(bodyProto.getMessage()); body.setMessage(bodyProto.getMessage());
Log.i(TAG, body.toString());
return body; return body;
} }
@ -114,8 +108,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
message.setExtra(bodyProto.getExtra()); message.setExtra(bodyProto.getExtra());
message.setTimestamp(bodyProto.getTimestamp()); message.setTimestamp(bodyProto.getTimestamp());
message.setFormat(bodyProto.getFormat()); message.setFormat(bodyProto.getFormat());
Log.i(TAG, message.toString());
return message; return message;
} }

View File

@ -28,28 +28,21 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import android.util.Log;
/** /**
* 客户端消息发送前进行编码,可在此加密消息 * 客户端消息发送前进行编码,可在此加密消息
* *
*/ */
public class ClientMessageEncoder extends MessageToByteEncoder<Object> { public class ClientMessageEncoder extends MessageToByteEncoder<Protobufable> {
@Override @Override
protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Protobufable message, ByteBuf out) throws Exception {
if (message instanceof Protobufable) { Protobufable data = (Protobufable) message;
byte[] byteArray = data.getByteArray();
Protobufable data = (Protobufable) message; out.writeBytes(createHeader(data.getType(), byteArray.length));
byte[] byteArray = data.getByteArray(); out.writeBytes(byteArray);
out.writeBytes(createHeader(data.getType(), byteArray.length));
out.writeBytes(byteArray);
}
Log.i(ClientMessageEncoder.class.getSimpleName(), message.toString());
} }
/** /**

View File

@ -86,9 +86,17 @@ public class BindHandler implements CIMRequestHandler {
sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel()); sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel());
} }
// 第一次设置心跳时间为登录时间
newSession.setBindTime(System.currentTimeMillis());
/**
* 有可能是同一个设备重复连接则关闭旧的链接这种情况一般是客户端断网联网又重新链接上来之前的旧链接没有来得及通过心跳机制关闭在这里手动关闭
* 条件1连接来自是同一个设备
* 条件2.2个连接都是同一台服务器
*/
if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) {
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
oldSession.closeNow();
}
// 第一次设置心跳时间为登录时间 // 第一次设置心跳时间为登录时间
newSession.setBindTime(System.currentTimeMillis()); newSession.setBindTime(System.currentTimeMillis());

View File

@ -25,7 +25,7 @@
</value> </value>
</option> </option>
</component> </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_1_7" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/build/classes" /> <output url="file://$PROJECT_DIR$/build/classes" />
</component> </component>
<component name="ProjectType"> <component name="ProjectType">

View File

@ -1,53 +1,53 @@
/** /**
* Copyright 2013-2023 Xia Jun(3979434@qq.com). * Copyright 2013-2023 Xia Jun(3979434@qq.com).
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* * <p>
*************************************************************************************** * **************************************************************************************
* * * *
* Website : http://www.farsunset.com * * Website : http://www.farsunset.com *
* * * *
*************************************************************************************** * **************************************************************************************
*/ */
package com.farsunset.ichat.example.app; package com.farsunset.ichat.example.app;
public interface Constant { public interface Constant {
//服务端IP地址 //服务端IP地址
public static final String CIM_SERVER_HOST = "192.168.1.103"; public static final String CIM_SERVER_HOST = "192.168.1.106";
//注意这里的端口不是tomcat的端口CIM端口在服务端spring-cim.xml中配置的没改动就使用默认的23456 //注意这里的端口不是tomcat的端口CIM端口在服务端spring-cim.xml中配置的没改动就使用默认的23456
public static final int CIM_SERVER_PORT = 23456; public static final int CIM_SERVER_PORT = 23456;
public static interface MessageType{ public static interface MessageType {
//用户之间的普通消息 //用户之间的普通消息
public static final String TYPE_0 = "0"; public static final String TYPE_0 = "0";
//下线类型 //下线类型
String TYPE_999 = "999"; String TYPE_999 = "999";
} }
public static interface MessageStatus{ public static interface MessageStatus {
//消息未读 //消息未读
public static final String STATUS_0 = "0"; public static final String STATUS_0 = "0";
//消息已经读取 //消息已经读取
public static final String STATUS_1 = "1"; public static final String STATUS_1 = "1";
} }
} }

View File

@ -29,6 +29,7 @@ import android.view.View;
import android.view.animation.AlphaAnimation; import android.view.animation.AlphaAnimation;
import com.farsunset.cim.sdk.android.CIMPushManager; import com.farsunset.cim.sdk.android.CIMPushManager;
import com.farsunset.ichat.example.BuildConfig;
import com.farsunset.ichat.example.R; import com.farsunset.ichat.example.R;
import com.farsunset.ichat.example.app.CIMMonitorActivity; import com.farsunset.ichat.example.app.CIMMonitorActivity;
import com.farsunset.ichat.example.app.Constant; import com.farsunset.ichat.example.app.Constant;
@ -42,6 +43,8 @@ public class SplanshActivity extends CIMMonitorActivity{
super.onCreate(savedInstanceState); super.onCreate(savedInstanceState);
CIMPushManager.setLoggerEnable(this, BuildConfig.DEBUG);
//连接服务端 //连接服务端
CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT); CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT);

View File

@ -2,7 +2,6 @@
<classpath> <classpath>
<classpathentry kind="src" path="src"/> <classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="libs/log4j.jar"/>
<classpathentry kind="lib" path="libs/protobuf-java-3.2.0.jar"/> <classpathentry kind="lib" path="libs/protobuf-java-3.2.0.jar"/>
<classpathentry kind="lib" path="libs/netty-buffer-4.1.9.Final.jar"/> <classpathentry kind="lib" path="libs/netty-buffer-4.1.9.Final.jar"/>
<classpathentry kind="lib" path="libs/netty-codec-4.1.9.Final.jar"/> <classpathentry kind="lib" path="libs/netty-codec-4.1.9.Final.jar"/>
@ -10,5 +9,6 @@
<classpathentry kind="lib" path="libs/netty-handler-4.1.9.Final.jar"/> <classpathentry kind="lib" path="libs/netty-handler-4.1.9.Final.jar"/>
<classpathentry kind="lib" path="libs/netty-transport-4.1.9.Final.jar"/> <classpathentry kind="lib" path="libs/netty-transport-4.1.9.Final.jar"/>
<classpathentry kind="lib" path="libs/netty-resolver-4.1.9.Final.jar"/> <classpathentry kind="lib" path="libs/netty-resolver-4.1.9.Final.jar"/>
<classpathentry kind="lib" path="libs/slf4j-api-1.7.25.jar"/>
<classpathentry kind="output" path="bin"/> <classpathentry kind="output" path="bin"/>
</classpath> </classpath>

View File

@ -21,11 +21,15 @@
*/ */
package com.farsunset.cim.sdk.client; package com.farsunset.cim.sdk.client;
import java.net.InetSocketAddress;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.log4j.Logger;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException; import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException;
import com.farsunset.cim.sdk.client.filter.ClientMessageDecoder; import com.farsunset.cim.sdk.client.filter.ClientMessageDecoder;
@ -40,13 +44,13 @@ import com.farsunset.cim.sdk.client.model.SentBody;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
@ -54,6 +58,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
/** /**
* 连接服务端管理cim核心处理类管理连接以及消息处理 * 连接服务端管理cim核心处理类管理连接以及消息处理
@ -62,7 +67,7 @@ import io.netty.util.AttributeKey;
*/ */
@Sharable @Sharable
class CIMConnectorManager extends SimpleChannelInboundHandler<Object> { class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
protected final Logger logger = Logger.getLogger(CIMConnectorManager.class.getSimpleName()); protected final Logger logger = LoggerFactory.getLogger(CIMConnectorManager.class.getSimpleName());
private final int CONNECT_TIMEOUT = 10 * 1000;// private final int CONNECT_TIMEOUT = 10 * 1000;//
private final int WRITE_TIMEOUT = 10 * 1000;// private final int WRITE_TIMEOUT = 10 * 1000;//
@ -74,6 +79,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
private EventLoopGroup loopGroup; private EventLoopGroup loopGroup;
private Channel channel;; private Channel channel;;
private ExecutorService executor = Executors.newCachedThreadPool(); private ExecutorService executor = Executors.newCachedThreadPool();
private Semaphore semaphore = new Semaphore(1,true);
private static CIMConnectorManager manager; private static CIMConnectorManager manager;
private CIMConnectorManager() { private CIMConnectorManager() {
@ -107,46 +113,52 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
} }
private synchronized void syncConnection(String host, int port) { public void connect(final String host, final int port) {
if (isConnected()) { if (isConnected() || !semaphore.tryAcquire()) {
return; return;
} }
try {
logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......");
ChannelFuture channelFuture = bootstrap.connect(host, port).syncUninterruptibly();
channel = channelFuture.channel();
} catch (Exception e) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), e);
intent.putExtra("interval", interval);
sendBroadcast(intent);
logger.error(
"****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
}
public void connect(final String host, final int port) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override
public void run() { public void run() {
syncConnection(host, port);
logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......");
final InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
bootstrap.connect(remoteAddress).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.removeListener(this);
semaphore.acquire();
if(!future.isSuccess() && future.cause() != null) {
handleConnectFailure(future.cause(),remoteAddress);
}
if(future.isSuccess()) {
channel = future.channel();
}
}
});
} }
}); });
} }
public synchronized void send(SentBody body) { private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) {
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra(Exception.class.getName(), error);
intent.putExtra("interval", interval);
sendBroadcast(intent);
logger.warn("****************CIM连接服务器失败 " + remoteAddress.getHostName() + ":" + remoteAddress.getPort() + "......将在" + interval / 1000 + "秒后重新尝试连接");
}
public void send(SentBody body) {
boolean isSuccessed = false; boolean isSuccessed = false;
@ -184,10 +196,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler<Object> {
} }
public boolean isConnected() { public boolean isConnected() {
if (channel == null) { return channel != null && channel.isActive();
return false;
}
return channel.isActive();
} }
public void closeSession() { public void closeSession() {

View File

@ -25,7 +25,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.farsunset.cim.sdk.client.model.Message; import com.farsunset.cim.sdk.client.model.Message;
import com.farsunset.cim.sdk.client.model.ReplyBody; import com.farsunset.cim.sdk.client.model.ReplyBody;
@ -37,7 +38,7 @@ public class CIMListenerManager {
private static ArrayList<CIMEventListener> cimListeners = new ArrayList<CIMEventListener>(); private static ArrayList<CIMEventListener> cimListeners = new ArrayList<CIMEventListener>();
private static CIMMessageReceiveComparator comparator = new CIMMessageReceiveComparator(); private static CIMMessageReceiveComparator comparator = new CIMMessageReceiveComparator();
protected static final Logger logger = Logger.getLogger(CIMListenerManager.class); protected static final Logger logger = LoggerFactory.getLogger(CIMListenerManager.class);
public static void registerMessageListener(CIMEventListener listener) { public static void registerMessageListener(CIMEventListener listener) {

View File

@ -25,7 +25,8 @@ import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.util.Properties; import java.util.Properties;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.model.Intent; import com.farsunset.cim.sdk.client.model.Intent;
@ -35,7 +36,7 @@ import com.farsunset.cim.sdk.client.model.SentBody;
* CIM 功能接口 * CIM 功能接口
*/ */
public class CIMPushManager { public class CIMPushManager {
protected static final Logger logger = Logger.getLogger(CIMPushManager.class); protected static final Logger logger = LoggerFactory.getLogger(CIMPushManager.class);
static String ACTION_ACTIVATE_PUSH_SERVICE = "ACTION_ACTIVATE_PUSH_SERVICE"; static String ACTION_ACTIVATE_PUSH_SERVICE = "ACTION_ACTIVATE_PUSH_SERVICE";
static String ACTION_CREATE_CIM_CONNECTION = "ACTION_CREATE_CIM_CONNECTION"; static String ACTION_CREATE_CIM_CONNECTION = "ACTION_CREATE_CIM_CONNECTION";

View File

@ -23,7 +23,8 @@ package com.farsunset.cim.sdk.client.filter;
import java.util.List; import java.util.List;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.model.HeartbeatRequest; import com.farsunset.cim.sdk.client.model.HeartbeatRequest;
@ -41,7 +42,7 @@ import io.netty.handler.codec.ByteToMessageDecoder;
* 客户端消息解码 * 客户端消息解码
*/ */
public class ClientMessageDecoder extends ByteToMessageDecoder { public class ClientMessageDecoder extends ByteToMessageDecoder {
protected final Logger logger = Logger.getLogger(ClientMessageDecoder.class.getSimpleName()); protected final Logger logger = LoggerFactory.getLogger(ClientMessageDecoder.class.getSimpleName());
@Override @Override
protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List<Object> queue) throws Exception { protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List<Object> queue) throws Exception {

View File

@ -21,7 +21,8 @@
*/ */
package com.farsunset.cim.sdk.client.filter; package com.farsunset.cim.sdk.client.filter;
import org.apache.log4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.model.Protobufable; import com.farsunset.cim.sdk.client.model.Protobufable;
@ -35,7 +36,7 @@ import io.netty.handler.codec.MessageToByteEncoder;
* *
*/ */
public class ClientMessageEncoder extends MessageToByteEncoder<Object> { public class ClientMessageEncoder extends MessageToByteEncoder<Object> {
protected final Logger logger = Logger.getLogger(ClientMessageEncoder.class.getSimpleName()); protected final Logger logger = LoggerFactory.getLogger(ClientMessageEncoder.class.getSimpleName());
@Override @Override
protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception {

View File

@ -23,6 +23,7 @@ package com.farsunset.cim.sdk.server.handler;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import com.farsunset.cim.sdk.server.constant.CIMConstant; import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.filter.ServerMessageDecoder; import com.farsunset.cim.sdk.server.filter.ServerMessageDecoder;
@ -32,6 +33,7 @@ import com.farsunset.cim.sdk.server.model.SentBody;
import com.farsunset.cim.sdk.server.session.CIMSession; import com.farsunset.cim.sdk.server.session.CIMSession;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -61,7 +63,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
private HashMap<String, CIMRequestHandler> innerHandlerMap = new HashMap<String, CIMRequestHandler>(); private HashMap<String, CIMRequestHandler> innerHandlerMap = new HashMap<String, CIMRequestHandler>();
private CIMRequestHandler outerRequestHandler; private CIMRequestHandler outerRequestHandler;
private ConcurrentHashMap<String,Channel> channelGroup = new ConcurrentHashMap<String,Channel>();
private int port; private int port;
// 连接空闲时间 // 连接空闲时间
@ -108,6 +110,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
this.outerRequestHandler = outerRequestHandler; this.outerRequestHandler = outerRequestHandler;
} }
@Override
protected void channelRead0(ChannelHandlerContext ctx, SentBody body) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, SentBody body) throws Exception {
CIMSession session = new CIMSession(ctx.channel()); CIMSession session = new CIMSession(ctx.channel());
@ -127,9 +130,9 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
outerRequestHandler.process(session, body); outerRequestHandler.process(session, body);
} }
/** @Override
*/ public void channelInactive(ChannelHandlerContext ctx) {
public void channelInactive(ChannelHandlerContext ctx) throws Exception { channelGroup.remove(ctx.channel().id().asShortText());
CIMSession session = new CIMSession(ctx.channel()); CIMSession session = new CIMSession(ctx.channel());
SentBody body = new SentBody(); SentBody body = new SentBody();
@ -138,8 +141,12 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
} }
/** @Override
*/ public void channelActive(ChannelHandlerContext ctx){
channelGroup.put(ctx.channel().id().asShortText(),ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)) { if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)) {
ctx.channel().attr(AttributeKey.valueOf(CIMConstant.HEARTBEAT_KEY)).set(System.currentTimeMillis()); ctx.channel().attr(AttributeKey.valueOf(CIMConstant.HEARTBEAT_KEY)).set(System.currentTimeMillis());
@ -162,4 +169,11 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
this.port = port; this.port = port;
} }
public Channel getManagedChannel(String id) {
if (id == null) {
return null;
}
return channelGroup.get(id);
}
} }

Binary file not shown.