1.重构一些代码

2.修改服务端demo启动时有可能会bean循环依赖的问题
This commit is contained in:
远方夕阳 2019-05-30 17:57:33 +08:00
parent e1a74096ce
commit 27a7c5d2c0
31 changed files with 475 additions and 440 deletions

View File

@ -66,7 +66,7 @@
<orderEntry type="module-library"> <orderEntry type="module-library">
<library name="Maven: com.farsunset:cim-server-sdk:3.7"> <library name="Maven: com.farsunset:cim-server-sdk:3.7">
<CLASSES> <CLASSES>
<root url="jar://$MODULE_DIR$/lib/cim-server-sdk-mina-3.7.0.jar!/" /> <root url="jar://$MODULE_DIR$/lib/cim-server-sdk-netty-3.7.0.jar!/" />
</CLASSES> </CLASSES>
<JAVADOC /> <JAVADOC />
<SOURCES /> <SOURCES />

View File

@ -15,20 +15,29 @@
</component> </component>
<component name="FileEditorManager"> <component name="FileEditorManager">
<leaf SIDE_TABS_SIZE_LIMIT_KEY="300"> <leaf SIDE_TABS_SIZE_LIMIT_KEY="300">
<file pinned="false" current-in-tab="true"> <file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/pom.xml"> <entry file="file://$PROJECT_DIR$/pom.xml">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="129"> <state relative-caret-position="836">
<caret line="44" column="89" lean-forward="true" selection-start-line="44" selection-start-column="89" selection-end-line="44" selection-end-column="89" /> <caret line="44" column="67" selection-start-line="44" selection-start-column="67" selection-end-line="44" selection-end-column="67" />
</state> </state>
</provider> </provider>
</entry> </entry>
</file> </file>
<file pinned="false" current-in-tab="false"> <file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/service/CIMSessionService.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="399">
<caret line="43" selection-start-line="43" selection-end-line="43" />
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java"> <entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-302"> <state relative-caret-position="171">
<caret line="28" column="50" selection-start-line="28" selection-start-column="50" selection-end-line="28" selection-end-column="50" /> <caret line="57" column="78" selection-start-line="57" selection-start-column="78" selection-end-line="57" selection-end-column="78" />
<folding> <folding>
<element signature="imports" expanded="true" /> <element signature="imports" expanded="true" />
</folding> </folding>
@ -37,10 +46,10 @@
</entry> </entry>
</file> </file>
<file pinned="false" current-in-tab="false"> <file pinned="false" current-in-tab="false">
<entry file="jar://$PROJECT_DIR$/lib/cim-server-sdk-netty-3.7.0.jar!/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.class"> <entry file="file://$PROJECT_DIR$/src/main/resources/application.properties">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-195"> <state relative-caret-position="114">
<caret line="48" column="43" selection-start-line="48" selection-start-column="43" selection-end-line="48" selection-end-column="43" /> <caret line="6" column="31" selection-start-line="6" selection-start-column="31" selection-end-line="6" selection-end-column="31" />
</state> </state>
</provider> </provider>
</entry> </entry>
@ -51,6 +60,8 @@
<findStrings> <findStrings>
<find>CIMWebBridge</find> <find>CIMWebBridge</find>
<find>memorySessionService</find> <find>memorySessionService</find>
<find>cim-server-sdk-mina-3.7.0.jar</find>
<find>cim.server.port</find>
</findStrings> </findStrings>
<replaceStrings> <replaceStrings>
<replace>CIMPushManager</replace> <replace>CIMPushManager</replace>
@ -63,21 +74,24 @@
<option name="CHANGED_PATHS"> <option name="CHANGED_PATHS">
<list> <list>
<option value="$PROJECT_DIR$/src/main/resources/static/js/cim/cim.web.sdk.js" /> <option value="$PROJECT_DIR$/src/main/resources/static/js/cim/cim.web.sdk.js" />
<option value="$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java" /> <option value="$PROJECT_DIR$/src/main/resources/application.properties" />
<option value="$PROJECT_DIR$/pom.xml" /> <option value="$PROJECT_DIR$/pom.xml" />
<option value="$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java" />
</list> </list>
</option> </option>
</component> </component>
<component name="ProjectFrameBounds" extendedState="6"> <component name="ProjectFrameBounds" extendedState="1">
<option name="y" value="23" /> <option name="y" value="23" />
<option name="width" value="1920" /> <option name="width" value="1920" />
<option name="height" value="999" /> <option name="height" value="1057" />
</component> </component>
<component name="ProjectView"> <component name="ProjectView">
<navigator proportions="" version="1"> <navigator proportions="" version="1">
<foldersAlwaysOnTop value="true" /> <foldersAlwaysOnTop value="true" />
</navigator> </navigator>
<panes> <panes>
<pane id="PackagesPane" />
<pane id="Scope" />
<pane id="ProjectPane"> <pane id="ProjectPane">
<subPane> <subPane>
<expand> <expand>
@ -95,12 +109,40 @@
<item name="cim-boot-server" type="462c0819:PsiDirectoryNode" /> <item name="cim-boot-server" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" /> <item name="src" type="462c0819:PsiDirectoryNode" />
</path> </path>
<path>
<item name="cim-boot-server" type="b2602c69:ProjectViewProjectNode" />
<item name="cim-boot-server" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="main" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="cim-boot-server" type="b2602c69:ProjectViewProjectNode" />
<item name="cim-boot-server" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="main" type="462c0819:PsiDirectoryNode" />
<item name="java" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="cim-boot-server" type="b2602c69:ProjectViewProjectNode" />
<item name="cim-boot-server" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="main" type="462c0819:PsiDirectoryNode" />
<item name="java" type="462c0819:PsiDirectoryNode" />
<item name="cim" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="cim-boot-server" type="b2602c69:ProjectViewProjectNode" />
<item name="cim-boot-server" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="main" type="462c0819:PsiDirectoryNode" />
<item name="java" type="462c0819:PsiDirectoryNode" />
<item name="cim" type="462c0819:PsiDirectoryNode" />
<item name="service" type="462c0819:PsiDirectoryNode" />
</path>
</expand> </expand>
<select /> <select />
</subPane> </subPane>
</pane> </pane>
<pane id="PackagesPane" />
<pane id="Scope" />
</panes> </panes>
</component> </component>
<component name="PropertiesComponent"> <component name="PropertiesComponent">
@ -173,17 +215,19 @@
<workItem from="1557740865016" duration="686000" /> <workItem from="1557740865016" duration="686000" />
<workItem from="1558432010952" duration="352000" /> <workItem from="1558432010952" duration="352000" />
<workItem from="1558596242648" duration="1398000" /> <workItem from="1558596242648" duration="1398000" />
<workItem from="1559009816995" duration="1293000" />
<workItem from="1559110709907" duration="1801000" />
<workItem from="1559198165156" duration="1213000" />
</task> </task>
<servers /> <servers />
</component> </component>
<component name="TimeTrackingManager"> <component name="TimeTrackingManager">
<option name="totallyTimeSpent" value="3323000" /> <option name="totallyTimeSpent" value="7630000" />
</component> </component>
<component name="ToolWindowManager"> <component name="ToolWindowManager">
<frame x="0" y="23" width="1920" height="999" extended-state="6" /> <frame x="0" y="23" width="1920" height="1057" extended-state="1" />
<editor active="true" />
<layout> <layout>
<window_info content_ui="combo" id="Project" order="0" visible="true" weight="0.26149732" /> <window_info content_ui="combo" id="Project" order="0" visible="true" weight="0.271123" />
<window_info id="Structure" order="1" side_tool="true" weight="0.25" /> <window_info id="Structure" order="1" side_tool="true" weight="0.25" />
<window_info id="Designer" order="2" /> <window_info id="Designer" order="2" />
<window_info id="UI Designer" order="3" /> <window_info id="UI Designer" order="3" />
@ -191,8 +235,8 @@
<window_info id="Web" order="5" side_tool="true" /> <window_info id="Web" order="5" side_tool="true" />
<window_info anchor="bottom" id="Message" order="0" /> <window_info anchor="bottom" id="Message" order="0" />
<window_info anchor="bottom" id="Find" order="1" /> <window_info anchor="bottom" id="Find" order="1" />
<window_info anchor="bottom" id="Run" order="2" weight="0.5635546" /> <window_info active="true" anchor="bottom" id="Run" order="2" visible="true" weight="0.51953536" />
<window_info anchor="bottom" id="Debug" order="3" weight="0.4" /> <window_info anchor="bottom" id="Debug" order="3" weight="0.26821542" />
<window_info anchor="bottom" id="Cvs" order="4" weight="0.25" /> <window_info anchor="bottom" id="Cvs" order="4" weight="0.25" />
<window_info anchor="bottom" id="Inspection" order="5" weight="0.4" /> <window_info anchor="bottom" id="Inspection" order="5" weight="0.4" />
<window_info anchor="bottom" id="TODO" order="6" /> <window_info anchor="bottom" id="TODO" order="6" />
@ -219,6 +263,18 @@
<component name="TypeScriptGeneratedFilesManager"> <component name="TypeScriptGeneratedFilesManager">
<option name="version" value="1" /> <option name="version" value="1" />
</component> </component>
<component name="XDebuggerManager">
<breakpoint-manager>
<breakpoints>
<line-breakpoint enabled="true" type="java-line">
<url>jar://$PROJECT_DIR$/lib/cim-server-sdk-mina-3.7.0.jar!/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.class</url>
<line>72</line>
<properties />
<option name="timeStamp" value="5" />
</line-breakpoint>
</breakpoints>
</breakpoint-manager>
</component>
<component name="editorHistoryManager"> <component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/src/main/resources/static/js/cim/cim.web.sdk.js"> <entry file="file://$PROJECT_DIR$/src/main/resources/static/js/cim/cim.web.sdk.js">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
@ -244,27 +300,48 @@
</state> </state>
</provider> </provider>
</entry> </entry>
<entry file="jar://$PROJECT_DIR$/lib/cim-server-sdk-netty-3.7.0.jar!/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.class"> <entry file="jar://$PROJECT_DIR$/lib/cim-server-sdk-mina-3.7.0.jar!/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.class">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-195"> <state relative-caret-position="212">
<caret line="48" column="43" selection-start-line="48" selection-start-column="43" selection-end-line="48" selection-end-column="43" /> <caret line="70" lean-forward="true" selection-start-line="70" selection-end-line="70" />
</state> </state>
</provider> </provider>
</entry> </entry>
<entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java"> <entry file="jar://$PROJECT_DIR$/lib/cim-server-sdk-netty-3.7.0.jar!/com/farsunset/cim/sdk/server/handler/CIMNioSocketAcceptor.class">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-302"> <state relative-caret-position="-136">
<caret line="28" column="50" selection-start-line="28" selection-start-column="50" selection-end-line="28" selection-end-column="50" /> <caret line="57" column="58" selection-start-line="57" selection-start-column="58" selection-end-line="57" selection-end-column="58" />
<folding>
<element signature="imports" expanded="true" />
</folding>
</state> </state>
</provider> </provider>
</entry> </entry>
<entry file="file://$PROJECT_DIR$/pom.xml"> <entry file="file://$PROJECT_DIR$/pom.xml">
<provider selected="true" editor-type-id="text-editor"> <provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="129"> <state relative-caret-position="836">
<caret line="44" column="89" lean-forward="true" selection-start-line="44" selection-start-column="89" selection-end-line="44" selection-end-column="89" /> <caret line="44" column="67" selection-start-line="44" selection-start-column="67" selection-end-line="44" selection-end-column="67" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/service/CIMSessionService.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="399">
<caret line="43" selection-start-line="43" selection-end-line="43" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/resources/application.properties">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="114">
<caret line="6" column="31" selection-start-line="6" selection-start-column="31" selection-end-line="6" selection-end-column="31" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/com/farsunset/cim/CIMConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="171">
<caret line="57" column="78" selection-start-line="57" selection-start-column="78" selection-end-line="57" selection-end-column="78" />
<folding>
<element signature="imports" expanded="true" />
</folding>
</state> </state>
</provider> </provider>
</entry> </entry>

