diff --git a/cim_for_mina/cim-android-sdk/.classpath b/cim_for_mina/cim-android-sdk/.classpath index b7cde6d..664a9cb 100644 --- a/cim_for_mina/cim-android-sdk/.classpath +++ b/cim_for_mina/cim-android-sdk/.classpath @@ -5,6 +5,6 @@ - + diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java index f466b66..67df9da 100644 --- a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java @@ -26,8 +26,10 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; @@ -41,11 +43,11 @@ import android.content.Context; import android.content.Intent; import android.net.ConnectivityManager; import android.net.NetworkInfo; -import android.util.Log; import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.exception.SessionClosedException; import com.farsunset.cim.sdk.android.exception.NetworkDisabledException; +import com.farsunset.cim.sdk.android.filter.CIMLoggingFilter; import com.farsunset.cim.sdk.android.filter.ClientMessageCodecFactory; import com.farsunset.cim.sdk.android.model.HeartbeatRequest; import com.farsunset.cim.sdk.android.model.HeartbeatResponse; @@ -60,7 +62,6 @@ import com.farsunset.cim.sdk.android.model.SentBody; */ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFactory { - private final String TAG = CIMConnectorManager.class.getSimpleName(); private final int READ_BUFFER_SIZE = 2048;// bit private final int CONNECT_TIMEOUT = 10 * 1000;// 秒 private final int WRITE_TIMEOUT = 10 * 1000;// 秒 @@ -70,8 +71,9 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME"; private NioSocketConnector connector; - private ConnectFuture connectFuture; private ExecutorService executor = Executors.newFixedThreadPool(1); + private Semaphore semaphore = new Semaphore(1,true); + private Context context; private static CIMConnectorManager manager; @@ -92,7 +94,7 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientMessageCodecFactory())); connector.getFilterChain().addLast("heartbeat", keepAliveaHandler); - + connector.getFilterChain().addLast("logger",CIMLoggingFilter.getLogger()); connector.setHandler(this); } @@ -105,37 +107,20 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa } - private synchronized void syncConnection(final String host, final int port) { + private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) { + long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - if (isConnected()) { - return; - } + CIMLoggingFilter.getLogger().connectFailure(remoteAddress, interval); - try { - - Log.i(TAG, "****************CIM正在连接服务器 " + host + ":" + port + "......"); - - CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); - InetSocketAddress remoteSocketAddress = new InetSocketAddress(host, port); - connectFuture = connector.connect(remoteSocketAddress); - connectFuture.awaitUninterruptibly(); - connectFuture.getSession(); - } catch (Exception e) { - - long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); - intent.putExtra(Exception.class.getName(), e.getClass().getSimpleName()); - intent.putExtra("interval", interval); - context.sendBroadcast(intent); - - Log.e(TAG, "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接"); - - } + + Intent intent = new Intent(); + intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); + intent.putExtra(Exception.class.getName(), error.getClass().getSimpleName()); + intent.putExtra("interval", interval); + context.sendBroadcast(intent); } - + public void connect(final String host, final int port) { if (!isNetworkConnected(context)) { @@ -147,16 +132,36 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa return; } - + + if (isConnected() || !semaphore.tryAcquire()) { + return; + } + executor.execute(new Runnable() { + @Override public void run() { - syncConnection(host, port); + + CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); + final InetSocketAddress remoteAddress = new InetSocketAddress(host, port); + + CIMLoggingFilter.getLogger().startConnect(remoteAddress); + + connector.connect(remoteAddress).addListener(new IoFutureListener() { + @Override + public void operationComplete(ConnectFuture future) { + semaphore.release(); + future.removeListener(this); + if(future.getException() != null) { + handleConnectFailure(future.getException(),remoteAddress); + } + } + }); } }); } - public synchronized void send(SentBody body) { + public void send(SentBody body) { boolean isSuccessed = false; @@ -222,8 +227,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa @Override public void sessionCreated(IoSession session) throws Exception { - Log.i(TAG, "****************CIM连接服务器成功:" + session.getLocalAddress() + " NID:" + session.getId()); - setLastHeartbeatTime(session); Intent intent = new Intent(); @@ -235,8 +238,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa @Override public void sessionClosed(IoSession session) { - Log.e(TAG, "****************CIM与服务器断开连接:" + session.getLocalAddress() + " NID:" + session.getId()); - Intent intent = new Intent(); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); context.sendBroadcast(intent); @@ -245,9 +246,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa @Override public void sessionIdle(IoSession session, IdleStatus status) { - Log.d(TAG, "****************CIM " + status.toString().toUpperCase() + ":" + session.getLocalAddress() + " NID:" - + session.getId() + " isConnected:" + session.isConnected()); - /** * 用于解决,wifi情况下。偶而路由器与服务器断开连接时,客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题 * @@ -255,17 +253,6 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa long lastHeartbeatTime = getLastHeartbeatTime(session); if (System.currentTimeMillis() - lastHeartbeatTime >= HEARBEAT_TIME_OUT) { session.closeNow(); - Log.e(TAG, "****************CIM心跳超时 ,即将重新连接......" + " NID:" + session.getId()); - } - } - - @Override - public void exceptionCaught(IoSession session, Throwable cause) { - - Log.e(TAG, "****************CIM连接出现未知异常:" + session.getLocalAddress() + " NID:" + session.getId()); - - if (cause != null && cause.getMessage() != null) { - Log.e(TAG, cause.getMessage()); } } diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java index c161d8b..7be847d 100644 --- a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java @@ -45,6 +45,8 @@ public class CIMPushManager { static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION"; + static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE"; + static String ACTION_DESTORY = "ACTION_DESTORY"; static String KEY_SEND_BODY = "KEY_SEND_BODY"; @@ -85,6 +87,13 @@ public class CIMPushManager { } + public static void setLoggerEnable(Context context,boolean enable) { + Intent serviceIntent = new Intent(context, CIMPushService.class); + serviceIntent.putExtra(CIMPushService.KEY_LOGGER_ENABLE, enable); + serviceIntent.setAction(ACTION_SET_LOGGER_EANABLE); + context.startService(serviceIntent); + } + protected static void connect(Context context, long delayedTime) { boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP); diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java index 10d4528..5c1f893 100644 --- a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java @@ -26,7 +26,8 @@ import android.content.Intent; import android.os.Handler; import android.os.IBinder; import android.os.Message; -import android.util.Log; + +import com.farsunset.cim.sdk.android.filter.CIMLoggingFilter; import com.farsunset.cim.sdk.android.model.SentBody; /** @@ -36,8 +37,9 @@ import com.farsunset.cim.sdk.android.model.SentBody; * */ public class CIMPushService extends Service { - private final String TAG = CIMPushService.class.getSimpleName(); public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; + public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE"; + private CIMConnectorManager manager; @Override @@ -97,17 +99,24 @@ public class CIMPushService extends Service { if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { if (!manager.isConnected()) { - boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), - CIMCacheManager.KEY_MANUAL_STOP); - Log.w(TAG, "manager.isConnected() == false, isManualStop == " + isManualStop); + boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_MANUAL_STOP); + boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_CIM_DESTROYED); + + CIMLoggingFilter.getLogger().connectState(false, isManualStop, isDestroyed); + CIMPushManager.connect(this, 0); } else { - Log.i(TAG, "manager.isConnected() == true"); + CIMLoggingFilter.getLogger().connectState(true); } } + if (CIMPushManager.ACTION_SET_LOGGER_EANABLE.equals(action)) { + boolean enable = intent.getBooleanExtra(KEY_LOGGER_ENABLE, true); + CIMLoggingFilter.getLogger().debugMode(enable); + } + return START_STICKY; } diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingFilter.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingFilter.java new file mode 100644 index 0000000..df48b62 --- /dev/null +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingFilter.java @@ -0,0 +1,154 @@ +/** + * Copyright 2013-2023 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ +package com.farsunset.cim.sdk.android.filter; + +import java.net.InetSocketAddress; + +import org.apache.mina.core.filterchain.IoFilterAdapter; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; + +import android.util.Log; + + +/** + * 日志打印,添加session 的id和ip address + */ +public class CIMLoggingFilter extends IoFilterAdapter { + private final static String TAG = "CIM"; + private boolean debug = true; + + public static CIMLoggingFilter getLogger() { + return LoggerHolder.logger; + } + + private CIMLoggingFilter() { + + } + + private static class LoggerHolder{ + private static CIMLoggingFilter logger = new CIMLoggingFilter(); + } + + public void debugMode(boolean mode) { + debug = mode; + } + + @Override + public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { + if(debug) { + Log.d(TAG,String.format("EXCEPTION" + getSessionInfo(session) + "\n%s", cause.getClass().getName())); + } + session.closeOnFlush(); + } + + @Override + public void messageReceived(NextFilter nextFilter, IoSession session, Object message) { + if(debug) { + Log.i(TAG,String.format("RECEIVED" + getSessionInfo(session) + "\n%s", message)); + } + nextFilter.messageReceived(session, message); + } + + @Override + public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { + if(debug) { + Log.i(TAG,String.format("SENT" + getSessionInfo(session) + "\n%s", writeRequest.getOriginalRequest().getMessage())); + } + nextFilter.messageSent(session, writeRequest); + } + + @Override + public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { + nextFilter.sessionCreated(session); + } + + @Override + public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { + if(debug) { + Log.i(TAG,"OPENED" + getSessionInfo(session)); + } + nextFilter.sessionOpened(session); + } + + @Override + public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { + if(debug) { + Log.i(TAG,"IDLE " + status.toString().toUpperCase() + getSessionInfo(session)); + } + nextFilter.sessionIdle(session, status); + } + + @Override + public void sessionClosed(NextFilter nextFilter, IoSession session) { + if(debug) { + Log.i(TAG,"CLOSED" + getSessionInfo(session)); + } + nextFilter.sessionClosed(session); + } + + public void connectFailure(InetSocketAddress remoteAddress,long interval) { + if(debug) { + Log.i(TAG,"CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms"); + } + } + + public void startConnect(InetSocketAddress remoteAddress) { + if(debug) { + Log.i(TAG,"START CONNECT REMOTE HOST: " + remoteAddress.toString()); + } + } + + public void connectState(boolean isConnected) { + if(debug) { + Log.i(TAG,"CONNECTED:" + isConnected); + } + } + + public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) { + if(debug) { + Log.i(TAG,"CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed); + } + } + private String getSessionInfo(IoSession session) { + StringBuilder builder = new StringBuilder(); + if (session == null) { + return ""; + } + builder.append(" ["); + builder.append("id:").append(session.getId()); + + if (session.getLocalAddress() != null) { + builder.append(" L:").append(session.getLocalAddress().toString()); + } + + + if (session.getRemoteAddress() != null) { + builder.append(" R:").append(session.getRemoteAddress().toString()); + } + builder.append("]"); + return builder.toString(); + } + + +} diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java index d3b96fd..9915a4a 100644 --- a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java @@ -25,7 +25,6 @@ import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import android.util.Log; import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.model.HeartbeatRequest; @@ -40,7 +39,6 @@ import com.google.protobuf.InvalidProtocolBufferException; */ public class ClientMessageDecoder extends CumulativeProtocolDecoder { - final static String TAG = ClientMessageDecoder.class.getSimpleName(); @Override public boolean doDecode(IoSession iosession, IoBuffer iobuffer, ProtocolDecoderOutput out) throws Exception { @@ -82,7 +80,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder { if (CIMConstant.ProtobufType.S_H_RQ == type) { HeartbeatRequest request = HeartbeatRequest.getInstance(); - Log.i(TAG, request.toString()); return request; } @@ -94,9 +91,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder { body.putAll(bodyProto.getDataMap()); body.setCode(bodyProto.getCode()); body.setMessage(bodyProto.getMessage()); - - Log.i(TAG, body.toString()); - return body; } @@ -112,8 +106,6 @@ public class ClientMessageDecoder extends CumulativeProtocolDecoder { message.setExtra(bodyProto.getExtra()); message.setTimestamp(bodyProto.getTimestamp()); message.setFormat(bodyProto.getFormat()); - - Log.i(TAG, message.toString()); return message; } diff --git a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java index 88d5f74..9a119ae 100644 --- a/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java +++ b/cim_for_mina/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java @@ -25,8 +25,6 @@ import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import android.util.Log; - import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.model.Protobufable; @@ -35,8 +33,6 @@ import com.farsunset.cim.sdk.android.model.Protobufable; */ public class ClientMessageEncoder extends ProtocolEncoderAdapter { - final static String TAG = ClientMessageEncoder.class.getSimpleName(); - @Override public void encode(IoSession iosession, Object object, ProtocolEncoderOutput out) throws Exception { @@ -52,8 +48,6 @@ public class ClientMessageEncoder extends ProtocolEncoderAdapter { buff.flip(); out.write(buff); - // 打印出收到的消息 - Log.i(TAG, data.toString()); } } diff --git a/cim_for_mina/cim-boot-server/.settings/org.eclipse.buildship.core.prefs b/cim_for_mina/cim-boot-server/.settings/org.eclipse.buildship.core.prefs index 51a1303..e889521 100644 --- a/cim_for_mina/cim-boot-server/.settings/org.eclipse.buildship.core.prefs +++ b/cim_for_mina/cim-boot-server/.settings/org.eclipse.buildship.core.prefs @@ -1,8 +1,2 @@ -auto.sync=false -build.scans.enabled=true -connection.gradle.distribution=GRADLE_DISTRIBUTION(VERSION(4.5)) connection.project.dir= eclipse.preferences.version=1 -gradle.user.home= -offline.mode=false -override.workspace.settings=true diff --git a/cim_for_mina/cim-boot-server/libs/cim-server-sdk-3.5.jar b/cim_for_mina/cim-boot-server/libs/cim-server-sdk-3.5.jar index 5fc22ff..ac9ec95 100644 Binary files a/cim_for_mina/cim-boot-server/libs/cim-server-sdk-3.5.jar and b/cim_for_mina/cim-boot-server/libs/cim-server-sdk-3.5.jar differ diff --git a/cim_for_mina/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java b/cim_for_mina/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java index 2a75c95..86dcf98 100644 --- a/cim_for_mina/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java +++ b/cim_for_mina/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java @@ -74,7 +74,8 @@ public class BindHandler implements CIMRequestHandler { newSession.setSystemVersion(message.get("osVersion")); newSession.setBindTime(System.currentTimeMillis()); newSession.setPackageName(message.get("packageName")); - + newSession.setHeartbeat(System.currentTimeMillis()); + /** * 由于客户端断线服务端可能会无法获知的情况,客户端重连时,需要关闭旧的连接 */ @@ -86,13 +87,18 @@ public class BindHandler implements CIMRequestHandler { sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel()); } - // 第一次设置心跳时间为登录时间 - newSession.setBindTime(System.currentTimeMillis()); - - - // 第一次设置心跳时间为登录时间 - newSession.setBindTime(System.currentTimeMillis()); - newSession.setHeartbeat(System.currentTimeMillis()); + + /** + * 有可能是同一个设备重复连接,则关闭旧的链接,这种情况一般是客户端断网,联网又重新链接上来,之前的旧链接没有来得及通过心跳机制关闭,在这里手动关闭 + * 条件1,连接来自是同一个设备 + * 条件2.2个连接都是同一台服务器 + */ + + if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) { + oldSession.removeAttribute(CIMConstant.SESSION_KEY); + oldSession.closeOnFlush(); + } + sessionManager.add(newSession); diff --git a/cim_for_mina/cim-client-android/.idea/caches/build_file_checksums.ser b/cim_for_mina/cim-client-android/.idea/caches/build_file_checksums.ser index 659d272..c46967b 100644 Binary files a/cim_for_mina/cim-client-android/.idea/caches/build_file_checksums.ser and b/cim_for_mina/cim-client-android/.idea/caches/build_file_checksums.ser differ diff --git a/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.4.jar b/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.4.jar deleted file mode 100644 index 1c8aa7d..0000000 Binary files a/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.4.jar and /dev/null differ diff --git a/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.5.jar b/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.5.jar new file mode 100644 index 0000000..f32b4f9 Binary files /dev/null and b/cim_for_mina/cim-client-android/app/libs/cim-android-sdk-3.5.jar differ diff --git a/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java b/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java index b4ad7ef..8cf51cf 100644 --- a/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java +++ b/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java @@ -24,7 +24,7 @@ package com.farsunset.ichat.example.app; public interface Constant { //服务端IP地址 - public static final String CIM_SERVER_HOST = "192.168.1.110"; + public static final String CIM_SERVER_HOST = "192.168.1.106"; //注意,这里的端口不是tomcat的端口,CIM端口在服务端spring-cim.xml中配置的,没改动就使用默认的23456 diff --git a/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java b/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java index cb396ea..9bcee42 100644 --- a/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java +++ b/cim_for_mina/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java @@ -29,6 +29,7 @@ import android.view.View; import android.view.animation.AlphaAnimation; import com.farsunset.cim.sdk.android.CIMPushManager; +import com.farsunset.ichat.example.BuildConfig; import com.farsunset.ichat.example.R; import com.farsunset.ichat.example.app.CIMMonitorActivity; import com.farsunset.ichat.example.app.Constant; @@ -41,7 +42,8 @@ public class SplanshActivity extends CIMMonitorActivity{ super.onCreate(savedInstanceState); - + + CIMPushManager.setLoggerEnable(this,BuildConfig.DEBUG); //连接服务端 CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT); diff --git a/cim_for_mina/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java b/cim_for_mina/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java index ba1ab41..6221125 100644 --- a/cim_for_mina/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java +++ b/cim_for_mina/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java @@ -26,9 +26,11 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import org.apache.log4j.Logger; import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; @@ -38,6 +40,7 @@ import org.apache.mina.filter.keepalive.KeepAliveFilter; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler; import org.apache.mina.transport.socket.nio.NioSocketConnector; + import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException; import com.farsunset.cim.sdk.client.filter.ClientMessageCodecFactory; @@ -65,8 +68,8 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME"; private NioSocketConnector connector; - private ConnectFuture connectFuture; private ExecutorService executor = Executors.newFixedThreadPool(1); + private Semaphore semaphore = new Semaphore(1,true); private static CIMConnectorManager manager; @@ -97,49 +100,50 @@ class CIMConnectorManager extends IoHandlerAdapter implements KeepAliveMessageFa } - private synchronized void syncConnection(final String host, final int port) { - - if (isConnected()) { - return; - } - - try { - - logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......"); - - CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); - InetSocketAddress remoteSocketAddress = new InetSocketAddress(host, port); - connectFuture = connector.connect(remoteSocketAddress); - connectFuture.awaitUninterruptibly(); - connectFuture.getSession(); - } catch (Exception e) { - - long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); - intent.putExtra(Exception.class.getName(), e); - intent.putExtra("interval", interval); - sendBroadcast(intent); - - logger.error( - "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接"); - - } - - } - + + public void connect(final String host, final int port) { + if (isConnected() || !semaphore.tryAcquire()) { + return; + } + executor.execute(new Runnable() { + @Override public void run() { - syncConnection(host, port); + + logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......"); + InetSocketAddress remoteAddress = new InetSocketAddress(host, port); + CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); + connector.connect(remoteAddress).addListener(new IoFutureListener() { + @Override + public void operationComplete(ConnectFuture future) { + semaphore.release(); + future.removeListener(this); + if(future.getException() != null) { + handleConnectFailure(future.getException(),remoteAddress); + } + } + }); } }); } - public synchronized void send(SentBody body) { + private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) { + long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); + + Intent intent = new Intent(); + intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); + intent.putExtra(Exception.class.getName(), error); + intent.putExtra("interval", interval); + sendBroadcast(intent); + + logger.error("****************CIM连接服务器失败 " + remoteAddress.getHostString() + ":" + remoteAddress.getPort() + "......将在" + interval / 1000 + "秒后重新尝试连接"); + + } + + public void send(SentBody body) { boolean isSuccessed = false; diff --git a/cim_for_mina/cim-server-sdk/.classpath b/cim_for_mina/cim-server-sdk/.classpath index 2fcf73d..3430be1 100644 --- a/cim_for_mina/cim-server-sdk/.classpath +++ b/cim_for_mina/cim-server-sdk/.classpath @@ -1,15 +1,13 @@ - - - - - - - + + + + + + + + + + diff --git a/cim_for_mina/cim-server-sdk/.project b/cim_for_mina/cim-server-sdk/.project index 169d462..70c4662 100644 --- a/cim_for_mina/cim-server-sdk/.project +++ b/cim_for_mina/cim-server-sdk/.project @@ -22,8 +22,6 @@ - org.eclipse.jem.workbench.JavaEMFNature - org.eclipse.wst.common.modulecore.ModuleCoreNature org.eclipse.jdt.core.javanature org.eclipse.wst.common.project.facet.core.nature diff --git a/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.component b/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.component deleted file mode 100644 index 88c7540..0000000 --- a/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.component +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - diff --git a/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.project.facet.core.xml b/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.project.facet.core.xml index 2d18782..10c580d 100644 --- a/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.project.facet.core.xml +++ b/cim_for_mina/cim-server-sdk/.settings/org.eclipse.wst.common.project.facet.core.xml @@ -1,8 +1,5 @@ - - - - - + + diff --git a/cim_for_mina/cim-server-sdk/src/com/farsunset/cim/sdk/server/filter/CIMLoggingFilter.java b/cim_for_mina/cim-server-sdk/src/com/farsunset/cim/sdk/server/filter/CIMLoggingFilter.java index a075400..f5e8c44 100644 --- a/cim_for_mina/cim-server-sdk/src/com/farsunset/cim/sdk/server/filter/CIMLoggingFilter.java +++ b/cim_for_mina/cim-server-sdk/src/com/farsunset/cim/sdk/server/filter/CIMLoggingFilter.java @@ -28,6 +28,8 @@ import org.apache.mina.core.write.WriteRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.farsunset.cim.sdk.server.constant.CIMConstant; + /** * 日志打印,添加session 的id和ip address */ @@ -36,21 +38,20 @@ public class CIMLoggingFilter extends IoFilterAdapter { private final Logger logger = LoggerFactory.getLogger(CIMLoggingFilter.class); @Override - public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { - - logger.info("EXCEPTION" + getSessionInfo(session) + ":", cause); - nextFilter.exceptionCaught(session, cause); + public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { + logger.info("EXCEPTION" + getSessionInfo(session) + "\n{}", cause.getClass().getName()); + session.closeOnFlush(); } @Override - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { - logger.info("RECEIVED" + getSessionInfo(session) + ": {}", message); + public void messageReceived(NextFilter nextFilter, IoSession session, Object message) { + logger.info("RECEIVED" + getSessionInfo(session) + "\n{}", message); nextFilter.messageReceived(session, message); } @Override - public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - logger.info("SENT" + getSessionInfo(session) + ": {}", writeRequest.getOriginalRequest().getMessage()); + public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { + logger.info("SENT" + getSessionInfo(session) + "\n{}", writeRequest.getOriginalRequest().getMessage()); nextFilter.messageSent(session, writeRequest); } @@ -67,13 +68,13 @@ public class CIMLoggingFilter extends IoFilterAdapter { } @Override - public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { + public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { logger.info("IDLE" + getSessionInfo(session)); nextFilter.sessionIdle(session, status); } @Override - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { + public void sessionClosed(NextFilter nextFilter, IoSession session) { logger.info("CLOSED" + getSessionInfo(session)); nextFilter.sessionClosed(session); } @@ -83,10 +84,20 @@ public class CIMLoggingFilter extends IoFilterAdapter { if (session == null) { return ""; } - builder.append("["); + builder.append(" ["); builder.append("id:").append(session.getId()); + + if (session.getLocalAddress() != null) { + builder.append(" L:").append(session.getLocalAddress().toString()); + } + + if (session.getRemoteAddress() != null) { - builder.append(" address:").append(session.getRemoteAddress().toString()); + builder.append(" R:").append(session.getRemoteAddress().toString()); + } + + if (session.containsAttribute(CIMConstant.SESSION_KEY)) { + builder.append(" account:").append(session.getAttribute(CIMConstant.SESSION_KEY)); } builder.append("]"); return builder.toString(); diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java index 5430335..e3743c0 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java @@ -21,17 +21,18 @@ */ package com.farsunset.cim.sdk.android; +import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + import android.content.Context; import android.content.Intent; import android.net.ConnectivityManager; import android.net.NetworkInfo; -import android.util.Log; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -45,7 +46,11 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.channel.ChannelHandler.Sharable; + import com.farsunset.cim.sdk.android.constant.CIMConstant; +import com.farsunset.cim.sdk.android.filter.CIMLoggingHandler; import com.farsunset.cim.sdk.android.filter.ClientMessageDecoder; import com.farsunset.cim.sdk.android.filter.ClientMessageEncoder; import com.farsunset.cim.sdk.android.exception.NetworkDisabledException; @@ -56,15 +61,9 @@ import com.farsunset.cim.sdk.android.model.Message; import com.farsunset.cim.sdk.android.model.ReplyBody; import com.farsunset.cim.sdk.android.model.SentBody; -/** - * 连接服务端管理,cim核心处理类,管理连接,以及消息处理 - * - * @author 3979434@qq.com - */ @Sharable class CIMConnectorManager extends SimpleChannelInboundHandler { - private final String TAG = CIMConnectorManager.class.getSimpleName(); private final int CONNECT_TIMEOUT = 10 * 1000;// 秒 private final int WRITE_TIMEOUT = 10 * 1000;// 秒 @@ -76,6 +75,8 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { private EventLoopGroup loopGroup; private Channel channel;; private ExecutorService executor = Executors.newFixedThreadPool(1); + private Semaphore semaphore = new Semaphore(1,true); + private Context context; private static CIMConnectorManager manager; @@ -84,19 +85,19 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { context = ctx; bootstrap = new Bootstrap(); - loopGroup = new NioEventLoopGroup(); + loopGroup = new NioEventLoopGroup(1); bootstrap.group(loopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT); - bootstrap.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientMessageDecoder()); ch.pipeline().addLast(new ClientMessageEncoder()); ch.pipeline().addLast(new IdleStateHandler(READ_IDLE_TIME, 0, 0)); + ch.pipeline().addLast(CIMLoggingHandler.getLogger()); ch.pipeline().addLast(CIMConnectorManager.this); } }); @@ -111,32 +112,22 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { } - private synchronized void syncConnection(String host, int port) { - - if (isConnected()) { - return; - } - - try { - - Log.i(TAG, "****************CIM正在连接服务器 " + host + ":" + port + "......"); - ChannelFuture channelFuture = bootstrap.connect(host, port).syncUninterruptibly(); - channel = channelFuture.channel(); - } catch (Exception e) { - - long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); - intent.putExtra(Exception.class.getName(), e.getClass().getSimpleName()); - intent.putExtra("interval", interval); - context.sendBroadcast(intent); - - Log.e(TAG, "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接"); - - } + private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) { + + long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); + + CIMLoggingHandler.getLogger().connectFailure(remoteAddress, interval); + + Intent intent = new Intent(); + intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); + intent.putExtra(Exception.class.getName(), error.getClass().getSimpleName()); + intent.putExtra("interval", interval); + context.sendBroadcast(intent); + + } + public void connect(final String host, final int port) { @@ -150,21 +141,43 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { return; } + if (isConnected() || !semaphore.tryAcquire()) { + return; + } + executor.execute(new Runnable() { + @Override public void run() { - syncConnection(host, port); + + final InetSocketAddress remoteAddress = new InetSocketAddress(host, port); + bootstrap.connect(remoteAddress).addListener(new GenericFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) { + semaphore.release(); + future.removeListener(this); + if(!future.isSuccess() && future.cause() != null) { + handleConnectFailure(future.cause(),remoteAddress); + } + + if(future.isSuccess()) { + channel = future.channel(); + } + + } + }); } }); } - public synchronized void send(SentBody body) { + public void send(SentBody body) { boolean isSuccessed = false; String exceptionName = SessionClosedException.class.getSimpleName(); - if (channel != null && channel.isActive()) { + if (isConnected()) { ChannelFuture future = channel.writeAndFlush(body); isSuccessed = future.awaitUninterruptibly(WRITE_TIMEOUT); if (!isSuccessed && future.cause() != null) { @@ -201,10 +214,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { } public boolean isConnected() { - if (channel == null) { - return false; - } - return channel.isActive(); + return channel != null && channel.isActive(); } public void closeSession() { @@ -216,8 +226,6 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - Log.i(TAG, "****************CIM连接服务器成功:" + ctx.channel().localAddress() + " NID:" - + ctx.channel().id().asShortText()); setLastHeartbeatTime(ctx.channel()); @@ -230,9 +238,6 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) { - Log.e(TAG, "****************CIM与服务器断开连接:" + ctx.channel().localAddress() + " NID:" - + ctx.channel().id().asShortText()); - Intent intent = new Intent(); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); context.sendBroadcast(intent); @@ -247,34 +252,14 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { * */ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.READER_IDLE)) { - Log.d(TAG, "****************CIM " + IdleState.READER_IDLE + ":" + ctx.channel().localAddress() + " NID:" - + ctx.channel().id().asShortText()); - Long lastTime = getLastHeartbeatTime(ctx.channel()); if (lastTime != null && System.currentTimeMillis() - lastTime > HEARBEAT_TIME_OUT) { channel.close(); - Log.e(TAG, "****************CIM心跳超时 ,即将重新连接......" + " NID:" + ctx.channel().id().asShortText()); } } } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - - Log.e(TAG, "****************CIM连接出现未知异常:" + ctx.channel().localAddress() + " NID:" - + ctx.channel().id().asShortText()); - - if (cause != null && cause.getMessage() != null) { - Log.e(TAG, cause.getMessage()); - } - - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_UNCAUGHT_EXCEPTION); - intent.putExtra(Exception.class.getName(), cause.getClass().getSimpleName()); - context.sendBroadcast(intent); - } - @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Message) { diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java index c161d8b..52a6481 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java @@ -45,6 +45,8 @@ public class CIMPushManager { static String ACTION_CLOSE_CIM_CONNECTION = "ACTION_CLOSE_CIM_CONNECTION"; + static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE"; + static String ACTION_DESTORY = "ACTION_DESTORY"; static String KEY_SEND_BODY = "KEY_SEND_BODY"; @@ -85,6 +87,14 @@ public class CIMPushManager { } + public static void setLoggerEnable(Context context,boolean enable) { + Intent serviceIntent = new Intent(context, CIMPushService.class); + serviceIntent.putExtra(CIMPushService.KEY_LOGGER_ENABLE, enable); + serviceIntent.setAction(ACTION_SET_LOGGER_EANABLE); + context.startService(serviceIntent); + } + + protected static void connect(Context context, long delayedTime) { boolean isManualStop = CIMCacheManager.getBoolean(context, CIMCacheManager.KEY_MANUAL_STOP); diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java index 10d4528..ded4ff9 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java @@ -26,7 +26,8 @@ import android.content.Intent; import android.os.Handler; import android.os.IBinder; import android.os.Message; -import android.util.Log; + +import com.farsunset.cim.sdk.android.filter.CIMLoggingHandler; import com.farsunset.cim.sdk.android.model.SentBody; /** @@ -36,8 +37,8 @@ import com.farsunset.cim.sdk.android.model.SentBody; * */ public class CIMPushService extends Service { - private final String TAG = CIMPushService.class.getSimpleName(); public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; + public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE"; private CIMConnectorManager manager; @Override @@ -97,16 +98,24 @@ public class CIMPushService extends Service { if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { if (!manager.isConnected()) { - boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), - CIMCacheManager.KEY_MANUAL_STOP); - Log.w(TAG, "manager.isConnected() == false, isManualStop == " + isManualStop); + boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_MANUAL_STOP); + boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(),CIMCacheManager.KEY_CIM_DESTROYED); + + CIMLoggingHandler.getLogger().connectState(false, isManualStop, isDestroyed); + CIMPushManager.connect(this, 0); } else { - Log.i(TAG, "manager.isConnected() == true"); + + CIMLoggingHandler.getLogger().connectState(true); } } + + if (CIMPushManager.ACTION_SET_LOGGER_EANABLE.equals(action)) { + boolean enable = intent.getBooleanExtra(KEY_LOGGER_ENABLE, true); + CIMLoggingHandler.getLogger().debugMode(enable); + } return START_STICKY; } diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/constant/CIMConstant.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/constant/CIMConstant.java index 2efa7cd..d4e8519 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/constant/CIMConstant.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/constant/CIMConstant.java @@ -100,9 +100,6 @@ public interface CIMConstant { // 重试连接 String ACTION_CONNECTION_RECOVERY = "com.farsunset.cim.CONNECTION_RECOVERY"; - - // 未知异常 - String ACTION_UNCAUGHT_EXCEPTION = "com.farsunset.cim.UNCAUGHT_EXCEPTION"; } } diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingHandler.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingHandler.java new file mode 100644 index 0000000..1ed5937 --- /dev/null +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/CIMLoggingHandler.java @@ -0,0 +1,200 @@ +/** + * Copyright 2013-2023 Xia Jun(3979434@qq.com). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + *************************************************************************************** + */ +package com.farsunset.cim.sdk.android.filter; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + + +import android.util.Log; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; +@Sharable +public class CIMLoggingHandler extends ChannelDuplexHandler{ + + private final static String TAG = "CIM"; + private boolean debug = true; + + public static CIMLoggingHandler getLogger() { + return LoggerHolder.logger; + } + + private CIMLoggingHandler() { + + } + + private static class LoggerHolder{ + private static CIMLoggingHandler logger = new CIMLoggingHandler(); + } + + public void debugMode(boolean mode) { + debug = mode; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + ctx.fireChannelRegistered(); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + ctx.fireChannelUnregistered(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if(debug) { + Log.i(TAG,"OPENED" + getSessionInfo(ctx.channel())); + } + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if(debug) { + Log.i(TAG,"CLOSED" + getSessionInfo(ctx.channel())); + } + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + + if(debug) { + Log.d(TAG,String.format("EXCEPTION" + getSessionInfo(ctx.channel()) + "\n%s", cause.getClass().getName() + ":" + cause.getMessage())); + } + + ctx.channel().close(); + } + + public void connectFailure(InetSocketAddress remoteAddress,long interval) { + if(debug) { + Log.d(TAG,"CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms"); + } + } + + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if(debug && evt instanceof IdleStateEvent) { + Log.d(TAG,String.format("IDLE %s" + getSessionInfo(ctx.channel()) , ((IdleStateEvent)evt).state().toString())); + } + ctx.fireUserEventTriggered(evt); + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { + ctx.bind(localAddress, promise); + } + + @Override + public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { + if(debug) { + Log.d(TAG,"START CONNECT REMOTE HOST: " + remoteAddress.toString()); + } + ctx.connect(remoteAddress, localAddress, promise); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + + ctx.disconnect(promise); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.close(promise); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.deregister(promise); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelReadComplete(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if(debug) { + Log.i(TAG,String.format("RECEIVED" + getSessionInfo(ctx.channel()) + "\n%s", msg.toString())); + } + ctx.fireChannelRead(msg); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if(debug) { + Log.i(TAG,String.format("SENT" + getSessionInfo(ctx.channel()) + "\n%s", msg.toString())); + } + ctx.write(msg, promise); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelWritabilityChanged(); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + private String getSessionInfo(Channel session) { + StringBuilder builder = new StringBuilder(); + if (session == null) { + return ""; + } + builder.append(" ["); + builder.append("id:").append(session.id().asShortText()); + + if (session.localAddress() != null) { + builder.append(" L:").append(session.localAddress().toString()); + } + + + if (session.remoteAddress() != null) { + builder.append(" R:").append(session.remoteAddress().toString()); + } + builder.append("]"); + return builder.toString(); + } + + + public void connectState(boolean isConnected) { + if(debug) { + Log.i(TAG,"CONNECTED:" + isConnected); + } + } + + public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) { + if(debug) { + Log.i(TAG,"CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed); + } + } +} diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java index c64ad96..4b16445 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageDecoder.java @@ -31,7 +31,6 @@ import com.farsunset.cim.sdk.android.model.proto.MessageProto; import com.farsunset.cim.sdk.android.model.proto.ReplyBodyProto; import com.google.protobuf.InvalidProtocolBufferException; -import android.util.Log; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; @@ -40,7 +39,6 @@ import io.netty.handler.codec.ByteToMessageDecoder; * 客户端消息解码 */ public class ClientMessageDecoder extends ByteToMessageDecoder { - final static String TAG = ClientMessageDecoder.class.getSimpleName(); @Override protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List queue) throws Exception { @@ -84,7 +82,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder { if (CIMConstant.ProtobufType.S_H_RQ == type) { HeartbeatRequest request = HeartbeatRequest.getInstance(); - Log.i(TAG, request.toString()); return request; } @@ -96,9 +93,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder { body.putAll(bodyProto.getDataMap()); body.setCode(bodyProto.getCode()); body.setMessage(bodyProto.getMessage()); - - Log.i(TAG, body.toString()); - return body; } @@ -114,8 +108,6 @@ public class ClientMessageDecoder extends ByteToMessageDecoder { message.setExtra(bodyProto.getExtra()); message.setTimestamp(bodyProto.getTimestamp()); message.setFormat(bodyProto.getFormat()); - - Log.i(TAG, message.toString()); return message; } diff --git a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java index 659e8e3..351fe57 100644 --- a/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java +++ b/cim_for_netty/cim-android-sdk/src/com/farsunset/cim/sdk/android/filter/ClientMessageEncoder.java @@ -28,28 +28,21 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import android.util.Log; - /** * 客户端消息发送前进行编码,可在此加密消息 * */ -public class ClientMessageEncoder extends MessageToByteEncoder { +public class ClientMessageEncoder extends MessageToByteEncoder { @Override - protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, Protobufable message, ByteBuf out) throws Exception { - if (message instanceof Protobufable) { + Protobufable data = (Protobufable) message; + byte[] byteArray = data.getByteArray(); - Protobufable data = (Protobufable) message; - byte[] byteArray = data.getByteArray(); + out.writeBytes(createHeader(data.getType(), byteArray.length)); + out.writeBytes(byteArray); - out.writeBytes(createHeader(data.getType(), byteArray.length)); - out.writeBytes(byteArray); - - } - - Log.i(ClientMessageEncoder.class.getSimpleName(), message.toString()); } /** diff --git a/cim_for_netty/cim-boot-server/libs/cim-server-sdk-3.5.jar b/cim_for_netty/cim-boot-server/libs/cim-server-sdk-3.5.jar index fb02403..22eca24 100644 Binary files a/cim_for_netty/cim-boot-server/libs/cim-server-sdk-3.5.jar and b/cim_for_netty/cim-boot-server/libs/cim-server-sdk-3.5.jar differ diff --git a/cim_for_netty/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java b/cim_for_netty/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java index 065db5c..6a06740 100644 --- a/cim_for_netty/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java +++ b/cim_for_netty/cim-boot-server/src/main/java/com/farsunset/cim/handler/BindHandler.java @@ -86,9 +86,17 @@ public class BindHandler implements CIMRequestHandler { sendForceOfflineMessage(oldSession, account, newSession.getDeviceModel()); } - // 第一次设置心跳时间为登录时间 - newSession.setBindTime(System.currentTimeMillis()); + /** + * 有可能是同一个设备重复连接,则关闭旧的链接,这种情况一般是客户端断网,联网又重新链接上来,之前的旧链接没有来得及通过心跳机制关闭,在这里手动关闭 + * 条件1,连接来自是同一个设备 + * 条件2.2个连接都是同一台服务器 + */ + + if (oldSession != null && !fromOtherDevice(newSession,oldSession) && Objects.equals(oldSession.getHost(),host)) { + oldSession.removeAttribute(CIMConstant.SESSION_KEY); + oldSession.closeNow(); + } // 第一次设置心跳时间为登录时间 newSession.setBindTime(System.currentTimeMillis()); diff --git a/cim_for_netty/cim-client-android/.idea/misc.xml b/cim_for_netty/cim-client-android/.idea/misc.xml index c0f68ed..99202cc 100644 --- a/cim_for_netty/cim-client-android/.idea/misc.xml +++ b/cim_for_netty/cim-client-android/.idea/misc.xml @@ -25,7 +25,7 @@ - + diff --git a/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.4.jar b/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.4.jar deleted file mode 100644 index 4b3497a..0000000 Binary files a/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.4.jar and /dev/null differ diff --git a/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.5.jar b/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.5.jar new file mode 100644 index 0000000..01bf8b6 Binary files /dev/null and b/cim_for_netty/cim-client-android/app/libs/cim-android-sdk-3.5.jar differ diff --git a/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java b/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java index ea1fc18..e260e32 100644 --- a/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java +++ b/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/app/Constant.java @@ -1,53 +1,53 @@ /** * Copyright 2013-2023 Xia Jun(3979434@qq.com). - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - *************************************************************************************** - * * - * Website : http://www.farsunset.com * - * * - *************************************************************************************** + *

+ * ************************************************************************************** + * * + * Website : http://www.farsunset.com * + * * + * ************************************************************************************** */ package com.farsunset.ichat.example.app; - -public interface Constant { - - //服务端IP地址 - public static final String CIM_SERVER_HOST = "192.168.1.103"; - - //注意,这里的端口不是tomcat的端口,CIM端口在服务端spring-cim.xml中配置的,没改动就使用默认的23456 - public static final int CIM_SERVER_PORT = 23456; +public interface Constant { + + //服务端IP地址 + public static final String CIM_SERVER_HOST = "192.168.1.106"; + + + //注意,这里的端口不是tomcat的端口,CIM端口在服务端spring-cim.xml中配置的,没改动就使用默认的23456 + public static final int CIM_SERVER_PORT = 23456; + + public static interface MessageType { + + + //用户之间的普通消息 + public static final String TYPE_0 = "0"; + + + //下线类型 + String TYPE_999 = "999"; + } + + + public static interface MessageStatus { + + //消息未读 + public static final String STATUS_0 = "0"; + //消息已经读取 + public static final String STATUS_1 = "1"; + } - public static interface MessageType{ - - - //用户之间的普通消息 - public static final String TYPE_0 = "0"; - - - //下线类型 - String TYPE_999 = "999"; - } - - - public static interface MessageStatus{ - - //消息未读 - public static final String STATUS_0 = "0"; - //消息已经读取 - public static final String STATUS_1 = "1"; - } - } diff --git a/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java b/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java index 847d0e7..5ba8bb5 100644 --- a/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java +++ b/cim_for_netty/cim-client-android/app/src/main/java/com/farsunset/ichat/example/ui/SplanshActivity.java @@ -29,6 +29,7 @@ import android.view.View; import android.view.animation.AlphaAnimation; import com.farsunset.cim.sdk.android.CIMPushManager; +import com.farsunset.ichat.example.BuildConfig; import com.farsunset.ichat.example.R; import com.farsunset.ichat.example.app.CIMMonitorActivity; import com.farsunset.ichat.example.app.Constant; @@ -38,10 +39,12 @@ public class SplanshActivity extends CIMMonitorActivity{ boolean initComplete = false; public void onCreate(Bundle savedInstanceState) { - - + + super.onCreate(savedInstanceState); - + + CIMPushManager.setLoggerEnable(this, BuildConfig.DEBUG); + //连接服务端 CIMPushManager.connect(SplanshActivity.this,Constant.CIM_SERVER_HOST, Constant.CIM_SERVER_PORT); diff --git a/cim_for_netty/cim-java-sdk/.classpath b/cim_for_netty/cim-java-sdk/.classpath index 8a0478f..231cd3e 100644 --- a/cim_for_netty/cim-java-sdk/.classpath +++ b/cim_for_netty/cim-java-sdk/.classpath @@ -2,7 +2,6 @@ - @@ -10,5 +9,6 @@ + diff --git a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java index 27b750b..bfc0234 100644 --- a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java +++ b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java @@ -21,11 +21,15 @@ */ package com.farsunset.cim.sdk.client; +import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.log4j.Logger; import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.exception.SessionDisconnectedException; import com.farsunset.cim.sdk.client.filter.ClientMessageDecoder; @@ -40,13 +44,13 @@ import com.farsunset.cim.sdk.client.model.SentBody; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -54,6 +58,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.GenericFutureListener; /** * 连接服务端管理,cim核心处理类,管理连接,以及消息处理 @@ -62,7 +67,7 @@ import io.netty.util.AttributeKey; */ @Sharable class CIMConnectorManager extends SimpleChannelInboundHandler { - protected final Logger logger = Logger.getLogger(CIMConnectorManager.class.getSimpleName()); + protected final Logger logger = LoggerFactory.getLogger(CIMConnectorManager.class.getSimpleName()); private final int CONNECT_TIMEOUT = 10 * 1000;// 秒 private final int WRITE_TIMEOUT = 10 * 1000;// 秒 @@ -74,6 +79,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { private EventLoopGroup loopGroup; private Channel channel;; private ExecutorService executor = Executors.newCachedThreadPool(); + private Semaphore semaphore = new Semaphore(1,true); private static CIMConnectorManager manager; private CIMConnectorManager() { @@ -106,47 +112,53 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { return manager; } - - private synchronized void syncConnection(String host, int port) { - - if (isConnected()) { - return; - } - - try { - - logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......"); - - ChannelFuture channelFuture = bootstrap.connect(host, port).syncUninterruptibly(); - channel = channelFuture.channel(); - } catch (Exception e) { - - long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); - intent.putExtra(Exception.class.getName(), e); - intent.putExtra("interval", interval); - sendBroadcast(intent); - - logger.error( - "****************CIM连接服务器失败 " + host + ":" + port + "......将在" + interval / 1000 + "秒后重新尝试连接"); - - } - - } - + public void connect(final String host, final int port) { + if (isConnected() || !semaphore.tryAcquire()) { + return; + } + executor.execute(new Runnable() { + @Override public void run() { - syncConnection(host, port); + + logger.info("****************CIM正在连接服务器 " + host + ":" + port + "......"); + final InetSocketAddress remoteAddress = new InetSocketAddress(host, port); + bootstrap.connect(remoteAddress).addListener(new GenericFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + future.removeListener(this); + semaphore.acquire(); + if(!future.isSuccess() && future.cause() != null) { + handleConnectFailure(future.cause(),remoteAddress); + } + + if(future.isSuccess()) { + channel = future.channel(); + } + } + }); } }); } + + private void handleConnectFailure(Throwable error,InetSocketAddress remoteAddress) { + long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); - public synchronized void send(SentBody body) { + Intent intent = new Intent(); + intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED); + intent.putExtra(Exception.class.getName(), error); + intent.putExtra("interval", interval); + sendBroadcast(intent); + + logger.warn("****************CIM连接服务器失败 " + remoteAddress.getHostName() + ":" + remoteAddress.getPort() + "......将在" + interval / 1000 + "秒后重新尝试连接"); + + } + + public void send(SentBody body) { boolean isSuccessed = false; @@ -184,10 +196,7 @@ class CIMConnectorManager extends SimpleChannelInboundHandler { } public boolean isConnected() { - if (channel == null) { - return false; - } - return channel.isActive(); + return channel != null && channel.isActive(); } public void closeSession() { diff --git a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMListenerManager.java b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMListenerManager.java index 1484188..13a4dc1 100644 --- a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMListenerManager.java +++ b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMListenerManager.java @@ -25,7 +25,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.farsunset.cim.sdk.client.model.Message; import com.farsunset.cim.sdk.client.model.ReplyBody; @@ -37,7 +38,7 @@ public class CIMListenerManager { private static ArrayList cimListeners = new ArrayList(); private static CIMMessageReceiveComparator comparator = new CIMMessageReceiveComparator(); - protected static final Logger logger = Logger.getLogger(CIMListenerManager.class); + protected static final Logger logger = LoggerFactory.getLogger(CIMListenerManager.class); public static void registerMessageListener(CIMEventListener listener) { diff --git a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMPushManager.java b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMPushManager.java index 5a83b9a..dd60217 100644 --- a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMPushManager.java +++ b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMPushManager.java @@ -25,7 +25,8 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Properties; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.model.Intent; @@ -35,7 +36,7 @@ import com.farsunset.cim.sdk.client.model.SentBody; * CIM 功能接口 */ public class CIMPushManager { - protected static final Logger logger = Logger.getLogger(CIMPushManager.class); + protected static final Logger logger = LoggerFactory.getLogger(CIMPushManager.class); static String ACTION_ACTIVATE_PUSH_SERVICE = "ACTION_ACTIVATE_PUSH_SERVICE"; static String ACTION_CREATE_CIM_CONNECTION = "ACTION_CREATE_CIM_CONNECTION"; diff --git a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageDecoder.java b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageDecoder.java index 896fc03..70942d2 100644 --- a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageDecoder.java +++ b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageDecoder.java @@ -23,7 +23,8 @@ package com.farsunset.cim.sdk.client.filter; import java.util.List; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.model.HeartbeatRequest; @@ -41,7 +42,7 @@ import io.netty.handler.codec.ByteToMessageDecoder; * 客户端消息解码 */ public class ClientMessageDecoder extends ByteToMessageDecoder { - protected final Logger logger = Logger.getLogger(ClientMessageDecoder.class.getSimpleName()); + protected final Logger logger = LoggerFactory.getLogger(ClientMessageDecoder.class.getSimpleName()); @Override protected void decode(ChannelHandlerContext arg0, ByteBuf buffer, List queue) throws Exception { diff --git a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageEncoder.java b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageEncoder.java index 62e0fbf..ec3dc59 100644 --- a/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageEncoder.java +++ b/cim_for_netty/cim-java-sdk/src/com/farsunset/cim/sdk/client/filter/ClientMessageEncoder.java @@ -21,7 +21,8 @@ */ package com.farsunset.cim.sdk.client.filter; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.model.Protobufable; @@ -35,7 +36,7 @@ import io.netty.handler.codec.MessageToByteEncoder; * */ public class ClientMessageEncoder extends MessageToByteEncoder { - protected final Logger logger = Logger.getLogger(ClientMessageEncoder.class.getSimpleName()); + protected final Logger logger = LoggerFactory.getLogger(ClientMessageEncoder.class.getSimpleName()); @Override protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception { diff --git a/cim_for_netty/cim-server-sdk/src/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.java b/cim_for_netty/cim-server-sdk/src/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.java index eb2fa9a..aab710b 100644 --- a/cim_for_netty/cim-server-sdk/src/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.java +++ b/cim_for_netty/cim-server-sdk/src/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.java @@ -23,6 +23,7 @@ package com.farsunset.cim.sdk.server.handler; import java.io.IOException; import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import com.farsunset.cim.sdk.server.constant.CIMConstant; import com.farsunset.cim.sdk.server.filter.ServerMessageDecoder; @@ -32,6 +33,7 @@ import com.farsunset.cim.sdk.server.model.SentBody; import com.farsunset.cim.sdk.server.session.CIMSession; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -61,7 +63,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler private HashMap innerHandlerMap = new HashMap(); private CIMRequestHandler outerRequestHandler; - + private ConcurrentHashMap channelGroup = new ConcurrentHashMap(); private int port; // 连接空闲时间 @@ -108,6 +110,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler this.outerRequestHandler = outerRequestHandler; } + @Override protected void channelRead0(ChannelHandlerContext ctx, SentBody body) throws Exception { CIMSession session = new CIMSession(ctx.channel()); @@ -127,10 +130,10 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler outerRequestHandler.process(session, body); } - /** - */ - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - + @Override + public void channelInactive(ChannelHandlerContext ctx) { + channelGroup.remove(ctx.channel().id().asShortText()); + CIMSession session = new CIMSession(ctx.channel()); SentBody body = new SentBody(); body.setKey(CIMSESSION_CLOSED_HANDLER_KEY); @@ -138,8 +141,12 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler } - /** - */ + @Override + public void channelActive(ChannelHandlerContext ctx){ + channelGroup.put(ctx.channel().id().asShortText(),ctx.channel()); + } + + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)) { ctx.channel().attr(AttributeKey.valueOf(CIMConstant.HEARTBEAT_KEY)).set(System.currentTimeMillis()); @@ -162,4 +169,11 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler this.port = port; } + + public Channel getManagedChannel(String id) { + if (id == null) { + return null; + } + return channelGroup.get(id); + } } diff --git a/doc/CIM客户端文档.doc b/doc/CIM客户端文档.doc index f371195..178918c 100644 Binary files a/doc/CIM客户端文档.doc and b/doc/CIM客户端文档.doc differ