服务端集群实现 相关修改

This commit is contained in:
xj753277@126.com 2014-06-13 14:04:40 +08:00
parent 174a0c2def
commit a4278c6ead
23 changed files with 551 additions and 105 deletions

View File

@ -7,7 +7,7 @@
#服务端集群配置方案
###服务端修改
1.多台服务器集群配置首先需要重写SessionManager接口用户登录时将账号和服务器IP 存入数据库中,这样就可以统计各台服务器接受的连接数量。
1.多台服务器集群配置首先需要重写SessionManager接口(参考com.farsunset.ichat.cim.session.ClusterSessionManager.java)用户登录时将账号和服务器IP 存入数据库中,这样就可以统计各台服务器接受的连接数量。
2.客户端连接服务器时,服务端为客户端动态分配 服务器IP每次分配 较为空闲的服务器IP
3.服务端接受消息后 通过接收者账号查询出对应的Iosession和 登录的 服务器IP然后将消息信息传往目标服务器处理发送

View File

@ -4,6 +4,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="/ichat-server/WebRoot/WEB-INF/lib/mina-core-2.0.7.jar"/>
<classpathentry kind="lib" path="/ichat-server/WebRoot/WEB-INF/lib/log4j.jar"/>
<classpathentry kind="lib" path="D:/dev/Android-SDK-Windows/platforms/android-16/android.jar"/>
<classpathentry kind="lib" path="D:/soft/adt-bundle-windows-x86_64-20140321/sdk/platforms/android-19/android.jar"/>
<classpathentry kind="lib" path="/ichat-server/WebRoot/WEB-INF/lib/commons-lang-2.3.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>

View File

@ -89,13 +89,15 @@ public class CIMPushManager {
}
protected static void setAccount(Context context){
String account = CIMDataConfig.getString(context,CIMDataConfig.KEY_ACCOUNT);
setAccount(context,account);
}
protected static void clearAccount(Context context){
CIMDataConfig.putString(context,CIMDataConfig.KEY_ACCOUNT, null);
}
/**
* 发送一个CIM请求

View File

@ -30,6 +30,12 @@ public interface CIMConstant {
public static int CIM_DEFAULT_MESSAGE_ORDER=1;
public static final String SESSION_KEY ="account";
public static final String HEARTBEAT_KEY ="heartbeat";
/**
* 对应ichat spring-cim.xml > bean:mainIoHandler >handlers
* 服务端处理对应的handlers应该继承与com.farsunset.cim.nio.handle.AbstractHandler

View File

@ -5,12 +5,11 @@ package com.farsunset.cim.nio.handle;
* 请求处理接口,所有的请求实现必须实现此接口
* @author 3979434@qq.com
*/
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
public interface CIMRequestHandler {
public abstract ReplyBody process(IoSession ios,SentBody message);
public abstract ReplyBody process(CIMSession session,SentBody message);
}

View File