View File

@ -1,6 +1,5 @@
package com.farsunset.cim; package com.farsunset.cim;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -8,7 +7,9 @@ import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -21,13 +22,11 @@ import com.farsunset.cim.service.CIMSessionService;
import com.farsunset.cim.sdk.server.model.CIMSession; import com.farsunset.cim.sdk.server.model.CIMSession;
@Configuration @Configuration
public class CIMConfig implements CIMRequestHandler { public class CIMConfig implements CIMRequestHandler, ApplicationListener<ApplicationStartedEvent> {
@Value("${cim.server.port}")
private int port;
@Resource @Resource
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private HashMap<String,Class<? extends CIMRequestHandler>> appHandlerMap = new HashMap<>(); private HashMap<String,Class<? extends CIMRequestHandler>> appHandlerMap = new HashMap<>();
@PostConstruct @PostConstruct
@ -42,12 +41,11 @@ public class CIMConfig implements CIMRequestHandler {
appHandlerMap.put("client_closed", SessionClosedHandler.class); appHandlerMap.put("client_closed", SessionClosedHandler.class);
} }
@Bean @Bean(destroyMethod = "destroy")
public CIMNioSocketAcceptor getNioSocketAcceptor() throws IOException { public CIMNioSocketAcceptor getNioSocketAcceptor(@Value("${cim.server.port}") int port) {
CIMNioSocketAcceptor nioSocketAcceptor = new CIMNioSocketAcceptor(); CIMNioSocketAcceptor nioSocketAcceptor = new CIMNioSocketAcceptor();
nioSocketAcceptor.setPort(port); nioSocketAcceptor.setPort(port);
nioSocketAcceptor.setAppSentBodyHandler(this); nioSocketAcceptor.setAppSentBodyHandler(this);
nioSocketAcceptor.bind();
return nioSocketAcceptor; return nioSocketAcceptor;
} }
@ -56,7 +54,7 @@ public class CIMConfig implements CIMRequestHandler {
* @param memorySessionService 默认使用内存管理方案 * @param memorySessionService 默认使用内存管理方案
* @return * @return
*/ */
@Bean("cimSessionService") @Bean(value = "cimSessionService")
public CIMSessionService getCIMSessionService(@Qualifier("memorySessionService") CIMSessionService memorySessionService) { public CIMSessionService getCIMSessionService(@Qualifier("memorySessionService") CIMSessionService memorySessionService) {
return memorySessionService; return memorySessionService;
@ -80,6 +78,13 @@ public class CIMConfig implements CIMRequestHandler {
} }
return applicationContext.getBean(handlerClass); return applicationContext.getBean(handlerClass);
} }
/**
* springboot启动完成之后再启动cim服务的避免服务正在重启时客户端会立即开始连接导致意外异常发生.
*/
@Override
public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
applicationContext.getBean(CIMNioSocketAcceptor.class).bind();
}
} }

