Update CIMConnectorManager.java

This commit is contained in:
远方夕阳 2014-09-04 14:29:22 +08:00
parent b21a344064
commit 95b5f9d74c

View File

@ -1,309 +1,319 @@
package com.farsunset.cim.client.android; package com.farsunset.cim.client.android;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.WriteFuture; import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession; import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.apache.mina.transport.socket.nio.NioSocketConnector;
import android.content.Context; import android.content.Context;
import android.content.Intent; import android.content.Intent;
import android.net.ConnectivityManager; import android.net.ConnectivityManager;
import android.net.NetworkInfo; import android.net.NetworkInfo;
import com.farsunset.cim.nio.constant.CIMConstant; import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.filter.ClientMessageCodecFactory; import com.farsunset.cim.nio.filter.ClientMessageCodecFactory;
import com.farsunset.cim.nio.mutual.Message; import com.farsunset.cim.nio.mutual.Message;
import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody; import com.farsunset.cim.nio.mutual.SentBody;
/** /**
* 连接服务端管理cim核心处理类管理连接以及消息处理 * 连接服务端管理cim核心处理类管理连接以及消息处理
* *
* @author 3979434@qq.com * @author 3979434@qq.com
*/ */
class CIMConnectorManager { class CIMConnectorManager {
private NioSocketConnector connector; private NioSocketConnector connector;
private ConnectFuture connectFuture; private ConnectFuture connectFuture;
private IoSession session; private IoSession session;
Context context; Context context;
static CIMConnectorManager manager; static CIMConnectorManager manager;
// 消息广播action // 消息广播action
public static final String ACTION_MESSAGE_RECEIVED = "com.farsunset.cim.MESSAGE_RECEIVED"; public static final String ACTION_MESSAGE_RECEIVED = "com.farsunset.cim.MESSAGE_RECEIVED";
// 发送sendbody失败广播 // 发送sendbody失败广播
public static final String ACTION_SENT_FAILED = "com.farsunset.cim.SENT_FAILED"; public static final String ACTION_SENT_FAILED = "com.farsunset.cim.SENT_FAILED";
// 发送sendbody成功广播 // 发送sendbody成功广播
public static final String ACTION_SENT_SUCCESS = "com.farsunset.cim.SENT_SUCCESS"; public static final String ACTION_SENT_SUCCESS = "com.farsunset.cim.SENT_SUCCESS";
// 链接意外关闭广播 // 链接意外关闭广播
public static final String ACTION_CONNECTION_CLOSED = "com.farsunset.cim.CONNECTION_CLOSED"; public static final String ACTION_CONNECTION_CLOSED = "com.farsunset.cim.CONNECTION_CLOSED";
// 链接失败广播 // 链接失败广播
public static final String ACTION_CONNECTION_FAILED = "com.farsunset.cim.CONNECTION_FAILED"; public static final String ACTION_CONNECTION_FAILED = "com.farsunset.cim.CONNECTION_FAILED";
// 链接成功广播 // 链接成功广播
public static final String ACTION_CONNECTION_SUCCESS = "com.farsunset.cim.CONNECTION_SUCCESS"; public static final String ACTION_CONNECTION_SUCCESS = "com.farsunset.cim.CONNECTION_SUCCESS";
// 发送sendbody成功后获得replaybody回应广播 // 发送sendbody成功后获得replaybody回应广播
public static final String ACTION_REPLY_RECEIVED = "com.farsunset.cim.REPLY_RECEIVED"; public static final String ACTION_REPLY_RECEIVED = "com.farsunset.cim.REPLY_RECEIVED";
// 网络变化广播 // 网络变化广播
public static final String ACTION_NETWORK_CHANGED = "android.net.conn.CONNECTIVITY_CHANGE"; public static final String ACTION_NETWORK_CHANGED = "android.net.conn.CONNECTIVITY_CHANGE";
// 未知异常 // 未知异常
public static final String ACTION_UNCAUGHT_EXCEPTION = "com.farsunset.cim.UNCAUGHT_EXCEPTION"; public static final String ACTION_UNCAUGHT_EXCEPTION = "com.farsunset.cim.UNCAUGHT_EXCEPTION";
// CIM连接状态 // CIM连接状态
public static final String ACTION_CONNECTION_STATUS = "com.farsunset.cim.CONNECTION_STATUS"; public static final String ACTION_CONNECTION_STATUS = "com.farsunset.cim.CONNECTION_STATUS";
private ExecutorService executor; private ExecutorService executor;
private CIMConnectorManager(Context ctx) { private CIMConnectorManager(Context ctx) {
context = ctx; context = ctx;
executor = Executors.newFixedThreadPool(3); executor = Executors.newFixedThreadPool(3);
connector = new NioSocketConnector(); connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(10 * 1000); connector.setConnectTimeoutMillis(10 * 1000);
connector.getSessionConfig().setBothIdleTime(180); connector.getSessionConfig().setBothIdleTime(180);
connector.getSessionConfig().setKeepAlive(true); connector.getSessionConfig().setKeepAlive(true);
connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientMessageCodecFactory())); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ClientMessageCodecFactory()));
connector.setHandler(iohandler); connector.setHandler(iohandler);
} }
public synchronized static CIMConnectorManager getManager(Context context) { public synchronized static CIMConnectorManager getManager(Context context) {
if (manager == null) { if (manager == null) {
manager = new CIMConnectorManager(context); manager = new CIMConnectorManager(context);
} }
return manager; return manager;
} }
private synchronized void syncConnection(final String cimServerHost,final int cimServerPort) { private synchronized void syncConnection(final String cimServerHost,final int cimServerPort) {
try { try {
if(isConnected()){ if(isConnected()){
return ; return ;
} }
InetSocketAddress remoteSocketAddress = new InetSocketAddress(cimServerHost, cimServerPort); InetSocketAddress remoteSocketAddress = new InetSocketAddress(cimServerHost, cimServerPort);
connectFuture = connector.connect(remoteSocketAddress); connectFuture = connector.connect(remoteSocketAddress);
connectFuture.awaitUninterruptibly(); connectFuture.awaitUninterruptibly();
session = connectFuture.getSession(); session = connectFuture.getSession();
} catch (Exception e) { } catch (Exception e) {
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(ACTION_CONNECTION_FAILED); intent.setAction(ACTION_CONNECTION_FAILED);
intent.putExtra("exception", e); intent.putExtra("exception", e);
context.sendBroadcast(intent); context.sendBroadcast(intent);
System.out.println("******************CIM连接服务器失败 "+cimServerHost+":"+cimServerPort); System.out.println("******************CIM连接服务器失败 "+cimServerHost+":"+cimServerPort);
} }
} }
public void connect(final String cimServerHost, final int cimServerPort) { public void connect(final String cimServerHost, final int cimServerPort) {
if (!netWorkAvailable(context)) { if (!netWorkAvailable(context)) {
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(ACTION_CONNECTION_FAILED); intent.setAction(ACTION_CONNECTION_FAILED);
intent.putExtra("exception", new NetWorkDisableException()); intent.putExtra("exception", new NetWorkDisableException());
context.sendBroadcast(intent); context.sendBroadcast(intent);
return; return;
} }
executor.execute(new Runnable() { Future<?> future = executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
syncConnection(cimServerHost, cimServerPort); syncConnection(cimServerHost, cimServerPort);
} }
}); });
} try {
if(future.get()!=null)
public void send(final SentBody body) { {
connect(cimServerHost,cimServerPort);
}
executor.execute(new Runnable() { } catch (Exception e) {
@Override
public void run() { connect(cimServerHost,cimServerPort);
e.printStackTrace();
android.os.Message msg = new android.os.Message(); }
msg.getData().putSerializable("body", body); }
if(session!=null && session.isConnected()) public void send(final SentBody body) {
{
WriteFuture wf = session.write(body);
// 消息发送超时 10秒 executor.execute(new Runnable() {
wf.awaitUninterruptibly(5, TimeUnit.SECONDS); @Override
public void run() {
if (!wf.isWritten()) {
android.os.Message msg = new android.os.Message();
msg.getData().putSerializable("body", body);
Intent intent = new Intent();
intent.setAction(ACTION_SENT_FAILED); if(session!=null && session.isConnected())
intent.putExtra("exception", new WriteToClosedSessionException()); {
intent.putExtra("sentBody", body); WriteFuture wf = session.write(body);
context.sendBroadcast(intent); // 消息发送超时 10秒
} wf.awaitUninterruptibly(5, TimeUnit.SECONDS);
}else
{ if (!wf.isWritten()) {
Intent intent = new Intent();
intent.setAction(ACTION_SENT_FAILED); Intent intent = new Intent();
intent.putExtra("exception", new CIMSessionDisableException()); intent.setAction(ACTION_SENT_FAILED);
intent.putExtra("sentBody", body); intent.putExtra("exception", new WriteToClosedSessionException());
context.sendBroadcast(intent); intent.putExtra("sentBody", body);
} context.sendBroadcast(intent);
} }
}); }else
} {
public void destroy() { Intent intent = new Intent();
if (manager.session != null) { intent.setAction(ACTION_SENT_FAILED);
manager.session.close(false); intent.putExtra("exception", new CIMSessionDisableException());
manager.session.removeAttribute("account"); intent.putExtra("sentBody", body);
} context.sendBroadcast(intent);
}
if (manager.connector != null && !manager.connector.isDisposed()) { }
manager.connector.dispose(); });
} }
manager = null;
} public void destroy() {
if (manager.session != null) {
public boolean isConnected() { manager.session.close(false);
if (session == null || connector == null) { manager.session.removeAttribute("account");
return false; }
}
return session.isConnected() ; if (manager.connector != null && !manager.connector.isDisposed()) {
} manager.connector.dispose();
}
public void deliverIsConnected() { manager = null;
Intent intent = new Intent(); }
intent.setAction(ACTION_CONNECTION_FAILED);
intent.putExtra(CIMPushManager.KEY_CIM_CONNECTION_STATUS, isConnected()); public boolean isConnected() {
context.sendBroadcast(intent); if (session == null || connector == null) {
} return false;
}
return session.isConnected() ;
}
public void closeSession()
{ public void deliverIsConnected() {
if(session!=null) Intent intent = new Intent();
{ intent.setAction(ACTION_CONNECTION_FAILED);
session.close(false); intent.putExtra(CIMPushManager.KEY_CIM_CONNECTION_STATUS, isConnected());
} context.sendBroadcast(intent);
} }
IoHandlerAdapter iohandler = new IoHandlerAdapter() {
public void closeSession()
@Override {
public void sessionCreated(IoSession session) throws Exception { if(session!=null)
{
System.out.println("******************CIM连接服务器成功:"+session.getLocalAddress()); session.close(false);
}
Intent intent = new Intent(); }
intent.setAction(ACTION_CONNECTION_SUCCESS);
context.sendBroadcast(intent);
IoHandlerAdapter iohandler = new IoHandlerAdapter() {
}
@Override
@Override public void sessionCreated(IoSession session) throws Exception {
public void sessionOpened(IoSession session) throws Exception {
session.getConfig().setBothIdleTime(180); System.out.println("******************CIM连接服务器成功:"+session.getLocalAddress());
}
Intent intent = new Intent();
@Override intent.setAction(ACTION_CONNECTION_SUCCESS);
public void sessionClosed(IoSession session) throws Exception { context.sendBroadcast(intent);
System.out.println("******************CIM与服务器断开连接:"+session.getLocalAddress()); }
if(CIMConnectorManager.this.session.getId()==session.getId())
{ @Override
public void sessionOpened(IoSession session) throws Exception {
Intent intent = new Intent(); session.getConfig().setBothIdleTime(180);
intent.setAction(ACTION_CONNECTION_CLOSED); }
context.sendBroadcast(intent);
@Override
} public void sessionClosed(IoSession session) throws Exception {
}
System.out.println("******************CIM与服务器断开连接:"+session.getLocalAddress());
@Override if(CIMConnectorManager.this.session.getId()==session.getId())
public void sessionIdle(IoSession session, IdleStatus status) {
throws Exception {
Intent intent = new Intent();
System.out.println("******************CIM与服务器连接空闲:"+session.getLocalAddress()); intent.setAction(ACTION_CONNECTION_CLOSED);
SentBody sent = new SentBody(); context.sendBroadcast(intent);
sent.setKey(CIMConstant.RequestKey.CLIENT_HEARTBEAT);
send(sent); }
} }
@Override @Override
public void exceptionCaught(IoSession session, Throwable cause) public void sessionIdle(IoSession session, IdleStatus status)
throws Exception { throws Exception {
Intent intent = new Intent(); System.out.println("******************CIM与服务器连接空闲:"+session.getLocalAddress());
intent.setAction(ACTION_UNCAUGHT_EXCEPTION); SentBody sent = new SentBody();
intent.putExtra("exception", cause); sent.setKey(CIMConstant.RequestKey.CLIENT_HEARTBEAT);
context.sendBroadcast(intent); send(sent);
} }
@Override @Override
public void messageReceived(IoSession session, Object obj) public void exceptionCaught(IoSession session, Throwable cause)
throws Exception { throws Exception {
if (obj instanceof Message) { Intent intent = new Intent();
intent.setAction(ACTION_UNCAUGHT_EXCEPTION);
Intent intent = new Intent(); intent.putExtra("exception", cause);
intent.setAction(ACTION_MESSAGE_RECEIVED); context.sendBroadcast(intent);
intent.putExtra("message", (Message) obj); }
context.sendBroadcast(intent);
@Override
} public void messageReceived(IoSession session, Object obj)
if (obj instanceof ReplyBody) { throws Exception {
if (obj instanceof Message) {
Intent intent = new Intent();
intent.setAction(ACTION_REPLY_RECEIVED); Intent intent = new Intent();
intent.putExtra("replyBody", (ReplyBody) obj); intent.setAction(ACTION_MESSAGE_RECEIVED);
context.sendBroadcast(intent); intent.putExtra("message", (Message) obj);
} context.sendBroadcast(intent);
}
}
@Override if (obj instanceof ReplyBody) {
public void messageSent(IoSession session, Object message)
throws Exception {
Intent intent = new Intent();
Intent intent = new Intent(); intent.setAction(ACTION_REPLY_RECEIVED);
intent.setAction(ACTION_SENT_SUCCESS); intent.putExtra("replyBody", (ReplyBody) obj);
intent.putExtra("sentBody", (SentBody) message); context.sendBroadcast(intent);
context.sendBroadcast(intent); }
}
} @Override
}; public void messageSent(IoSession session, Object message)
throws Exception {
public static boolean netWorkAvailable(Context context) {
try { Intent intent = new Intent();
ConnectivityManager nw = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); intent.setAction(ACTION_SENT_SUCCESS);
NetworkInfo networkInfo = nw.getActiveNetworkInfo(); intent.putExtra("sentBody", (SentBody) message);
return networkInfo != null; context.sendBroadcast(intent);
} catch (Exception e) {}
return false; }
} };
public static boolean netWorkAvailable(Context context) {
try {
ConnectivityManager nw = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo networkInfo = nw.getActiveNetworkInfo();
return networkInfo != null;
} catch (Exception e) {}
return false;
}
} }