优化android和java sdk代码

This commit is contained in:
远方夕阳 2019-06-11 11:37:20 +08:00
parent 42ade1ada8
commit 9e9cc962d5
9 changed files with 243 additions and 326 deletions

View File

@ -24,17 +24,15 @@ package com.farsunset.cim.sdk.android;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import com.farsunset.cim.sdk.android.coder.CIMLogger; import com.farsunset.cim.sdk.android.coder.CIMLogger;
import com.farsunset.cim.sdk.android.coder.ClientMessageDecoder; import com.farsunset.cim.sdk.android.coder.ClientMessageDecoder;
@ -68,29 +66,38 @@ class CIMConnectorManager{
private final int READ_IDLE_TIME = 120 * 1000; private final int READ_IDLE_TIME = 120 * 1000;
private final int CONNECT_TIME_OUT = 10 * 1000;
private final int CONNECT_ALIVE_TIME_OUT = 150 * 1000; private final int CONNECT_ALIVE_TIME_OUT = 150 * 1000;
private final AtomicLong LAST_READ_TIME = new AtomicLong(0); private final AtomicLong LAST_READ_TIME = new AtomicLong(0);
private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private final CIMLogger LOGGER = CIMLogger.getLogger(); private final CIMLogger LOGGER = CIMLogger.getLogger();
private static final HandlerThread IDLE_HANDLER_THREAD = new HandlerThread("READ-IDLE", Process.THREAD_PRIORITY_BACKGROUND); private static final HandlerThread IDLE_HANDLER_THREAD = new HandlerThread("READ-IDLE", Process.THREAD_PRIORITY_BACKGROUND);
private LinkedBlockingQueue<Protobufable> sendFailBodyQueue = new LinkedBlockingQueue<Protobufable>(); private Semaphore semaphore = new Semaphore(1, true);
private final ReentrantLock IOLOCK = new ReentrantLock();
private Selector selector;
private SocketChannel socketChannel ; private SocketChannel socketChannel ;
private Context context; private Context context;
private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); private ExecutorService workerExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() {
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); @Override
public Thread newThread(Runnable r) {
return new Thread(r,"worker-");
}
});
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"boss-");
}
});
private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder();
private ClientMessageDecoder messageDecoder = new ClientMessageDecoder(); private ClientMessageDecoder messageDecoder = new ClientMessageDecoder();
@ -107,19 +114,11 @@ class CIMConnectorManager{
try { try {
if(socketChannel == null || !socketChannel.isOpen()) { if(socketChannel == null || !socketChannel.isOpen()) {
socketChannel = SocketChannel.open(); socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().setTcpNoDelay(true); socketChannel.socket().setTcpNoDelay(true);
socketChannel.socket().setKeepAlive(true); socketChannel.socket().setKeepAlive(true);
socketChannel.socket().setReceiveBufferSize(READ_BUFFER_SIZE); socketChannel.socket().setReceiveBufferSize(READ_BUFFER_SIZE);
socketChannel.socket().setSendBufferSize(WRITE_BUFFER_SIZE); socketChannel.socket().setSendBufferSize(WRITE_BUFFER_SIZE);
} }
if(selector == null || !selector.isOpen()) {
selector = Selector.open();
}
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}catch(Exception ignore) {} }catch(Exception ignore) {}
@ -148,22 +147,22 @@ class CIMConnectorManager{
return; return;
} }
if (CONNECTING_FLAG.get() || isConnected()) { if (isConnected()) {
return; return;
} }
CONNECTING_FLAG.set(true); if(!socketChannel.isOpen()) {
if(!socketChannel.isOpen() ||!selector.isOpen()) {
makeNioConnector(); makeNioConnector();
} }
workerExecutor.execute(new Runnable() { bossExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
IOLOCK.lock(); if (isConnected()) {
return;
}
LOGGER.startConnect(host, port); LOGGER.startConnect(host, port);
@ -171,38 +170,40 @@ class CIMConnectorManager{
try { try {
socketChannel.connect(new InetSocketAddress(host, port)); semaphore.acquire();
while (socketChannel.isOpen()) { socketChannel.socket().connect(new InetSocketAddress(host, port),CONNECT_TIME_OUT);
selector.select();
if(!selector.isOpen()) {
break;
}
for(SelectionKey key : selector.selectedKeys()){
if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) {
IOLOCK.unlock();
handelConnectedEvent();
continue;
}
if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
handelSocketReadEvent();
}
}
}
semaphore.release();
handelConnectedEvent();
int result = -1;
while((result = socketChannel.read(readBuffer)) > 0) {
if(readBuffer.position() == readBuffer.capacity()) {
extendByteBuffer();
}
handelSocketReadEvent(result);
}
handelSocketReadEvent(result);
}catch(ConnectException ignore){ }catch(ConnectException ignore){
IOLOCK.unlock(); semaphore.release();
handleConnectAbortedEvent(); handleConnectAbortedEvent();
}catch(IllegalArgumentException ignore){ }catch(SocketTimeoutException ignore){
IOLOCK.unlock(); semaphore.release();
handleConnectAbortedEvent(); handleConnectAbortedEvent();
}catch(IOException ignore) { }catch(IOException ignore) {
semaphore.release();
handelDisconnectedEvent(); handelDisconnectedEvent();
}catch(InterruptedException ignore) {
semaphore.release();
} }
} }
}); });
@ -212,12 +213,16 @@ class CIMConnectorManager{
public void destroy() { public void destroy() {
closeSession(); closeSession();
closeSelector();
} }
public void closeSession() { public void closeSession() {
if(!isConnected()) {
return;
}
try { try {
socketChannel.close(); socketChannel.close();
} catch (IOException ignore) { } catch (IOException ignore) {
@ -233,28 +238,31 @@ class CIMConnectorManager{
public void send(final Protobufable body) { public void send(final Protobufable body) {
bossExecutor.execute(new Runnable() { if(!isConnected()) {
return;
}
workerExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
int result = 0; int result = 0;
try { try {
IOLOCK.lock(); semaphore.acquire();
ByteBuffer buffer = messageEncoder.encode(body); ByteBuffer buffer = messageEncoder.encode(body);
while(buffer.hasRemaining()){ while(buffer.hasRemaining()){
result += socketChannel.write(buffer); result += socketChannel.write(buffer);
} }
} catch (IOException e) { } catch (Exception e) {
result = -1; result = -1;
}finally { }finally {
IOLOCK.unlock(); semaphore.release();
if(result <= 0) { if(result <= 0) {
sendFailBodyQueue.offer(body);
closeSession(); closeSession();
}else { }else {
messageSent(body); messageSent(body);
@ -264,18 +272,8 @@ class CIMConnectorManager{
}); });
} }
private void sendFaildQueueBody() {
Protobufable body = sendFailBodyQueue.poll();
if(body == null) {
return;
}
send(body);
sendFaildQueueBody();
}
private void sessionCreated() { private void sessionCreated() {
LOGGER.sessionCreated(socketChannel); LOGGER.sessionCreated(socketChannel);
@ -286,8 +284,6 @@ class CIMConnectorManager{
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED);
context.sendBroadcast(intent); context.sendBroadcast(intent);
sendFaildQueueBody();
} }
private void sessionClosed() { private void sessionClosed() {
@ -304,8 +300,6 @@ class CIMConnectorManager{
readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
} }
closeSelector();
Intent intent = new Intent(); Intent intent = new Intent();
intent.setPackage(context.getPackageName()); intent.setPackage(context.getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED);
@ -369,13 +363,10 @@ class CIMConnectorManager{
}; };
private void handelDisconnectedEvent() { private void handelDisconnectedEvent() {
CONNECTING_FLAG.set(false);
closeSession(); closeSession();
} }
private void handleConnectAbortedEvent() { private void handleConnectAbortedEvent() {
CONNECTING_FLAG.set(false);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
@ -389,33 +380,22 @@ class CIMConnectorManager{
} }
private void handelConnectedEvent() throws IOException { private void handelConnectedEvent() {
CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ);
sessionCreated(); sessionCreated();
idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME); idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME);
} }
private void handelSocketReadEvent() throws IOException { private void handelSocketReadEvent(int result) throws IOException {
int result = 0; if(result == -1) {
while((result = socketChannel.read(readBuffer)) > 0) {
if(readBuffer.position() == readBuffer.capacity()) {
extendByteBuffer();
}
}
if(result == -1 && !readBuffer.hasRemaining()) {
closeSession(); closeSession();
return; return;
} }
markLastReadTime(); markLastReadTime();
readBuffer.position(0); readBuffer.position(0);
Object message = messageDecoder.doDecode(readBuffer); Object message = messageDecoder.doDecode(readBuffer);
@ -424,8 +404,6 @@ class CIMConnectorManager{
return; return;
} }
LOGGER.messageReceived(socketChannel,message); LOGGER.messageReceived(socketChannel,message);
if(isHeartbeatRequest(message)) { if(isHeartbeatRequest(message)) {
@ -436,11 +414,7 @@ class CIMConnectorManager{
} }
this.messageReceived(message); this.messageReceived(message);
if(result == -1) {
closeSession();
return;
}
} }
@ -473,13 +447,6 @@ class CIMConnectorManager{
private boolean isHeartbeatRequest(Object data) { private boolean isHeartbeatRequest(Object data) {
return data instanceof HeartbeatRequest; return data instanceof HeartbeatRequest;
} }
private void closeSelector() {
try {
selector.close();
} catch (IOException ignore) {}
}
} }

View File

@ -49,8 +49,6 @@ public class CIMPushManager {
static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE"; static String ACTION_SET_LOGGER_EANABLE = "ACTION_SET_LOGGER_EANABLE";
static String ACTION_DESTORY = "ACTION_DESTORY";
static String KEY_SEND_BODY = "KEY_SEND_BODY"; static String KEY_SEND_BODY = "KEY_SEND_BODY";
static String KEY_CIM_CONNECTION_STATUS = "KEY_CIM_CONNECTION_STATUS"; static String KEY_CIM_CONNECTION_STATUS = "KEY_CIM_CONNECTION_STATUS";
@ -221,9 +219,7 @@ public class CIMPushManager {
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_DESTROYED, true); CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_DESTROYED, true);
CIMCacheManager.putString(context, CIMCacheManager.KEY_ACCOUNT, null); CIMCacheManager.putString(context, CIMCacheManager.KEY_ACCOUNT, null);
Intent serviceIntent = new Intent(context, CIMPushService.class); context.stopService(new Intent(context, CIMPushService.class));
serviceIntent.setAction(ACTION_DESTORY);
startServiceCompat(context,serviceIntent);
} }

View File

@ -36,8 +36,6 @@ import android.os.Handler;
import android.os.IBinder; import android.os.IBinder;
import android.os.Message; import android.os.Message;
import java.util.concurrent.Semaphore;
import com.farsunset.cim.sdk.android.coder.CIMLogger; import com.farsunset.cim.sdk.android.coder.CIMLogger;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.model.SentBody; import com.farsunset.cim.sdk.android.model.SentBody;
@ -48,14 +46,13 @@ import com.farsunset.cim.sdk.android.model.SentBody;
* @author 3979434 * @author 3979434
* *
*/ */
public class CIMPushService extends Service { public class CIMPushService extends Service {
public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME"; public final static String KEY_DELAYED_TIME = "KEY_DELAYED_TIME";
public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE"; public final static String KEY_LOGGER_ENABLE = "KEY_LOGGER_ENABLE";
private CIMConnectorManager manager; private CIMConnectorManager manager;
private KeepAliveBroadcastReceiver keepAliveReceiver; private KeepAliveBroadcastReceiver keepAliveReceiver;
private Semaphore semaphore = new Semaphore(1, true); private ConnectivityManager connectivityManager;
@Override @Override
public void onCreate() { public void onCreate() {
manager = CIMConnectorManager.getManager(this.getApplicationContext()); manager = CIMConnectorManager.getManager(this.getApplicationContext());
@ -68,35 +65,37 @@ public class CIMPushService extends Service {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
ConnectivityManager connectivityManager = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE); connectivityManager = (ConnectivityManager) getSystemService(Context.CONNECTIVITY_SERVICE);
connectivityManager.registerDefaultNetworkCallback(new ConnectivityManager.NetworkCallback() {
@Override connectivityManager.registerDefaultNetworkCallback(networkCallback);
public void onAvailable(Network network) {
Intent intent = new Intent();
intent.setPackage(getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED);
sendBroadcast(intent);
}
@Override
public void onUnavailable() {
Intent intent = new Intent();
intent.setPackage(getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED);
sendBroadcast(intent);
}
});
} }
} }
ConnectivityManager.NetworkCallback networkCallback = new ConnectivityManager.NetworkCallback() {
@Override
public void onAvailable(Network network) {
Intent intent = new Intent();
intent.setPackage(getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED);
sendBroadcast(intent);
}
@Override
public void onUnavailable() {
Intent intent = new Intent();
intent.setPackage(getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_NETWORK_CHANGED);
sendBroadcast(intent);
}
};
Handler connectionHandler = new Handler() { Handler connectionHandler = new Handler() {
@Override @Override
public void handleMessage(android.os.Message message) { public void handleMessage(android.os.Message message) {
String host = message.getData().getString(CIMCacheManager.KEY_CIM_SERVIER_HOST); String host = message.getData().getString(CIMCacheManager.KEY_CIM_SERVIER_HOST);
int port = message.getData().getInt(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0); int port = message.getData().getInt(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0);
manager.connect(host, port); handleConnection(host, port);
semaphore.release();
} }
}; };
@ -118,7 +117,7 @@ public class CIMPushService extends Service {
String action = intent.getAction(); String action = intent.getAction();
if (CIMPushManager.ACTION_CREATE_CIM_CONNECTION.equals(action)) { if (CIMPushManager.ACTION_CREATE_CIM_CONNECTION.equals(action)) {
handleConnection(intent); handleDelayConnection(intent);
} }
if (CIMPushManager.ACTION_SEND_REQUEST_BODY.equals(action)) { if (CIMPushManager.ACTION_SEND_REQUEST_BODY.equals(action)) {
@ -129,11 +128,6 @@ public class CIMPushService extends Service {
manager.closeSession(); manager.closeSession();
} }
if (CIMPushManager.ACTION_DESTORY.equals(action)) {
manager.destroy();
this.stopSelf();
}
if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) { if (CIMPushManager.ACTION_ACTIVATE_PUSH_SERVICE.equals(action)) {
handleKeepAlive(); handleKeepAlive();
} }
@ -150,41 +144,47 @@ public class CIMPushService extends Service {
return super.onStartCommand(intent, flags, startId); return super.onStartCommand(intent, flags, startId);
} }
private void handleConnection(Intent intent) { private void handleDelayConnection(Intent intent) {
long delayMillis = intent.getLongExtra(KEY_DELAYED_TIME, 0); long delayMillis = intent.getLongExtra(KEY_DELAYED_TIME, 0);
if (delayMillis <= 0) { if (delayMillis <= 0) {
String host = intent.getStringExtra(CIMCacheManager.KEY_CIM_SERVIER_HOST); String host = intent.getStringExtra(CIMCacheManager.KEY_CIM_SERVIER_HOST);
int port = intent.getIntExtra(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0); int port = intent.getIntExtra(CIMCacheManager.KEY_CIM_SERVIER_PORT, 0);
manager.connect(host, port); handleConnection(host, port);
return;
}
if (!semaphore.tryAcquire()) {
return; return;
} }
Message msg = connectionHandler.obtainMessage(); Message msg = connectionHandler.obtainMessage();
msg.what = 0; msg.what = 0;
msg.setData(intent.getExtras()); msg.setData(intent.getExtras());
connectionHandler.removeMessages(0);
connectionHandler.sendMessageDelayed(msg, delayMillis); connectionHandler.sendMessageDelayed(msg, delayMillis);
} }
private void handleConnection(String host,int port) {
boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_MANUAL_STOP);
boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_CIM_DESTROYED);
if(isManualStop || isDestroyed) {
return;
}
manager.connect(host, port);
}
private void handleKeepAlive() { private void handleKeepAlive() {
if (manager.isConnected()) { if (manager.isConnected()) {
CIMLogger.getLogger().connectState(true); CIMLogger.getLogger().connectState(true);
return; return;
} }
boolean isManualStop = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_MANUAL_STOP); String host = CIMCacheManager.getString(this, CIMCacheManager.KEY_CIM_SERVIER_HOST);
boolean isDestroyed = CIMCacheManager.getBoolean(getApplicationContext(), CIMCacheManager.KEY_CIM_DESTROYED); int port = CIMCacheManager.getInt(this, CIMCacheManager.KEY_CIM_SERVIER_PORT);
CIMLogger.getLogger().connectState(false, isManualStop, isDestroyed); handleConnection(host,port);
CIMPushManager.connect(this, 0);
} }
@Override @Override
@ -195,9 +195,15 @@ public class CIMPushService extends Service {
@Override @Override
public void onDestroy() { public void onDestroy() {
super.onDestroy(); super.onDestroy();
manager.destroy();
connectionHandler.removeMessages(0);
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
unregisterReceiver(keepAliveReceiver); unregisterReceiver(keepAliveReceiver);
} }
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
connectivityManager.unregisterNetworkCallback(networkCallback);
}
} }
public class KeepAliveBroadcastReceiver extends BroadcastReceiver { public class KeepAliveBroadcastReceiver extends BroadcastReceiver {

View File

@ -24,26 +24,24 @@ package com.farsunset.cim.sdk.client;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.StandardSocketOptions; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import com.farsunset.cim.sdk.client.coder.CIMLogger; import com.farsunset.cim.sdk.client.coder.CIMLogger;
import com.farsunset.cim.sdk.client.coder.ClientMessageDecoder; import com.farsunset.cim.sdk.client.coder.ClientMessageDecoder;
import com.farsunset.cim.sdk.client.coder.ClientMessageEncoder; import com.farsunset.cim.sdk.client.coder.ClientMessageEncoder;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.exception.SessionClosedException;
import com.farsunset.cim.sdk.client.model.HeartbeatRequest; import com.farsunset.cim.sdk.client.model.HeartbeatRequest;
import com.farsunset.cim.sdk.client.model.HeartbeatResponse; import com.farsunset.cim.sdk.client.model.HeartbeatResponse;
import com.farsunset.cim.sdk.client.model.Intent; import com.farsunset.cim.sdk.client.model.Intent;
import com.farsunset.cim.sdk.client.model.Message; import com.farsunset.cim.sdk.client.model.Message;
import com.farsunset.cim.sdk.client.model.Protobufable;
import com.farsunset.cim.sdk.client.model.ReplyBody; import com.farsunset.cim.sdk.client.model.ReplyBody;
import com.farsunset.cim.sdk.client.model.SentBody; import com.farsunset.cim.sdk.client.model.SentBody;
@ -59,20 +57,34 @@ class CIMConnectorManager {
private final int READ_BUFFER_SIZE = 2048; private final int READ_BUFFER_SIZE = 2048;
private final int WRITE_BUFFER_SIZE = 1024; private final int WRITE_BUFFER_SIZE = 1024;
private final int CONNECT_TIME_OUT = 10 * 1000;
private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private final CIMLogger LOGGER = CIMLogger.getLogger(); private final CIMLogger LOGGER = CIMLogger.getLogger();
private Selector selector;
private SocketChannel socketChannel ; private SocketChannel socketChannel ;
private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); private ExecutorService workerExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() {
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); @Override
private ExecutorService eventExecutor = Executors.newFixedThreadPool(1); public Thread newThread(Runnable r) {
return new Thread(r,"worker-");
}
});
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"boss-");
}
});
private ExecutorService eventExecutor = Executors.newFixedThreadPool(1,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"event-");
}
});
private Semaphore semaphore = new Semaphore(1, true);
private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder();
@ -83,21 +95,14 @@ class CIMConnectorManager {
} }
private void makeNioConnector() { private void makeNioConnector() {
try { try {
if(socketChannel == null || !socketChannel.isOpen()) { if(socketChannel == null || !socketChannel.isOpen()) {
socketChannel = SocketChannel.open(); socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true);
socketChannel.setOption(StandardSocketOptions.SO_RCVBUF,READ_BUFFER_SIZE); socketChannel.socket().setKeepAlive(true);
socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, WRITE_BUFFER_SIZE); socketChannel.socket().setReceiveBufferSize(READ_BUFFER_SIZE);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); socketChannel.socket().setSendBufferSize(WRITE_BUFFER_SIZE);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY,true);
} }
if(selector == null || !selector.isOpen()) {
selector = Selector.open();
}
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}catch(Exception ignore) {} }catch(Exception ignore) {}
@ -116,20 +121,21 @@ class CIMConnectorManager {
public void connect(final String host, final int port) { public void connect(final String host, final int port) {
boolean isConnected = isConnected(); if (isConnected()) {
if (CONNECTING_FLAG.get() || isConnected) {
return; return;
} }
CONNECTING_FLAG.set(true); if(!socketChannel.isOpen()) {
if(!socketChannel.isOpen() ||!selector.isOpen()) {
makeNioConnector(); makeNioConnector();
} }
workerExecutor.execute(new Runnable() { bossExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
if (isConnected()) {
return;
}
LOGGER.startConnect(host, port); LOGGER.startConnect(host, port);
@ -137,36 +143,41 @@ class CIMConnectorManager {
try { try {
socketChannel.connect(new InetSocketAddress(host, port)); semaphore.acquire();
socketChannel.socket().connect(new InetSocketAddress(host, port),CONNECT_TIME_OUT);
while (socketChannel.isOpen()) { semaphore.release();
selector.select();
if(!selector.isOpen()) {
break;
}
for(SelectionKey key : selector.selectedKeys()){
if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) {
handelConnectedEvent();
continue;
}
if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
handelSocketReadEvent();
}
}
}
handelConnectedEvent();
int result = -1;
while((result = socketChannel.read(readBuffer)) > 0) {
if(readBuffer.position() == readBuffer.capacity()) {
extendByteBuffer();
}
handelSocketReadEvent(result);
}
handelSocketReadEvent(result);
}catch(ConnectException ignore){ }catch(ConnectException ignore){
semaphore.release();
handleConnectAbortedEvent(); handleConnectAbortedEvent();
}catch(IllegalArgumentException ignore){ }catch(SocketTimeoutException ignore){
semaphore.release();
handleConnectAbortedEvent(); handleConnectAbortedEvent();
}catch(IOException ignore) { }catch(IOException ignore) {
semaphore.release();
handelDisconnectedEvent(); handelDisconnectedEvent();
}catch(ClosedSelectorException ignore) {} }catch (InterruptedException ignore) {
semaphore.release();
}
} }
}); });
} }
@ -174,13 +185,10 @@ class CIMConnectorManager {
private void handelDisconnectedEvent() { private void handelDisconnectedEvent() {
CONNECTING_FLAG.set(false);
closeSession(); closeSession();
} }
private void handleConnectAbortedEvent() { private void handleConnectAbortedEvent() {
CONNECTING_FLAG.set(false);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000)); long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
@ -195,26 +203,16 @@ class CIMConnectorManager {
private void handelConnectedEvent() throws IOException { private void handelConnectedEvent() throws IOException {
CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ);
sessionCreated(); sessionCreated();
} }
private void handelSocketReadEvent() throws IOException { private void handelSocketReadEvent(int result) throws IOException {
int result = 0; if(result == -1) {
closeSession();
while((result = socketChannel.read(readBuffer)) > 0) { return;
if(readBuffer.position() == readBuffer.capacity()) {
extendByteBuffer();
}
} }
if(result == -1) {
closeSession();
return;
}
readBuffer.position(0); readBuffer.position(0);
@ -246,63 +244,44 @@ class CIMConnectorManager {
readBuffer.clear(); readBuffer.clear();
readBuffer = newBuffer; readBuffer = newBuffer;
} }
public void send(final SentBody body) {
public void send(final Protobufable body) {
bossExecutor.execute(new Runnable() { if(!isConnected()) {
return;
}
workerExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
boolean isSuccessed = false; int result = 0;
String exceptionName = SessionClosedException.class.getSimpleName(); try {
if (isConnected()) {
try { semaphore.acquire();
ByteBuffer buffer = messageEncoder.encode(body);
int result = 0; ByteBuffer buffer = messageEncoder.encode(body);
while(buffer.hasRemaining()){ while(buffer.hasRemaining()){
result += socketChannel.write(buffer); result += socketChannel.write(buffer);
}
isSuccessed = result > 0;
} catch (IOException e) {
exceptionName = e.getClass().getSimpleName();
closeSession();
} }
} } catch (Exception e) {
result = -1;
if (!isSuccessed) { }finally {
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_SENT_FAILED); semaphore.release();
intent.putExtra(Exception.class.getName(), exceptionName);
intent.putExtra(SentBody.class.getName(), body); if(result <= 0) {
sendBroadcast(intent); closeSession();
}else { }else {
messageSent(body); messageSent(body);
} }
}
} }
}); });
} }
public void send(final HeartbeatResponse body) {
bossExecutor.execute(new Runnable() {
@Override
public void run() {
try {
socketChannel.write(messageEncoder.encode(body));
messageSent(body);
} catch (IOException ignore) {
closeSession();
}
}
});
}
public void sessionCreated() { public void sessionCreated() {
@ -311,7 +290,7 @@ class CIMConnectorManager {
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED);
sendBroadcast(intent); sendBroadcast(intent);
} }
public void sessionClosed() { public void sessionClosed() {
@ -324,8 +303,6 @@ class CIMConnectorManager {
readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
} }
closeSelector();
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_CLOSED);
sendBroadcast(intent); sendBroadcast(intent);
@ -373,10 +350,7 @@ class CIMConnectorManager {
} }
public void destroy() { public void destroy() {
closeSession(); closeSession();
closeSelector();
} }
public boolean isConnected() { public boolean isConnected() {
@ -384,6 +358,11 @@ class CIMConnectorManager {
} }
public void closeSession() { public void closeSession() {
if(!isConnected()) {
return;
}
try { try {
socketChannel.close(); socketChannel.close();
} catch (IOException ignore) { } catch (IOException ignore) {
@ -391,13 +370,7 @@ class CIMConnectorManager {
this.sessionClosed(); this.sessionClosed();
} }
} }
public void closeSelector() {
try {
selector.close();
} catch (IOException ignore) {}
}
private void sendBroadcast(final Intent intent) { private void sendBroadcast(final Intent intent) {
eventExecutor.execute(new Runnable() { eventExecutor.execute(new Runnable() {

View File

@ -26,7 +26,6 @@ import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import com.farsunset.cim.sdk.client.constant.CIMConstant; import com.farsunset.cim.sdk.client.constant.CIMConstant;
import com.farsunset.cim.sdk.client.exception.SessionClosedException;
import com.farsunset.cim.sdk.client.model.Intent; import com.farsunset.cim.sdk.client.model.Intent;
import com.farsunset.cim.sdk.client.model.Message; import com.farsunset.cim.sdk.client.model.Message;
import com.farsunset.cim.sdk.client.model.ReplyBody; import com.farsunset.cim.sdk.client.model.ReplyBody;
@ -90,14 +89,6 @@ public class CIMEventBroadcastReceiver {
listener.onReplyReceived((ReplyBody) intent.getExtra(ReplyBody.class.getName())); listener.onReplyReceived((ReplyBody) intent.getExtra(ReplyBody.class.getName()));
} }
/*
* 获取sendbody发送失败事件
*/
if (intent.getAction().equals(CIMConstant.IntentAction.ACTION_SENT_FAILED)) {
onSentFailed((Exception) intent.getExtra(Exception.class.getName()),
(SentBody) intent.getExtra(SentBody.class.getName()));
}
/* /*
* 获取sendbody发送成功事件 * 获取sendbody发送成功事件
*/ */
@ -158,19 +149,7 @@ public class CIMEventBroadcastReceiver {
private boolean isForceOfflineMessage(String action) { private boolean isForceOfflineMessage(String action) {
return CIMConstant.MessageAction.ACTION_999.equals(action); return CIMConstant.MessageAction.ACTION_999.equals(action);
} }
private void onSentFailed(Exception e, SentBody body) {
e.printStackTrace();
// 与服务端端开链接重新连接
if (e instanceof SessionClosedException) {
CIMPushManager.connect();
} else {
// 发送失败 重新发送
CIMPushManager.sendRequest(body);
}
}
private void onSentSucceed(SentBody body) { private void onSentSucceed(SentBody body) {
} }

View File

@ -21,7 +21,6 @@
*/ */
package com.farsunset.cim.sdk.client.coder; package com.farsunset.cim.sdk.client.coder;
import java.io.IOException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -83,6 +82,7 @@ public class CIMLogger {
public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) { public void connectState(boolean isConnected,boolean isManualStop,boolean isDestroyed) {
LOGGER.debug("CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed); LOGGER.debug("CONNECTED:" + isConnected + " STOPED:"+isManualStop+ " DESTROYED:"+isDestroyed);
} }
private String getSessionInfo(SocketChannel session) { private String getSessionInfo(SocketChannel session) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
if (session == null) { if (session == null) {
@ -92,24 +92,23 @@ public class CIMLogger {
builder.append("id:").append(session.hashCode()); builder.append("id:").append(session.hashCode());
try { try {
if (session.getLocalAddress() != null) { if (session.socket().getLocalAddress() != null) {
builder.append(" L:").append(session.getLocalAddress().toString()); builder.append(" L:").append(session.socket().getLocalAddress()+":"+session.socket().getLocalPort());
} }
} catch (IOException e) { } catch (Exception ignore) {
e.printStackTrace();
} }
try { try {
if (session.getRemoteAddress() != null) { if (session.socket().getRemoteSocketAddress() != null) {
builder.append(" R:").append(session.getRemoteAddress().toString()); builder.append(" R:").append(session.socket().getRemoteSocketAddress().toString());
} }
} catch (IOException e) { } catch (Exception ignore) {
e.printStackTrace();
} }
builder.append("]"); builder.append("]");
return builder.toString(); return builder.toString();
} }
} }

View File

@ -87,9 +87,6 @@ public interface CIMConstant {
// 消息广播action // 消息广播action
String ACTION_MESSAGE_RECEIVED = "com.farsunset.cim.MESSAGE_RECEIVED"; String ACTION_MESSAGE_RECEIVED = "com.farsunset.cim.MESSAGE_RECEIVED";
// 发送sendbody失败广播
String ACTION_SENT_FAILED = "com.farsunset.cim.SENT_FAILED";
// 发送sendbody成功广播 // 发送sendbody成功广播
String ACTION_SENT_SUCCESSED = "com.farsunset.cim.SENT_SUCCESSED"; String ACTION_SENT_SUCCESSED = "com.farsunset.cim.SENT_SUCCESSED";