View File

@ -14,7 +14,7 @@ spring.messages.basename=i18n/messages
################################################################## ##################################################################
# CIM Config # # CIM Config #
################################################################## ##################################################################
cim.server.port =23456 cim.server.port=23456
apple.apns.debug=false apple.apns.debug=false
apple.apns.p12.password= your p12 password apple.apns.p12.password= your p12 password
apple.apns.p12.file= /apns/lvxin.p12 apple.apns.p12.file= /apns/lvxin.p12

View File

@ -2,7 +2,7 @@
<classpath> <classpath>
<classpathentry kind="src" path="src"/> <classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="C:/Program Files/Android/android-sdk-windows/platforms/android-26/android.jar"/>
<classpathentry kind="lib" path="libs/protobuf-java-3.7.0.jar"/> <classpathentry kind="lib" path="libs/protobuf-java-3.7.0.jar"/>
<classpathentry kind="lib" path="libs/android.jar"/>
<classpathentry kind="output" path="bin"/> <classpathentry kind="output" path="bin"/>
</classpath> </classpath>

Binary file not shown.

View File

@ -79,7 +79,7 @@ class CIMCacheManager {
try { try {
if (cursor != null) if (cursor != null)
cursor.close(); cursor.close();
} catch (Exception e) { } catch (Exception ignore) {
} }
} }

View File

