diff --git a/README.md b/README.md index 53a2de8..08a22fc 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ #服务端集群配置方案 ###服务端修改 -1.多台服务器集群配置,首先需要重写SessionManager接口,用户登录时,将账号和服务器IP 存入数据库中,这样就可以统计各台服务器接受的连接数量。 +1.多台服务器集群配置,首先需要重写SessionManager接口(参考com.farsunset.ichat.cim.session.ClusterSessionManager.java),用户登录时,将账号和服务器IP 存入数据库中,这样就可以统计各台服务器接受的连接数量。 2.客户端连接服务器时,服务端为客户端动态分配 服务器IP,每次分配 较为空闲的服务器IP 3.服务端接受消息后 通过接收者账号查询出对应的Iosession,和 登录的 服务器IP,然后将消息信息传往目标服务器处理发送 diff --git a/cim-core/.classpath b/cim-core/.classpath index 9b3987b..08ed94c 100644 --- a/cim-core/.classpath +++ b/cim-core/.classpath @@ -4,6 +4,7 @@ - + + diff --git a/cim-core/src/com/farsunset/cim/client/android/CIMPushManager.java b/cim-core/src/com/farsunset/cim/client/android/CIMPushManager.java index 9230d80..d07c327 100644 --- a/cim-core/src/com/farsunset/cim/client/android/CIMPushManager.java +++ b/cim-core/src/com/farsunset/cim/client/android/CIMPushManager.java @@ -89,13 +89,15 @@ public class CIMPushManager { } protected static void setAccount(Context context){ - String account = CIMDataConfig.getString(context,CIMDataConfig.KEY_ACCOUNT); setAccount(context,account); } - + protected static void clearAccount(Context context){ + + CIMDataConfig.putString(context,CIMDataConfig.KEY_ACCOUNT, null); + } /** * 发送一个CIM请求 diff --git a/cim-core/src/com/farsunset/cim/nio/constant/CIMConstant.java b/cim-core/src/com/farsunset/cim/nio/constant/CIMConstant.java index 5df4d38..aba6580 100644 --- a/cim-core/src/com/farsunset/cim/nio/constant/CIMConstant.java +++ b/cim-core/src/com/farsunset/cim/nio/constant/CIMConstant.java @@ -30,6 +30,12 @@ public interface CIMConstant { public static int CIM_DEFAULT_MESSAGE_ORDER=1; + + public static final String SESSION_KEY ="account"; + + public static final String HEARTBEAT_KEY ="heartbeat"; + + /** * 对应ichat 中 spring-cim.xml > bean:mainIoHandler >handlers * 为 服务端处理对应的handlers,应该继承与com.farsunset.cim.nio.handle.AbstractHandler diff --git a/cim-core/src/com/farsunset/cim/nio/handle/CIMRequestHandler.java b/cim-core/src/com/farsunset/cim/nio/handle/CIMRequestHandler.java index b314c02..b02accf 100644 --- a/cim-core/src/com/farsunset/cim/nio/handle/CIMRequestHandler.java +++ b/cim-core/src/com/farsunset/cim/nio/handle/CIMRequestHandler.java @@ -5,12 +5,11 @@ package com.farsunset.cim.nio.handle; * 请求处理接口,所有的请求实现必须实现此接口 * @author 3979434@qq.com */ -import org.apache.mina.core.session.IoSession; - import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; public interface CIMRequestHandler { - public abstract ReplyBody process(IoSession ios,SentBody message); + public abstract ReplyBody process(CIMSession session,SentBody message); } \ No newline at end of file diff --git a/cim-core/src/com/farsunset/cim/nio/handle/HeartbeatHandler.java b/cim-core/src/com/farsunset/cim/nio/handle/HeartbeatHandler.java index df96ef2..3bab701 100644 --- a/cim-core/src/com/farsunset/cim/nio/handle/HeartbeatHandler.java +++ b/cim-core/src/com/farsunset/cim/nio/handle/HeartbeatHandler.java @@ -2,12 +2,11 @@ package com.farsunset.cim.nio.handle; import org.apache.log4j.Logger; -import org.apache.mina.core.session.IoSession; import com.farsunset.cim.nio.constant.CIMConstant; -import com.farsunset.cim.nio.handle.CIMRequestHandler; import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; /** *客户端心跳实现 @@ -18,14 +17,13 @@ public class HeartbeatHandler implements CIMRequestHandler { protected final Logger logger = Logger.getLogger(HeartbeatHandler.class); - public ReplyBody process(IoSession session, SentBody message) { + public ReplyBody process(CIMSession session, SentBody message) { logger.warn("heartbeat... from "+session.getRemoteAddress().toString()); ReplyBody reply = new ReplyBody(); reply.setKey(CIMConstant.RequestKey.CLIENT_HEARTBEAT); reply.setCode(CIMConstant.ReturnCode.CODE_200); - session.setAttribute("heartbeat", System.currentTimeMillis()); - + session.setHeartbeat(System.currentTimeMillis()); return reply; } diff --git a/cim-core/src/com/farsunset/cim/nio/handle/MainIOHandler.java b/cim-core/src/com/farsunset/cim/nio/handle/MainIOHandler.java index efc45fa..9b3dccf 100644 --- a/cim-core/src/com/farsunset/cim/nio/handle/MainIOHandler.java +++ b/cim-core/src/com/farsunset/cim/nio/handle/MainIOHandler.java @@ -1,3 +1,4 @@ + package com.farsunset.cim.nio.handle; import java.util.HashMap; @@ -10,6 +11,7 @@ import org.apache.mina.core.session.IoSession; import com.farsunset.cim.nio.constant.CIMConstant; import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; /** * @@ -18,7 +20,7 @@ import com.farsunset.cim.nio.mutual.SentBody; */ public class MainIOHandler extends IoHandlerAdapter { - protected final Logger logger = Logger.getLogger(HeartbeatHandler.class); + protected final Logger logger = Logger.getLogger(MainIOHandler.class); private HashMap handlers = new HashMap(); @@ -26,7 +28,6 @@ public class MainIOHandler extends IoHandlerAdapter { public void sessionCreated(IoSession session) throws Exception { logger.warn("sessionCreated()... from "+session.getRemoteAddress().toString()); - } @@ -35,10 +36,11 @@ public class MainIOHandler extends IoHandlerAdapter { } - public void messageReceived(IoSession session, Object message) + public void messageReceived(IoSession ios, Object message) throws Exception { logger.debug("message: " + message.toString()); + CIMSession cimSession =new CIMSession(ios); ReplyBody reply = new ReplyBody(); SentBody body = (SentBody) message; String key = body.getKey(); @@ -46,30 +48,34 @@ public class MainIOHandler extends IoHandlerAdapter { CIMRequestHandler handler = handlers.get(key); if (handler == null) { reply.setCode(CIMConstant.ReturnCode.CODE_405); - reply.setMessage("KEY ["+key+"] 服务端未定义"); + reply.setCode("KEY ["+key+"] 服务端未定义"); } else { - reply = handler.process(session, body); + reply = handler.process(cimSession, body); } if(reply!=null) { reply.setKey(key); - session.write(reply); + cimSession.write(reply); logger.debug("-----------------------process done. reply: " + reply.toString()); } - - + + + //设置心跳时间 + cimSession.setAttribute(CIMConstant.HEARTBEAT_KEY, System.currentTimeMillis()); } /** */ - public void sessionClosed(IoSession session) throws Exception { + public void sessionClosed(IoSession ios) throws Exception { + + CIMSession cimSession =new CIMSession(ios); try{ - logger.warn("sessionClosed()... from "+session.getRemoteAddress()); + logger.warn("sessionClosed()... from "+cimSession.getRemoteAddress()); CIMRequestHandler handler = handlers.get("sessionClosedHander"); - if(handler!=null && session.containsAttribute("account")) + if(handler!=null && cimSession.containsAttribute(CIMConstant.SESSION_KEY)) { - handler.process(session, null); + handler.process(cimSession, null); } } catch(Exception e) @@ -83,9 +89,17 @@ public class MainIOHandler extends IoHandlerAdapter { public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.warn("sessionIdle()... from "+session.getRemoteAddress().toString()); - if(!session.containsAttribute("account")) + if(!session.containsAttribute(CIMConstant.SESSION_KEY)) { session.close(true); + }else + { + //如果5分钟之内客户端没有发送心态,则可能客户端断网,关闭连接 + Object heartbeat = session.getAttribute(CIMConstant.HEARTBEAT_KEY); + if(heartbeat!=null && System.currentTimeMillis()-Long.valueOf(heartbeat.toString()) >= 300000) + { + session.close(false); + } } } @@ -95,11 +109,15 @@ public class MainIOHandler extends IoHandlerAdapter { throws Exception { logger.error("exceptionCaught()... from "+session.getRemoteAddress()); logger.error(cause); + cause.printStackTrace(); } /** */ public void messageSent(IoSession session, Object message) throws Exception { + + //设置心跳时间 + session.setAttribute(CIMConstant.HEARTBEAT_KEY, System.currentTimeMillis()); } diff --git a/cim-core/src/com/farsunset/cim/nio/session/CIMSession.java b/cim-core/src/com/farsunset/cim/nio/session/CIMSession.java new file mode 100644 index 0000000..0161272 --- /dev/null +++ b/cim-core/src/com/farsunset/cim/nio/session/CIMSession.java @@ -0,0 +1,232 @@ +package com.farsunset.cim.nio.session; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.session.IoSession; + +import com.farsunset.cim.nio.constant.CIMConstant; + +/** + * IoSession包装类,集群时 将此对象存入表中 + * + * @author 3979434@qq.com + */ + +public class CIMSession implements Serializable{ + + /** + * + */ + private static final long serialVersionUID = 1L; + public static String ID = "ID"; + public static String HOST = "HOST"; + + private IoSession session; + + private String id;//session全局ID + private String deviceId;//客户端设备ID + private String host;//session绑定的服务器IP + private String account;//session绑定的账号 + private String channel;//终端设备类型 + private String deviceModel;//终端设备型号 + + private Long bindTime;//登录时间 + + private Long heartbeat;//心跳时间 + + public CIMSession(IoSession session) { + this.session = session; + } + + public CIMSession() + { + + } + + + + + public String getAccount() { + return account; + } + + public void setAccount(String account) { + if(session!=null) + { + session.setAttribute(CIMConstant.SESSION_KEY, account); + } + this.account = account; + } + + + public String getId() { + return id; + } + + + + public String getDeviceId() { + return deviceId; + } + + + public String getChannel() { + return channel; + } + + public void setChannel(String channel) { + this.channel = channel; + } + + public String getDeviceModel() { + return deviceModel; + } + + public void setDeviceModel(String deviceModel) { + this.deviceModel = deviceModel; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + + + public void setId(String id) { + this.id = id; + } + + + + public String getHost() { + return host; + } + + + + public Long getBindTime() { + return bindTime; + } + + public void setBindTime(Long bindTime) { + this.bindTime = bindTime; + } + + public Long getHeartbeat() { + return heartbeat; + } + + public void setHeartbeat(Long heartbeat) { + this.heartbeat = heartbeat; + if(session!=null) + { + session.setAttribute(CIMConstant.HEARTBEAT_KEY, heartbeat); + } + } + + public void setHost(String host) { + this.host = host; + } + + + + public void setAttribute(String key, Object value) { + if(session!=null) + session.setAttribute(key, value); + } + + + public boolean containsAttribute(String key) { + if(session!=null) + return session.containsAttribute(key); + return false; + } + + public Object getAttribute(String key) { + if(session!=null) + return session.getAttribute(key); + return null; + } + + public void removeAttribute(String key) { + if(session!=null) + session.removeAttribute(key); + } + + public SocketAddress getRemoteAddress() { + if(session!=null) + return session.getRemoteAddress(); + return null; + } + + public boolean write(Object msg) { + if(session!=null) + { + WriteFuture wf = session.write(msg); + wf.awaitUninterruptibly(5, TimeUnit.SECONDS); + return wf.isWritten(); + } + return false; + } + + public boolean isConnected() { + if(session!=null) + return session.isConnected(); + return false; + } + + public boolean isLocalhost() + { + + try { + String ip = InetAddress.getLocalHost().getHostAddress(); + return ip.equals(host) && session!=null; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + return false; + + } + + + public void close(boolean immediately) { + if(session!=null) + session.close(immediately); + } + + + public boolean equals(Object o) { + + if (o instanceof CIMSession) { + + CIMSession t = (CIMSession) o; + if(!t.isLocalhost()) + { + return false; + } + if (t.session.getId() == session.getId()&& t.host.equals(host)) { + return true; + } + return false; + } else { + return false; + } + + } + + public void setIoSession(IoSession session) { + this.session = session; + } + + public IoSession getIoSession() { + return session; + } + + + + +} \ No newline at end of file diff --git a/cim-core/src/com/farsunset/cim/nio/session/DefaultSessionManager.java b/cim-core/src/com/farsunset/cim/nio/session/DefaultSessionManager.java index b5491f2..6bf11ed 100644 --- a/cim-core/src/com/farsunset/cim/nio/session/DefaultSessionManager.java +++ b/cim-core/src/com/farsunset/cim/nio/session/DefaultSessionManager.java @@ -5,20 +5,17 @@ import java.util.Collection; import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; -import org.apache.mina.core.session.IoSession; +import com.farsunset.cim.nio.constant.CIMConstant; /** * 自带默认 session管理实现, 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 - * + *服务器集群时 须要将CIMSession 信息存入数据库或者nosql 等 第三方存储空间中,便于所有服务器都可以访问 * @author farsunset (3979434@qq.com) */ public class DefaultSessionManager implements SessionManager{ - protected final Logger logger = Logger.getLogger(DefaultSessionManager.class); - static final String SESSION_KEY ="account"; - private static HashMap sessions =new HashMap(); + private static HashMap sessions =new HashMap(); private static final AtomicInteger connectionsCounter = new AtomicInteger(0); @@ -28,10 +25,10 @@ public class DefaultSessionManager implements SessionManager{ /** * */ - public void addSession(String account,IoSession session) { + public void addSession(String account,CIMSession session) { if(session!=null) { - session.setAttribute(SESSION_KEY, account); + session.setAttribute(CIMConstant.SESSION_KEY, account); sessions.put(account, session); connectionsCounter.incrementAndGet(); } @@ -39,7 +36,7 @@ public class DefaultSessionManager implements SessionManager{ } - public IoSession getSession(String account) { + public CIMSession getSession(String account) { return sessions.get(account); @@ -48,14 +45,14 @@ public class DefaultSessionManager implements SessionManager{ - public Collection getSessions() { + public Collection getSessions() { return sessions.values(); } - public void removeSession(IoSession session) { + public void removeSession(CIMSession session) { - sessions.remove(session.getAttribute(SESSION_KEY)); + sessions.remove(session.getAttribute(CIMConstant.SESSION_KEY)); } @@ -66,15 +63,15 @@ public class DefaultSessionManager implements SessionManager{ } - public boolean containsIoSession(IoSession ios) + public boolean containsCIMSession(CIMSession ios) { - return sessions.containsKey(ios.getAttribute(SESSION_KEY)) || sessions.containsValue(ios); + return sessions.containsKey(ios.getAttribute(CIMConstant.SESSION_KEY)) || sessions.containsValue(ios); } - public String getAccount(IoSession ios) + public String getAccount(CIMSession ios) { - if(ios.getAttribute(SESSION_KEY)==null) + if(ios.getAttribute(CIMConstant.SESSION_KEY)==null) { for(String key:sessions.keySet()) { @@ -85,7 +82,7 @@ public class DefaultSessionManager implements SessionManager{ } }else { - return ios.getAttribute(SESSION_KEY).toString(); + return ios.getAttribute(CIMConstant.SESSION_KEY).toString(); } return null; diff --git a/cim-core/src/com/farsunset/cim/nio/session/SessionManager.java b/cim-core/src/com/farsunset/cim/nio/session/SessionManager.java index f8cc043..8c01dcf 100644 --- a/cim-core/src/com/farsunset/cim/nio/session/SessionManager.java +++ b/cim-core/src/com/farsunset/cim/nio/session/SessionManager.java @@ -3,7 +3,6 @@ package com.farsunset.cim.nio.session; import java.util.Collection; -import org.apache.mina.core.session.IoSession; /** * 客户端的 session管理接口 @@ -17,26 +16,26 @@ public interface SessionManager { /** * 添加新的session */ - public void addSession(String account,IoSession session); + public void addSession(String account,CIMSession session); /** * * @param account 客户端session的 key 一般可用 用户账号来对应session * @return */ - IoSession getSession(String account); + CIMSession getSession(String account); /** * 获取所有session * @return */ - public Collection getSessions(); + public Collection getSessions(); /** * 删除session * @param session */ - public void removeSession(IoSession session) ; + public void removeSession(CIMSession session) ; /** @@ -49,11 +48,11 @@ public interface SessionManager { * session是否存在 * @param session */ - public boolean containsIoSession(IoSession ios); + public boolean containsCIMSession(CIMSession ios); /** * session获取对应的 用户 key * @param session */ - public String getAccount(IoSession ios); + public String getAccount(CIMSession ios); } \ No newline at end of file diff --git a/ichat-server/.classpath b/ichat-server/.classpath index a2865b1..843570c 100644 --- a/ichat-server/.classpath +++ b/ichat-server/.classpath @@ -37,5 +37,7 @@ + + diff --git a/ichat-server/WebRoot/WEB-INF/lib/cim-core.1.5.jar b/ichat-server/WebRoot/WEB-INF/lib/cim-core.1.5.jar index d551ac0..227af62 100644 Binary files a/ichat-server/WebRoot/WEB-INF/lib/cim-core.1.5.jar and b/ichat-server/WebRoot/WEB-INF/lib/cim-core.1.5.jar differ diff --git a/ichat-server/WebRoot/WEB-INF/lib/httpclient-4.3.4.jar b/ichat-server/WebRoot/WEB-INF/lib/httpclient-4.3.4.jar new file mode 100644 index 0000000..f8a7afe Binary files /dev/null and b/ichat-server/WebRoot/WEB-INF/lib/httpclient-4.3.4.jar differ diff --git a/ichat-server/WebRoot/WEB-INF/lib/httpcore-4.3.2.jar b/ichat-server/WebRoot/WEB-INF/lib/httpcore-4.3.2.jar new file mode 100644 index 0000000..813ec23 Binary files /dev/null and b/ichat-server/WebRoot/WEB-INF/lib/httpcore-4.3.2.jar differ diff --git a/ichat-server/WebRoot/WEB-INF/lib/mina-core-2.0.7.jar b/ichat-server/WebRoot/WEB-INF/lib/mina-core-2.0.7.jar index 5ec135b..02c515e 100644 Binary files a/ichat-server/WebRoot/WEB-INF/lib/mina-core-2.0.7.jar and b/ichat-server/WebRoot/WEB-INF/lib/mina-core-2.0.7.jar differ diff --git a/ichat-server/WebRoot/console/session/manage.jsp b/ichat-server/WebRoot/console/session/manage.jsp index 0aff9e1..d704517 100644 --- a/ichat-server/WebRoot/console/session/manage.jsp +++ b/ichat-server/WebRoot/console/session/manage.jsp @@ -1,14 +1,14 @@ <%@ page language="java" pageEncoding="utf-8"%> <%@ page import="java.util.Collection"%> <%@ page import="com.farsunset.ichat.common.util.StringUtil"%> -<%@ page import="org.apache.mina.core.session.IoSession"%> +<%@ page import="com.farsunset.cim.nio.session.CIMSession"%> <% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path; - Collection sessionList = (Collection)request.getAttribute("sessionList"); + Collection sessionList = (Collection)request.getAttribute("sessionList"); %> @@ -91,40 +91,43 @@ - 账号 - 终端 - 在线时长 - 心跳时间 - 操作 + 账号 + 终端来源 + 终端ID + 终端型号 + 在线时长 + 操作 <% - for(IoSession ios:sessionList) + for(CIMSession ios:sessionList) { %> - "> + - <%=ios.getAttribute("account") %> + <%=ios.getAccount() %> - <%=ios.getAttribute("channel") %> + <%=ios.getChannel() %> - <%=(System.currentTimeMillis()-Long.valueOf(ios.getAttribute("loginTime").toString()))/1000 %>秒 + <%=ios.getDeviceId() %> - <%if(ios.getAttribute("heartbeat")!=null){ %> - <%=StringUtil.transformDateTime(Long.valueOf(ios.getAttribute("heartbeat").toString())) %> - <%} %> + <%=ios.getDeviceModel() %> + + <%=(System.currentTimeMillis()-ios.getBindTime())/1000 %>秒 + +
- - + +
diff --git a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/BindHandler.java b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/BindHandler.java index a56185b..11d5b1f 100644 --- a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/BindHandler.java +++ b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/BindHandler.java @@ -1,55 +1,74 @@ + package com.farsunset.ichat.cim.handler; +import java.net.InetAddress; +import java.util.UUID; import org.apache.log4j.Logger; -import org.apache.mina.core.session.IoSession; import com.farsunset.cim.nio.constant.CIMConstant; import com.farsunset.cim.nio.handle.CIMRequestHandler; import com.farsunset.cim.nio.mutual.Message; import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; import com.farsunset.cim.nio.session.DefaultSessionManager; import com.farsunset.ichat.common.util.ContextHolder; + /** - * 绑定账号到服务端实现 + * 账号绑定实现 * * @author - */ + */ public class BindHandler implements CIMRequestHandler { protected final Logger logger = Logger.getLogger(BindHandler.class); - - public ReplyBody process(IoSession newSession, SentBody message) { - + public ReplyBody process(CIMSession newSession, SentBody message) { ReplyBody reply = new ReplyBody(); DefaultSessionManager sessionManager= ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager")); try { String account = message.get("account"); - newSession.setAttribute("channel", message.get("channel")); - newSession.setAttribute("deviceId", message.get("deviceId")); - newSession.setAttribute("device", message.get("device")); + newSession.setAccount(account); + newSession.setDeviceId(message.get("deviceId")); + newSession.setId(UUID.randomUUID().toString()); + newSession.setHost(InetAddress.getLocalHost().getHostAddress()); + newSession.setChannel( message.get("channel")); + newSession.setDeviceModel(message.get("device")); /** * 由于客户端断线服务端可能会无法获知的情况,客户端重连时,需要关闭旧的连接 */ - IoSession oldSession = sessionManager.getSession(account); + CIMSession oldSession = sessionManager.getSession(account); if(oldSession!=null) { //如果是账号已经在另一台终端登录。则让另一个终端下线 - if(oldSession.getAttribute("deviceId")!=null&&!oldSession.getAttribute("deviceId").equals(newSession.getAttribute("deviceId"))) + if((oldSession.getDeviceId()!=null&&!oldSession.getDeviceId().equals(newSession.getDeviceId()) + ||!oldSession.equals(newSession))) { - oldSession.removeAttribute("account"); + + + oldSession.removeAttribute(CIMConstant.SESSION_KEY); Message msg = new Message(); msg.setType(CIMConstant.MessageType.TYPE_999);//强行下线消息类型 msg.setReceiver(account); - oldSession.write(msg); - oldSession.close(true); + if(!oldSession.isLocalhost()) + { + + /* + 判断当前session是否连接于本台服务器,如不是发往目标服务器处理 + MessageDispatcher.execute(msg, oldSession.getHost()); + */ + }else + { + oldSession.write(msg); + oldSession.close(true); + oldSession = null; + } oldSession = null; } @@ -57,23 +76,20 @@ public class BindHandler implements CIMRequestHandler { if(oldSession==null) { //第一次设置心跳时间为登录时间 - newSession.setAttribute("heartbeat", System.currentTimeMillis()); - newSession.setAttribute("loginTime", System.currentTimeMillis()); + newSession.setBindTime(System.currentTimeMillis()); + newSession.setHeartbeat(System.currentTimeMillis()); sessionManager.addSession(account, newSession); - - //设置在线状态 - reply.setCode(CIMConstant.ReturnCode.CODE_200); - + } - + reply.setCode(CIMConstant.ReturnCode.CODE_200); } catch (Exception e) { reply.setCode(CIMConstant.ReturnCode.CODE_500); e.printStackTrace(); } - logger.debug("auth :account:" +message.get("account")+"-----------------------------" +reply.getCode()); + logger.debug("bind :account:" +message.get("account")+"-----------------------------" +reply.getCode()); return reply; } diff --git a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/LogoutHandler.java b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/LogoutHandler.java index 1978be0..a593c51 100644 --- a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/LogoutHandler.java +++ b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/LogoutHandler.java @@ -1,11 +1,11 @@ package com.farsunset.ichat.cim.handler; -import org.apache.mina.core.session.IoSession; - +import com.farsunset.cim.nio.constant.CIMConstant; import com.farsunset.cim.nio.handle.CIMRequestHandler; import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; import com.farsunset.cim.nio.session.DefaultSessionManager; import com.farsunset.ichat.common.util.ContextHolder; @@ -17,13 +17,13 @@ import com.farsunset.ichat.common.util.ContextHolder; */ public class LogoutHandler implements CIMRequestHandler { - public ReplyBody process(IoSession ios, SentBody message) { + public ReplyBody process(CIMSession ios, SentBody message) { DefaultSessionManager sessionManager = ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager")); - String account =ios.getAttribute("account").toString(); - ios.removeAttribute("account"); + String account =ios.getAttribute(CIMConstant.SESSION_KEY).toString(); + ios.removeAttribute(CIMConstant.SESSION_KEY); ios.close(true); sessionManager.removeSession(account); diff --git a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/SessionClosedHandler.java b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/SessionClosedHandler.java index 3d220e4..ddae3d7 100644 --- a/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/SessionClosedHandler.java +++ b/ichat-server/src/main/java/com/farsunset/ichat/cim/handler/SessionClosedHandler.java @@ -2,11 +2,12 @@ package com.farsunset.ichat.cim.handler; import org.apache.log4j.Logger; -import org.apache.mina.core.session.IoSession; +import com.farsunset.cim.nio.constant.CIMConstant; import com.farsunset.cim.nio.handle.CIMRequestHandler; import com.farsunset.cim.nio.mutual.ReplyBody; import com.farsunset.cim.nio.mutual.SentBody; +import com.farsunset.cim.nio.session.CIMSession; import com.farsunset.cim.nio.session.DefaultSessionManager; import com.farsunset.ichat.common.util.ContextHolder; @@ -19,17 +20,16 @@ public class SessionClosedHandler implements CIMRequestHandler { protected final Logger logger = Logger.getLogger(SessionClosedHandler.class); - public ReplyBody process(IoSession ios, SentBody message) { + public ReplyBody process(CIMSession ios, SentBody message) { DefaultSessionManager sessionManager = ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager")); - if(ios.getAttribute("account")==null) + if(ios.getAttribute(CIMConstant.SESSION_KEY)==null) { return null; } - String account = ios.getAttribute("account").toString(); - + String account = ios.getAttribute(CIMConstant.SESSION_KEY).toString(); sessionManager.removeSession(account); return null; diff --git a/ichat-server/src/main/java/com/farsunset/ichat/cim/push/DefaultMessagePusher.java b/ichat-server/src/main/java/com/farsunset/ichat/cim/push/DefaultMessagePusher.java index 2a93c2f..6ac7e31 100644 --- a/ichat-server/src/main/java/com/farsunset/ichat/cim/push/DefaultMessagePusher.java +++ b/ichat-server/src/main/java/com/farsunset/ichat/cim/push/DefaultMessagePusher.java @@ -3,9 +3,9 @@ package com.farsunset.ichat.cim.push; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.mina.core.session.IoSession; import com.farsunset.cim.nio.mutual.Message; +import com.farsunset.cim.nio.session.CIMSession; import com.farsunset.cim.nio.session.DefaultSessionManager; /** @@ -30,9 +30,15 @@ public class DefaultMessagePusher implements CIMMessagePusher { * @param msg */ public void pushMessageToUser(Message msg) { - IoSession session = sessionManager.getSession(msg.getReceiver()); + CIMSession session = sessionManager.getSession(msg.getReceiver()); - //服务器集群时,可以在此 判断当前session是否连接于本台服务器,如果是,继续往下走,如果不是,将此消息发往当前session连接的服务器并 return + /*服务器集群时,可以在此 判断当前session是否连接于本台服务器,如果是,继续往下走,如果不是,将此消息发往当前session连接的服务器并 return + if(!session.isLocalhost()){//判断当前session是否连接于本台服务器,如不是 + + MessageDispatcher.execute(msg, session.getHost()); + return; + } + */ if (session != null && session.isConnected()) { diff --git a/ichat-server/src/main/java/com/farsunset/ichat/cim/session/ClusterSessionManager.java b/ichat-server/src/main/java/com/farsunset/ichat/cim/session/ClusterSessionManager.java new file mode 100644 index 0000000..17e24a4 --- /dev/null +++ b/ichat-server/src/main/java/com/farsunset/ichat/cim/session/ClusterSessionManager.java @@ -0,0 +1,95 @@ + +package com.farsunset.ichat.cim.session; + +import java.util.Collection; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.session.IoSession; + +import com.farsunset.cim.nio.constant.CIMConstant; +import com.farsunset.cim.nio.session.CIMSession; +import com.farsunset.cim.nio.session.SessionManager; + +/** + * 集群 session管理实现示例, 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理 + *服务器集群时 须要将CIMSession 信息存入数据库或者nosql 等 第三方存储空间中,便于所有服务器都可以访问 + * @author farsunset (3979434@qq.com) + */ +public class ClusterSessionManager implements SessionManager{ + + + private static HashMap sessions =new HashMap(); + + + private static final AtomicInteger connectionsCounter = new AtomicInteger(0); + + + + /** + * + */ + public void addSession(String account,CIMSession session) { + if(session!=null) + { + session.setAttribute(CIMConstant.SESSION_KEY, account); + sessions.put(account, session.getIoSession()); + connectionsCounter.incrementAndGet(); + } + + /** + * 下面 将session 存入数据库 + */ + + } + + + public CIMSession getSession(String account) { + + /*//这里查询数据库 + CIMSession session = database.getSession(account); + session.setIoSession(sessions.get(account)); + + return session;*/ + return null; + } + + + + public Collection getSessions() { + /*//这里查询数据库 + return database.getSessions();*/ + return null; + } + + public void removeSession(CIMSession session) { + + + sessions.remove(session.getAccount()); + //database.removeSession(session.getAttribute(CIMConstant.SESSION_KEY));*/ + } + + + public void removeSession(String account) { + + //database.removeSession(account);*/ + sessions.remove(account); + + } + + + public boolean containsCIMSession(CIMSession ios) + { + //return database.containsCIMSession(session.getAccount()); + return sessions.containsKey(ios.getAttribute(CIMConstant.SESSION_KEY)) || sessions.containsValue(ios); + } + + + public String getAccount(CIMSession ios) + { + return ios.getAccount(); + } + + + +} diff --git a/ichat-server/src/main/java/com/farsunset/ichat/common/util/MessageDispatcher.java b/ichat-server/src/main/java/com/farsunset/ichat/common/util/MessageDispatcher.java new file mode 100644 index 0000000..3c820ff --- /dev/null +++ b/ichat-server/src/main/java/com/farsunset/ichat/common/util/MessageDispatcher.java @@ -0,0 +1,74 @@ + +package com.farsunset.ichat.common.util; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; + +import com.farsunset.cim.nio.mutual.Message; + + +public class MessageDispatcher { + private static BlockingQueue queue = new LinkedBlockingQueue(); + private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 20, TimeUnit.SECONDS,queue);; + final static String sendUrl = "http://%1$s:8080/ichat-servier/cgi/message_send.api"; + + public static void execute(final Message msg,final String ip) + { + executor.execute(new Runnable() { + @Override + public void run() { + try { + httpPost(String.format(sendUrl,ip),msg); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + + private static String httpPost(String url,Message msg) throws Exception + { + CloseableHttpClient httpclient = HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(url); + List nvps = new ArrayList (); + nvps.add(new BasicNameValuePair("mid", msg.getMid())); + nvps.add(new BasicNameValuePair("type", msg.getType())); + nvps.add(new BasicNameValuePair("title", msg.getTitle())); + nvps.add(new BasicNameValuePair("content",msg.getContent())); + nvps.add(new BasicNameValuePair("sender", msg.getSender())); + nvps.add(new BasicNameValuePair("receiver",msg.getReceiver())); + nvps.add(new BasicNameValuePair("file",msg.getFile())); + nvps.add(new BasicNameValuePair("fileType",msg.getFileType())); + nvps.add(new BasicNameValuePair("timestamp",String.valueOf(msg.getTimestamp()))); + httpPost.setEntity(new UrlEncodedFormEntity(nvps)); + CloseableHttpResponse response2 = httpclient.execute(httpPost); + String data = null; + try { + System.out.println(response2.getStatusLine()); + HttpEntity entity2 = response2.getEntity(); + data = EntityUtils.toString(entity2); + } finally { + response2.close(); + } + + + + return data; + } + + +} diff --git a/mchat-simple/src/com/farsunset/ichat/example/app/Constant.java b/mchat-simple/src/com/farsunset/ichat/example/app/Constant.java index 9024801..4546e1e 100644 --- a/mchat-simple/src/com/farsunset/ichat/example/app/Constant.java +++ b/mchat-simple/src/com/farsunset/ichat/example/app/Constant.java @@ -12,9 +12,9 @@ package com.farsunset.ichat.example.app; public interface Constant { //服务端web地址 - public static final String SERVER_URL = "http://192.168.0.141:8080/ichat-server"; + public static final String SERVER_URL = "http://192.168.0.107:8080/ichat-server"; //服务端IP地址 - public static final String CIM_SERVER_HOST = "192.168.0.141"; + public static final String CIM_SERVER_HOST = "192.168.0.107"; //注意,这里的端口不是tomcat的端口,CIM端口在服务端spring-cim.xml中配置的,没改动就使用默认的23456 public static final int CIM_SERVER_PORT = 23456; @@ -37,8 +37,6 @@ public interface Constant { public static final String STATUS_0 = "0"; //消息已经读取 public static final String STATUS_1 = "1"; - - } }