@ -2,12 +2,11 @@
package com.farsunset.cim.nio.handle;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.handle.CIMRequestHandler;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
/**
*客户端心跳实现
@ -18,14 +17,13 @@ public class HeartbeatHandler implements CIMRequestHandler {
protected final Logger logger = Logger.getLogger(HeartbeatHandler.class);
public ReplyBody process(IoSession session, SentBody message) {
public ReplyBody process(CIMSession session, SentBody message) {
logger.warn("heartbeat... from "+session.getRemoteAddress().toString());
ReplyBody reply = new ReplyBody();
reply.setKey(CIMConstant.RequestKey.CLIENT_HEARTBEAT);
reply.setCode(CIMConstant.ReturnCode.CODE_200);
session.setAttribute("heartbeat", System.currentTimeMillis());
session.setHeartbeat(System.currentTimeMillis());
return reply;
}

View File

@ -1,3 +1,4 @@
package com.farsunset.cim.nio.handle;
import java.util.HashMap;
@ -10,6 +11,7 @@ import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
/**
*
@ -18,7 +20,7 @@ import com.farsunset.cim.nio.mutual.SentBody;
*/
public class MainIOHandler extends IoHandlerAdapter {
protected final Logger logger = Logger.getLogger(HeartbeatHandler.class);
protected final Logger logger = Logger.getLogger(MainIOHandler.class);
private HashMap<String, CIMRequestHandler> handlers = new HashMap<String, CIMRequestHandler>();
@ -26,7 +28,6 @@ public class MainIOHandler extends IoHandlerAdapter {
public void sessionCreated(IoSession session) throws Exception {
logger.warn("sessionCreated()... from "+session.getRemoteAddress().toString());
}
@ -35,10 +36,11 @@ public class MainIOHandler extends IoHandlerAdapter {
}
public void messageReceived(IoSession session, Object message)
public void messageReceived(IoSession ios, Object message)
throws Exception {
logger.debug("message: " + message.toString());
CIMSession cimSession =new CIMSession(ios);
ReplyBody reply = new ReplyBody();
SentBody body = (SentBody) message;
String key = body.getKey();
@ -46,30 +48,34 @@ public class MainIOHandler extends IoHandlerAdapter {
CIMRequestHandler handler = handlers.get(key);
if (handler == null) {
reply.setCode(CIMConstant.ReturnCode.CODE_405);
reply.setMessage("KEY ["+key+"] 服务端未定义");
reply.setCode("KEY ["+key+"] 服务端未定义");
} else {
reply = handler.process(session, body);
reply = handler.process(cimSession, body);
}
if(reply!=null)
{
reply.setKey(key);
session.write(reply);
cimSession.write(reply);
logger.debug("-----------------------process done. reply: " + reply.toString());
}
//设置心跳时间
cimSession.setAttribute(CIMConstant.HEARTBEAT_KEY, System.currentTimeMillis());
}
/**
*/
public void sessionClosed(IoSession session) throws Exception {
public void sessionClosed(IoSession ios) throws Exception {
CIMSession cimSession =new CIMSession(ios);
try{
logger.warn("sessionClosed()... from "+session.getRemoteAddress());
logger.warn("sessionClosed()... from "+cimSession.getRemoteAddress());
CIMRequestHandler handler = handlers.get("sessionClosedHander");
if(handler!=null && session.containsAttribute("account"))
if(handler!=null && cimSession.containsAttribute(CIMConstant.SESSION_KEY))
{
handler.process(session, null);
handler.process(cimSession, null);
}
}
catch(Exception e)
@ -83,9 +89,17 @@ public class MainIOHandler extends IoHandlerAdapter {
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.warn("sessionIdle()... from "+session.getRemoteAddress().toString());
if(!session.containsAttribute("account"))
if(!session.containsAttribute(CIMConstant.SESSION_KEY))
{
session.close(true);
}else
{
//如果5分钟之内客户端没有发送心态则可能客户端断网关闭连接
Object heartbeat = session.getAttribute(CIMConstant.HEARTBEAT_KEY);
if(heartbeat!=null && System.currentTimeMillis()-Long.valueOf(heartbeat.toString()) >= 300000)
{
session.close(false);
}
}
}
@ -95,11 +109,15 @@ public class MainIOHandler extends IoHandlerAdapter {
throws Exception {
logger.error("exceptionCaught()... from "+session.getRemoteAddress());
logger.error(cause);
cause.printStackTrace();
}
/**
*/
public void messageSent(IoSession session, Object message) throws Exception {
//设置心跳时间
session.setAttribute(CIMConstant.HEARTBEAT_KEY, System.currentTimeMillis());
}

View File

@ -0,0 +1,232 @@
package com.farsunset.cim.nio.session;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
/**
* IoSession包装类,集群时 将此对象存入表中
*
* @author 3979434@qq.com
*/
public class CIMSession implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public static String ID = "ID";
public static String HOST = "HOST";
private IoSession session;
private String id;//session全局ID
private String deviceId;//客户端设备ID
private String host;//session绑定的服务器IP
private String account;//session绑定的账号
private String channel;//终端设备类型
private String deviceModel;//终端设备型号
private Long bindTime;//登录时间
private Long heartbeat;//心跳时间
public CIMSession(IoSession session) {
this.session = session;
}
public CIMSession()
{
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
if(session!=null)
{
session.setAttribute(CIMConstant.SESSION_KEY, account);
}
this.account = account;
}
public String getId() {
return id;
}
public String getDeviceId() {
return deviceId;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getDeviceModel() {
return deviceModel;
}
public void setDeviceModel(String deviceModel) {
this.deviceModel = deviceModel;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public void setId(String id) {
this.id = id;
}
public String getHost() {
return host;
}
public Long getBindTime() {
return bindTime;
}
public void setBindTime(Long bindTime) {
this.bindTime = bindTime;
}
public Long getHeartbeat() {
return heartbeat;
}
public void setHeartbeat(Long heartbeat) {
this.heartbeat = heartbeat;
if(session!=null)
{
session.setAttribute(CIMConstant.HEARTBEAT_KEY, heartbeat);
}
}
public void setHost(String host) {
this.host = host;
}
public void setAttribute(String key, Object value) {
if(session!=null)
session.setAttribute(key, value);
}
public boolean containsAttribute(String key) {
if(session!=null)
return session.containsAttribute(key);
return false;
}
public Object getAttribute(String key) {
if(session!=null)
return session.getAttribute(key);
return null;
}
public void removeAttribute(String key) {
if(session!=null)
session.removeAttribute(key);
}
public SocketAddress getRemoteAddress() {
if(session!=null)
return session.getRemoteAddress();
return null;
}
public boolean write(Object msg) {
if(session!=null)
{
WriteFuture wf = session.write(msg);
wf.awaitUninterruptibly(5, TimeUnit.SECONDS);
return wf.isWritten();
}
return false;
}
public boolean isConnected() {
if(session!=null)
return session.isConnected();
return false;
}
public boolean isLocalhost()
{
try {
String ip = InetAddress.getLocalHost().getHostAddress();
return ip.equals(host) && session!=null;
} catch (UnknownHostException e) {
e.printStackTrace();
}
return false;
}
public void close(boolean immediately) {
if(session!=null)
session.close(immediately);
}
public boolean equals(Object o) {
if (o instanceof CIMSession) {
CIMSession t = (CIMSession) o;
if(!t.isLocalhost())
{
return false;
}
if (t.session.getId() == session.getId()&& t.host.equals(host)) {
return true;
}
return false;
} else {
return false;
}
}
public void setIoSession(IoSession session) {
this.session = session;
}
public IoSession getIoSession() {
return session;
}
}

View File

@ -5,20 +5,17 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
/**
* 自带默认 session管理实现 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理
*
*服务器集群时 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
* @author farsunset (3979434@qq.com)
*/
public class DefaultSessionManager implements SessionManager{
protected final Logger logger = Logger.getLogger(DefaultSessionManager.class);
static final String SESSION_KEY ="account";
private static HashMap<String,IoSession> sessions =new HashMap<String,IoSession>();
private static HashMap<String,CIMSession> sessions =new HashMap<String,CIMSession>();
private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
@ -28,10 +25,10 @@ public class DefaultSessionManager implements SessionManager{
/**
*
*/
public void addSession(String account,IoSession session) {
public void addSession(String account,CIMSession session) {
if(session!=null)
{
session.setAttribute(SESSION_KEY, account);
session.setAttribute(CIMConstant.SESSION_KEY, account);
sessions.put(account, session);
connectionsCounter.incrementAndGet();
}
@ -39,7 +36,7 @@ public class DefaultSessionManager implements SessionManager{
}
public IoSession getSession(String account) {
public CIMSession getSession(String account) {
return sessions.get(account);
@ -48,14 +45,14 @@ public class DefaultSessionManager implements SessionManager{
public Collection<IoSession> getSessions() {
public Collection<CIMSession> getSessions() {
return sessions.values();
}
public void removeSession(IoSession session) {
public void removeSession(CIMSession session) {
sessions.remove(session.getAttribute(SESSION_KEY));
sessions.remove(session.getAttribute(CIMConstant.SESSION_KEY));
}
@ -66,15 +63,15 @@ public class DefaultSessionManager implements SessionManager{
}
public boolean containsIoSession(IoSession ios)
public boolean containsCIMSession(CIMSession ios)
{
return sessions.containsKey(ios.getAttribute(SESSION_KEY)) || sessions.containsValue(ios);
return sessions.containsKey(ios.getAttribute(CIMConstant.SESSION_KEY)) || sessions.containsValue(ios);
}
public String getAccount(IoSession ios)
public String getAccount(CIMSession ios)
{
if(ios.getAttribute(SESSION_KEY)==null)
if(ios.getAttribute(CIMConstant.SESSION_KEY)==null)
{
for(String key:sessions.keySet())
{
@ -85,7 +82,7 @@ public class DefaultSessionManager implements SessionManager{
}
}else
{
return ios.getAttribute(SESSION_KEY).toString();
return ios.getAttribute(CIMConstant.SESSION_KEY).toString();
}
return null;

View File

@ -3,7 +3,6 @@ package com.farsunset.cim.nio.session;
import java.util.Collection;
import org.apache.mina.core.session.IoSession;
/**
* 客户端的 session管理接口
@ -17,26 +16,26 @@ public interface SessionManager {
/**
* 添加新的session
*/
public void addSession(String account,IoSession session);
public void addSession(String account,CIMSession session);
/**
*
* @param account 客户端session的 key 一般可用 用户账号来对应session
* @return
*/
IoSession getSession(String account);
CIMSession getSession(String account);
/**
* 获取所有session
* @return
*/
public Collection<IoSession> getSessions();
public Collection<CIMSession> getSessions();
/**
* 删除session
* @param session
*/
public void removeSession(IoSession session) ;
public void removeSession(CIMSession session) ;
/**
@ -49,11 +48,11 @@ public interface SessionManager {
* session是否存在
* @param session
*/
public boolean containsIoSession(IoSession ios);
public boolean containsCIMSession(CIMSession ios);
/**
* session获取对应的 用户 key
* @param session
*/
public String getAccount(IoSession ios);
public String getAccount(CIMSession ios);
}

View File

@ -37,5 +37,7 @@
<classpathentry kind="lib" path="WebRoot/WEB-INF/lib/xstream-1.3.jar"/>
<classpathentry kind="lib" path="WebRoot/WEB-INF/lib/xwork-core-2.1.6.jar"/>
<classpathentry kind="lib" path="WebRoot/WEB-INF/lib/cim-core.1.5.jar"/>
<classpathentry kind="lib" path="WebRoot/WEB-INF/lib/httpclient-4.3.4.jar"/>
<classpathentry kind="lib" path="WebRoot/WEB-INF/lib/httpcore-4.3.2.jar"/>
<classpathentry kind="output" path="WebRoot/WEB-INF/classes"/>
</classpath>

Binary file not shown.

Binary file not shown.

View File

@ -1,14 +1,14 @@
<%@ page language="java" pageEncoding="utf-8"%>
<%@ page import="java.util.Collection"%>
<%@ page import="com.farsunset.ichat.common.util.StringUtil"%>
<%@ page import="org.apache.mina.core.session.IoSession"%>
<%@ page import="com.farsunset.cim.nio.session.CIMSession"%>
<%
String path = request.getContextPath();
String basePath = request.getScheme() + "://"
+ request.getServerName() + ":" + request.getServerPort()
+ path;
Collection<IoSession> sessionList = (Collection<IoSession>)request.getAttribute("sessionList");
Collection<CIMSession> sessionList = (Collection<CIMSession>)request.getAttribute("sessionList");
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
@ -91,40 +91,43 @@
<thead>
<tr class="tableHeader">
<th width="20%">账号</th>
<th width="20%">终端</th>
<th width="20%">在线时长</th>
<th width="20%">心跳时间</th>
<th width="40%">操作</th>
<th width="15%">账号</th>
<th width="15%">终端来源</th>
<th width="15%">终端ID</th>
<th width="15%">终端型号</th>
<th width="15%">在线时长</th>
<th width="10%">操作</th>
</tr>
</thead>
<tbody id="checkPlanList">
<%
for(IoSession ios:sessionList)
for(CIMSession ios:sessionList)
{
%>
<tr id="<%=ios.getAttribute("account") %>">
<tr id="<%=ios.getAccount()%>">
<td>
<%=ios.getAttribute("account") %>
<%=ios.getAccount() %>
</td>
<td>
<%=ios.getAttribute("channel") %>
<%=ios.getChannel() %>
</td>
<td>
<%=(System.currentTimeMillis()-Long.valueOf(ios.getAttribute("loginTime").toString()))/1000 %>秒
<%=ios.getDeviceId() %>
</td>
<td>
<%if(ios.getAttribute("heartbeat")!=null){ %>
<%=StringUtil.transformDateTime(Long.valueOf(ios.getAttribute("heartbeat").toString())) %>
<%} %>
<%=ios.getDeviceModel() %>
</td>
<td>
<%=(System.currentTimeMillis()-ios.getBindTime())/1000 %>秒
</td>
<td>
<div class="btn-group btn-group-xs">
<button type="button" class="btn btn-primary" style="padding: 5px;" onclick="showMessageDialog('<%=ios.getAttribute("account") %>')">发送消息</button>
<button type="button" class="btn btn-danger" style="padding: 5px;" onclick="doOffLine('<%=ios.getAttribute("account") %>')">强制下线</button>
<button type="button" class="btn btn-primary" style="padding: 5px;" onclick="showMessageDialog('<%=ios.getAccount() %>')">发送消息</button>
<button type="button" class="btn btn-danger" style="padding: 5px;" onclick="doOffLine('<%=ios.getAccount() %>')">强制下线</button>
</div>
</td>
</tr>

View File

@ -1,55 +1,74 @@
package com.farsunset.ichat.cim.handler;
import java.net.InetAddress;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.handle.CIMRequestHandler;
import com.farsunset.cim.nio.mutual.Message;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
import com.farsunset.cim.nio.session.DefaultSessionManager;
import com.farsunset.ichat.common.util.ContextHolder;
/**
* 绑定账号到服务端实现
* 账号绑定实现
*
* @author
*/
*/
public class BindHandler implements CIMRequestHandler {
protected final Logger logger = Logger.getLogger(BindHandler.class);
public ReplyBody process(IoSession newSession, SentBody message) {
public ReplyBody process(CIMSession newSession, SentBody message) {
ReplyBody reply = new ReplyBody();
DefaultSessionManager sessionManager= ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager"));
try {
String account = message.get("account");
newSession.setAttribute("channel", message.get("channel"));
newSession.setAttribute("deviceId", message.get("deviceId"));
newSession.setAttribute("device", message.get("device"));
newSession.setAccount(account);
newSession.setDeviceId(message.get("deviceId"));
newSession.setId(UUID.randomUUID().toString());
newSession.setHost(InetAddress.getLocalHost().getHostAddress());
newSession.setChannel( message.get("channel"));
newSession.setDeviceModel(message.get("device"));
/**
* 由于客户端断线服务端可能会无法获知的情况客户端重连时需要关闭旧的连接
*/
IoSession oldSession = sessionManager.getSession(account);
CIMSession oldSession = sessionManager.getSession(account);
if(oldSession!=null)
{
//如果是账号已经在另一台终端登录则让另一个终端下线
if(oldSession.getAttribute("deviceId")!=null&&!oldSession.getAttribute("deviceId").equals(newSession.getAttribute("deviceId")))
if((oldSession.getDeviceId()!=null&&!oldSession.getDeviceId().equals(newSession.getDeviceId())
||!oldSession.equals(newSession)))
{
oldSession.removeAttribute("account");
oldSession.removeAttribute(CIMConstant.SESSION_KEY);
Message msg = new Message();
msg.setType(CIMConstant.MessageType.TYPE_999);//强行下线消息类型
msg.setReceiver(account);
oldSession.write(msg);
oldSession.close(true);
if(!oldSession.isLocalhost())
{
/*
判断当前session是否连接于本台服务器如不是发往目标服务器处理
MessageDispatcher.execute(msg, oldSession.getHost());
*/
}else
{
oldSession.write(msg);
oldSession.close(true);
oldSession = null;
}
oldSession = null;
}
@ -57,23 +76,20 @@ public class BindHandler implements CIMRequestHandler {
if(oldSession==null)
{
//第一次设置心跳时间为登录时间
newSession.setAttribute("heartbeat", System.currentTimeMillis());
newSession.setAttribute("loginTime", System.currentTimeMillis());
newSession.setBindTime(System.currentTimeMillis());
newSession.setHeartbeat(System.currentTimeMillis());
sessionManager.addSession(account, newSession);
//设置在线状态
reply.setCode(CIMConstant.ReturnCode.CODE_200);
}
reply.setCode(CIMConstant.ReturnCode.CODE_200);
} catch (Exception e) {
reply.setCode(CIMConstant.ReturnCode.CODE_500);
e.printStackTrace();
}
logger.debug("auth :account:" +message.get("account")+"-----------------------------" +reply.getCode());
logger.debug("bind :account:" +message.get("account")+"-----------------------------" +reply.getCode());
return reply;
}

View File

@ -1,11 +1,11 @@
package com.farsunset.ichat.cim.handler;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.handle.CIMRequestHandler;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
import com.farsunset.cim.nio.session.DefaultSessionManager;
import com.farsunset.ichat.common.util.ContextHolder;
@ -17,13 +17,13 @@ import com.farsunset.ichat.common.util.ContextHolder;
*/
public class LogoutHandler implements CIMRequestHandler {
public ReplyBody process(IoSession ios, SentBody message) {
public ReplyBody process(CIMSession ios, SentBody message) {
DefaultSessionManager sessionManager = ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager"));
String account =ios.getAttribute("account").toString();
ios.removeAttribute("account");
String account =ios.getAttribute(CIMConstant.SESSION_KEY).toString();
ios.removeAttribute(CIMConstant.SESSION_KEY);
ios.close(true);
sessionManager.removeSession(account);

View File

@ -2,11 +2,12 @@
package com.farsunset.ichat.cim.handler;
import org.apache.log4j.Logger;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.handle.CIMRequestHandler;
import com.farsunset.cim.nio.mutual.ReplyBody;
import com.farsunset.cim.nio.mutual.SentBody;
import com.farsunset.cim.nio.session.CIMSession;
import com.farsunset.cim.nio.session.DefaultSessionManager;
import com.farsunset.ichat.common.util.ContextHolder;
@ -19,17 +20,16 @@ public class SessionClosedHandler implements CIMRequestHandler {
protected final Logger logger = Logger.getLogger(SessionClosedHandler.class);
public ReplyBody process(IoSession ios, SentBody message) {
public ReplyBody process(CIMSession ios, SentBody message) {
DefaultSessionManager sessionManager = ((DefaultSessionManager) ContextHolder.getBean("defaultSessionManager"));
if(ios.getAttribute("account")==null)
if(ios.getAttribute(CIMConstant.SESSION_KEY)==null)
{
return null;
}
String account = ios.getAttribute("account").toString();
String account = ios.getAttribute(CIMConstant.SESSION_KEY).toString();
sessionManager.removeSession(account);
return null;

View File

@ -3,9 +3,9 @@ package com.farsunset.ichat.cim.push;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.mutual.Message;
import com.farsunset.cim.nio.session.CIMSession;
import com.farsunset.cim.nio.session.DefaultSessionManager;
/**
@ -30,9 +30,15 @@ public class DefaultMessagePusher implements CIMMessagePusher {
* @param msg
*/
public void pushMessageToUser(Message msg) {
IoSession session = sessionManager.getSession(msg.getReceiver());
CIMSession session = sessionManager.getSession(msg.getReceiver());
//服务器集群时可以在此 判断当前session是否连接于本台服务器如果是继续往下走如果不是将此消息发往当前session连接的服务器并 return
/*服务器集群时可以在此 判断当前session是否连接于本台服务器如果是继续往下走如果不是将此消息发往当前session连接的服务器并 return
if(!session.isLocalhost()){//判断当前session是否连接于本台服务器如不是
MessageDispatcher.execute(msg, session.getHost());
return;
}
*/
if (session != null && session.isConnected()) {

View File

@ -0,0 +1,95 @@
package com.farsunset.ichat.cim.session;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.session.IoSession;
import com.farsunset.cim.nio.constant.CIMConstant;
import com.farsunset.cim.nio.session.CIMSession;
import com.farsunset.cim.nio.session.SessionManager;
/**
* 集群 session管理实现示例 各位可以自行实现 AbstractSessionManager接口来实现自己的 session管理
*服务器集群时 须要将CIMSession 信息存入数据库或者nosql 第三方存储空间中便于所有服务器都可以访问
* @author farsunset (3979434@qq.com)
*/
public class ClusterSessionManager implements SessionManager{
private static HashMap<String,IoSession> sessions =new HashMap<String,IoSession>();
private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
/**
*
*/
public void addSession(String account,CIMSession session) {
if(session!=null)
{
session.setAttribute(CIMConstant.SESSION_KEY, account);
sessions.put(account, session.getIoSession());
connectionsCounter.incrementAndGet();
}
/**
* 下面 将session 存入数据库
*/
}
public CIMSession getSession(String account) {
/*//这里查询数据库
CIMSession session = database.getSession(account);
session.setIoSession(sessions.get(account));
return session;*/
return null;
}
public Collection<CIMSession> getSessions() {
/*//这里查询数据库
return database.getSessions();*/
return null;
}
public void removeSession(CIMSession session) {
sessions.remove(session.getAccount());
//database.removeSession(session.getAttribute(CIMConstant.SESSION_KEY));*/
}
public void removeSession(String account) {
//database.removeSession(account);*/
sessions.remove(account);
}
public boolean containsCIMSession(CIMSession ios)
{
//return database.containsCIMSession(session.getAccount());
return sessions.containsKey(ios.getAttribute(CIMConstant.SESSION_KEY)) || sessions.containsValue(ios);
}
public String getAccount(CIMSession ios)
{
return ios.getAccount();
}
}

View File

@ -0,0 +1,74 @@
package com.farsunset.ichat.common.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.farsunset.cim.nio.mutual.Message;
public class MessageDispatcher {
private static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 20, TimeUnit.SECONDS,queue);;
final static String sendUrl = "http://%1$s:8080/ichat-servier/cgi/message_send.api";
public static void execute(final Message msg,final String ip)
{
executor.execute(new Runnable() {
@Override
public void run() {
try {
httpPost(String.format(sendUrl,ip),msg);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
private static String httpPost(String url,Message msg) throws Exception
{
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(url);
List <NameValuePair> nvps = new ArrayList <NameValuePair>();
nvps.add(new BasicNameValuePair("mid", msg.getMid()));
nvps.add(new BasicNameValuePair("type", msg.getType()));
nvps.add(new BasicNameValuePair("title", msg.getTitle()));
nvps.add(new BasicNameValuePair("content",msg.getContent()));
nvps.add(new BasicNameValuePair("sender", msg.getSender()));
nvps.add(new BasicNameValuePair("receiver",msg.getReceiver()));
nvps.add(new BasicNameValuePair("file",msg.getFile()));
nvps.add(new BasicNameValuePair("fileType",msg.getFileType()));
nvps.add(new BasicNameValuePair("timestamp",String.valueOf(msg.getTimestamp())));
httpPost.setEntity(new UrlEncodedFormEntity(nvps));
CloseableHttpResponse response2 = httpclient.execute(httpPost);
String data = null;
try {
System.out.println(response2.getStatusLine());
HttpEntity entity2 = response2.getEntity();
data = EntityUtils.toString(entity2);
} finally {
response2.close();
}
return data;
}
}

View File

@ -12,9 +12,9 @@ package com.farsunset.ichat.example.app;
public interface Constant {
//服务端web地址
public static final String SERVER_URL = "http://192.168.0.141:8080/ichat-server";
public static final String SERVER_URL = "http://192.168.0.107:8080/ichat-server";
//服务端IP地址
public static final String CIM_SERVER_HOST = "192.168.0.141";
public static final String CIM_SERVER_HOST = "192.168.0.107";
//注意这里的端口不是tomcat的端口CIM端口在服务端spring-cim.xml中配置的没改动就使用默认的23456
public static final int CIM_SERVER_PORT = 23456;
@ -37,8 +37,6 @@ public interface Constant {
public static final String STATUS_0 = "0";
//消息已经读取
public static final String STATUS_1 = "1";
}
}