@ -26,6 +26,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -33,6 +34,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.farsunset.cim.sdk.android.coder.CIMLogger; import com.farsunset.cim.sdk.android.coder.CIMLogger;
import com.farsunset.cim.sdk.android.coder.ClientMessageDecoder; import com.farsunset.cim.sdk.android.coder.ClientMessageDecoder;
@ -50,7 +52,8 @@ import android.content.Intent;
import android.net.ConnectivityManager; import android.net.ConnectivityManager;
import android.net.NetworkInfo; import android.net.NetworkInfo;
import android.os.Handler; import android.os.Handler;
import android.util.ArrayMap; import android.os.HandlerThread;
import android.os.Process;
/** /**
* 连接服务端管理cim核心处理类管理连接以及消息处理 * 连接服务端管理cim核心处理类管理连接以及消息处理
@ -59,38 +62,42 @@ import android.util.ArrayMap;
*/ */
class CIMConnectorManager { class CIMConnectorManager {
private static CIMConnectorManager manager;
private final int READ_BUFFER_SIZE = 2048; private final int READ_BUFFER_SIZE = 2048;
private final int WRITE_BUFFER_SIZE = 1024; private final int WRITE_BUFFER_SIZE = 1024;
private final int READ_IDLE_TIME = 120 * 1000;// private final int READ_IDLE_TIME = 120 * 1000;
private final int HEARBEAT_TIME_OUT = (READ_IDLE_TIME + 10) * 1000;// 收到服务端心跳请求超时时间 毫秒
private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME"; private final int CONNECT_ALIVE_TIME_OUT = 150 * 1000;
private final AtomicLong LAST_READ_TIME = new AtomicLong(0);
private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private final CIMLogger LOGGER = CIMLogger.getLogger();
private static final HandlerThread IDLE_HANDLER_THREAD = new HandlerThread("READ-IDLE", Process.THREAD_PRIORITY_BACKGROUND);
private Selector selector; private Selector selector;
private SocketChannel socketChannel ; private SocketChannel socketChannel ;
private Context context; private Context context;
private static CIMConnectorManager manager;
private CIMLogger logger = CIMLogger.getLogger();
private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
private ArrayMap<String, Object> attr = new ArrayMap<>();
private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); private ExecutorService workerExecutor = Executors.newFixedThreadPool(1);
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); private ExecutorService bossExecutor = Executors.newFixedThreadPool(1);
private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder();
private ClientMessageDecoder messageDecoder = new ClientMessageDecoder(); private ClientMessageDecoder messageDecoder = new ClientMessageDecoder();
static {
private CIMConnectorManager(Context ctx) { IDLE_HANDLER_THREAD.start();
context = ctx; }
private CIMConnectorManager(Context context) {
this.context = context;
makeNioConnector(); makeNioConnector();
} }
@ -112,14 +119,10 @@ class CIMConnectorManager {
selector.wakeup(); selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.register(selector, SelectionKey.OP_CONNECT);
}catch(Exception e) { }catch(Exception ignore) {}
}
} }
public synchronized static CIMConnectorManager getManager(Context context) { public synchronized static CIMConnectorManager getManager(Context context) {
if (manager == null) { if (manager == null) {
@ -129,8 +132,6 @@ class CIMConnectorManager {
return manager; return manager;
} }
public void connect(final String host, final int port) { public void connect(final String host, final int port) {
@ -155,85 +156,100 @@ class CIMConnectorManager {
makeNioConnector(); makeNioConnector();
} }
bossExecutor.execute(new Runnable() { workerExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
LOGGER.startConnect(host, port);
final InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
logger.startConnect(remoteAddress);
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_CONNECTION_STATE, false);
try { try {
socketChannel.connect(remoteAddress);
}catch(Exception e){ socketChannel.connect(new InetSocketAddress(host, port));
closeSession();
return; while (socketChannel.isOpen()) {
}
selector.select();
workerExecutor.execute(new Runnable() { if(!selector.isOpen()) {
@Override break;
public void run() { }
while (socketChannel.isOpen()) {
for(SelectionKey key : selector.selectedKeys()){
try {
selector.select(); if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) {
for(SelectionKey key : selector.selectedKeys()){ handelConnectedEvent();
if (key.isConnectable() && socketChannel.finishConnect()) { continue;
handelConnectionEvent(); }
continue;
} if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
handelSocketReadEvent();
if (key.isReadable()) { }
handelReadEvent(); }
} }
}
}catch(ConnectException ignore){
} catch (Exception e) { handleConnectAbortedEvent();
if(e instanceof ConnectException) { }catch(IllegalArgumentException ignore){
handleConnectFailure(remoteAddress); handleConnectAbortedEvent();
}else { }catch(IOException ignore) {
closeSession(); handelDisconnectedEvent();
} }catch(ClosedSelectorException ignore) {}
}
}
}
});
} }
}); });
} }
Handler idleHandler = new Handler() { private Handler idleHandler = new Handler(IDLE_HANDLER_THREAD.getLooper()) {
public void handleMessage(android.os.Message m) { public void handleMessage(android.os.Message m) {
sessionIdle(); sessionIdle();
} }
}; };
private void handelConnectionEvent() throws Exception { private void handelDisconnectedEvent() {
CONNECTING_FLAG.set(false); CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ); closeSession();
}
private void handleConnectAbortedEvent() {
CONNECTING_FLAG.set(false);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
LOGGER.connectFailure(interval);
Intent intent = new Intent();
intent.setPackage(context.getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra("interval", interval);
context.sendBroadcast(intent);
}
private void handelConnectedEvent() throws IOException {
CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ);
sessionCreated(); sessionCreated();
idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME); idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME);
} }
private void handelReadEvent() throws Exception { private void handelSocketReadEvent() throws IOException {
idleHandler.removeCallbacksAndMessages(null); LAST_READ_TIME.set(System.currentTimeMillis());
idleHandler.removeMessages(0);
idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME); idleHandler.sendEmptyMessageDelayed(0, READ_IDLE_TIME);
int result = 0; int result = 0;
while((result = socketChannel.read(readBuffer)) > 0) { while((result = socketChannel.read(readBuffer)) > 0) {
if(readBuffer.position() == readBuffer.capacity()) { if(readBuffer.position() == readBuffer.capacity()) {
extemdByteBuffer(); extendByteBuffer();
} }
} }
@ -250,11 +266,12 @@ class CIMConnectorManager {
return; return;
} }
LOGGER.messageReceived(socketChannel,message);
logger.messageReceived(socketChannel,message);
if(isRequest(message)) { if(isHeartbeatRequest(message)) {
send(getResponse());
send(getHeartbeatResponse());
return; return;
} }
@ -262,7 +279,7 @@ class CIMConnectorManager {
} }
private void extemdByteBuffer() { private void extendByteBuffer() {
ByteBuffer newBuffer = ByteBuffer.allocate(readBuffer.capacity() + READ_BUFFER_SIZE / 2); ByteBuffer newBuffer = ByteBuffer.allocate(readBuffer.capacity() + READ_BUFFER_SIZE / 2);
readBuffer.position(0); readBuffer.position(0);
@ -272,23 +289,6 @@ class CIMConnectorManager {
readBuffer = newBuffer; readBuffer = newBuffer;
} }
private void handleConnectFailure(InetSocketAddress remoteAddress) {
CONNECTING_FLAG.set(false);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
logger.connectFailure(remoteAddress, interval);
Intent intent = new Intent();
intent.setPackage(context.getPackageName());
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra("interval", interval);
context.sendBroadcast(intent);
}
public void send(final SentBody body) { public void send(final SentBody body) {
@ -338,7 +338,7 @@ class CIMConnectorManager {
try { try {
socketChannel.write(messageEncoder.encode(body)); socketChannel.write(messageEncoder.encode(body));
messageSent(body); messageSent(body);
} catch (IOException e) { } catch (IOException ignore) {
closeSession(); closeSession();
} }
} }
@ -347,11 +347,10 @@ class CIMConnectorManager {
} }
public void sessionCreated() throws Exception { public void sessionCreated() {
LOGGER.sessionCreated(socketChannel);
logger.sessionCreated(socketChannel);
setLastHeartbeatTime(); LAST_READ_TIME.set(System.currentTimeMillis());
Intent intent = new Intent(); Intent intent = new Intent();
intent.setPackage(context.getPackageName()); intent.setPackage(context.getPackageName());
@ -362,16 +361,18 @@ class CIMConnectorManager {
public void sessionClosed() { public void sessionClosed() {
idleHandler.removeMessages(0);
LAST_READ_TIME.set(0);
LOGGER.sessionClosed(socketChannel);
readBuffer.clear(); readBuffer.clear();
if(readBuffer.capacity() > READ_BUFFER_SIZE) { if(readBuffer.capacity() > READ_BUFFER_SIZE) {
readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
} }
attr.clear();
logger.sessionClosed(socketChannel);
closeSelector(); closeSelector();
Intent intent = new Intent(); Intent intent = new Intent();
@ -383,14 +384,13 @@ class CIMConnectorManager {
public void sessionIdle() { public void sessionIdle() {
logger.sessionIdle(socketChannel); LOGGER.sessionIdle(socketChannel);
/** /**
* 用于解决wifi情况下偶而路由器与服务器断开连接时客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题 * 用于解决wifi情况下偶而路由器与服务器断开连接时客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题
* *
*/ */
long lastHeartbeatTime = getLastHeartbeatTime(); if (System.currentTimeMillis() - LAST_READ_TIME.get() >= CONNECT_ALIVE_TIME_OUT) {
if (System.currentTimeMillis() - lastHeartbeatTime >= HEARBEAT_TIME_OUT) {
closeSession(); closeSession();
} }
} }
@ -420,7 +420,7 @@ class CIMConnectorManager {
public void messageSent(Object message) { public void messageSent(Object message) {
logger.messageSent(socketChannel, message); LOGGER.messageSent(socketChannel, message);
if (message instanceof SentBody) { if (message instanceof SentBody) {
Intent intent = new Intent(); Intent intent = new Intent();
@ -431,41 +431,23 @@ class CIMConnectorManager {
} }
} }
private void setLastHeartbeatTime() {
attr.put(KEY_LAST_HEART_TIME, System.currentTimeMillis());
}
private long getLastHeartbeatTime() {
long time = 0;
Object value = attr.get(KEY_LAST_HEART_TIME);
if (value != null) {
time = Long.parseLong(value.toString());
}
return time;
}
public static boolean isNetworkConnected(Context context) { public static boolean isNetworkConnected(Context context) {
try { try {
ConnectivityManager nw = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); ConnectivityManager nw = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo networkInfo = nw.getActiveNetworkInfo(); NetworkInfo networkInfo = nw.getActiveNetworkInfo();
return networkInfo != null; return networkInfo != null;
} catch (Exception e) { } catch (Exception ignore) {
} }
return false; return false;
} }
public HeartbeatResponse getResponse() { public HeartbeatResponse getHeartbeatResponse() {
return HeartbeatResponse.getInstance(); return HeartbeatResponse.getInstance();
} }
public boolean isRequest(Object data) { public boolean isHeartbeatRequest(Object data) {
setLastHeartbeatTime();
return data instanceof HeartbeatRequest; return data instanceof HeartbeatRequest;
} }
@ -490,12 +472,9 @@ class CIMConnectorManager {
} }
public void closeSelector() { public void closeSelector() {
if (selector != null) { try {
try { selector.close();
selector.close(); } catch (IOException ignore) {}
} catch (IOException ignore) {
}
}
} }
} }

View File

@ -41,10 +41,11 @@ public abstract class CIMEventBroadcastReceiver extends BroadcastReceiver {
protected Context context; protected Context context;
@SuppressWarnings("deprecation")
@Override @Override
public void onReceive(Context ctx, Intent intent) { public void onReceive(Context context, Intent intent) {
context = ctx; this.context = context;
/* /*
* 操作事件广播用于提高service存活率 * 操作事件广播用于提高service存活率

View File

@ -30,6 +30,7 @@ import android.content.pm.PackageManager.NameNotFoundException;
import android.os.Build; import android.os.Build;
import android.text.TextUtils; import android.text.TextUtils;
import com.farsunset.cim.sdk.android.coder.CIMLogger;
import com.farsunset.cim.sdk.android.constant.CIMConstant; import com.farsunset.cim.sdk.android.constant.CIMConstant;
import com.farsunset.cim.sdk.android.model.SentBody; import com.farsunset.cim.sdk.android.model.SentBody;
@ -67,12 +68,17 @@ public class CIMPushManager {
} }
private static void connect(Context context, String ip, int port, boolean autoBind, long delayedTime) { private static void connect(Context context, String host, int port, boolean autoBind, long delayedTime) {
if(TextUtils.isEmpty(host) || port == 0) {
CIMLogger.getLogger().invalidHostPort(host, port);
return;
}
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_DESTROYED, false); CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_CIM_DESTROYED, false);
CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_MANUAL_STOP, false); CIMCacheManager.putBoolean(context, CIMCacheManager.KEY_MANUAL_STOP, false);
CIMCacheManager.putString(context, CIMCacheManager.KEY_CIM_SERVIER_HOST, ip); CIMCacheManager.putString(context, CIMCacheManager.KEY_CIM_SERVIER_HOST, host);
CIMCacheManager.putInt(context, CIMCacheManager.KEY_CIM_SERVIER_PORT, port); CIMCacheManager.putInt(context, CIMCacheManager.KEY_CIM_SERVIER_PORT, port);
if (!autoBind) { if (!autoBind) {
@ -80,7 +86,7 @@ public class CIMPushManager {
} }
Intent serviceIntent = new Intent(context, CIMPushService.class); Intent serviceIntent = new Intent(context, CIMPushService.class);
serviceIntent.putExtra(CIMCacheManager.KEY_CIM_SERVIER_HOST, ip); serviceIntent.putExtra(CIMCacheManager.KEY_CIM_SERVIER_HOST, host);
serviceIntent.putExtra(CIMCacheManager.KEY_CIM_SERVIER_PORT, port); serviceIntent.putExtra(CIMCacheManager.KEY_CIM_SERVIER_PORT, port);
serviceIntent.putExtra(CIMPushService.KEY_DELAYED_TIME, delayedTime); serviceIntent.putExtra(CIMPushService.KEY_DELAYED_TIME, delayedTime);
serviceIntent.setAction(ACTION_CREATE_CIM_CONNECTION); serviceIntent.setAction(ACTION_CREATE_CIM_CONNECTION);
@ -134,7 +140,7 @@ public class CIMPushManager {
String deviceId = CIMCacheManager.getString(context, CIMCacheManager.KEY_DEVICE_ID); String deviceId = CIMCacheManager.getString(context, CIMCacheManager.KEY_DEVICE_ID);
if (TextUtils.isEmpty(deviceId)) { if (TextUtils.isEmpty(deviceId)) {
deviceId = UUID.randomUUID().toString().replaceAll("-", ""); deviceId = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
CIMCacheManager.putString(context, CIMCacheManager.KEY_DEVICE_ID, deviceId); CIMCacheManager.putString(context, CIMCacheManager.KEY_DEVICE_ID, deviceId);
} }
@ -245,7 +251,7 @@ public class CIMPushManager {
try { try {
PackageInfo mPackageInfo = context.getPackageManager().getPackageInfo(context.getPackageName(), 0); PackageInfo mPackageInfo = context.getPackageManager().getPackageInfo(context.getPackageName(), 0);
versionName = mPackageInfo.versionName; versionName = mPackageInfo.versionName;
} catch (NameNotFoundException e) { } catch (NameNotFoundException ignore) {
} }
return versionName; return versionName;
} }

View File

@ -211,7 +211,6 @@ public class CIMPushService extends Service {
IntentFilter intentFilter = new IntentFilter(); IntentFilter intentFilter = new IntentFilter();
intentFilter.addAction(Intent.ACTION_POWER_CONNECTED); intentFilter.addAction(Intent.ACTION_POWER_CONNECTED);
intentFilter.addAction(Intent.ACTION_POWER_DISCONNECTED); intentFilter.addAction(Intent.ACTION_POWER_DISCONNECTED);
intentFilter.addAction(Intent.ACTION_SCREEN_ON);
intentFilter.addAction(Intent.ACTION_USER_PRESENT); intentFilter.addAction(Intent.ACTION_USER_PRESENT);
return intentFilter; return intentFilter;
} }

View File

@ -21,8 +21,6 @@
*/ */
package com.farsunset.cim.sdk.android.coder; package com.farsunset.cim.sdk.android.coder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import android.util.Log; import android.util.Log;
@ -53,43 +51,49 @@ public class CIMLogger {
public void messageReceived(SocketChannel session, Object message) { public void messageReceived(SocketChannel session, Object message) {
if(debug) { if(debug) {
Log.i(TAG,String.format("RECEIVED" + getSessionInfo(session) + "\n%s", message)); Log.i(TAG,String.format("[RECEIVED]" + getSessionInfo(session) + "\n%s", message));
} }
} }
public void messageSent(SocketChannel session, Object message) { public void messageSent(SocketChannel session, Object message) {
if(debug) { if(debug) {
Log.i(TAG,String.format("SENT" + getSessionInfo(session) + "\n%s", message)); Log.i(TAG,String.format("[ SENT ]" + getSessionInfo(session) + "\n%s", message));
} }
} }
public void sessionCreated( SocketChannel session) throws Exception { public void sessionCreated( SocketChannel session) {
if(debug) { if(debug) {
Log.i(TAG,"OPENED" + getSessionInfo(session)); Log.i(TAG,"[ OPENED ]" + getSessionInfo(session));
} }
} }
public void sessionIdle( SocketChannel session) { public void sessionIdle( SocketChannel session) {
if(debug) { if(debug) {
Log.d(TAG,"IDLE READ" + getSessionInfo(session)); Log.d(TAG,"[ IDLE ]" + getSessionInfo(session));
} }
} }
public void sessionClosed( SocketChannel session) { public void sessionClosed( SocketChannel session) {
if(debug) { if(debug) {
Log.w(TAG,"CLOSED ID = " + session.hashCode()); Log.w(TAG,"[ CLOSED ] ID = " + session.hashCode());
} }
} }
public void connectFailure(InetSocketAddress remoteAddress,long interval) { public void connectFailure(long interval) {
if(debug) { if(debug) {
Log.d(TAG,"CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms"); Log.d(TAG,"CONNECT FAILURE, TRY RECONNECT AFTER " + interval +"ms");
} }
} }
public void startConnect(InetSocketAddress remoteAddress) { public void startConnect(String host , int port) {
if(debug) { if(debug) {
Log.i(TAG,"START CONNECT REMOTE HOST: " + remoteAddress.toString()); Log.i(TAG,"START CONNECT REMOTE HOST:" + host + " PORT:" + port);
}
}
public void invalidHostPort(String host , int port) {
if(debug) {
Log.d(TAG,"INVALID SOCKET ADDRESS -> HOST:" + host + " PORT:" + port);
} }
} }

View File

@ -39,7 +39,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ClientMessageDecoder { public class ClientMessageDecoder {
public Object doDecode(ByteBuffer iobuffer) throws Exception { public Object doDecode(ByteBuffer iobuffer) {
/** /**
* 消息头3位 * 消息头3位
@ -68,7 +68,11 @@ public class ClientMessageDecoder {
iobuffer.position(0); iobuffer.position(0);
return mappingMessageObject(dataBytes, conetnType); try {
return mappingMessageObject(dataBytes, conetnType);
} catch (InvalidProtocolBufferException e) {
return null;
}
} }

View File

@ -26,13 +26,11 @@ import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Random; import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -49,7 +47,6 @@ import com.farsunset.cim.sdk.client.model.Message;
import com.farsunset.cim.sdk.client.model.ReplyBody; import com.farsunset.cim.sdk.client.model.ReplyBody;
import com.farsunset.cim.sdk.client.model.SentBody; import com.farsunset.cim.sdk.client.model.SentBody;
/** /**
* 连接服务端管理cim核心处理类管理连接以及消息处理 * 连接服务端管理cim核心处理类管理连接以及消息处理
@ -58,43 +55,32 @@ import com.farsunset.cim.sdk.client.model.SentBody;
*/ */
class CIMConnectorManager { class CIMConnectorManager {
private static CIMConnectorManager manager;
private final int READ_BUFFER_SIZE = 2048; private final int READ_BUFFER_SIZE = 2048;
private final int WRITE_BUFFER_SIZE = 1024; private final int WRITE_BUFFER_SIZE = 1024;
private final int READ_IDLE_TIME = 120 * 1000;// private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private final int HEARBEAT_TIME_OUT = (READ_IDLE_TIME + 10) * 1000;// 收到服务端心跳请求超时时间 毫秒
private final CIMLogger LOGGER = CIMLogger.getLogger();
private final String KEY_LAST_HEART_TIME = "KEY_LAST_HEART_TIME";
private Selector selector; private Selector selector;
private SocketChannel socketChannel ; private SocketChannel socketChannel ;
private static CIMConnectorManager manager;
private CIMLogger logger = CIMLogger.getLogger();
private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
private HashMap<String, Object> attr = new HashMap<>();
private ExecutorService eventExecutor = Executors.newFixedThreadPool(1);
private ExecutorService workerExecutor = Executors.newFixedThreadPool(1); private ExecutorService workerExecutor = Executors.newFixedThreadPool(1);
private ExecutorService bossExecutor = Executors.newFixedThreadPool(1); private ExecutorService bossExecutor = Executors.newFixedThreadPool(1);
private ExecutorService eventExecutor = Executors.newFixedThreadPool(1);
private final AtomicBoolean CONNECTING_FLAG = new AtomicBoolean(false) ;
private ClientMessageEncoder messageEncoder = new ClientMessageEncoder(); private ClientMessageEncoder messageEncoder = new ClientMessageEncoder();
private ClientMessageDecoder messageDecoder = new ClientMessageDecoder(); private ClientMessageDecoder messageDecoder = new ClientMessageDecoder();
private Timer idleHandler = new Timer();;
private ReadIdleTask idleTask = new ReadIdleTask();;
private CIMConnectorManager() { private CIMConnectorManager() {
makeNioConnector(); makeNioConnector();
} }
private void makeNioConnector() { private void makeNioConnector() {
try { try {
if(socketChannel == null || !socketChannel.isOpen()) { if(socketChannel == null || !socketChannel.isOpen()) {
@ -111,15 +97,12 @@ class CIMConnectorManager {
} }
selector.wakeup(); selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.register(selector, SelectionKey.OP_CONNECT);
}catch(Exception e) {
}catch(Exception ignore) {}
}
} }
public synchronized static CIMConnectorManager getManager() { public synchronized static CIMConnectorManager getManager() {
if (manager == null) { if (manager == null) {
@ -129,13 +112,12 @@ class CIMConnectorManager {
return manager; return manager;
} }
public void connect(final String host, final int port) { public void connect(final String host, final int port) {
if (CONNECTING_FLAG.get() || isConnected()) { boolean isConnected = isConnected();
if (CONNECTING_FLAG.get() || isConnected) {
return; return;
} }
@ -145,83 +127,87 @@ class CIMConnectorManager {
makeNioConnector(); makeNioConnector();
} }
bossExecutor.execute(new Runnable() { workerExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
LOGGER.startConnect(host, port);
final InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
logger.startConnect(remoteAddress);
CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false); CIMCacheManager.getInstance().putBoolean(CIMCacheManager.KEY_CIM_CONNECTION_STATE, false);
try {
try {
socketChannel.connect(remoteAddress); socketChannel.connect(new InetSocketAddress(host, port));
}catch(Exception e) {
closeSession(); while (socketChannel.isOpen()) {
return;
} selector.select();
if(!selector.isOpen()) {
workerExecutor.execute(new Runnable() { break;
@Override }
public void run() {
while (socketChannel.isOpen()) { for(SelectionKey key : selector.selectedKeys()){
try { if((key.interestOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT && socketChannel.finishConnect()) {
selector.select(); handelConnectedEvent();
for(SelectionKey key : selector.selectedKeys()){ continue;
if (key.isConnectable() && socketChannel.finishConnect()) { }
handelConnectionEvent();
continue; if((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
} handelSocketReadEvent();
}
if (key.isReadable()) { }
handelReadEvent(); }
}
} }catch(ConnectException ignore){
handleConnectAbortedEvent();
} catch (Exception e) { }catch(IllegalArgumentException ignore){
if(e instanceof ConnectException) { handleConnectAbortedEvent();
handleConnectFailure(remoteAddress); }catch(IOException ignore) {
}else { handelDisconnectedEvent();
closeSession(); }catch(ClosedSelectorException ignore) {}
}
}
}
}
});
} }
}); });
} }
private void handelConnectionEvent() throws Exception {
private void handelDisconnectedEvent() {
CONNECTING_FLAG.set(false); CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ); closeSession();
sessionCreated();
idleHandler.schedule(idleTask = new ReadIdleTask(), READ_IDLE_TIME);
} }
private void handelReadEvent() throws Exception { private void handleConnectAbortedEvent() {
if(idleTask != null) { CONNECTING_FLAG.set(false);
idleTask.cancel();
} long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
idleHandler.schedule(idleTask = new ReadIdleTask(), READ_IDLE_TIME); LOGGER.connectFailure(interval);
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra("interval", interval);
sendBroadcast(intent);
}
private void handelConnectedEvent() throws IOException {
CONNECTING_FLAG.set(false);
socketChannel.register(selector, SelectionKey.OP_READ);
sessionCreated();
}
private void handelSocketReadEvent() throws IOException {
int result = 0; int result = 0;
while((result = socketChannel.read(readBuffer)) > 0) { while((result = socketChannel.read(readBuffer)) > 0) {
if(readBuffer.position() == readBuffer.capacity()) { if(readBuffer.position() == readBuffer.capacity()) {
extemdByteBuffer(); extendByteBuffer();
} }
} }
@ -238,11 +224,12 @@ class CIMConnectorManager {
return; return;
} }
LOGGER.messageReceived(socketChannel,message);
logger.messageReceived(socketChannel,message);
if(isRequest(message)) { if(isHeartbeatRequest(message)) {
send(getResponse());
send(getHeartbeatResponse());
return; return;
} }
@ -250,7 +237,7 @@ class CIMConnectorManager {
} }
private void extemdByteBuffer() { private void extendByteBuffer() {
ByteBuffer newBuffer = ByteBuffer.allocate(readBuffer.capacity() + READ_BUFFER_SIZE / 2); ByteBuffer newBuffer = ByteBuffer.allocate(readBuffer.capacity() + READ_BUFFER_SIZE / 2);
readBuffer.position(0); readBuffer.position(0);
@ -260,22 +247,6 @@ class CIMConnectorManager {
readBuffer = newBuffer; readBuffer = newBuffer;
} }
private void handleConnectFailure(InetSocketAddress remoteAddress) {
CONNECTING_FLAG.set(false);
long interval = CIMConstant.RECONN_INTERVAL_TIME - (5 * 1000 - new Random().nextInt(15 * 1000));
logger.connectFailure(remoteAddress, interval);
Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_FAILED);
intent.putExtra("interval", interval);
sendBroadcast(intent);
}
public void send(final SentBody body) { public void send(final SentBody body) {
@ -307,7 +278,6 @@ class CIMConnectorManager {
intent.putExtra(Exception.class.getName(), exceptionName); intent.putExtra(Exception.class.getName(), exceptionName);
intent.putExtra(SentBody.class.getName(), body); intent.putExtra(SentBody.class.getName(), body);
sendBroadcast(intent); sendBroadcast(intent);
}else { }else {
messageSent(body); messageSent(body);
} }
@ -325,8 +295,8 @@ class CIMConnectorManager {
try { try {
socketChannel.write(messageEncoder.encode(body)); socketChannel.write(messageEncoder.encode(body));
messageSent(body); messageSent(body);
} catch (IOException e) { } catch (IOException ignore) {
closeSession(); closeSession();
} }
} }
}); });
@ -334,12 +304,10 @@ class CIMConnectorManager {
} }
public void sessionCreated() throws Exception { public void sessionCreated() {
logger.sessionCreated(socketChannel); LOGGER.sessionCreated(socketChannel);
setLastHeartbeatTime();
Intent intent = new Intent(); Intent intent = new Intent();
intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED); intent.setAction(CIMConstant.IntentAction.ACTION_CONNECTION_SUCCESSED);
sendBroadcast(intent); sendBroadcast(intent);
@ -348,16 +316,14 @@ class CIMConnectorManager {
public void sessionClosed() { public void sessionClosed() {
LOGGER.sessionClosed(socketChannel);
readBuffer.clear(); readBuffer.clear();
if(readBuffer.capacity() > READ_BUFFER_SIZE) { if(readBuffer.capacity() > READ_BUFFER_SIZE) {
readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
} }
attr.clear();
logger.sessionClosed(socketChannel);
closeSelector(); closeSelector();
Intent intent = new Intent(); Intent intent = new Intent();
@ -365,22 +331,7 @@ class CIMConnectorManager {
sendBroadcast(intent); sendBroadcast(intent);
} }
public void sessionIdle() {
logger.sessionIdle(socketChannel);
/**
* 用于解决wifi情况下偶而路由器与服务器断开连接时客户端并没及时收到关闭事件 导致这样的情况下当前连接无效也不会重连的问题
*
*/
long lastHeartbeatTime = getLastHeartbeatTime();
if (System.currentTimeMillis() - lastHeartbeatTime >= HEARBEAT_TIME_OUT) {
closeSession();
}
}
public void messageReceived(Object obj) { public void messageReceived(Object obj) {
if (obj instanceof Message) { if (obj instanceof Message) {
@ -403,7 +354,7 @@ class CIMConnectorManager {
public void messageSent(Object message) { public void messageSent(Object message) {
logger.messageSent(socketChannel, message); LOGGER.messageSent(socketChannel, message);
if (message instanceof SentBody) { if (message instanceof SentBody) {
Intent intent = new Intent(); Intent intent = new Intent();
@ -413,30 +364,11 @@ class CIMConnectorManager {
} }
} }
public HeartbeatResponse getHeartbeatResponse() {
private void setLastHeartbeatTime() {
attr.put(KEY_LAST_HEART_TIME, System.currentTimeMillis());
}
private long getLastHeartbeatTime() {
long time = 0;
Object value = attr.get(KEY_LAST_HEART_TIME);
if (value != null) {
time = Long.parseLong(value.toString());
}
return time;
}
public HeartbeatResponse getResponse() {
return HeartbeatResponse.getInstance(); return HeartbeatResponse.getInstance();
} }
public boolean isRequest(Object data) { public boolean isHeartbeatRequest(Object data) {
setLastHeartbeatTime();
return data instanceof HeartbeatRequest; return data instanceof HeartbeatRequest;
} }
@ -461,13 +393,11 @@ class CIMConnectorManager {
} }
public void closeSelector() { public void closeSelector() {
if (selector != null) { try {
try { selector.close();
selector.close(); } catch (IOException ignore) {}
} catch (IOException ignore) {
}
}
} }
private void sendBroadcast(final Intent intent) { private void sendBroadcast(final Intent intent) {
eventExecutor.execute(new Runnable() { eventExecutor.execute(new Runnable() {
@ -477,14 +407,5 @@ class CIMConnectorManager {
} }
}); });
} }
private class ReadIdleTask extends TimerTask{
@Override
public void run() {
sessionIdle();
}
}
} }

View File

@ -21,8 +21,6 @@
*/ */
package com.farsunset.cim.sdk.client; package com.farsunset.cim.sdk.client;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Properties; import java.util.Properties;
import java.util.UUID; import java.util.UUID;
@ -106,8 +104,8 @@ public class CIMPushManager {
sent.setKey(CIMConstant.RequestKey.CLIENT_BIND); sent.setKey(CIMConstant.RequestKey.CLIENT_BIND);
sent.put("account", account); sent.put("account", account);
sent.put("deviceId", getDeviceId()); sent.put("deviceId", getDeviceId());
sent.put("channel", sysPro.getProperty("os.name")); sent.put("channel", "java");
sent.put("device", System.getenv().get("COMPUTERNAME")); sent.put("device", sysPro.getProperty("os.name"));
sent.put("version", getClientVersion()); sent.put("version", getClientVersion());
sent.put("osVersion", sysPro.getProperty("os.version")); sent.put("osVersion", sysPro.getProperty("os.version"));
sendRequest(sent); sendRequest(sent);
@ -251,30 +249,13 @@ public class CIMPushManager {
} }
private static String getDeviceId() { private static String getDeviceId() {
InetAddress ia;
try { String deviceId = System.getProperties().getProperty(CIMConstant.ConfigKey.CLIENT_DEVICEID);
ia = InetAddress.getLocalHost();
byte[] mac = NetworkInterface.getByInetAddress(ia).getHardwareAddress(); if(deviceId == null) {
StringBuffer sb = new StringBuffer(""); deviceId = UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
for (int i = 0; i < mac.length; i++) { System.getProperties().put(CIMConstant.ConfigKey.CLIENT_DEVICEID, deviceId);
if (i != 0) {
sb.append("-");
}
// 字节转换为整数
int temp = mac[i] & 0xff;
String str = Integer.toHexString(temp);
if (str.length() == 1) {
sb.append("0" + str);
} else {
sb.append(str);
}
}
return UUID.fromString(sb.toString()).toString().replaceAll("-", "").toUpperCase();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} }
return deviceId;
return null;
} }
} }

View File

@ -22,7 +22,6 @@
package com.farsunset.cim.sdk.client.coder; package com.farsunset.cim.sdk.client.coder;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -56,7 +55,7 @@ public class CIMLogger {
LOGGER.info(String.format("SENT" + getSessionInfo(session) + "\n%s", message)); LOGGER.info(String.format("SENT" + getSessionInfo(session) + "\n%s", message));
} }
public void sessionCreated( SocketChannel session) throws Exception { public void sessionCreated( SocketChannel session) {
LOGGER.info("OPENED" + getSessionInfo(session)); LOGGER.info("OPENED" + getSessionInfo(session));
} }
@ -69,12 +68,12 @@ public class CIMLogger {
} }
public void connectFailure(InetSocketAddress remoteAddress,long interval) { public void connectFailure(long interval) {
LOGGER.debug("CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms"); LOGGER.debug("CONNECT FAILURE TRY RECONNECT AFTER " + interval +"ms");
} }
public void startConnect(InetSocketAddress remoteAddress) { public void startConnect(String host , int port) {
LOGGER.info("START CONNECT REMOTE HOST: " + remoteAddress.toString()); LOGGER.info("START CONNECT REMOTE HOST:" + host + " PORT:" + port);
} }
public void connectState(boolean isConnected) { public void connectState(boolean isConnected) {

View File

@ -38,7 +38,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ClientMessageDecoder { public class ClientMessageDecoder {
public Object doDecode(ByteBuffer iobuffer) throws Exception { public Object doDecode(ByteBuffer iobuffer) {
/** /**
* 消息头3位 * 消息头3位
@ -67,7 +67,12 @@ public class ClientMessageDecoder {
iobuffer.position(0); iobuffer.position(0);
return mappingMessageObject(dataBytes, conetnType); try {
return mappingMessageObject(dataBytes, conetnType);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return null;
}
} }

View File

@ -51,6 +51,7 @@ public interface CIMConstant {
public static String DEVICE_MODEL = "client.model"; public static String DEVICE_MODEL = "client.model";
public static String CLIENT_VERSION = "client.version"; public static String CLIENT_VERSION = "client.version";
public static String CLIENT_ACCOUNT = "client.account"; public static String CLIENT_ACCOUNT = "client.account";
public static String CLIENT_DEVICEID = "client.deviceid";
} }

View File

@ -0,0 +1,32 @@
/**
* Copyright 2013-2019 Xia Jun(3979434@qq.com).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************************
* *
* Website : http://www.farsunset.com *
* *
***************************************************************************************
*/
package com.farsunset.cim.sdk.server.exception;
public class ServerSocketBindException extends RuntimeException {
private static final long serialVersionUID = 1L;
public ServerSocketBindException(int port,Throwable cause) {
super("cim server bind failed, port:" + port,cause);
}
}

View File

@ -1,5 +1,5 @@
/** /**
* Copyright 2013-2023 Xia Jun(3979434@qq.com). * Copyright 2013-2019 Xia Jun(3979434@qq.com).
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -40,6 +40,7 @@ import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.farsunset.cim.sdk.server.constant.CIMConstant; import com.farsunset.cim.sdk.server.constant.CIMConstant;
import com.farsunset.cim.sdk.server.exception.ServerSocketBindException;
import com.farsunset.cim.sdk.server.filter.CIMLoggingFilter; import com.farsunset.cim.sdk.server.filter.CIMLoggingFilter;
import com.farsunset.cim.sdk.server.filter.ServerMessageCodecFactory; import com.farsunset.cim.sdk.server.filter.ServerMessageCodecFactory;
import com.farsunset.cim.sdk.server.model.HeartbeatRequest; import com.farsunset.cim.sdk.server.model.HeartbeatRequest;
@ -57,7 +58,7 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
private final int TIME_OUT = 10; // private final int TIME_OUT = 10; //
private final int READ_BUFFER_SIZE = 1024; // byte private final int READ_BUFFER_SIZE = 1024; // byte
public void bind() throws IOException { public void bind() {
/** /**
* 预制websocket握手请求的处理 * 预制websocket握手请求的处理
@ -85,15 +86,22 @@ public class CIMNioSocketAcceptor extends IoHandlerAdapter implements KeepAliveM
acceptor.getFilterChain().addLast("heartbeat", keepAliveFilter); acceptor.getFilterChain().addLast("heartbeat", keepAliveFilter);
acceptor.getFilterChain().addLast("executor", new ExecutorFilter(executor)); acceptor.getFilterChain().addLast("executor", new ExecutorFilter(executor));
acceptor.setHandler(this); acceptor.setHandler(this);
try {
acceptor.bind(new InetSocketAddress(port)); acceptor.bind(new InetSocketAddress(port));
} catch (IOException ignore) {
throw new ServerSocketBindException(port,ignore);
}
} }
public void unbind() { public void destroy() {
acceptor.unbind(); if(acceptor == null) {
acceptor.dispose(); return;
} }
try {
acceptor.unbind();
acceptor.dispose();
}catch(Exception ignore) {}
}
/** /**
* 设置应用层的sentbody处理handler * 设置应用层的sentbody处理handler
* @param outerRequestHandler * @param outerRequestHandler

View File

@ -42,9 +42,7 @@ public class WebsocketHandler implements CIMRequestHandler {
md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length()); md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length());
byte[] sha1Hash = md.digest(); byte[] sha1Hash = md.digest();
secKey = new String(org.apache.mina.util.Base64.encodeBase64(sha1Hash)); secKey = new String(org.apache.mina.util.Base64.encodeBase64(sha1Hash));
} catch (Exception e) { } catch (Exception ignore) {}
e.printStackTrace();
}
session.write(new HandshakerResponse(secKey)); session.write(new HandshakerResponse(secKey));
} }
} }

View File

@ -1,5 +1,5 @@
/** /**
* Copyright 2013-2023 Xia Jun(3979434@qq.com). * Copyright 2013-2019 Xia Jun(3979434@qq.com).
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.

View File

@ -21,7 +21,6 @@
*/ */
package com.farsunset.cim.sdk.server.handler; package com.farsunset.cim.sdk.server.handler;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -59,6 +58,10 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
private CIMRequestHandler outerRequestHandler; private CIMRequestHandler outerRequestHandler;
private ConcurrentHashMap<String,Channel> channelGroup = new ConcurrentHashMap<String,Channel>(); private ConcurrentHashMap<String,Channel> channelGroup = new ConcurrentHashMap<String,Channel>();
private int port; private int port;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
// 连接空闲时间 // 连接空闲时间
public static final int READ_IDLE_TIME = 150;// public static final int READ_IDLE_TIME = 150;//
@ -68,7 +71,7 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
public static final int PING_TIME_OUT = 30;// 心跳响应 超时为30秒 public static final int PING_TIME_OUT = 30;// 心跳响应 超时为30秒
public void bind() throws IOException { public void bind() {
/** /**
* 预制websocket握手请求的处理 * 预制websocket握手请求的处理
@ -77,8 +80,8 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
innerHandlerMap.put(CIMConstant.CLIENT_HEARTBEAT, new HeartbeatHandler()); innerHandlerMap.put(CIMConstant.CLIENT_HEARTBEAT, new HeartbeatHandler());
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup); bootstrap.group(bossGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
@ -98,11 +101,24 @@ public class CIMNioSocketAcceptor extends SimpleChannelInboundHandler<SentBody>
ChannelFuture channelFuture = bootstrap.bind(port).syncUninterruptibly(); ChannelFuture channelFuture = bootstrap.bind(port).syncUninterruptibly();
channelFuture.channel().closeFuture().addListener(future -> { channelFuture.channel().closeFuture().addListener(future -> {
bossGroup.shutdownGracefully(); destroy();
workerGroup.shutdownGracefully();
}); });
} }
public void destroy() {
if(bossGroup != null && !bossGroup.isShuttingDown() && !bossGroup.isShutdown() ) {
try {bossGroup.shutdownGracefully();}catch(Exception ignore) {}
return;
}
if(workerGroup != null && !workerGroup.isShuttingDown() && !workerGroup.isShutdown() ) {
try {workerGroup.shutdownGracefully();}catch(Exception ignore) {}
return;
}
}
/** /**
* 设置应用层的sentbody处理handler * 设置应用层的sentbody处理handler

View File

@ -44,9 +44,7 @@ public class WebsocketHandler implements CIMRequestHandler {
md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length()); md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length());
byte[] sha1Hash = md.digest(); byte[] sha1Hash = md.digest();
secKey = new String(Base64.getEncoder().encode(sha1Hash)); secKey = new String(Base64.getEncoder().encode(sha1Hash));
} catch (Exception e) { } catch (Exception ignore) {}
e.printStackTrace();
}
session.write(new HandshakerResponse(secKey)); session.write(new HandshakerResponse(secKey));
} }
} }

View File

@ -1,5 +1,5 @@
/** /**
* Copyright 2013-2023 Xia Jun(3979434@qq.com). * Copyright 2013-2019 Xia Jun(3979434@qq.com).
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.

View File

@ -2,8 +2,9 @@
<classpath> <classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
<classpathentry kind="src" path="src"/> <classpathentry kind="src" path="src"/>
<classpathentry kind="lib" path="lib/cim-java-sdk-3.7.0.jar"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/> <classpathentry kind="lib" path="lib/slf4j-api-1.7.5.jar"/>
<classpathentry kind="lib" path="lib/slf4j-nop-1.7.5.jar"/> <classpathentry kind="lib" path="lib/slf4j-nop-1.7.5.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-3.7.0.jar"/>
<classpathentry kind="lib" path="lib/cim-java-sdk-3.7.0.jar"/>
<classpathentry kind="output" path="bin"/> <classpathentry kind="output" path="bin"/>
</classpath> </classpath>