mirror of
https://github.com/wbt5/real-url.git
synced 2025-08-02 15:44:49 +08:00
704 lines
25 KiB
Python
704 lines
25 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
# filename: __adapterproxymanager.py_compiler
|
||
|
||
# Tencent is pleased to support the open source community by making Tars available.
|
||
#
|
||
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
|
||
#
|
||
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
|
||
# in compliance with the License. You may obtain a copy of the License at
|
||
#
|
||
# https://opensource.org/licenses/BSD-3-Clause
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software distributed
|
||
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||
# specific language governing permissions and limitations under the License.
|
||
#
|
||
|
||
'''
|
||
@version: 0.01
|
||
@brief: 将rpc部分中的adapterproxymanager抽离出来,实现不同的负载均衡
|
||
'''
|
||
|
||
from enum import Enum
|
||
import random
|
||
import socket
|
||
import select
|
||
import os
|
||
import time
|
||
|
||
|
||
from .__util import (LockGuard, NewLock, ConsistentHashNew)
|
||
from .__trans import EndPointInfo
|
||
from .__logger import tarsLogger
|
||
from . import exception
|
||
from .__trans import TcpTransceiver
|
||
from .__TimeoutQueue import ReqMessage
|
||
from .exception import TarsException
|
||
|
||
|
||
# 因为循环import的问题只能放这里,不能放文件开始处
|
||
from .QueryF import QueryFProxy
|
||
from .QueryF import QueryFPrxCallback
|
||
|
||
|
||
class AdapterProxy:
|
||
'''
|
||
@brief: 每一个Adapter管理一个服务端端口的连接,数据收发
|
||
'''
|
||
|
||
def __init__(self):
|
||
tarsLogger.debug('AdapterProxy:__init__')
|
||
self.__closeTrans = False
|
||
self.__trans = None
|
||
self.__object = None
|
||
self.__reactor = None
|
||
self.__lock = None
|
||
self.__asyncProc = None
|
||
self.__activeStateInReg = True
|
||
|
||
@property
|
||
def activatestateinreg(self):
|
||
return self.__activeStateInReg
|
||
|
||
@activatestateinreg.setter
|
||
def activatestateinreg(self, value):
|
||
self.__activeStateInReg = value
|
||
|
||
def __del__(self):
|
||
tarsLogger.debug('AdapterProxy:__del__')
|
||
|
||
def initialize(self, endPointInfo, objectProxy, reactor, asyncProc):
|
||
'''
|
||
@brief: 初始化
|
||
@param endPointInfo: 连接对端信息
|
||
@type endPointInfo: EndPointInfo
|
||
@type objectProxy: ObjectProxy
|
||
@type reactor: FDReactor
|
||
@type asyncProc: AsyncProcThread
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:initialize')
|
||
self.__closeTrans = False
|
||
self.__trans = TcpTransceiver(endPointInfo)
|
||
self.__object = objectProxy
|
||
self.__reactor = reactor
|
||
# self.__lock = threading.Lock()
|
||
self.__lock = NewLock()
|
||
self.__asyncProc = asyncProc
|
||
|
||
def terminate(self):
|
||
'''
|
||
@brief: 关闭
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:terminate')
|
||
self.setCloseTrans(True)
|
||
|
||
def trans(self):
|
||
'''
|
||
@brief: 获取传输类
|
||
@return: 负责网络传输的trans
|
||
@rtype: Transceiver
|
||
'''
|
||
return self.__trans
|
||
|
||
def invoke(self, reqmsg):
|
||
'''
|
||
@brief: 远程过程调用处理方法
|
||
@param reqmsg: 请求响应报文
|
||
@type reqmsg: ReqMessage
|
||
@return: 错误码:0表示成功,-1表示连接失败
|
||
@rtype: int
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:invoke')
|
||
assert(self.__trans)
|
||
|
||
if (not self.__trans.hasConnected() and
|
||
not self.__trans.isConnecting):
|
||
# -1表示连接失败
|
||
return -1
|
||
|
||
reqmsg.request.iRequestId = self.__object.getTimeoutQueue().generateId()
|
||
self.__object.getTimeoutQueue().push(reqmsg, reqmsg.request.iRequestId)
|
||
|
||
self.__reactor.notify(self)
|
||
|
||
return 0
|
||
|
||
def finished(self, rsp):
|
||
'''
|
||
@brief: 远程过程调用返回处理
|
||
@param rsp: 响应报文
|
||
@type rsp: ResponsePacket
|
||
@return: 函数是否执行成功
|
||
@rtype: bool
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:finished')
|
||
reqmsg = self.__object.getTimeoutQueue().pop(rsp.iRequestId)
|
||
if not reqmsg:
|
||
tarsLogger.error(
|
||
'finished, can not get ReqMessage, may be timeout, id: %d',
|
||
rsp.iRequestId)
|
||
return False
|
||
|
||
reqmsg.response = rsp
|
||
if reqmsg.type == ReqMessage.SYNC_CALL:
|
||
return reqmsg.servant._finished(reqmsg)
|
||
elif reqmsg.callback:
|
||
self.__asyncProc.put(reqmsg)
|
||
return True
|
||
|
||
tarsLogger.error('finished, adapter proxy finish fail, id: %d, ret: %d',
|
||
rsp.iRequestId, rsp.iRet)
|
||
return False
|
||
|
||
# 检测连接是否失败,失效时重连
|
||
def checkActive(self, forceConnect=False):
|
||
'''
|
||
@brief: 检测连接是否失效
|
||
@param forceConnect: 是否强制发起连接,为true时不对状态进行判断就发起连接
|
||
@type forceConnect: bool
|
||
@return: 连接是否有效
|
||
@rtype: bool
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:checkActive')
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
tarsLogger.info('checkActive, %s, forceConnect: %s',
|
||
self.__trans.getEndPointInfo(), forceConnect)
|
||
|
||
if not self.__trans.isConnecting() and not self.__trans.hasConnected():
|
||
self.doReconnect()
|
||
|
||
# self.__lock.release()
|
||
return self.__trans.isConnecting() or self.__trans.hasConnected()
|
||
|
||
def doReconnect(self):
|
||
'''
|
||
@brief: 重新发起连接
|
||
@return: None
|
||
@rtype: None
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:doReconnect')
|
||
assert(self.__trans)
|
||
|
||
self.__trans.reInit()
|
||
tarsLogger.info('doReconnect, connect: %s, fd:%d',
|
||
self.__trans.getEndPointInfo(),
|
||
self.__trans.getFd())
|
||
|
||
self.__reactor.registerAdapter(self, select.EPOLLIN | select.EPOLLOUT)
|
||
|
||
def sendRequest(self):
|
||
'''
|
||
@brief: 把队列中的请求放到Transceiver的发送缓存里
|
||
@return: 放入缓存的数据长度
|
||
@rtype: int
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:sendRequest')
|
||
if not self.__trans.hasConnected():
|
||
return False
|
||
|
||
reqmsg = self.__object.popRequest()
|
||
blen = 0
|
||
while reqmsg:
|
||
reqmsg.adapter = self
|
||
buf = reqmsg.packReq()
|
||
self.__trans.writeToSendBuf(buf)
|
||
tarsLogger.info('sendRequest, id: %d, len: %d',
|
||
reqmsg.request.iRequestId, len(buf))
|
||
blen += len(buf)
|
||
# 合并一次发送的包 最大合并至8k 提高异步时客户端效率?
|
||
if (self.__trans.getEndPointInfo().getConnType() == EndPointInfo.SOCK_UDP
|
||
or blen > 8192):
|
||
break
|
||
reqmsg = self.__object.popRequest()
|
||
|
||
return blen
|
||
|
||
def finishConnect(self):
|
||
'''
|
||
@brief: 使用的非阻塞socket连接不能立刻判断是否连接成功,
|
||
在epoll响应后调用此函数处理connect结束后的操作
|
||
@return: 是否连接成功
|
||
@rtype: bool
|
||
'''
|
||
tarsLogger.debug('AdapterProxy:finishConnect')
|
||
success = True
|
||
errmsg = ''
|
||
try:
|
||
ret = self.__trans.getSock().getsockopt(
|
||
socket.SOL_SOCKET, socket.SO_ERROR)
|
||
if ret:
|
||
success = False
|
||
errmsg = os.strerror(ret)
|
||
except Exception as msg:
|
||
errmsg = msg
|
||
success = False
|
||
|
||
if not success:
|
||
self.__reactor.unregisterAdapter(
|
||
self, socket.EPOLLIN | socket.EPOLLOUT)
|
||
self.__trans.close()
|
||
self.__trans.setConnFailed()
|
||
tarsLogger.error(
|
||
'AdapterProxy finishConnect, exception: %s, error: %s',
|
||
self.__trans.getEndPointInfo(), errmsg)
|
||
return False
|
||
self.__trans.setConnected()
|
||
self.__reactor.notify(self)
|
||
tarsLogger.info('AdapterProxy finishConnect, connect %s success',
|
||
self.__trans.getEndPointInfo())
|
||
return True
|
||
|
||
def finishInvoke(self, isTimeout):
|
||
pass
|
||
|
||
# 弹出请求报文
|
||
def popRequest(self):
|
||
pass
|
||
|
||
def shouldCloseTrans(self):
|
||
'''
|
||
@brief: 是否设置关闭连接
|
||
@return: 关闭连接的flag的值
|
||
@rtype: bool
|
||
'''
|
||
return self.__closeTrans
|
||
|
||
def setCloseTrans(self, closeTrans):
|
||
'''
|
||
@brief: 设置关闭连接flag的值
|
||
@param closeTrans: 是否关闭连接
|
||
@type closeTrans: bool
|
||
@return: None
|
||
@rtype: None
|
||
'''
|
||
self.__closeTrans = closeTrans
|
||
|
||
|
||
class QueryRegisterCallback(QueryFPrxCallback):
|
||
def __init__(self, adpManager):
|
||
self.__adpManager = adpManager
|
||
super(QueryRegisterCallback, self).__init__()
|
||
# QueryFPrxCallback.__init__(self)
|
||
|
||
def callback_findObjectById4All(self, ret, activeEp, inactiveEp):
|
||
eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
|
||
for x in activeEp if ret == 0 and x.istcp]
|
||
ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
|
||
for x in inactiveEp if ret == 0 and x.istcp]
|
||
self.__adpManager.setEndpoints(eplist, ieplist)
|
||
|
||
def callback_findObjectById4All_exception(self, ret):
|
||
tarsLogger.error('callback_findObjectById4All_exception ret: %d', ret)
|
||
|
||
|
||
class EndpointWeightType(Enum):
|
||
E_LOOP = 0
|
||
E_STATIC_WEIGHT = 1
|
||
|
||
|
||
class AdapterProxyManager:
|
||
'''
|
||
@brief: 管理Adapter
|
||
'''
|
||
|
||
def __init__(self):
|
||
tarsLogger.debug('AdapterProxyManager:__init__')
|
||
self.__comm = None
|
||
self.__object = None
|
||
# __adps的key=str(EndPointInfo) value=[EndPointInfo, AdapterProxy, cnt]
|
||
# cnt是访问次数
|
||
self.__adps = {}
|
||
self.__iadps = {}
|
||
self.__newLock = None
|
||
self.__isDirectProxy = True
|
||
self.__lastFreshTime = 0
|
||
self.__queryRegisterCallback = QueryRegisterCallback(self)
|
||
self.__regAdapterProxyDict = {}
|
||
self.__lastConHashPrxList = []
|
||
self.__consistentHashWeight = None
|
||
self.__weightType = EndpointWeightType.E_LOOP
|
||
self.__update = True
|
||
self.__lastWeightedProxyData = {}
|
||
|
||
def initialize(self, comm, objectProxy, eplist):
|
||
'''
|
||
@brief: 初始化
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:initialize')
|
||
self.__comm = comm
|
||
self.__object = objectProxy
|
||
self.__newLock = NewLock()
|
||
|
||
self.__isDirectProxy = len(eplist) > 0
|
||
if self.__isDirectProxy:
|
||
self.setEndpoints(eplist, {})
|
||
else:
|
||
self.refreshEndpoints()
|
||
|
||
def terminate(self):
|
||
'''
|
||
@brief: 释放资源
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:terminate')
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__newLock)
|
||
for ep, epinfo in self.__adps.items():
|
||
epinfo[1].terminate()
|
||
self.__adps = {}
|
||
self.__lock.release()
|
||
|
||
def refreshEndpoints(self):
|
||
'''
|
||
@brief: 刷新服务器列表
|
||
@return: 新的服务列表
|
||
@rtype: EndPointInfo列表
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:refreshEndpoints')
|
||
if self.__isDirectProxy:
|
||
return
|
||
|
||
interval = self.__comm.getProperty(
|
||
'refresh-endpoint-interval', float) / 1000
|
||
locator = self.__comm.getProperty('locator')
|
||
|
||
if '@' not in locator:
|
||
raise exception.TarsRegistryException(
|
||
'locator is not valid: ' + locator)
|
||
|
||
now = time.time()
|
||
last = self.__lastFreshTime
|
||
epSize = len(self.__adps)
|
||
if last + interval < now or (epSize <= 0 and last + 2 < now):
|
||
queryFPrx = self.__comm.stringToProxy(QueryFProxy, locator)
|
||
# 首次访问是同步调用,之后访问是异步调用
|
||
if epSize == 0 or last == 0:
|
||
ret, activeEps, inactiveEps = (
|
||
queryFPrx.findObjectById4All(self.__object.name()))
|
||
# 目前只支持TCP
|
||
eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
|
||
for x in activeEps if ret == 0 and x.istcp]
|
||
ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
|
||
for x in inactiveEps if ret == 0 and x.istcp]
|
||
self.setEndpoints(eplist, ieplist)
|
||
else:
|
||
queryFPrx.async_findObjectById4All(self.__queryRegisterCallback,
|
||
self.__object.name())
|
||
self.__lastFreshTime = now
|
||
|
||
def getEndpoints(self):
|
||
'''
|
||
@brief: 获取可用服务列表 如果启用分组,只返回同分组的服务端ip
|
||
@return: 获取节点列表
|
||
@rtype: EndPointInfo列表
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:getEndpoints')
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__newLock)
|
||
ret = [x[1][0] for x in list(self.__adps.items())]
|
||
# self.__lock.release()
|
||
|
||
return ret
|
||
|
||
def setEndpoints(self, eplist, ieplist):
|
||
'''
|
||
@brief: 设置服务端信息
|
||
@para eplist: 活跃的被调节点列表
|
||
@para ieplist: 不活跃的被调节点列表
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:setEndpoints')
|
||
adps = {}
|
||
iadps = {}
|
||
comm = self.__comm
|
||
isNeedNotify = False
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__newLock)
|
||
isStartStatic = True
|
||
|
||
for ep in eplist:
|
||
if ep.getWeightType() == 0:
|
||
isStartStatic = False
|
||
epstr = str(ep)
|
||
if epstr in self.__adps:
|
||
adps[epstr] = self.__adps[epstr]
|
||
continue
|
||
isNeedNotify = True
|
||
self.__update = True
|
||
adapter = AdapterProxy()
|
||
adapter.initialize(ep, self.__object,
|
||
comm.getReactor(), comm.getAsyncProc())
|
||
adapter.activatestateinreg = True
|
||
adps[epstr] = [ep, adapter, 0]
|
||
self.__adps, adps = adps, self.__adps
|
||
|
||
for iep in ieplist:
|
||
iepstr = str(iep)
|
||
if iepstr in self.__iadps:
|
||
iadps[iepstr] = self.__iadps[iepstr]
|
||
continue
|
||
isNeedNotify = True
|
||
adapter = AdapterProxy()
|
||
adapter.initialize(iep, self.__object,
|
||
comm.getReactor(), comm.getAsyncProc())
|
||
adapter.activatestateinreg = False
|
||
iadps[iepstr] = [iep, adapter, 0]
|
||
self.__iadps, iadps = iadps, self.__iadps
|
||
|
||
if isStartStatic:
|
||
self.__weightType = EndpointWeightType.E_STATIC_WEIGHT
|
||
else:
|
||
self.__weightType = EndpointWeightType.E_LOOP
|
||
|
||
# self.__lock.release()
|
||
if isNeedNotify:
|
||
self.__notifyEndpoints(self.__adps, self.__iadps)
|
||
# 关闭已经失效的连接
|
||
for ep in adps:
|
||
if ep not in self.__adps:
|
||
adps[ep][1].terminate()
|
||
|
||
def __notifyEndpoints(self, actives, inactives):
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__newLock)
|
||
self.__regAdapterProxyDict.clear()
|
||
self.__regAdapterProxyDict.update(actives)
|
||
self.__regAdapterProxyDict.update(inactives)
|
||
# self.__lock.release()
|
||
|
||
def __getNextValidProxy(self):
|
||
'''
|
||
@brief: 刷新本地缓存列表,如果服务下线了,要求删除本地缓存
|
||
@return:
|
||
@rtype: EndPointInfo列表
|
||
@todo: 优化负载均衡算法
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:getNextValidProxy')
|
||
lock = LockGuard(self.__newLock)
|
||
if len(self.__adps) == 0:
|
||
raise TarsException("the activate adapter proxy is empty")
|
||
|
||
sortedActivateAdp = sorted(
|
||
list(self.__adps.items()), key=lambda item: item[1][2])
|
||
# self.refreshEndpoints()
|
||
# self.__lock.acquire()
|
||
sortedActivateAdpSize = len(sortedActivateAdp)
|
||
|
||
while sortedActivateAdpSize != 0:
|
||
if sortedActivateAdp[0][1][1].checkActive():
|
||
self.__adps[sortedActivateAdp[0][0]][2] += 1
|
||
# 返回的是 adapterProxy
|
||
return self.__adps[sortedActivateAdp[0][0]][1]
|
||
sortedActivateAdp.pop(0)
|
||
sortedActivateAdpSize -= 1
|
||
# 随机重连一个可用节点
|
||
adpPrx = list(self.__adps.items())[random.randint(
|
||
0, len(self.__adps))][1][1]
|
||
adpPrx.checkActive()
|
||
return None
|
||
# self.__lock.release()
|
||
|
||
def __getHashProxy(self, reqmsg):
|
||
if self.__weightType == EndpointWeightType.E_LOOP:
|
||
if reqmsg.isConHash:
|
||
return self.__getConHashProxyForNormal(reqmsg.hashCode)
|
||
else:
|
||
return self.__getHashProxyForNormal(reqmsg.hashCode)
|
||
else:
|
||
if reqmsg.isConHash:
|
||
return self.__getConHashProxyForWeight(reqmsg.hashCode)
|
||
else:
|
||
return self.__getHashProxyForWeight(reqmsg.hashCode)
|
||
|
||
def __getHashProxyForNormal(self, hashCode):
|
||
tarsLogger.debug('AdapterProxyManager:getHashProxyForNormal')
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__newLock)
|
||
regAdapterProxyList = sorted(
|
||
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
|
||
|
||
allPrxSize = len(regAdapterProxyList)
|
||
if allPrxSize == 0:
|
||
raise TarsException("the adapter proxy is empty")
|
||
hashNum = hashCode % allPrxSize
|
||
|
||
if regAdapterProxyList[hashNum][1][1].activatestateinreg and regAdapterProxyList[hashNum][1][1].checkActive():
|
||
epstr = regAdapterProxyList[hashNum][0]
|
||
self.__regAdapterProxyDict[epstr][2] += 1
|
||
if epstr in self.__adps:
|
||
self.__adps[epstr][2] += 1
|
||
elif epstr in self.__iadps:
|
||
self.__iadps[epstr][2] += 1
|
||
return self.__regAdapterProxyDict[epstr][1]
|
||
else:
|
||
if len(self.__adps) == 0:
|
||
raise TarsException("the activate adapter proxy is empty")
|
||
activeProxyList = list(self.__adps.items())
|
||
actPrxSize = len(activeProxyList)
|
||
while actPrxSize != 0:
|
||
hashNum = hashCode % actPrxSize
|
||
if activeProxyList[hashNum][1][1].checkActive():
|
||
self.__adps[activeProxyList[hashNum][0]][2] += 1
|
||
return self.__adps[activeProxyList[hashNum][0]][1]
|
||
activeProxyList.pop(hashNum)
|
||
actPrxSize -= 1
|
||
# 随机重连一个可用节点
|
||
adpPrx = list(self.__adps.items())[random.randint(
|
||
0, len(self.__adps))][1][1]
|
||
adpPrx.checkActive()
|
||
return None
|
||
|
||
def __getConHashProxyForNormal(self, hashCode):
|
||
tarsLogger.debug('AdapterProxyManager:getConHashProxyForNormal')
|
||
lock = LockGuard(self.__newLock)
|
||
if len(self.__regAdapterProxyDict) == 0:
|
||
raise TarsException("the adapter proxy is empty")
|
||
if self.__consistentHashWeight is None or self.__checkConHashChange(self.__lastConHashPrxList):
|
||
self.__updateConHashProxyWeighted()
|
||
|
||
if len(self.__consistentHashWeight.nodes) > 0:
|
||
conHashIndex = self.__consistentHashWeight.getNode(hashCode)
|
||
if conHashIndex in self.__regAdapterProxyDict and self.__regAdapterProxyDict[conHashIndex][1].activatestateinreg and self.__regAdapterProxyDict[conHashIndex][1].checkActive():
|
||
self.__regAdapterProxyDict[conHashIndex][2] += 1
|
||
if conHashIndex in self.__adps:
|
||
self.__adps[conHashIndex][2] += 1
|
||
elif conHashIndex in self.__iadps:
|
||
self.__iadps[conHashIndex][2] += 1
|
||
return self.__regAdapterProxyDict[conHashIndex][1]
|
||
else:
|
||
if len(self.__adps) == 0:
|
||
raise TarsException("the activate adapter proxy is empty")
|
||
activeProxyList = list(self.__adps.items())
|
||
actPrxSize = len(activeProxyList)
|
||
while actPrxSize != 0:
|
||
hashNum = hashCode % actPrxSize
|
||
if activeProxyList[hashNum][1][1].checkActive():
|
||
self.__adps[activeProxyList[hashNum][0]][2] += 1
|
||
return self.__adps[activeProxyList[hashNum][0]][1]
|
||
activeProxyList.pop(hashNum)
|
||
actPrxSize -= 1
|
||
# 随机重连一个可用节点
|
||
adpPrx = list(self.__adps.items())[random.randint(
|
||
0, len(self.__adps))][1][1]
|
||
adpPrx.checkActive()
|
||
return None
|
||
pass
|
||
else:
|
||
return self.__getHashProxyForNormal(hashCode)
|
||
|
||
def __getHashProxyForWeight(self, hashCode):
|
||
return None
|
||
pass
|
||
|
||
def __getConHashProxyForWeight(self, hashCode):
|
||
return None
|
||
pass
|
||
|
||
def __checkConHashChange(self, lastConHashPrxList):
|
||
tarsLogger.debug('AdapterProxyManager:checkConHashChange')
|
||
lock = LockGuard(self.__newLock)
|
||
if len(lastConHashPrxList) != len(self.__regAdapterProxyDict):
|
||
return True
|
||
regAdapterProxyList = sorted(
|
||
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
|
||
regAdapterProxyListSize = len(regAdapterProxyList)
|
||
for index in range(regAdapterProxyListSize):
|
||
if cmp(lastConHashPrxList[index][0], regAdapterProxyList[index][0]) != 0:
|
||
return True
|
||
return False
|
||
|
||
def __updateConHashProxyWeighted(self):
|
||
tarsLogger.debug('AdapterProxyManager:updateConHashProxyWeighted')
|
||
lock = LockGuard(self.__newLock)
|
||
if len(self.__regAdapterProxyDict) == 0:
|
||
raise TarsException("the adapter proxy is empty")
|
||
self.__lastConHashPrxList = sorted(
|
||
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
|
||
nodes = []
|
||
for var in self.__lastConHashPrxList:
|
||
nodes.append(var[0])
|
||
if self.__consistentHashWeight is None:
|
||
self.__consistentHashWeight = ConsistentHashNew(nodes)
|
||
else:
|
||
theOldActiveNodes = [
|
||
var for var in nodes if var in self.__consistentHashWeight.nodes]
|
||
|
||
theOldInactiveNodes = [
|
||
var for var in self.__consistentHashWeight.nodes if var not in theOldActiveNodes]
|
||
for var in theOldInactiveNodes:
|
||
self.__consistentHashWeight.removeNode(var)
|
||
|
||
theNewActiveNodes = [
|
||
var for var in nodes if var not in theOldActiveNodes]
|
||
for var in theNewActiveNodes:
|
||
self.__consistentHashWeight.addNode(var)
|
||
|
||
self.__consistentHashWeight.nodes = nodes
|
||
pass
|
||
|
||
def __getWeightedProxy(self):
|
||
tarsLogger.debug('AdapterProxyManager:getWeightedProxy')
|
||
lock = LockGuard(self.__newLock)
|
||
if len(self.__adps) == 0:
|
||
raise TarsException("the activate adapter proxy is empty")
|
||
|
||
if self.__update is True:
|
||
self.__lastWeightedProxyData.clear()
|
||
weightedProxyData = {}
|
||
minWeight = (list(self.__adps.items())[0][1][0]).getWeight()
|
||
for item in list(self.__adps.items()):
|
||
weight = (item[1][0].getWeight())
|
||
weightedProxyData[item[0]] = (weight)
|
||
if minWeight > weight:
|
||
minWeight = weight
|
||
|
||
if minWeight <= 0:
|
||
addWeight = -minWeight + 1
|
||
for item in list(weightedProxyData.items()):
|
||
item[1] += addWeight
|
||
|
||
self.__update = False
|
||
self.__lastWeightedProxyData = weightedProxyData
|
||
|
||
weightedProxyData = self.__lastWeightedProxyData
|
||
while len(weightedProxyData) > 0:
|
||
total = sum(weightedProxyData.values())
|
||
rand = random.randint(1, total)
|
||
temp = 0
|
||
for item in list(weightedProxyData.items()):
|
||
temp += item[1]
|
||
if rand <= temp:
|
||
if self.__adps[item[0]][1].checkActive():
|
||
self.__adps[item[0]][2] += 1
|
||
return self.__adps[item[0]][1]
|
||
else:
|
||
weightedProxyData.pop(item[0])
|
||
break
|
||
# 没有一个活跃的节点
|
||
# 随机重连一个可用节点
|
||
adpPrx = list(self.__adps.items())[random.randint(
|
||
0, len(self.__adps))][1][1]
|
||
adpPrx.checkActive()
|
||
return None
|
||
|
||
def selectAdapterProxy(self, reqmsg):
|
||
'''
|
||
@brief: 刷新本地缓存列表,如果服务下线了,要求删除本地缓存,通过一定算法返回AdapterProxy
|
||
@param: reqmsg:请求响应报文
|
||
@type reqmsg: ReqMessage
|
||
@return:
|
||
@rtype: EndPointInfo列表
|
||
@todo: 优化负载均衡算法
|
||
'''
|
||
tarsLogger.debug('AdapterProxyManager:selectAdapterProxy')
|
||
self.refreshEndpoints()
|
||
if reqmsg.isHash:
|
||
return self.__getHashProxy(reqmsg)
|
||
else:
|
||
if self.__weightType == EndpointWeightType.E_STATIC_WEIGHT:
|
||
return self.__getWeightedProxy()
|
||
else:
|
||
return self.__getNextValidProxy()
|