diff --git a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java index d2711bf..8301349 100644 --- a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java +++ b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMConnectorManager.java @@ -24,17 +24,15 @@ package com.farsunset.cim.sdk.android; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import com.farsunset.cim.sdk.android.coder.CIMLogger; import com.farsunset.cim.sdk.android.coder.ClientMessageDecoder; @@ -68,29 +66,38 @@ class CIMConnectorManager{ private final int READ_IDLE_TIME = 120 * 1000; + private final int CONNECT_TIME_OUT = 10 * 1000; + + private final int CONNECT_ALIVE_TIME_OUT = 150 * 1000; private final AtomicLong LAST_READ_TIME = new AtomicLong(0); - - private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ; private final CIMLogger LOGGER = CIMLogger.getLogger(); private static final HandlerThread IDLE_HANDLER_THREAD = new HandlerThread("READ-IDLE", Process.THREAD_PRIORITY_BACKGROUND); - private LinkedBlockingQueue sendFailBodyQueue = new LinkedBlockingQueue(); - private final ReentrantLock IOLOCK = new ReentrantLock(); - - private Selector selector; + private Semaphore semaphore = new Semaphore(1, true); + private SocketChannel socketChannel ; private Context context; private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); - private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); - private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); - + private ExecutorService workerExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r,"worker-"); + } + }); + + private ExecutorService bossExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r,"boss-"); + } + }); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); private ClientMessageDecoder messageDecoder = new ClientMessageDecoder(); @@ -107,19 +114,11 @@ class CIMConnectorManager{ try { if(socketChannel == null || !socketChannel.isOpen()) { socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.socket().setKeepAlive(true); socketChannel.socket().setReceiveBufferSize(READ_BUFFER_SIZE); socketChannel.socket().setSendBufferSize(WRITE_BUFFER_SIZE); } - - if(selector == null || !selector.isOpen()) { - selector = Selector.open(); - } - - selector.wakeup(); - socketChannel.register(selector, SelectionKey.OP_CONNECT); }catch(Exception ignore) {} @@ -148,22 +147,22 @@ class CIMConnectorManager{ return; } - - if (CONNECTING_FLAG.get() || isConnected()) { + + if (isConnected()) { return; } - CONNECTING_FLAG.set(true); - - if(!socketChannel.isOpen() ||!selector.isOpen()) { + if(!socketChannel.isOpen()) { makeNioConnector(); } - workerExecutor.execute(new Runnable() { + bossExecutor.execute(new Runnable() { @Override public void run() { - IOLOCK.lock(); + if (isConnected()) { + return; + } LOGGER.startConnect(host, port); @@ -171,38 +170,40 @@ class CIMConnectorManager{ try { - socketChannel.connect(new InetSocketAddress(host, port)); + semaphore.acquire(); - while (socketChannel.isOpen()) { - - selector.select(); - - if(!selector.isOpen()) { - break; - } - - for(SelectionKey key : selector.selectedKeys()){ - - if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) { - IOLOCK.unlock(); - handelConnectedEvent(); - continue; - } - - if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { - handelSocketReadEvent(); - } - } - } + socketChannel.socket().connect(new InetSocketAddress(host, port),CONNECT_TIME_OUT); + semaphore.release(); + + handelConnectedEvent(); + + + int result = -1; + + while((result = socketChannel.read(readBuffer)) > 0) { + + if(readBuffer.position() == readBuffer.capacity()) { + extendByteBuffer(); + } + + handelSocketReadEvent(result); + + } + + handelSocketReadEvent(result); + }catch(ConnectException ignore){ - IOLOCK.unlock(); + semaphore.release(); handleConnectAbortedEvent(); - }catch(IllegalArgumentException ignore){ - IOLOCK.unlock(); + }catch(SocketTimeoutException ignore){ + semaphore.release(); handleConnectAbortedEvent(); }catch(IOException ignore) { + semaphore.release(); handelDisconnectedEvent(); + }catch(InterruptedException ignore) { + semaphore.release(); } } }); @@ -212,12 +213,16 @@ class CIMConnectorManager{ public void destroy() { closeSession(); - closeSelector(); } public void closeSession() { + + if(!isConnected()) { + return; + } + try { socketChannel.close(); } catch (IOException ignore) { @@ -233,28 +238,31 @@ class CIMConnectorManager{ public void send(final Protobufable body) { - bossExecutor.execute(new Runnable() { + if(!isConnected()) { + return; + } + + workerExecutor.execute(new Runnable() { @Override public void run() { int result = 0; try { - IOLOCK.lock(); + semaphore.acquire(); ByteBuffer buffer = messageEncoder.encode(body); while(buffer.hasRemaining()){ result += socketChannel.write(buffer); } - } catch (IOException e) { + } catch (Exception e) { result = -1; }finally { - IOLOCK.unlock(); + semaphore.release(); if(result <= 0) { - sendFailBodyQueue.offer(body); closeSession(); }else { messageSent(body); @@ -264,18 +272,8 @@ class CIMConnectorManager{ }); } - - private void sendFaildQueueBody() { - Protobufable body = sendFailBodyQueue.poll(); - if(body == null) { - return; - } - - send(body); - - sendFaildQueueBody(); - - } + + private void sessionCreated() { LOGGER.sessionCreated(socketChannel); @@ -286,8 +284,6 @@ class CIMConnectorManager{ intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED); context.sendBroadcast(intent); - sendFaildQueueBody(); - } private void sessionClosed() { @@ -304,8 +300,6 @@ class CIMConnectorManager{ readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); } - closeSelector(); - Intent intent = new Intent(); intent.setPackage(context.getPackageName()); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); @@ -369,13 +363,10 @@ class CIMConnectorManager{ }; private void handelDisconnectedEvent() { - CONNECTING_FLAG.set(false); closeSession(); } private void handleConnectAbortedEvent() { - - CONNECTING_FLAG.set(false); long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); @@ -389,33 +380,22 @@ class CIMConnectorManager{ } - private void handelConnectedEvent() throws IOException { + private void handelConnectedEvent() { - CONNECTING_FLAG.set(false); - socketChannel.register(selector, SelectionKey.OP_READ); sessionCreated(); idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME); } - private void handelSocketReadEvent() throws IOException { - - int result = 0; - - while((result = socketChannel.read(readBuffer)) > 0) { - if(readBuffer.position() == readBuffer.capacity()) { - extendByteBuffer(); - } - } - - if(result == -1 && !readBuffer.hasRemaining()) { + private void handelSocketReadEvent(int result) throws IOException { + + if(result == -1) { closeSession(); return; } markLastReadTime(); - readBuffer.position(0); Object message = messageDecoder.doDecode(readBuffer); @@ -424,8 +404,6 @@ class CIMConnectorManager{ return; } - - LOGGER.messageReceived(socketChannel,message); if(isHeartbeatRequest(message)) { @@ -436,11 +414,7 @@ class CIMConnectorManager{ } this.messageReceived(message); - - if(result == -1) { - closeSession(); - return; - } + } @@ -473,13 +447,6 @@ class CIMConnectorManager{ private boolean isHeartbeatRequest(Object data) { return data instanceof HeartbeatRequest; } - - - - private void closeSelector() { - try { - selector.close(); - } catch (IOException ignore) {} - } + } diff --git a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java index a618b18..65ff4e2 100644 --- a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java +++ b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushManager.java @@ -49,8 +49,6 @@ public class CIMPushManager { static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE"; - static String ACTION_DESTORY = "ACTION_DESTORY"; - static String KEY_SEND_BODY = "KEY_SEND_BODY"; static String KEY_CIM_CONNECTION_STATUS = "KEY_CIM_CONNECTION_STATUS"; @@ -221,9 +219,7 @@ public class CIMPushManager { CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_DESTROYED, true); CIMCacheManager.putString(context, CIMCacheManager.KEY_ACCOUNT, null); - Intent serviceIntent = new Intent(context, CIMPushService.class); - serviceIntent.setAction(ACTION_DESTORY); - startServiceCompat(context,serviceIntent); + context.stopService(new Intent(context, CIMPushService.class)); } diff --git a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java index 21295b3..6267279 100644 --- a/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java +++ b/cim-client-sdk/cim-android-sdk/src/com/farsunset/cim/sdk/android/CIMPushService.java @@ -36,8 +36,6 @@ import android.os.Handler; import android.os.IBinder; import android.os.Message; -import java.util.concurrent.Semaphore; - import com.farsunset.cim.sdk.android.coder.CIMLogger; import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.model.SentBody; @@ -48,14 +46,13 @@ import com.farsunset.cim.sdk.android.model.SentBody; * @author 3979434 * */ -public class CIMPushService extends Service { +public class CIMPushService extends Service { public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE"; private CIMConnectorManager manager; private KeepAliveBroadcastReceiver keepAliveReceiver; - private Semaphore semaphore = new Semaphore(1, true); - + private ConnectivityManager connectivityManager; @Override public void onCreate() { manager = CIMConnectorManager.getManager(this.getApplicationContext()); @@ -68,35 +65,37 @@ public class CIMPushService extends Service { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { - ConnectivityManager connectivityManager = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); - connectivityManager.registerDefaultNetworkCallback(new ConnectivityManager.NetworkCallback() { - @Override - public void onAvailable(Network network) { - Intent intent = new Intent(); - intent.setPackage(getPackageName()); - intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED); - sendBroadcast(intent); - } - @Override - public void onUnavailable() { - Intent intent = new Intent(); - intent.setPackage(getPackageName()); - intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED); - sendBroadcast(intent); - } - - }); + connectivityManager = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); + + connectivityManager.registerDefaultNetworkCallback(networkCallback); } } + + ConnectivityManager.NetworkCallback networkCallback = new ConnectivityManager.NetworkCallback() { + @Override + public void onAvailable(Network network) { + Intent intent = new Intent(); + intent.setPackage(getPackageName()); + intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED); + sendBroadcast(intent); + } + @Override + public void onUnavailable() { + Intent intent = new Intent(); + intent.setPackage(getPackageName()); + intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED); + sendBroadcast(intent); + } + + }; Handler connectionHandler = new Handler() { @Override public void handleMessage(android.os.Message message) { String host = message.getData().getString(CIMCacheManager.KEY_CIM_SERVIER_HOST); int port = message.getData().getInt(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0); - manager.connect(host, port); - semaphore.release(); + handleConnection(host, port); } }; @@ -118,7 +117,7 @@ public class CIMPushService extends Service { String action = intent.getAction(); if (CIMPushManager.ACTION_CREATE_CIM_CONNECTION.equals(action)) { - handleConnection(intent); + handleDelayConnection(intent); } if (CIMPushManager.ACTION_SEND_REQUEST_BODY.equals(action)) { @@ -129,11 +128,6 @@ public class CIMPushService extends Service { manager.closeSession(); } - if (CIMPushManager.ACTION_DESTORY.equals(action)) { - manager.destroy(); - this.stopSelf(); - } - if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { handleKeepAlive(); } @@ -150,41 +144,47 @@ public class CIMPushService extends Service { return super.onStartCommand(intent, flags, startId); } - private void handleConnection(Intent intent) { + private void handleDelayConnection(Intent intent) { long delayMillis = intent.getLongExtra(KEY_DELAYED_TIME, 0); if (delayMillis <= 0) { String host = intent.getStringExtra(CIMCacheManager.KEY_CIM_SERVIER_HOST); int port = intent.getIntExtra(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0); - manager.connect(host, port); - return; - } - - if (!semaphore.tryAcquire()) { + handleConnection(host, port); return; } + Message msg = connectionHandler.obtainMessage(); msg.what = 0; msg.setData(intent.getExtras()); + connectionHandler.removeMessages(0); connectionHandler.sendMessageDelayed(msg, delayMillis); } + private void handleConnection(String host,int port) { + + boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_MANUAL_STOP); + boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_CIM_DESTROYED); + if(isManualStop || isDestroyed) { + return; + } + manager.connect(host, port); + } + private void handleKeepAlive() { + if (manager.isConnected()) { CIMLogger.getLogger().connectState(true); return; } - - boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_MANUAL_STOP); - boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_CIM_DESTROYED); - - CIMLogger.getLogger().connectState(false, isManualStop, isDestroyed); - - CIMPushManager.connect(this, 0); - + + String host = CIMCacheManager.getString(this, CIMCacheManager.KEY_CIM_SERVIER_HOST); + int port = CIMCacheManager.getInt(this, CIMCacheManager.KEY_CIM_SERVIER_PORT); + + handleConnection(host,port); } @Override @@ -195,9 +195,15 @@ public class CIMPushService extends Service { @Override public void onDestroy() { super.onDestroy(); + manager.destroy(); + connectionHandler.removeMessages(0); if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { unregisterReceiver(keepAliveReceiver); } + + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { + connectivityManager.unregisterNetworkCallback(networkCallback); + } } public class KeepAliveBroadcastReceiver extends BroadcastReceiver { diff --git a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java index 1d141ed..0fb30ce 100644 --- a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java +++ b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMConnectorManager.java @@ -24,26 +24,24 @@ package com.farsunset.cim.sdk.client; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import com.farsunset.cim.sdk.client.coder.CIMLogger; import com.farsunset.cim.sdk.client.coder.ClientMessageDecoder; import com.farsunset.cim.sdk.client.coder.ClientMessageEncoder; import com.farsunset.cim.sdk.client.constant.CIMConstant; -import com.farsunset.cim.sdk.client.exception.SessionClosedException; import com.farsunset.cim.sdk.client.model.HeartbeatRequest; import com.farsunset.cim.sdk.client.model.HeartbeatResponse; import com.farsunset.cim.sdk.client.model.Intent; import com.farsunset.cim.sdk.client.model.Message; +import com.farsunset.cim.sdk.client.model.Protobufable; import com.farsunset.cim.sdk.client.model.ReplyBody; import com.farsunset.cim.sdk.client.model.SentBody; @@ -59,20 +57,34 @@ class CIMConnectorManager { private final int READ_BUFFER_SIZE = 2048; private final int WRITE_BUFFER_SIZE = 1024; + private final int CONNECT_TIME_OUT = 10 * 1000; - private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ; - private final CIMLogger LOGGER = CIMLogger.getLogger(); - - private Selector selector; + private SocketChannel socketChannel ; - private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); - - private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); - private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); - private ExecutorService eventExecutor = Executors.newFixedThreadPool(1); + + private ExecutorService workerExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r,"worker-"); + } + }); + private ExecutorService bossExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r,"boss-"); + } + }); + private ExecutorService eventExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r,"event-"); + } + }); + + private Semaphore semaphore = new Semaphore(1, true); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); @@ -83,21 +95,14 @@ class CIMConnectorManager { } private void makeNioConnector() { try { + if(socketChannel == null || !socketChannel.isOpen()) { socketChannel = SocketChannel.open(); - socketChannel.configureBlocking(false); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF,READ_BUFFER_SIZE); - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, WRITE_BUFFER_SIZE); - socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,true); + socketChannel.socket().setTcpNoDelay(true); + socketChannel.socket().setKeepAlive(true); + socketChannel.socket().setReceiveBufferSize(READ_BUFFER_SIZE); + socketChannel.socket().setSendBufferSize(WRITE_BUFFER_SIZE); } - - if(selector == null || !selector.isOpen()) { - selector = Selector.open(); - } - - selector.wakeup(); - socketChannel.register(selector, SelectionKey.OP_CONNECT); }catch(Exception ignore) {} @@ -116,20 +121,21 @@ class CIMConnectorManager { public void connect(final String host, final int port) { - boolean isConnected = isConnected(); - if (CONNECTING_FLAG.get() || isConnected) { + if (isConnected()) { return; } - CONNECTING_FLAG.set(true); - - if(!socketChannel.isOpen() ||!selector.isOpen()) { + if(!socketChannel.isOpen()) { makeNioConnector(); } - workerExecutor.execute(new Runnable() { + bossExecutor.execute(new Runnable() { @Override public void run() { + + if (isConnected()) { + return; + } LOGGER.startConnect(host, port); @@ -137,36 +143,41 @@ class CIMConnectorManager { try { - socketChannel.connect(new InetSocketAddress(host, port)); + semaphore.acquire(); + + socketChannel.socket().connect(new InetSocketAddress(host, port),CONNECT_TIME_OUT); - while (socketChannel.isOpen()) { - - selector.select(); - - if(!selector.isOpen()) { - break; - } - - for(SelectionKey key : selector.selectedKeys()){ - - if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) { - handelConnectedEvent(); - continue; - } - - if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { - handelSocketReadEvent(); - } - } - } + semaphore.release(); + handelConnectedEvent(); + + + int result = -1; + + while((result = socketChannel.read(readBuffer)) > 0) { + + if(readBuffer.position() == readBuffer.capacity()) { + extendByteBuffer(); + } + + handelSocketReadEvent(result); + + } + + handelSocketReadEvent(result); + }catch(ConnectException ignore){ + semaphore.release(); handleConnectAbortedEvent(); - }catch(IllegalArgumentException ignore){ + }catch(SocketTimeoutException ignore){ + semaphore.release(); handleConnectAbortedEvent(); }catch(IOException ignore) { + semaphore.release(); handelDisconnectedEvent(); - }catch(ClosedSelectorException ignore) {} + }catch (InterruptedException ignore) { + semaphore.release(); + } } }); } @@ -174,13 +185,10 @@ class CIMConnectorManager { private void handelDisconnectedEvent() { - CONNECTING_FLAG.set(false); closeSession(); } private void handleConnectAbortedEvent() { - - CONNECTING_FLAG.set(false); long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); @@ -195,26 +203,16 @@ class CIMConnectorManager { private void handelConnectedEvent() throws IOException { - CONNECTING_FLAG.set(false); - socketChannel.register(selector, SelectionKey.OP_READ); sessionCreated(); - } - private void handelSocketReadEvent() throws IOException { + private void handelSocketReadEvent(int result) throws IOException { - int result = 0; - - while((result = socketChannel.read(readBuffer)) > 0) { - if(readBuffer.position() == readBuffer.capacity()) { - extendByteBuffer(); - } + if(result == -1) { + closeSession(); + return; } - - if(result == -1) { - closeSession(); - return; - } + readBuffer.position(0); @@ -246,63 +244,44 @@ class CIMConnectorManager { readBuffer.clear(); readBuffer = newBuffer; } - - public void send(final SentBody body) { + + public void send(final Protobufable body) { - bossExecutor.execute(new Runnable() { + if(!isConnected()) { + return; + } + + workerExecutor.execute(new Runnable() { @Override public void run() { - boolean isSuccessed = false; - String exceptionName = SessionClosedException.class.getSimpleName(); - if (isConnected()) { + int result = 0; + try { - try { - ByteBuffer buffer = messageEncoder.encode(body); - int result = 0; - while(buffer.hasRemaining()){ - result += socketChannel.write(buffer); - } - isSuccessed = result > 0; - } catch (IOException e) { - exceptionName = e.getClass().getSimpleName(); - closeSession(); + semaphore.acquire(); + + ByteBuffer buffer = messageEncoder.encode(body); + while(buffer.hasRemaining()){ + result += socketChannel.write(buffer); } - } - - if (!isSuccessed) { - Intent intent = new Intent(); - intent.setAction(CIMConstant.IntentAction.ACTION_SENT_FAILED); - intent.putExtra(Exception.class.getName(), exceptionName); - intent.putExtra(SentBody.class.getName(), body); - sendBroadcast(intent); - }else { - messageSent(body); - } + } catch (Exception e) { + result = -1; + }finally { + + semaphore.release(); + + if(result <= 0) { + closeSession(); + }else { + messageSent(body); + } + } } }); - } - - - public void send(final HeartbeatResponse body) { - - bossExecutor.execute(new Runnable() { - @Override - public void run() { - try { - socketChannel.write(messageEncoder.encode(body)); - messageSent(body); - } catch (IOException ignore) { - closeSession(); - } - } - }); - - } - + public void sessionCreated() { @@ -311,7 +290,7 @@ class CIMConnectorManager { Intent intent = new Intent(); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED); sendBroadcast(intent); - + } public void sessionClosed() { @@ -324,8 +303,6 @@ class CIMConnectorManager { readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); } - closeSelector(); - Intent intent = new Intent(); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); sendBroadcast(intent); @@ -373,10 +350,7 @@ class CIMConnectorManager { } public void destroy() { - closeSession(); - closeSelector(); - } public boolean isConnected() { @@ -384,6 +358,11 @@ class CIMConnectorManager { } public void closeSession() { + + if(!isConnected()) { + return; + } + try { socketChannel.close(); } catch (IOException ignore) { @@ -391,13 +370,7 @@ class CIMConnectorManager { this.sessionClosed(); } } - - public void closeSelector() { - try { - selector.close(); - } catch (IOException ignore) {} - } - + private void sendBroadcast(final Intent intent) { eventExecutor.execute(new Runnable() { diff --git a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMEventBroadcastReceiver.java b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMEventBroadcastReceiver.java index 9975310..ee0665b 100644 --- a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMEventBroadcastReceiver.java +++ b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/CIMEventBroadcastReceiver.java @@ -26,7 +26,6 @@ import java.util.Timer; import java.util.TimerTask; import com.farsunset.cim.sdk.client.constant.CIMConstant; -import com.farsunset.cim.sdk.client.exception.SessionClosedException; import com.farsunset.cim.sdk.client.model.Intent; import com.farsunset.cim.sdk.client.model.Message; import com.farsunset.cim.sdk.client.model.ReplyBody; @@ -90,14 +89,6 @@ public class CIMEventBroadcastReceiver { listener.onReplyReceived((ReplyBody) intent.getExtra(ReplyBody.class.getName())); } - /* - * 获取sendbody发送失败事件 - */ - if (intent.getAction().equals(CIMConstant.IntentAction.ACTION_SENT_FAILED)) { - onSentFailed((Exception) intent.getExtra(Exception.class.getName()), - (SentBody) intent.getExtra(SentBody.class.getName())); - } - /* * 获取sendbody发送成功事件 */ @@ -158,19 +149,7 @@ public class CIMEventBroadcastReceiver { private boolean isForceOfflineMessage(String action) { return CIMConstant.MessageAction.ACTION_999.equals(action); } - - private void onSentFailed(Exception e, SentBody body) { - - e.printStackTrace(); - // 与服务端端开链接,重新连接 - if (e instanceof SessionClosedException) { - CIMPushManager.connect(); - } else { - // 发送失败 重新发送 - CIMPushManager.sendRequest(body); - } - - } + private void onSentSucceed(SentBody body) { } diff --git a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/coder/CIMLogger.java b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/coder/CIMLogger.java index 609c732..66baa18 100644 --- a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/coder/CIMLogger.java +++ b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/coder/CIMLogger.java @@ -21,7 +21,6 @@ */ package com.farsunset.cim.sdk.client.coder; -import java.io.IOException; import java.nio.channels.SocketChannel; import org.slf4j.Logger; @@ -83,6 +82,7 @@ public class CIMLogger { public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) { LOGGER.debug("CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed); } + private String getSessionInfo(SocketChannel session) { StringBuilder builder = new StringBuilder(); if (session == null) { @@ -92,24 +92,23 @@ public class CIMLogger { builder.append("id:").append(session.hashCode()); try { - if (session.getLocalAddress() != null) { - builder.append(" L:").append(session.getLocalAddress().toString()); + if (session.socket().getLocalAddress() != null) { + builder.append(" L:").append(session.socket().getLocalAddress()+":"+session.socket().getLocalPort()); } - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception ignore) { } try { - if (session.getRemoteAddress() != null) { - builder.append(" R:").append(session.getRemoteAddress().toString()); + if (session.socket().getRemoteSocketAddress() != null) { + builder.append(" R:").append(session.socket().getRemoteSocketAddress().toString()); } - } catch (IOException e) { - e.printStackTrace(); + } catch (Exception ignore) { } builder.append("]"); return builder.toString(); } + } diff --git a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/constant/CIMConstant.java b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/constant/CIMConstant.java index 657ef97..a25b2df 100644 --- a/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/constant/CIMConstant.java +++ b/cim-client-sdk/cim-java-sdk/src/com/farsunset/cim/sdk/client/constant/CIMConstant.java @@ -87,9 +87,6 @@ public interface CIMConstant { // 消息广播action String ACTION_MESSAGE_RECEIVED = "com.farsunset.cim.MESSAGE_RECEIVED"; - // 发送sendbody失败广播 - String ACTION_SENT_FAILED = "com.farsunset.cim.SENT_FAILED"; - // 发送sendbody成功广播 String ACTION_SENT_SUCCESSED = "com.farsunset.cim.SENT_SUCCESSED"; diff --git a/cim-use-examples/cim-client-android/app/libs/cim-android-sdk-3.7.0.jar b/cim-use-examples/cim-client-android/app/libs/cim-android-sdk-3.7.0.jar index 6020175..d3328ad 100644 Binary files a/cim-use-examples/cim-client-android/app/libs/cim-android-sdk-3.7.0.jar and b/cim-use-examples/cim-client-android/app/libs/cim-android-sdk-3.7.0.jar differ diff --git a/cim-use-examples/cim-client-java/lib/cim-java-sdk-3.7.0.jar b/cim-use-examples/cim-client-java/lib/cim-java-sdk-3.7.0.jar index 2be9b1d..e9d88de 100644 Binary files a/cim-use-examples/cim-client-java/lib/cim-java-sdk-3.7.0.jar and b/cim-use-examples/cim-client-java/lib/cim-java-sdk-3.7.0.jar differ