1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-06-17 08:25:25 +08:00
This commit is contained in:
wbt5 2020-06-18 10:34:09 +08:00
parent 30d835b62c
commit 8ba799f3f6
19 changed files with 4353 additions and 0 deletions

View File

@ -0,0 +1,69 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
from core import tarscore
class EndpointF(tarscore.struct):
__tars_class__ = "register.EndpointF"
def __init__(self):
self.host = ""
self.port = 0
self.timeout = 0
self.istcp = 0
self.grid = 0
self.groupworkid = 0
self.grouprealid = 0
self.setId = ""
self.qos = 0
self.bakFlag = 0
self.weight = 0
self.weightType = 0
@staticmethod
def writeTo(oos, value):
oos.write(tarscore.string, 0, value.host)
oos.write(tarscore.int32, 1, value.port)
oos.write(tarscore.int32, 2, value.timeout)
oos.write(tarscore.int32, 3, value.istcp)
oos.write(tarscore.int32, 4, value.grid)
oos.write(tarscore.int32, 5, value.groupworkid)
oos.write(tarscore.int32, 6, value.grouprealid)
oos.write(tarscore.string, 7, value.setId)
oos.write(tarscore.int32, 8, value.qos)
oos.write(tarscore.int32, 9, value.bakFlag)
oos.write(tarscore.int32, 11, value.weight)
oos.write(tarscore.int32, 12, value.weightType)
@staticmethod
def readFrom(ios):
value = EndpointF()
value.host = ios.read(tarscore.string, 0, True, value.host)
value.port = ios.read(tarscore.int32, 1, True, value.port)
value.timeout = ios.read(tarscore.int32, 2, True, value.timeout)
value.istcp = ios.read(tarscore.int32, 3, True, value.istcp)
value.grid = ios.read(tarscore.int32, 4, True, value.grid)
value.groupworkid = ios.read(
tarscore.int32, 5, False, value.groupworkid)
value.grouprealid = ios.read(
tarscore.int32, 6, False, value.grouprealid)
value.setId = ios.read(tarscore.string, 7, False, value.setId)
value.qos = ios.read(tarscore.int32, 8, False, value.qos)
value.bakFlag = ios.read(tarscore.int32, 9, False, value.bakFlag)
value.weight = ios.read(tarscore.int32, 11, False, value.weight)
value.weightType = ios.read(
tarscore.int32, 12, False, value.weightType)
return value

View File

@ -0,0 +1,276 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# from tars.core import tarscore;
# from tars.core import ServantProxy;
# from tars.core import ServantProxyCallback;
# from com.qq.register.EndpointF import *;
from .__init__ import tarscore
from .__servantproxy import ServantProxy
from .__async import ServantProxyCallback
from .EndpointF import EndpointF
import time
# proxy for client
class QueryFProxy(ServantProxy):
def findObjectById(self, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
rsp = self.tars_invoke(ServantProxy.TARSNORMAL,
"findObjectById", oos.getBuffer(), context, None)
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.vctclass(EndpointF), 0, True)
return (ret)
def async_findObjectById(self, callback, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectById", oos.getBuffer(), context, None, callback)
def findObjectById4Any(self, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
rsp = self.tars_invoke(
ServantProxy.TARSNORMAL, "findObjectById4Any", oos.getBuffer(), context, None)
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
return (ret, activeEp, inactiveEp)
def async_findObjectById4Any(self, callback, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectById4Any", oos.getBuffer(), context, None, callback)
def findObjectById4All(self, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
rsp = self.tars_invoke(
ServantProxy.TARSNORMAL, "findObjectById4All", oos.getBuffer(), context, None)
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
return (ret, activeEp, inactiveEp)
def async_findObjectById4All(self, callback, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectById4All", oos.getBuffer(), context, None, callback)
def findObjectByIdInSameGroup(self, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
rsp = self.tars_invoke(
ServantProxy.TARSNORMAL, "findObjectByIdInSameGroup", oos.getBuffer(), context, None)
startDecodeTime = time.time()
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
endDecodeTime = time.time()
return (ret, activeEp, inactiveEp, (endDecodeTime - startDecodeTime))
def async_findObjectByIdInSameGroup(self, callback, id, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectByIdInSameGroup", oos.getBuffer(), context, None, callback)
def findObjectByIdInSameStation(self, id, sStation, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
oos.write(tarscore.string, 2, sStation)
rsp = self.tars_invoke(
ServantProxy.TARSNORMAL, "findObjectByIdInSameStation", oos.getBuffer(), context, None)
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True)
return (ret, activeEp, inactiveEp)
def async_findObjectByIdInSameStation(self, callback, id, sStation, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
oos.write(tarscore.string, 2, sStation)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectByIdInSameStation", oos.getBuffer(), context, None, callback)
def findObjectByIdInSameSet(self, id, setId, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
oos.write(tarscore.string, 2, setId)
rsp = self.tars_invoke(
ServantProxy.TARSNORMAL, "findObjectByIdInSameSet", oos.getBuffer(), context, None)
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True)
return (ret, activeEp, inactiveEp)
def async_findObjectByIdInSameSet(self, callback, id, setId, context=ServantProxy.mapcls_context()):
oos = tarscore.TarsOutputStream()
oos.write(tarscore.string, 1, id)
oos.write(tarscore.string, 2, setId)
self.tars_invoke_async(
ServantProxy.TARSNORMAL, "findObjectByIdInSameSet", oos.getBuffer(), context, None, callback)
# ========================================================
# callback of async proxy for client
# ========================================================
class QueryFPrxCallback(ServantProxyCallback):
def __init__(self):
ServantProxyCallback.__init__(self)
self.callback_map = {
'findObjectById': self.__invoke_findObjectById,
'findObjectById4Any': self.__invoke_findObjectById4Any,
'findObjectById4All': self.__invoke_findObjectById4All,
'findObjectByIdInSameGroup': self.__invoke_findObjectByIdInSameGroup,
'findObjectByIdInSameStation': self.__invoke_findObjectByIdInSameStation,
'findObjectByIdInSameSet': self.__invoke_findObjectByIdInSameSet
}
def callback_findObjectById(self, ret):
raise NotImplementedError()
def callback_findObjectById_exception(self, ret):
raise NotImplementedError()
def callback_findObjectById4Any(self, ret, activeEp, inactiveEp):
raise NotImplementedError()
def callback_findObjectById4Any_exception(self, ret):
raise NotImplementedError()
def callback_findObjectById4All(self, ret, activeEp, inactiveEp):
raise NotImplementedError()
def callback_findObjectById4All_exception(self, ret):
raise NotImplementedError()
def callback_findObjectByIdInSameGroup(self, ret, activeEp, inactiveEp):
raise NotImplementedError()
def callback_findObjectByIdInSameGroup_exception(self, ret):
raise NotImplementedError()
def callback_findObjectByIdInSameStation(self, ret, activeEp, inactiveEp):
raise NotImplementedError()
def callback_findObjectByIdInSameStation_exception(self, ret):
raise NotImplementedError()
def callback_findObjectByIdInSameSet(self, ret, activeEp, inactiveEp):
raise NotImplementedError()
def callback_findObjectByIdInSameSet_exception(self, ret):
raise NotImplementedError()
def __invoke_findObjectById(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectById_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.vctclass(EndpointF), 0, True)
self.callback_findObjectById(ret)
def __invoke_findObjectById4Any(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectById4Any_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
self.callback_findObjectById4Any(ret, activeEp, inactiveEp)
def __invoke_findObjectById4All(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectById4All_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
self.callback_findObjectById4All(ret, activeEp, inactiveEp)
def __invoke_findObjectByIdInSameGroup(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectByIdInSameGroup_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
self.callback_findObjectByIdInSameGroup(ret, activeEp, inactiveEp)
def __invoke_findObjectByIdInSameStation(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectByIdInSameStation_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True)
self.callback_findObjectByIdInSameStation(ret, activeEp, inactiveEp)
def __invoke_findObjectByIdInSameSet(self, reqmsg):
rsp = reqmsg.response
if rsp.iRet != ServantProxy.TARSSERVERSUCCESS:
self.callback_findObjectByIdInSameSet_exception(rsp.iRet)
return rsp.iRet
ios = tarscore.TarsInputStream(rsp.sBuffer)
ret = ios.read(tarscore.int32, 0, True)
activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True)
inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True)
self.callback_findObjectByIdInSameSet(ret, activeEp, inactiveEp)
def onDispatch(self, reqmsg):
self.callback_map[reqmsg.request.sFuncName](reqmsg)

View File

@ -0,0 +1,300 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __timeQueue.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 请求响应报文和超时队列
'''
import threading
import time
import struct
from .__logger import tarsLogger
from .__tars import TarsInputStream
from .__tars import TarsOutputStream
from .__packet import RequestPacket
from .__packet import ResponsePacket
from .__util import (NewLock, LockGuard)
class ReqMessage:
'''
@brief: 请求响应报文保存一个请求响应所需要的数据
'''
SYNC_CALL = 1
ASYNC_CALL = 2
ONE_WAY = 3
def __init__(self):
self.type = ReqMessage.SYNC_CALL
self.servant = None
self.lock = None
self.adapter = None
self.request = None
self.response = None
self.callback = None
self.begtime = None
self.endtime = None
self.isHash = False
self.isConHash = False
self.hashCode = 0
def packReq(self):
'''
@brief: 序列化请求报文
@return: 序列化后的请求报文
@rtype: str
'''
if not self.request:
return ''
oos = TarsOutputStream()
RequestPacket.writeTo(oos, self.request)
reqpkt = oos.getBuffer()
plen = len(reqpkt) + 4
reqpkt = struct.pack('!i', plen) + reqpkt
return reqpkt
@staticmethod
def unpackRspList(buf):
'''
@brief: 解码响应报文
@param buf: 多个序列化后的响应报文数据
@type buf: str
@return: 解码出来的响应报文和解码的buffer长度
@rtype: rsplist: 装有ResponsePacket的list
unpacklen: int
'''
rsplist = []
if not buf:
return rsplist
unpacklen = 0
buf = buffer(buf)
while True:
if len(buf) - unpacklen < 4:
break
packsize = buf[unpacklen: unpacklen+4]
packsize, = struct.unpack_from('!i', packsize)
if len(buf) < unpacklen + packsize:
break
ios = TarsInputStream(buf[unpacklen+4: unpacklen+packsize])
rsp = ResponsePacket.readFrom(ios)
rsplist.append(rsp)
unpacklen += packsize
return rsplist, unpacklen
# 超时队列,加锁,线程安全
class TimeoutQueue:
'''
@brief: 超时队列加锁线程安全
可以像队列一样FIFO也可以像字典一样按key取item
@todo: 限制队列长度
'''
def __init__(self, timeout=3):
self.__uniqId = 0
# self.__lock = threading.Lock()
self.__lock = NewLock()
self.__data = {}
self.__queue = []
self.__timeout = timeout
def getTimeout(self):
'''
@brief: 获取超时时间单位为s
@return: 超时时间
@rtype: float
'''
return self.__timeout
def setTimeout(self, timeout):
'''
@brief: 设置超时时间单位为s
@param timeout: 超时时间
@type timeout: float
@return: None
@rtype: None
'''
self.__timeout = timeout
def size(self):
'''
@brief: 获取队列长度
@return: 队列长度
@rtype: int
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = len(self.__data)
# self.__lock.release()
return ret
def generateId(self):
'''
@brief: 生成唯一id0 < id < 2 ** 32
@return: id
@rtype: int
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = self.__uniqId
ret = (ret + 1) % 0x7FFFFFFF
while ret <= 0:
ret = (ret + 1) % 0x7FFFFFFF
self.__uniqId = ret
# self.__lock.release()
return ret
def pop(self, uniqId=0, erase=True):
'''
@brief: 弹出item
@param uniqId: item的id如果为0按FIFO弹出
@type uniqId: int
@param erase: 弹出后是否从字典里删除item
@type erase: bool
@return: item
@rtype: any type
'''
ret = None
# self.__lock.acquire()
lock = LockGuard(self.__lock)
if not uniqId:
if len(self.__queue):
uniqId = self.__queue.pop(0)
if uniqId:
if erase:
ret = self.__data.pop(uniqId, None)
else:
ret = self.__data.get(uniqId, None)
# self.__lock.release()
return ret[0] if ret else None
def push(self, item, uniqId):
'''
@brief: 数据入队列如果队列已经有了uniqId插入失败
@param item: 插入的数据
@type item: any type
@return: 插入是否成功
@rtype: bool
'''
begtime = time.time()
ret = True
# self.__lock.acquire()
lock = LockGuard(self.__lock)
if uniqId in self.__data:
ret = False
else:
self.__data[uniqId] = [item, begtime]
self.__queue.append(uniqId)
# self.__lock.release()
return ret
def peek(self, uniqId):
'''
@brief: 根据uniqId获取item不会删除item
@param uniqId: item的id
@type uniqId: int
@return: item
@rtype: any type
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = self.__data.get(uniqId, None)
# self.__lock.release()
if not ret:
return None
return ret[0]
def timeout(self):
'''
@brief: 检测是否有item超时如果有就删除
@return: None
@rtype: None
'''
endtime = time.time()
# self.__lock.acquire()
lock = LockGuard(self.__lock)
# 处理异常情况,防止死锁
try:
new_data = {}
for uniqId, item in self.__data.items():
if endtime - item[1] < self.__timeout:
new_data[uniqId] = item
else:
tarsLogger.debug(
'TimeoutQueue:timeout remove id : %d' % uniqId)
self.__data = new_data
finally:
# self.__lock.release()
pass
class QueueTimeout(threading.Thread):
"""
超时线程定时触发超时事件
"""
def __init__(self, timeout=0.1):
# threading.Thread.__init__(self)
tarsLogger.debug('QueueTimeout:__init__')
super(QueueTimeout, self).__init__()
self.timeout = timeout
self.__terminate = False
self.__handler = None
self.__lock = threading.Condition()
def terminate(self):
tarsLogger.debug('QueueTimeout:terminate')
self.__terminate = True
self.__lock.acquire()
self.__lock.notifyAll()
self.__lock.release()
def setHandler(self, handler):
self.__handler = handler
def run(self):
while not self.__terminate:
try:
self.__lock.acquire()
self.__lock.wait(self.timeout)
self.__lock.release()
if self.__terminate:
break
self.__handler()
except Exception as msg:
tarsLogger.error('QueueTimeout:run exception : %s', msg)
tarsLogger.debug('QueueTimeout:run finished')
if __name__ == '__main__':
pass

View File

@ -0,0 +1,703 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __adapterproxymanager.py_compiler
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 将rpc部分中的adapterproxymanager抽离出来实现不同的负载均衡
'''
from enum import Enum
import random
import socket
import select
import os
import time
from .__util import (LockGuard, NewLock, ConsistentHashNew)
from .__trans import EndPointInfo
from .__logger import tarsLogger
from . import exception
from .__trans import TcpTransceiver
from .__TimeoutQueue import ReqMessage
from .exception import TarsException
# 因为循环import的问题只能放这里不能放文件开始处
from .QueryF import QueryFProxy
from .QueryF import QueryFPrxCallback
class AdapterProxy:
'''
@brief: 每一个Adapter管理一个服务端端口的连接数据收发
'''
def __init__(self):
tarsLogger.debug('AdapterProxy:__init__')
self.__closeTrans = False
self.__trans = None
self.__object = None
self.__reactor = None
self.__lock = None
self.__asyncProc = None
self.__activeStateInReg = True
@property
def activatestateinreg(self):
return self.__activeStateInReg
@activatestateinreg.setter
def activatestateinreg(self, value):
self.__activeStateInReg = value
def __del__(self):
tarsLogger.debug('AdapterProxy:__del__')
def initialize(self, endPointInfo, objectProxy, reactor, asyncProc):
'''
@brief: 初始化
@param endPointInfo: 连接对端信息
@type endPointInfo: EndPointInfo
@type objectProxy: ObjectProxy
@type reactor: FDReactor
@type asyncProc: AsyncProcThread
'''
tarsLogger.debug('AdapterProxy:initialize')
self.__closeTrans = False
self.__trans = TcpTransceiver(endPointInfo)
self.__object = objectProxy
self.__reactor = reactor
# self.__lock = threading.Lock()
self.__lock = NewLock()
self.__asyncProc = asyncProc
def terminate(self):
'''
@brief: 关闭
'''
tarsLogger.debug('AdapterProxy:terminate')
self.setCloseTrans(True)
def trans(self):
'''
@brief: 获取传输类
@return: 负责网络传输的trans
@rtype: Transceiver
'''
return self.__trans
def invoke(self, reqmsg):
'''
@brief: 远程过程调用处理方法
@param reqmsg: 请求响应报文
@type reqmsg: ReqMessage
@return: 错误码0表示成功-1表示连接失败
@rtype: int
'''
tarsLogger.debug('AdapterProxy:invoke')
assert(self.__trans)
if (not self.__trans.hasConnected() and
not self.__trans.isConnecting):
# -1表示连接失败
return -1
reqmsg.request.iRequestId = self.__object.getTimeoutQueue().generateId()
self.__object.getTimeoutQueue().push(reqmsg, reqmsg.request.iRequestId)
self.__reactor.notify(self)
return 0
def finished(self, rsp):
'''
@brief: 远程过程调用返回处理
@param rsp: 响应报文
@type rsp: ResponsePacket
@return: 函数是否执行成功
@rtype: bool
'''
tarsLogger.debug('AdapterProxy:finished')
reqmsg = self.__object.getTimeoutQueue().pop(rsp.iRequestId)
if not reqmsg:
tarsLogger.error(
'finished, can not get ReqMessage, may be timeout, id: %d',
rsp.iRequestId)
return False
reqmsg.response = rsp
if reqmsg.type == ReqMessage.SYNC_CALL:
return reqmsg.servant._finished(reqmsg)
elif reqmsg.callback:
self.__asyncProc.put(reqmsg)
return True
tarsLogger.error('finished, adapter proxy finish fail, id: %d, ret: %d',
rsp.iRequestId, rsp.iRet)
return False
# 检测连接是否失败,失效时重连
def checkActive(self, forceConnect=False):
'''
@brief: 检测连接是否失效
@param forceConnect: 是否强制发起连接为true时不对状态进行判断就发起连接
@type forceConnect: bool
@return: 连接是否有效
@rtype: bool
'''
tarsLogger.debug('AdapterProxy:checkActive')
# self.__lock.acquire()
lock = LockGuard(self.__lock)
tarsLogger.info('checkActive, %s, forceConnect: %s',
self.__trans.getEndPointInfo(), forceConnect)
if not self.__trans.isConnecting() and not self.__trans.hasConnected():
self.doReconnect()
# self.__lock.release()
return self.__trans.isConnecting() or self.__trans.hasConnected()
def doReconnect(self):
'''
@brief: 重新发起连接
@return: None
@rtype: None
'''
tarsLogger.debug('AdapterProxy:doReconnect')
assert(self.__trans)
self.__trans.reInit()
tarsLogger.info('doReconnect, connect: %s, fd:%d',
self.__trans.getEndPointInfo(),
self.__trans.getFd())
self.__reactor.registerAdapter(self, select.EPOLLIN | select.EPOLLOUT)
def sendRequest(self):
'''
@brief: 把队列中的请求放到Transceiver的发送缓存里
@return: 放入缓存的数据长度
@rtype: int
'''
tarsLogger.debug('AdapterProxy:sendRequest')
if not self.__trans.hasConnected():
return False
reqmsg = self.__object.popRequest()
blen = 0
while reqmsg:
reqmsg.adapter = self
buf = reqmsg.packReq()
self.__trans.writeToSendBuf(buf)
tarsLogger.info('sendRequest, id: %d, len: %d',
reqmsg.request.iRequestId, len(buf))
blen += len(buf)
# 合并一次发送的包 最大合并至8k 提高异步时客户端效率?
if (self.__trans.getEndPointInfo().getConnType() == EndPointInfo.SOCK_UDP
or blen > 8192):
break
reqmsg = self.__object.popRequest()
return blen
def finishConnect(self):
'''
@brief: 使用的非阻塞socket连接不能立刻判断是否连接成功
在epoll响应后调用此函数处理connect结束后的操作
@return: 是否连接成功
@rtype: bool
'''
tarsLogger.debug('AdapterProxy:finishConnect')
success = True
errmsg = ''
try:
ret = self.__trans.getSock().getsockopt(
socket.SOL_SOCKET, socket.SO_ERROR)
if ret:
success = False
errmsg = os.strerror(ret)
except Exception as msg:
errmsg = msg
success = False
if not success:
self.__reactor.unregisterAdapter(
self, socket.EPOLLIN | socket.EPOLLOUT)
self.__trans.close()
self.__trans.setConnFailed()
tarsLogger.error(
'AdapterProxy finishConnect, exception: %s, error: %s',
self.__trans.getEndPointInfo(), errmsg)
return False
self.__trans.setConnected()
self.__reactor.notify(self)
tarsLogger.info('AdapterProxy finishConnect, connect %s success',
self.__trans.getEndPointInfo())
return True
def finishInvoke(self, isTimeout):
pass
# 弹出请求报文
def popRequest(self):
pass
def shouldCloseTrans(self):
'''
@brief: 是否设置关闭连接
@return: 关闭连接的flag的值
@rtype: bool
'''
return self.__closeTrans
def setCloseTrans(self, closeTrans):
'''
@brief: 设置关闭连接flag的值
@param closeTrans: 是否关闭连接
@type closeTrans: bool
@return: None
@rtype: None
'''
self.__closeTrans = closeTrans
class QueryRegisterCallback(QueryFPrxCallback):
def __init__(self, adpManager):
self.__adpManager = adpManager
super(QueryRegisterCallback, self).__init__()
# QueryFPrxCallback.__init__(self)
def callback_findObjectById4All(self, ret, activeEp, inactiveEp):
eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
for x in activeEp if ret == 0 and x.istcp]
ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
for x in inactiveEp if ret == 0 and x.istcp]
self.__adpManager.setEndpoints(eplist, ieplist)
def callback_findObjectById4All_exception(self, ret):
tarsLogger.error('callback_findObjectById4All_exception ret: %d', ret)
class EndpointWeightType(Enum):
E_LOOP = 0
E_STATIC_WEIGHT = 1
class AdapterProxyManager:
'''
@brief: 管理Adapter
'''
def __init__(self):
tarsLogger.debug('AdapterProxyManager:__init__')
self.__comm = None
self.__object = None
# __adps的key=str(EndPointInfo) value=[EndPointInfo, AdapterProxy, cnt]
# cnt是访问次数
self.__adps = {}
self.__iadps = {}
self.__newLock = None
self.__isDirectProxy = True
self.__lastFreshTime = 0
self.__queryRegisterCallback = QueryRegisterCallback(self)
self.__regAdapterProxyDict = {}
self.__lastConHashPrxList = []
self.__consistentHashWeight = None
self.__weightType = EndpointWeightType.E_LOOP
self.__update = True
self.__lastWeightedProxyData = {}
def initialize(self, comm, objectProxy, eplist):
'''
@brief: 初始化
'''
tarsLogger.debug('AdapterProxyManager:initialize')
self.__comm = comm
self.__object = objectProxy
self.__newLock = NewLock()
self.__isDirectProxy = len(eplist) > 0
if self.__isDirectProxy:
self.setEndpoints(eplist, {})
else:
self.refreshEndpoints()
def terminate(self):
'''
@brief: 释放资源
'''
tarsLogger.debug('AdapterProxyManager:terminate')
# self.__lock.acquire()
lock = LockGuard(self.__newLock)
for ep, epinfo in self.__adps.items():
epinfo[1].terminate()
self.__adps = {}
self.__lock.release()
def refreshEndpoints(self):
'''
@brief: 刷新服务器列表
@return: 新的服务列表
@rtype: EndPointInfo列表
'''
tarsLogger.debug('AdapterProxyManager:refreshEndpoints')
if self.__isDirectProxy:
return
interval = self.__comm.getProperty(
'refresh-endpoint-interval', float) / 1000
locator = self.__comm.getProperty('locator')
if '@' not in locator:
raise exception.TarsRegistryException(
'locator is not valid: ' + locator)
now = time.time()
last = self.__lastFreshTime
epSize = len(self.__adps)
if last + interval < now or (epSize <= 0 and last + 2 < now):
queryFPrx = self.__comm.stringToProxy(QueryFProxy, locator)
# 首次访问是同步调用,之后访问是异步调用
if epSize == 0 or last == 0:
ret, activeEps, inactiveEps = (
queryFPrx.findObjectById4All(self.__object.name()))
# 目前只支持TCP
eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
for x in activeEps if ret == 0 and x.istcp]
ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType)
for x in inactiveEps if ret == 0 and x.istcp]
self.setEndpoints(eplist, ieplist)
else:
queryFPrx.async_findObjectById4All(self.__queryRegisterCallback,
self.__object.name())
self.__lastFreshTime = now
def getEndpoints(self):
'''
@brief: 获取可用服务列表 如果启用分组,只返回同分组的服务端ip
@return: 获取节点列表
@rtype: EndPointInfo列表
'''
tarsLogger.debug('AdapterProxyManager:getEndpoints')
# self.__lock.acquire()
lock = LockGuard(self.__newLock)
ret = [x[1][0] for x in list(self.__adps.items())]
# self.__lock.release()
return ret
def setEndpoints(self, eplist, ieplist):
'''
@brief: 设置服务端信息
@para eplist: 活跃的被调节点列表
@para ieplist: 不活跃的被调节点列表
'''
tarsLogger.debug('AdapterProxyManager:setEndpoints')
adps = {}
iadps = {}
comm = self.__comm
isNeedNotify = False
# self.__lock.acquire()
lock = LockGuard(self.__newLock)
isStartStatic = True
for ep in eplist:
if ep.getWeightType() == 0:
isStartStatic = False
epstr = str(ep)
if epstr in self.__adps:
adps[epstr] = self.__adps[epstr]
continue
isNeedNotify = True
self.__update = True
adapter = AdapterProxy()
adapter.initialize(ep, self.__object,
comm.getReactor(), comm.getAsyncProc())
adapter.activatestateinreg = True
adps[epstr] = [ep, adapter, 0]
self.__adps, adps = adps, self.__adps
for iep in ieplist:
iepstr = str(iep)
if iepstr in self.__iadps:
iadps[iepstr] = self.__iadps[iepstr]
continue
isNeedNotify = True
adapter = AdapterProxy()
adapter.initialize(iep, self.__object,
comm.getReactor(), comm.getAsyncProc())
adapter.activatestateinreg = False
iadps[iepstr] = [iep, adapter, 0]
self.__iadps, iadps = iadps, self.__iadps
if isStartStatic:
self.__weightType = EndpointWeightType.E_STATIC_WEIGHT
else:
self.__weightType = EndpointWeightType.E_LOOP
# self.__lock.release()
if isNeedNotify:
self.__notifyEndpoints(self.__adps, self.__iadps)
# 关闭已经失效的连接
for ep in adps:
if ep not in self.__adps:
adps[ep][1].terminate()
def __notifyEndpoints(self, actives, inactives):
# self.__lock.acquire()
lock = LockGuard(self.__newLock)
self.__regAdapterProxyDict.clear()
self.__regAdapterProxyDict.update(actives)
self.__regAdapterProxyDict.update(inactives)
# self.__lock.release()
def __getNextValidProxy(self):
'''
@brief: 刷新本地缓存列表如果服务下线了要求删除本地缓存
@return:
@rtype: EndPointInfo列表
@todo: 优化负载均衡算法
'''
tarsLogger.debug('AdapterProxyManager:getNextValidProxy')
lock = LockGuard(self.__newLock)
if len(self.__adps) == 0:
raise TarsException("the activate adapter proxy is empty")
sortedActivateAdp = sorted(
list(self.__adps.items()), key=lambda item: item[1][2])
# self.refreshEndpoints()
# self.__lock.acquire()
sortedActivateAdpSize = len(sortedActivateAdp)
while sortedActivateAdpSize != 0:
if sortedActivateAdp[0][1][1].checkActive():
self.__adps[sortedActivateAdp[0][0]][2] += 1
# 返回的是 adapterProxy
return self.__adps[sortedActivateAdp[0][0]][1]
sortedActivateAdp.pop(0)
sortedActivateAdpSize -= 1
# 随机重连一个可用节点
adpPrx = list(self.__adps.items())[random.randint(
0, len(self.__adps))][1][1]
adpPrx.checkActive()
return None
# self.__lock.release()
def __getHashProxy(self, reqmsg):
if self.__weightType == EndpointWeightType.E_LOOP:
if reqmsg.isConHash:
return self.__getConHashProxyForNormal(reqmsg.hashCode)
else:
return self.__getHashProxyForNormal(reqmsg.hashCode)
else:
if reqmsg.isConHash:
return self.__getConHashProxyForWeight(reqmsg.hashCode)
else:
return self.__getHashProxyForWeight(reqmsg.hashCode)
def __getHashProxyForNormal(self, hashCode):
tarsLogger.debug('AdapterProxyManager:getHashProxyForNormal')
# self.__lock.acquire()
lock = LockGuard(self.__newLock)
regAdapterProxyList = sorted(
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
allPrxSize = len(regAdapterProxyList)
if allPrxSize == 0:
raise TarsException("the adapter proxy is empty")
hashNum = hashCode % allPrxSize
if regAdapterProxyList[hashNum][1][1].activatestateinreg and regAdapterProxyList[hashNum][1][1].checkActive():
epstr = regAdapterProxyList[hashNum][0]
self.__regAdapterProxyDict[epstr][2] += 1
if epstr in self.__adps:
self.__adps[epstr][2] += 1
elif epstr in self.__iadps:
self.__iadps[epstr][2] += 1
return self.__regAdapterProxyDict[epstr][1]
else:
if len(self.__adps) == 0:
raise TarsException("the activate adapter proxy is empty")
activeProxyList = list(self.__adps.items())
actPrxSize = len(activeProxyList)
while actPrxSize != 0:
hashNum = hashCode % actPrxSize
if activeProxyList[hashNum][1][1].checkActive():
self.__adps[activeProxyList[hashNum][0]][2] += 1
return self.__adps[activeProxyList[hashNum][0]][1]
activeProxyList.pop(hashNum)
actPrxSize -= 1
# 随机重连一个可用节点
adpPrx = list(self.__adps.items())[random.randint(
0, len(self.__adps))][1][1]
adpPrx.checkActive()
return None
def __getConHashProxyForNormal(self, hashCode):
tarsLogger.debug('AdapterProxyManager:getConHashProxyForNormal')
lock = LockGuard(self.__newLock)
if len(self.__regAdapterProxyDict) == 0:
raise TarsException("the adapter proxy is empty")
if self.__consistentHashWeight is None or self.__checkConHashChange(self.__lastConHashPrxList):
self.__updateConHashProxyWeighted()
if len(self.__consistentHashWeight.nodes) > 0:
conHashIndex = self.__consistentHashWeight.getNode(hashCode)
if conHashIndex in self.__regAdapterProxyDict and self.__regAdapterProxyDict[conHashIndex][1].activatestateinreg and self.__regAdapterProxyDict[conHashIndex][1].checkActive():
self.__regAdapterProxyDict[conHashIndex][2] += 1
if conHashIndex in self.__adps:
self.__adps[conHashIndex][2] += 1
elif conHashIndex in self.__iadps:
self.__iadps[conHashIndex][2] += 1
return self.__regAdapterProxyDict[conHashIndex][1]
else:
if len(self.__adps) == 0:
raise TarsException("the activate adapter proxy is empty")
activeProxyList = list(self.__adps.items())
actPrxSize = len(activeProxyList)
while actPrxSize != 0:
hashNum = hashCode % actPrxSize
if activeProxyList[hashNum][1][1].checkActive():
self.__adps[activeProxyList[hashNum][0]][2] += 1
return self.__adps[activeProxyList[hashNum][0]][1]
activeProxyList.pop(hashNum)
actPrxSize -= 1
# 随机重连一个可用节点
adpPrx = list(self.__adps.items())[random.randint(
0, len(self.__adps))][1][1]
adpPrx.checkActive()
return None
pass
else:
return self.__getHashProxyForNormal(hashCode)
def __getHashProxyForWeight(self, hashCode):
return None
pass
def __getConHashProxyForWeight(self, hashCode):
return None
pass
def __checkConHashChange(self, lastConHashPrxList):
tarsLogger.debug('AdapterProxyManager:checkConHashChange')
lock = LockGuard(self.__newLock)
if len(lastConHashPrxList) != len(self.__regAdapterProxyDict):
return True
regAdapterProxyList = sorted(
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
regAdapterProxyListSize = len(regAdapterProxyList)
for index in range(regAdapterProxyListSize):
if cmp(lastConHashPrxList[index][0], regAdapterProxyList[index][0]) != 0:
return True
return False
def __updateConHashProxyWeighted(self):
tarsLogger.debug('AdapterProxyManager:updateConHashProxyWeighted')
lock = LockGuard(self.__newLock)
if len(self.__regAdapterProxyDict) == 0:
raise TarsException("the adapter proxy is empty")
self.__lastConHashPrxList = sorted(
list(self.__regAdapterProxyDict.items()), key=lambda item: item[0])
nodes = []
for var in self.__lastConHashPrxList:
nodes.append(var[0])
if self.__consistentHashWeight is None:
self.__consistentHashWeight = ConsistentHashNew(nodes)
else:
theOldActiveNodes = [
var for var in nodes if var in self.__consistentHashWeight.nodes]
theOldInactiveNodes = [
var for var in self.__consistentHashWeight.nodes if var not in theOldActiveNodes]
for var in theOldInactiveNodes:
self.__consistentHashWeight.removeNode(var)
theNewActiveNodes = [
var for var in nodes if var not in theOldActiveNodes]
for var in theNewActiveNodes:
self.__consistentHashWeight.addNode(var)
self.__consistentHashWeight.nodes = nodes
pass
def __getWeightedProxy(self):
tarsLogger.debug('AdapterProxyManager:getWeightedProxy')
lock = LockGuard(self.__newLock)
if len(self.__adps) == 0:
raise TarsException("the activate adapter proxy is empty")
if self.__update is True:
self.__lastWeightedProxyData.clear()
weightedProxyData = {}
minWeight = (list(self.__adps.items())[0][1][0]).getWeight()
for item in list(self.__adps.items()):
weight = (item[1][0].getWeight())
weightedProxyData[item[0]] = (weight)
if minWeight > weight:
minWeight = weight
if minWeight <= 0:
addWeight = -minWeight + 1
for item in list(weightedProxyData.items()):
item[1] += addWeight
self.__update = False
self.__lastWeightedProxyData = weightedProxyData
weightedProxyData = self.__lastWeightedProxyData
while len(weightedProxyData) > 0:
total = sum(weightedProxyData.values())
rand = random.randint(1, total)
temp = 0
for item in list(weightedProxyData.items()):
temp += item[1]
if rand <= temp:
if self.__adps[item[0]][1].checkActive():
self.__adps[item[0]][2] += 1
return self.__adps[item[0]][1]
else:
weightedProxyData.pop(item[0])
break
# 没有一个活跃的节点
# 随机重连一个可用节点
adpPrx = list(self.__adps.items())[random.randint(
0, len(self.__adps))][1][1]
adpPrx.checkActive()
return None
def selectAdapterProxy(self, reqmsg):
'''
@brief: 刷新本地缓存列表如果服务下线了要求删除本地缓存通过一定算法返回AdapterProxy
@param: reqmsg:请求响应报文
@type reqmsg: ReqMessage
@return:
@rtype: EndPointInfo列表
@todo: 优化负载均衡算法
'''
tarsLogger.debug('AdapterProxyManager:selectAdapterProxy')
self.refreshEndpoints()
if reqmsg.isHash:
return self.__getHashProxy(reqmsg)
else:
if self.__weightType == EndpointWeightType.E_STATIC_WEIGHT:
return self.__getWeightedProxy()
else:
return self.__getNextValidProxy()

View File

@ -0,0 +1,201 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __rpc.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 异步rpc实现
'''
import threading
import Queue
from __logger import tarsLogger
from __packet import ResponsePacket
from __servantproxy import ServantProxy
class AsyncProcThread:
'''
@brief: 异步调用线程管理类
'''
def __init__(self):
tarsLogger.debug('AsyncProcThread:__init__')
self.__initialize = False
self.__runners = []
self.__queue = None
self.__nrunner = 0
self.__popTimeout = 0.1
def __del__(self):
tarsLogger.debug('AsyncProcThread:__del__')
def initialize(self, nrunner=3):
'''
@brief: 使用AsyncProcThread前必须先调用此函数
@param nrunner: 异步线程个数
@type nrunner: int
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:initialize')
if self.__initialize:
return
self.__nrunner = nrunner
self.__queue = Queue.Queue()
self.__initialize = True
def terminate(self):
'''
@brief: 关闭所有异步线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:terminate')
for runner in self.__runners:
runner.terminate()
for runner in self.__runners:
runner.join()
self.__runners = []
def put(self, reqmsg):
'''
@brief: 处理数据入队列
@param reqmsg: 待处理数据
@type reqmsg: ReqMessage
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:put')
# 异步请求超时
if not reqmsg.response:
reqmsg.response = ResponsePacket()
reqmsg.response.iVerson = reqmsg.request.iVerson
reqmsg.response.cPacketType = reqmsg.request.cPacketType
reqmsg.response.iRequestId = reqmsg.request.iRequestId
reqmsg.response.iRet = ServantProxy.TARSASYNCCALLTIMEOUT
self.__queue.put(reqmsg)
def pop(self):
'''
@brief: 处理数据出队列
@return: ReqMessage
@rtype: ReqMessage
'''
# tarsLogger.debug('AsyncProcThread:pop')
ret = None
try:
ret = self.__queue.get(True, self.__popTimeout)
except Queue.Empty:
pass
return ret
def start(self):
'''
@brief: 启动异步线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:start')
for i in xrange(self.__nrunner):
runner = AsyncProcThreadRunner()
runner.initialize(self)
runner.start()
self.__runners.append(runner)
class AsyncProcThreadRunner(threading.Thread):
'''
@brief: 异步调用线程
'''
def __init__(self):
tarsLogger.debug('AsyncProcThreadRunner:__init__')
super(AsyncProcThreadRunner, self).__init__()
# threading.Thread.__init__(self)
self.__terminate = False
self.__initialize = False
self.__procQueue = None
def __del__(self):
tarsLogger.debug('AsyncProcThreadRunner:__del__')
def initialize(self, queue):
'''
@brief: 使用AsyncProcThreadRunner前必须调用此函数
@param queue: 有pop()的类用于提取待处理数据
@type queue: AsyncProcThread
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThreadRunner:initialize')
self.__procQueue = queue
def terminate(self):
'''
@brief: 关闭线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThreadRunner:terminate')
self.__terminate = True
def run(self):
'''
@brief: 线程启动函数执行异步调用
'''
tarsLogger.debug('AsyncProcThreadRunner:run')
while not self.__terminate:
if self.__terminate:
break
reqmsg = self.__procQueue.pop()
if not reqmsg or not reqmsg.callback:
continue
if reqmsg.adapter:
succ = reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS
reqmsg.adapter.finishInvoke(succ)
try:
reqmsg.callback.onDispatch(reqmsg)
except Exception, msg:
tarsLogger.error('AsyncProcThread excepttion: %s', msg)
tarsLogger.debug('AsyncProcThreadRunner:run finished')
class ServantProxyCallback(object):
'''
@brief: 异步回调对象基类
'''
def __init__(self):
tarsLogger.debug('ServantProxyCallback:__init__')
def onDispatch(reqmsg):
'''
@brief: 分配响应报文到对应的回调函数
@param queue: 有pop()的类用于提取待处理数据
@type queue: AsyncProcThread
@return: None
@rtype: None
'''
raise NotImplementedError()

View File

@ -0,0 +1,85 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
__version__ = "0.0.1"
from .__util import util
from .__tars import TarsInputStream
from .__tars import TarsOutputStream
from .__tup import TarsUniPacket
class tarscore:
class TarsInputStream(TarsInputStream):
pass
class TarsOutputStream(TarsOutputStream):
pass
class TarsUniPacket(TarsUniPacket):
pass
class boolean(util.boolean):
pass
class int8(util.int8):
pass
class uint8(util.uint8):
pass
class int16(util.int16):
pass
class uint16(util.uint16):
pass
class int32(util.int32):
pass
class uint32(util.uint32):
pass
class int64(util.int64):
pass
class float(util.float):
pass
class double(util.double):
pass
class bytes(util.bytes):
pass
class string(util.string):
pass
class struct(util.struct):
pass
@staticmethod
def mapclass(ktype, vtype): return util.mapclass(ktype, vtype)
@staticmethod
def vctclass(vtype): return util.vectorclass(vtype)
@staticmethod
def printHex(buff): util.printHex(buff)

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __logger.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 日志模块
'''
# 仅用于调试
import logging
from logging.handlers import RotatingFileHandler
import os
import re
tarsLogger = logging.getLogger('TARS client')
strToLoggingLevel = {
"critical": logging.CRITICAL,
"error": logging.ERROR,
"warn": logging.WARNING,
"info": logging.INFO,
"debug": logging.DEBUG,
"none": logging.NOTSET
}
#console = logging.StreamHandler()
# console.setLevel(logging.DEBUG)
#filelog = logging.FileHandler('tars.log')
# filelog.setLevel(logging.DEBUG)
#formatter = logging.Formatter('%(asctime)s | %(levelname)8s | [%(name)s] %(message)s', '%Y-%m-%d %H:%M:%S')
# console.setFormatter(formatter)
# filelog.setFormatter(formatter)
# tarsLogger.addHandler(console)
# tarsLogger.addHandler(filelog)
# tarsLogger.setLevel(logging.DEBUG)
# tarsLogger.setLevel(logging.INFO)
# tarsLogger.setLevel(logging.ERROR)
def createLogFile(filename):
if filename.endswith('/'):
raise ValueError("The logfile is a dir not a file")
if os.path.exists(filename) and os.path.isfile(filename):
pass
else:
fileComposition = str.split(filename, '/')
print(fileComposition)
currentFile = ''
for item in fileComposition:
if item == fileComposition[-1]:
currentFile += item
if not os.path.exists(currentFile) or not os.path.isfile(currentFile):
while True:
try:
os.mknod(currentFile)
break
except OSError as msg:
errno = re.findall(r"\d+", str(msg))
if len(errno) > 0 and errno[0] == '17':
currentFile += '.log'
continue
break
currentFile += (item + '/')
if not os.path.exists(currentFile):
os.mkdir(currentFile)
def initLog(logpath, logsize, lognum, loglevel):
createLogFile(logpath)
handler = RotatingFileHandler(filename=logpath, maxBytes=logsize,
backupCount=lognum)
formatter = logging.Formatter(
'%(asctime)s | %(levelname)6s | [%(filename)18s:%(lineno)4d] | [%(thread)d] %(message)s', '%Y-%m-%d %H:%M:%S')
handler.setFormatter(formatter)
tarsLogger.addHandler(handler)
if loglevel in strToLoggingLevel:
tarsLogger.setLevel(strToLoggingLevel[loglevel])
else:
tarsLogger.setLevel(strToLoggingLevel["error"])
if __name__ == '__main__':
tarsLogger.debug('debug log')
tarsLogger.info('info log')
tarsLogger.warning('warning log')
tarsLogger.error('error log')
tarsLogger.critical('critical log')

View File

@ -0,0 +1,104 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
from .__util import util
class RequestPacket(util.struct):
mapcls_context = util.mapclass(util.string, util.string)
mapcls_status = util.mapclass(util.string, util.string)
def __init__(self):
self.iVersion = 0
self.cPacketType = 0
self.iMessageType = 0
self.iRequestId = 0
self.sServantName = ''
self.sFuncName = ''
self.sBuffer = bytes()
self.iTimeout = 0
self.context = RequestPacket.mapcls_context()
self.status = RequestPacket.mapcls_status()
@staticmethod
def writeTo(oos, value):
oos.write(util.int16, 1, value.iVersion)
oos.write(util.int8, 2, value.cPacketType)
oos.write(util.int32, 3, value.iMessageType)
oos.write(util.int32, 4, value.iRequestId)
oos.write(util.string, 5, value.sServantName)
oos.write(util.string, 6, value.sFuncName)
oos.write(util.bytes, 7, value.sBuffer)
oos.write(util.int32, 8, value.iTimeout)
oos.write(RequestPacket.mapcls_context, 9, value.context)
oos.write(RequestPacket.mapcls_status, 10, value.status)
@staticmethod
def readFrom(ios):
value = RequestPacket()
value.iVersion = ios.read(util.int16, 1, True, 0)
print(("iVersion = %d" % value.iVersion))
value.cPacketType = ios.read(util.int8, 2, True, 0)
print(("cPackerType = %d" % value.cPacketType))
value.iMessageType = ios.read(util.int32, 3, True, 0)
print(("iMessageType = %d" % value.iMessageType))
value.iRequestId = ios.read(util.int32, 4, True, 0)
print(("iRequestId = %d" % value.iRequestId))
value.sServantName = ios.read(util.string, 5, True, '22222222')
value.sFuncName = ios.read(util.string, 6, True, '')
value.sBuffer = ios.read(util.bytes, 7, True, value.sBuffer)
value.iTimeout = ios.read(util.int32, 8, True, 0)
value.context = ios.read(
RequestPacket.mapcls_context, 9, True, value.context)
value.status = ios.read(
RequestPacket.mapcls_status, 10, True, value.status)
return value
class ResponsePacket(util.struct):
__tars_class__ = "tars.RpcMessage.ResponsePacket"
mapcls_status = util.mapclass(util.string, util.string)
def __init__(self):
self.iVersion = 0
self.cPacketType = 0
self.iRequestId = 0
self.iMessageType = 0
self.iRet = 0
self.sBuffer = bytes()
self.status = RequestPacket.mapcls_status()
@staticmethod
def writeTo(oos, value):
oos.write(util.int16, 1, value.iVersion)
oos.write(util.int8, 2, value.cPacketType)
oos.write(util.int32, 3, value.iRequestId)
oos.write(util.int32, 4, value.iMessageType)
oos.write(util.int32, 5, value.iRet)
oos.write(util.bytes, 6, value.sBuffer)
oos.write(value.mapcls_status, 7, value.status)
@staticmethod
def readFrom(ios):
value = ResponsePacket()
value.iVersion = ios.read(util.int16, 1, True)
value.cPacketType = ios.read(util.int8, 2, True)
value.iRequestId = ios.read(util.int32, 3, True)
value.iMessageType = ios.read(util.int32, 4, True)
value.iRet = ios.read(util.int32, 5, True)
value.sBuffer = ios.read(util.bytes, 6, True)
value.status = ios.read(value.mapcls_status, 7, True)
return value

441
danmu/danmaku/tars/__rpc.py Normal file
View File

@ -0,0 +1,441 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __rpc.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: rpc调用逻辑实现
'''
import time
import argparse
from .__logger import tarsLogger
from .__logger import initLog
from .__trans import EndPointInfo
from .__TimeoutQueue import TimeoutQueue
from .__TimeoutQueue import QueueTimeout
from .__trans import FDReactor
from .__adapterproxy import AdapterProxyManager
from .__servantproxy import ServantProxy
from .exception import (TarsException)
from .__async import AsyncProcThread
class Communicator:
'''
@brief: 通讯器创建和维护ServantProxyObjectProxyFDReactor线程和超时线程
'''
default_config = {'tars':
{'application':
{'client':
{'async-invoke-timeout': 20000,
'asyncthread': 0,
'locator': '',
'loglevel': 'error',
'logpath': 'tars.log',
'logsize': 15728640,
'lognum': 0,
'refresh-endpoint-interval': 60000,
'sync-invoke-timeout': 5000}}}}
def __init__(self, config={}):
tarsLogger.debug('Communicator:__init__')
self.__terminate = False
self.__initialize = False
self.__objects = {}
self.__servants = {}
self.__reactor = None
self.__qTimeout = None
self.__asyncProc = None
self.__config = Communicator.default_config.copy()
self.__config.update(config)
self.initialize()
def __del__(self):
tarsLogger.debug('Communicator:__del__')
def initialize(self):
'''
@brief: 使用通讯器前必须先调用此函数
'''
tarsLogger.debug('Communicator:initialize')
if self.__initialize:
return
logpath = self.getProperty('logpath')
logsize = self.getProperty('logsize', int)
lognum = self.getProperty('lognum', int)
loglevel = self.getProperty('loglevel')
initLog(logpath, logsize, lognum, loglevel)
self.__reactor = FDReactor()
self.__reactor.initialize()
self.__reactor.start()
self.__qTimeout = QueueTimeout()
self.__qTimeout.setHandler(self.handleTimeout)
self.__qTimeout.start()
async_num = self.getProperty('asyncthread', int)
self.__asyncProc = AsyncProcThread()
self.__asyncProc.initialize(async_num)
self.__asyncProc.start()
self.__initialize = True
def terminate(self):
'''
@brief: 不再使用通讯器需调用此函数释放资源
'''
tarsLogger.debug('Communicator:terminate')
if not self.__initialize:
return
self.__reactor.terminate()
self.__qTimeout.terminate()
self.__asyncProc.terminate()
for objName in self.__servants:
self.__servants[objName]._terminate()
for objName in self.__objects:
self.__objects[objName].terminate()
self.__objects = {}
self.__servants = {}
self.__reactor = None
self.__initialize = False
def parseConnAddr(self, connAddr):
'''
@brief: 解析connAddr字符串
@param connAddr: 连接地址
@type connAddr: str
@return: 解析结果
@rtype: dict, key是strval里name是str
timeout是floatendpoint是EndPointInfo的list
'''
tarsLogger.debug('Communicator:parseConnAddr')
connAddr = connAddr.strip()
connInfo = {
'name': '',
'timeout': -1,
'endpoint': []
}
if '@' not in connAddr:
connInfo['name'] = connAddr
return connInfo
try:
tks = connAddr.split('@')
connInfo['name'] = tks[0]
tks = tks[1].lower().split(':')
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-h')
parser.add_argument('-p')
parser.add_argument('-t')
for tk in tks:
argv = tk.split()
if argv[0] != 'tcp':
raise TarsException(
'unsupport transmission protocal : %s' % connInfo['name'])
mes = parser.parse_args(argv[1:])
try:
ip = mes.h if mes.h is not None else ''
port = int(mes.p) if mes.p is not None else '-1'
timeout = int(mes.t) if mes.t is not None else '-1'
connInfo['endpoint'].append(
EndPointInfo(ip, port, timeout))
except Exception:
raise TarsException('Unrecognized option : %s' % mes)
except TarsException:
raise
except Exception as exp:
raise TarsException(exp)
return connInfo
def getReactor(self):
'''
@brief: 获取reactor
'''
return self.__reactor
def getAsyncProc(self):
'''
@brief: 获取asyncProc
'''
return self.__asyncProc
def getProperty(self, name, dt_type=str):
'''
@brief: 获取配置
@param name: 配置名称
@type name: str
@param dt_type: 数据类型
@type name: type
@return: 配置内容
@rtype: str
'''
try:
ret = self.__config['tars']['application']['client'][name]
ret = dt_type(ret)
except:
ret = Communicator.default_config['tars']['application']['client'][name]
return ret
def setProperty(self, name, value):
'''
@brief: 修改配置
@param name: 配置名称
@type propertys: str
@param value: 配置内容
@type propertys: str
@return: 设置是否成功
@rtype: bool
'''
try:
self.__config['tars']['application']['client'][name] = value
return True
except:
return False
def setPropertys(self, propertys):
'''
@brief: 修改配置
@param propertys: 配置集合
@type propertys: map, key type: str, value type: str
@return:
@rtype: None
'''
pass
def updateConfig(self):
'''
@brief: 重新设置配置
'''
def stringToProxy(self, servantProxy, connAddr):
'''
@brief: 初始化ServantProxy
@param connAddr: 服务器地址信息
@type connAddr: str
@param servant: servant proxy
@type servant: ServantProxy子类
@return:
@rtype: None
@note: 如果connAddr的ServantObj连接过返回连接过的ServantProxy
如果没有连接过用参数servant初始化返回servant
'''
tarsLogger.debug('Communicator:stringToProxy')
connInfo = self.parseConnAddr(connAddr)
objName = connInfo['name']
if objName in self.__servants:
return self.__servants[objName]
objectPrx = ObjectProxy()
objectPrx.initialize(self, connInfo)
servantPrx = servantProxy()
servantPrx._initialize(self.__reactor, objectPrx)
self.__objects[objName] = objectPrx
self.__servants[objName] = servantPrx
return servantPrx
def handleTimeout(self):
'''
@brief: 处理超时事件
@return:
@rtype: None
'''
# tarsLogger.debug('Communicator:handleTimeout')
for obj in self.__objects.values():
obj.handleQueueTimeout()
class ObjectProxy:
'''
@brief: 一个object name在一个Communicator里有一个objectproxy
管理收发的消息队列
'''
DEFAULT_TIMEOUT = 3.0
def __init__(self):
tarsLogger.debug('ObjectProxy:__init__')
self.__name = ''
self.__timeout = ObjectProxy.DEFAULT_TIMEOUT
self.__comm = None
self.__epi = None
self.__adpmanager = None
self.__timeoutQueue = None
# self.__adapter = None
self.__initialize = False
def __del__(self):
tarsLogger.debug('ObjectProxy:__del__')
def initialize(self, comm, connInfo):
'''
@brief: 初始化使用ObjectProxy前必须调用
@param comm: 通讯器
@type comm: Communicator
@param connInfo: 连接信息
@type comm: dict
@return: None
@rtype: None
'''
if self.__initialize:
return
tarsLogger.debug('ObjectProxy:initialize')
self.__comm = comm
# async-invoke-timeout来设置队列时间
async_timeout = self.__comm.getProperty(
'async-invoke-timeout', float) / 1000
self.__timeoutQueue = TimeoutQueue(async_timeout)
self.__name = connInfo['name']
self.__timeout = self.__comm.getProperty(
'sync-invoke-timeout', float) / 1000
# 通过Communicator的配置设置超时
# 不再通过连接信息的-t来设置
# if connInfo['timeout'] != -1:
# self.__timeout = connInfo['timeout']
eplist = connInfo['endpoint']
self.__adpmanager = AdapterProxyManager()
self.__adpmanager.initialize(comm, self, eplist)
self.__initialize = True
def terminate(self):
'''
@brief: 回收资源不再使用ObjectProxy时调用
@return: None
@rtype: None
'''
tarsLogger.debug('ObjectProxy:terminate')
self.__timeoutQueue = None
self.__adpmanager.terminate()
self.__initialize = False
def name(self):
'''
@brief: 获取object name
@return: object name
@rtype: str
'''
return self.__name
# def setTimeout(self, timeout):
# '''
# @brief: 设置超时
# @param timeout: 超时时间单位为s
# @type timeout: float
# @return: None
# @rtype: None
# '''
# self.__timeout = timeout
# self.__timeoutQueue.setTimeout(timeout)
def timeout(self):
'''
@brief: 获取超时时间
@return: 超时时间单位为s
@rtype: float
'''
return self.__timeout
def getTimeoutQueue(self):
'''
@brief: 获取超时队列
@return: 超时队列
@rtype: TimeoutQueue
'''
return self.__timeoutQueue
def handleQueueTimeout(self):
'''
@brief: 超时事件发生时处理超时事务
@return: None
@rtype: None
'''
# tarsLogger.debug('ObjectProxy:handleQueueTimeout')
self.__timeoutQueue.timeout()
def invoke(self, reqmsg):
'''
@brief: 远程过程调用
@param reqmsg: 请求响应报文
@type reqmsg: ReqMessage
@return: 错误码
@rtype:
'''
tarsLogger.debug('ObjectProxy:invoke, objname: %s, func: %s',
self.__name, reqmsg.request.sFuncName)
# 负载均衡
# adapter = self.__adpmanager.getNextValidProxy()
adapter = self.__adpmanager.selectAdapterProxy(reqmsg)
if not adapter:
tarsLogger.error("invoke %s, select adapter proxy return None",
self.__name)
return -2
adapter.checkActive(True)
reqmsg.adapter = adapter
return adapter.invoke(reqmsg)
# 弹出请求报文
def popRequest(self):
'''
@brief: 返回消息队列里的请求响应报文FIFO
不删除TimeoutQueue里的数据响应时要用
@return: 请求响应报文
@rtype: ReqMessage
'''
return self.__timeoutQueue.pop(erase=False)
if __name__ == '__main__':
connAddr = "apptest.lightServer.lightServantObj@tcp -h 10.130.64.220 -p 10001 -t 10000"
connAddr = 'MTT.BookMarksUnifyServer.BookMarksUnifyObj@tcp -h 172.17.149.77 -t 60000 -p 10023'
comm = Communicator()
comm.initialize()
servant = ServantProxy()
servant = comm.stringToProxy(connAddr, servant)
print(servant.tars_timeout())
try:
rsp = servant.tars_invoke(
ServantProxy.TARSNORMAL, "test", '', ServantProxy.mapcls_context(), None)
print('Servant invoke success, request id: %d, iRet: %d' % (
rsp.iRequestId, rsp.iRet))
except Exception as msg:
print(msg)
finally:
servant.tars_terminate()
time.sleep(2)
print('app closing ...')
comm.terminate()
time.sleep(2)
print('cpp closed')

View File

@ -0,0 +1,358 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __servantproxy.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: rpc抽离出servantproxy
'''
import threading
import time
from __logger import tarsLogger
from __util import util
from __packet import RequestPacket
# from __packet import ResponsePacket
from __TimeoutQueue import ReqMessage
import exception
from exception import TarsException
class ServantProxy(object):
'''
@brief: 1远程对象的本地代理
2同名servant在一个通信器中最多只有一个实例
3防止和用户在Tars中定义的函数名冲突接口以tars_开头
'''
# 服务器响应的错误码
TARSSERVERSUCCESS = 0 # 服务器端处理成功
TARSSERVERDECODEERR = -1 # 服务器端解码异常
TARSSERVERENCODEERR = -2 # 服务器端编码异常
TARSSERVERNOFUNCERR = -3 # 服务器端没有该函数
TARSSERVERNOSERVANTERR = -4 # 服务器端五该Servant对象
TARSSERVERRESETGRID = -5 # 服务器端灰度状态不一致
TARSSERVERQUEUETIMEOUT = -6 # 服务器队列超过限制
TARSASYNCCALLTIMEOUT = -7 # 异步调用超时
TARSPROXYCONNECTERR = -8 # proxy链接异常
TARSSERVERUNKNOWNERR = -99 # 服务器端未知异常
TARSVERSION = 1
TUPVERSION = 2
TUPVERSION2 = 3
TARSNORMAL = 0
TARSONEWAY = 1
TARSMESSAGETYPENULL = 0
TARSMESSAGETYPEHASH = 1
TARSMESSAGETYPEGRID = 2
TARSMESSAGETYPEDYED = 4
TARSMESSAGETYPESAMPLE = 8
TARSMESSAGETYPEASYNC = 16
mapcls_context = util.mapclass(util.string, util.string)
def __init__(self):
tarsLogger.debug('ServantProxy:__init__')
self.__reactor = None
self.__object = None
self.__initialize = False
def __del__(self):
tarsLogger.debug('ServantProxy:__del__')
def _initialize(self, reactor, obj):
'''
@brief: 初始化函数需要调用才能使用ServantProxy
@param reactor: 网络管理的reactor实例
@type reactor: FDReactor
@return: None
@rtype: None
'''
tarsLogger.debug('ServantProxy:_initialize')
assert(reactor and obj)
if self.__initialize:
return
self.__reactor = reactor
self.__object = obj
self.__initialize = True
def _terminate(self):
'''
@brief: 不再使用ServantProxy时调用会释放相应资源
@return: None
@rtype: None
'''
tarsLogger.debug('ServantProxy:_terminate')
self.__object = None
self.__reactor = None
self.__initialize = False
def tars_name(self):
'''
@brief: 获取ServantProxy的名字
@return: ServantProxy的名字
@rtype: str
'''
return self.__object.name()
def tars_timeout(self):
'''
@brief: 获取超时时间单位是ms
@return: 超时时间
@rtype: int
'''
# 默认的为3S = ObjectProxy.DEFAULT_TIMEOUT
return int(self.__timeout() * 1000)
def tars_ping(self):
pass
# def tars_initialize(self):
# pass
# def tars_terminate(self):
# pass
def tars_invoke(self, cPacketType, sFuncName, sBuffer, context, status):
'''
@brief: TARS协议同步方法调用
@param cPacketType: 请求包类型
@type cPacketType: int
@param sFuncName: 调用函数名
@type sFuncName: str
@param sBuffer: 序列化后的发送参数
@type sBuffer: str
@param context: 上下文件信息
@type context: ServantProxy.mapcls_context
@param status: 状态信息
@type status:
@return: 响应报文
@rtype: ResponsePacket
'''
tarsLogger.debug('ServantProxy:tars_invoke, func: %s', sFuncName)
req = RequestPacket()
req.iVersion = ServantProxy.TARSVERSION
req.cPacketType = cPacketType
req.iMessageType = ServantProxy.TARSMESSAGETYPENULL
req.iRequestId = 0
req.sServantName = self.tars_name()
req.sFuncName = sFuncName
req.sBuffer = sBuffer
req.iTimeout = self.tars_timeout()
reqmsg = ReqMessage()
reqmsg.type = ReqMessage.SYNC_CALL
reqmsg.servant = self
reqmsg.lock = threading.Condition()
reqmsg.request = req
reqmsg.begtime = time.time()
# # test
reqmsg.isHash = True
reqmsg.isConHash = True
reqmsg.hashCode = 123456
rsp = None
try:
rsp = self.__invoke(reqmsg)
except exception.TarsSyncCallTimeoutException:
if reqmsg.adapter:
reqmsg.adapter.finishInvoke(True)
raise
except TarsException:
raise
except:
raise TarsException('ServantProxy::tars_invoke excpetion')
if reqmsg.adapter:
reqmsg.adapter.finishInvoke(False)
return rsp
def tars_invoke_async(self, cPacketType, sFuncName, sBuffer,
context, status, callback):
'''
@brief: TARS协议同步方法调用
@param cPacketType: 请求包类型
@type cPacketType: int
@param sFuncName: 调用函数名
@type sFuncName: str
@param sBuffer: 序列化后的发送参数
@type sBuffer: str
@param context: 上下文件信息
@type context: ServantProxy.mapcls_context
@param status: 状态信息
@type status:
@param callback: 异步调用回调对象
@type callback: ServantProxyCallback的子类
@return: 响应报文
@rtype: ResponsePacket
'''
tarsLogger.debug('ServantProxy:tars_invoke')
req = RequestPacket()
req.iVersion = ServantProxy.TARSVERSION
req.cPacketType = cPacketType if callback else ServantProxy.TARSONEWAY
req.iMessageType = ServantProxy.TARSMESSAGETYPENULL
req.iRequestId = 0
req.sServantName = self.tars_name()
req.sFuncName = sFuncName
req.sBuffer = sBuffer
req.iTimeout = self.tars_timeout()
reqmsg = ReqMessage()
reqmsg.type = ReqMessage.ASYNC_CALL if callback else ReqMessage.ONE_WAY
reqmsg.callback = callback
reqmsg.servant = self
reqmsg.request = req
reqmsg.begtime = time.time()
rsp = None
try:
rsp = self.__invoke(reqmsg)
except TarsException:
raise
except Exception:
raise TarsException('ServantProxy::tars_invoke excpetion')
if reqmsg.adapter:
reqmsg.adapter.finishInvoke(False)
return rsp
def __timeout(self):
'''
@brief: 获取超时时间单位是s
@return: 超时时间
@rtype: float
'''
return self.__object.timeout()
def __invoke(self, reqmsg):
'''
@brief: 远程过程调用
@param reqmsg: 请求数据
@type reqmsg: ReqMessage
@return: 调用成功或失败
@rtype: bool
'''
tarsLogger.debug('ServantProxy:invoke, func: %s',
reqmsg.request.sFuncName)
ret = self.__object.invoke(reqmsg)
if ret == -2:
errmsg = ('ServantProxy::invoke fail, no valid servant,' +
' servant name : %s, function name : %s' %
(reqmsg.request.sServantName,
reqmsg.request.sFuncName))
raise TarsException(errmsg)
if ret == -1:
errmsg = ('ServantProxy::invoke connect fail,' +
' servant name : %s, function name : %s, adapter : %s' %
(reqmsg.request.sServantName,
reqmsg.request.sFuncName,
reqmsg.adapter.getEndPointInfo()))
raise TarsException(errmsg)
elif ret != 0:
errmsg = ('ServantProxy::invoke unknown fail, ' +
'Servant name : %s, function name : %s' %
(reqmsg.request.sServantName,
reqmsg.request.sFuncName))
raise TarsException(errmsg)
if reqmsg.type == ReqMessage.SYNC_CALL:
reqmsg.lock.acquire()
reqmsg.lock.wait(self.__timeout())
reqmsg.lock.release()
if not reqmsg.response:
errmsg = ('ServantProxy::invoke timeout: %d, servant name'
': %s, adapter: %s, request id: %d' % (
self.tars_timeout(),
self.tars_name(),
reqmsg.adapter.trans().getEndPointInfo(),
reqmsg.request.iRequestId))
raise exception.TarsSyncCallTimeoutException(errmsg)
elif reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS:
return reqmsg.response
else:
errmsg = 'servant name: %s, function name: %s' % (
self.tars_name(), reqmsg.request.sFuncName)
self.tarsRaiseException(reqmsg.response.iRet, errmsg)
def _finished(self, reqmsg):
'''
@brief: 通知远程过程调用线程响应报文到了
@param reqmsg: 请求响应报文
@type reqmsg: ReqMessage
@return: 函数执行成功或失败
@rtype: bool
'''
tarsLogger.debug('ServantProxy:finished')
if not reqmsg.lock:
return False
reqmsg.lock.acquire()
reqmsg.lock.notifyAll()
reqmsg.lock.release()
return True
def tarsRaiseException(self, errno, desc):
'''
@brief: 服务器调用失败根据服务端给的错误码抛出异常
@param errno: 错误码
@type errno: int
@param desc: 错误描述
@type desc: str
@return: 没有返回值函数会抛出异常
@rtype:
'''
if errno == ServantProxy.TARSSERVERSUCCESS:
return
elif errno == ServantProxy.TARSSERVERDECODEERR:
raise exception.TarsServerDecodeException(
"server decode exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSSERVERENCODEERR:
raise exception.TarsServerEncodeException(
"server encode exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSSERVERNOFUNCERR:
raise exception.TarsServerNoFuncException(
"server function mismatch exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSSERVERNOSERVANTERR:
raise exception.TarsServerNoServantException(
"server servant mismatch exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSSERVERRESETGRID:
raise exception.TarsServerResetGridException(
"server reset grid exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSSERVERQUEUETIMEOUT:
raise exception.TarsServerQueueTimeoutException(
"server queue timeout exception: errno: %d, msg: %s" % (errno, desc))
elif errno == ServantProxy.TARSPROXYCONNECTERR:
raise exception.TarsServerQueueTimeoutException(
"server connection lost: errno: %d, msg: %s" % (errno, desc))
else:
raise exception.TarsServerUnknownException(
"server unknown exception: errno: %d, msg: %s" % (errno, desc))

View File

@ -0,0 +1,546 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
import struct
from .__util import util
from .exception import *
class BinBuffer:
def __init__(self, buff=bytes()):
self.buffer = buff
self.position = 0
def writeBuf(self, buff):
self.buffer += buff
def getBuffer(self):
return self.buffer
def length(self):
return len(self.buffer)
class DataHead:
EN_INT8 = 0
EN_INT16 = 1
EN_INT32 = 2
EN_INT64 = 3
EN_FLOAT = 4
EN_DOUBLE = 5
EN_STRING1 = 6
EN_STRING4 = 7
EN_MAP = 8
EN_LIST = 9
EN_STRUCTBEGIN = 10
EN_STRUCTEND = 11
EN_ZERO = 12
EN_BYTES = 13
@staticmethod
def writeTo(buff, tag, vtype):
if tag < 15:
helper = (tag << 4) | vtype
buff.writeBuf(struct.pack('!B', helper))
else:
helper = (0xF0 | vtype) << 8 | tag
buff.writeBuf(struct.pack('!H', helper))
class TarsOutputStream(object):
def __init__(self):
self.__buffer = BinBuffer()
def __writeBoolean(self, tag, value):
self.__writeInt8(tag, int(value))
def __writeInt8(self, tag, value):
if value == 0:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_ZERO)
else:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT8)
self.__buffer.writeBuf(struct.pack('!b', value))
def __writeInt16(self, tag, value):
if value >= -128 and value <= 127:
self.__writeInt8(tag, value)
else:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT16)
self.__buffer.writeBuf(struct.pack('!h', value))
def __writeInt32(self, tag, value):
if value >= -32768 and value <= 32767:
self.__writeInt16(tag, value)
else:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT32)
self.__buffer.writeBuf(struct.pack('!i', value))
def __writeInt64(self, tag, value):
if value >= (-2147483648) and value <= 2147483647:
self.__writeInt32(tag, value)
else:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT64)
self.__buffer.writeBuf(struct.pack('!q', value))
def __writeFloat(self, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_FLOAT)
self.__buffer.writeBuf(struct.pack('!f', value))
def __writeDouble(self, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_DOUBLE)
self.__buffer.writeBuf(struct.pack('!d', value))
def __writeString(self, tag, value):
length = len(value)
if length <= 255:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRING1)
self.__buffer.writeBuf(struct.pack('!B', length))
self.__buffer.writeBuf(str.encode(value))
else:
DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRING4)
self.__buffer.writeBuf(struct.pack('!I', length))
self.__buffer.writeBuf(str.encode(value))
def __writeBytes(self, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_BYTES)
DataHead.writeTo(self.__buffer, 0, DataHead.EN_INT8)
length = len(value)
self.__writeInt32(0, length)
self.__buffer.buffer += value
self.__buffer.position += length
def __writeMap(self, coder, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_MAP)
self.__writeInt32(0, len(value))
for key in value:
self.write(coder.ktype, 0, key)
self.write(coder.vtype, 1, value.get(key))
def __writeVector(self, coder, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_LIST)
n = len(value)
self.__writeInt32(0, n)
for i in range(0, n):
self.write(value.vtype, 0, value[i])
def __writeStruct(self, coder, tag, value):
DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRUCTBEGIN)
value.writeTo(self, value)
DataHead.writeTo(self.__buffer, 0, DataHead.EN_STRUCTEND)
def write(self, coder, tag, value):
if coder.__tars_index__ == 999:
self.__writeBoolean(tag, value)
elif coder.__tars_index__ == 0:
self.__writeInt8(tag, value)
elif coder.__tars_index__ == 1:
self.__writeInt16(tag, value)
elif coder.__tars_index__ == 2:
self.__writeInt32(tag, value)
elif coder.__tars_index__ == 3:
self.__writeInt64(tag, value)
elif coder.__tars_index__ == 4:
self.__writeFloat(tag, value)
elif coder.__tars_index__ == 5:
self.__writeDouble(tag, value)
elif coder.__tars_index__ == 13:
self.__writeBytes(tag, value)
elif coder.__tars_index__ == 67:
self.__writeString(tag, value)
elif coder.__tars_index__ == 8:
self.__writeMap(coder, tag, value)
elif coder.__tars_index__ == 9:
self.__writeVector(coder, tag, value)
elif coder.__tars_index__ == 1011:
self.__writeStruct(coder, tag, value)
else:
raise TarsTarsUnsupportType(
"tars unsupport data type:" % coder.__tars_index__)
def getBuffer(self):
return self.__buffer.getBuffer()
def printHex(self):
util.printHex(self.__buffer.getBuffer())
class TarsInputStream(object):
def __init__(self, buff):
self.__buffer = BinBuffer(buff)
def __peekFrom(self):
helper, = struct.unpack_from(
'!B', self.__buffer.buffer, self.__buffer.position)
t = (helper & 0xF0) >> 4
p = (helper & 0x0F)
l = 1
if t >= 15:
l = 2
t, = struct.unpack_from(
'!B', self.__buffer.buffer, self.__buffer.position + 1)
return (t, p, l)
def __readFrom(self):
t, p, l = self.__peekFrom()
self.__buffer.position += l
return (t, p, l)
def __skipToStructEnd(self):
t, p, l = self.__readFrom()
while p != DataHead.EN_STRUCTEND:
self.__skipField(p)
t, p, l = self.__readFrom()
def __skipField(self, p):
if p == DataHead.EN_INT8:
self.__buffer.position += 1
elif p == DataHead.EN_INT16:
self.__buffer.position += 2
elif p == DataHead.EN_INT32:
self.__buffer.position += 4
elif p == DataHead.EN_INT64:
self.__buffer.position += 8
elif p == DataHead.EN_FLOAT:
self.__buffer.position += 4
elif p == DataHead.EN_DOUBLE:
self.__buffer.position += 8
elif p == DataHead.EN_STRING1:
length, = struct.unpack_from(
'!B', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += length + 1
elif p == DataHead.EN_STRING4:
length, = struct.unpack_from(
'!i', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += length + 4
elif p == DataHead.EN_MAP:
size = self.__readInt32(0, True)
for i in range(0, size * 2):
ti, pi, li = self.__readFrom()
self.__skipField(pi)
elif p == DataHead.EN_LIST:
size = self.__readInt32(0, True)
for i in range(0, size):
ti, pi, li = self.__readFrom()
self.__skipField(pi)
elif p == DataHead.EN_BYTES:
ti, pi, li = self.__readFrom()
if pi != DataHead.EN_INT8:
raise TarsTarsDecodeInvalidValue(
"skipField with invalid type, type value: %d, %d." % (p, pi))
size = self.__readInt32(0, True)
self.__buffer.position += size
elif p == DataHead.EN_STRUCTBEGIN:
self.__skipToStructEnd()
elif p == DataHead.EN_STRUCTEND:
pass
#self.__buffer.position += length + 1;
elif p == DataHead.EN_ZERO:
pass
#self.__buffer.position += length + 1;
else:
raise TarsTarsDecodeMismatch(
"skipField with invalid type, type value:%d" % p)
def __skipToTag(self, tag):
length = self.__buffer.length()
while self.__buffer.position < length:
t, p, l = self.__peekFrom()
if tag <= t or p == DataHead.EN_STRUCTEND:
return False if (p == DataHead.EN_STRUCTEND) else (t == tag)
self.__buffer.position += l
self.__skipField(p)
return False
def __readBoolean(self, tag, require, default=None):
v = self.__readInt8(tag, require)
if v is None:
return default
else:
return (v != 0)
def __readInt8(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_INT8:
value, = struct.unpack_from(
'!b', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 1
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Char' type mismatch, tag: %d , get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readInt16(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_INT8:
value, = struct.unpack_from(
'!b', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 1
return value
elif p == DataHead.EN_INT16:
value, = struct.unpack_from(
'!h', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 2
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Short' type mismatch, tag: %d , get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readInt32(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_INT8:
value, = struct.unpack_from(
'!b', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 1
return value
elif p == DataHead.EN_INT16:
value, = struct.unpack_from(
'!h', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 2
return value
elif p == DataHead.EN_INT32:
value, = struct.unpack_from(
'!i', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 4
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Int32' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readInt64(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_INT8:
value, = struct.unpack_from(
'!b', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 1
return value
elif p == DataHead.EN_INT16:
value, = struct.unpack_from(
'!h', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 2
return value
elif p == DataHead.EN_INT32:
value, = struct.unpack_from(
'!i', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 4
return value
elif p == DataHead.EN_INT64:
value, = struct.unpack_from(
'!q', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 8
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Int64' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readString(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_STRING1:
length, = struct.unpack_from(
'!B', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 1
value, = struct.unpack_from(
str(length) + "s", self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += length
return value
elif p == DataHead.EN_STRING4:
length, = struct.unpack_from(
'!i', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 4
value, = struct.unpack_from(
str(length) + "s", self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += length
return value
else:
raise TarsTarsDecodeMismatch(
"read 'string' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readBytes(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_BYTES:
ti, pi, li = self.__readFrom()
if pi != DataHead.EN_INT8:
raise TarsTarsDecodeMismatch(
"type mismatch, tag: %d, type: %d, %d" % (tag, p, pi))
size = self.__readInt32(0, True)
value, = struct.unpack_from(
str(size) + 's', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += size
return value
else:
raise TarsTarsDecodeMismatch(
"type mismatch, tag: %d, type: %d" % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readFloat(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_FLOAT:
value, = struct.unpack_from(
'!f', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 4
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Float' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readDouble(self, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_ZERO:
return 0
elif p == DataHead.EN_FLOAT:
value, = struct.unpack_from(
'!f', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 4
return value
elif p == DataHead.EN_DOUBLE:
value, = struct.unpack_from(
'!d', self.__buffer.buffer, self.__buffer.position)
self.__buffer.position += 8
return value
else:
raise TarsTarsDecodeMismatch(
"read 'Double' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readStruct(self, coder, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p != DataHead.EN_STRUCTBEGIN:
raise TarsTarsDecodeMismatch(
"read 'struct' type mismatch, tag: %d, get type: %d." % (tag, p))
value = coder.readFrom(self)
self.__skipToStructEnd()
return value
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readMap(self, coder, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_MAP:
size = self.__readInt32(0, True)
omap = coder()
for i in range(0, size):
k = self.read(coder.ktype, 0, True)
v = self.read(coder.vtype, 1, True)
omap[k] = v
return omap
else:
raise TarsTarsDecodeMismatch(
"read 'map' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def __readVector(self, coder, tag, require, default=None):
if self.__skipToTag(tag):
t, p, l = self.__readFrom()
if p == DataHead.EN_LIST:
size = self.__readInt32(0, True)
value = coder()
for i in range(0, size):
k = self.read(coder.vtype, 0, True)
value.append(k)
return value
else:
raise TarsTarsDecodeMismatch(
"read 'vector' type mismatch, tag: %d, get type: %d." % (tag, p))
elif require:
raise TarsTarsDecodeRequireNotExist(
"require field not exist, tag: %d" % tag)
return default
def read(self, coder, tag, require, default=None):
if coder.__tars_index__ == 999:
return self.__readBoolean(tag, require, default)
elif coder.__tars_index__ == 0:
return self.__readInt8(tag, require, default)
elif coder.__tars_index__ == 1:
return self.__readInt16(tag, require, default)
elif coder.__tars_index__ == 2:
return self.__readInt32(tag, require, default)
elif coder.__tars_index__ == 3:
return self.__readInt64(tag, require, default)
elif coder.__tars_index__ == 4:
return self.__readFloat(tag, require, default)
elif coder.__tars_index__ == 5:
return self.__readDouble(tag, require, default)
elif coder.__tars_index__ == 13:
return self.__readBytes(tag, require, default)
elif coder.__tars_index__ == 67:
return self.__readString(tag, require, default)
elif coder.__tars_index__ == 8:
return self.__readMap(coder, tag, require, default)
elif coder.__tars_index__ == 9:
return self.__readVector(coder, tag, require, default)
elif coder.__tars_index__ == 1011:
return self.__readStruct(coder, tag, require, default)
else:
raise TarsTarsUnsupportType(
"tars unsupport data type:" % coder.__tars_index__)
def printHex(self):
util.printHex(self.__buffer.buffer)

View File

@ -0,0 +1,575 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __trans.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 网络相关模块
'''
import socket
import select
import errno
import threading
from .__logger import tarsLogger
from .__TimeoutQueue import ReqMessage
class EndPointInfo:
'''
@brief: 保存每个连接端口的信息
'''
SOCK_TCP = 'TCP'
SOCK_UDP = 'UDP'
def __init__(self,
ip='',
port=0,
timeout=-1,
weight=0,
weightType=0,
connType=SOCK_TCP):
self.__ip = ip
self.__port = port
self.__timeout = timeout
self.__connType = connType
self.__weightType = weightType
self.__weight = weight
def getIp(self):
return self.__ip
def getPort(self):
return self.__port
def getConnType(self):
'''
@return: 传输层连接类型
@rtype: EndPointInfo.SOCK_TCP EndPointInfo.SOCK_UDP
'''
return self.__connType
def getWeightType(self):
return self.__weightType
def getWeight(self):
return self.__weight
def __str__(self):
return '%s %s:%s %d:%d' % (self.__connType, self.__ip, self.__port, self.__weightType, self.__weight)
class Transceiver:
'''
@brief: 网络传输基类提供网络send/recv接口
'''
CONNECTED = 0
CONNECTING = 1
UNCONNECTED = 2
def __init__(self, endPointInfo):
tarsLogger.debug('Transceiver:__init__, %s', endPointInfo)
self.__epi = endPointInfo
self.__sock = None
self.__connStatus = Transceiver.UNCONNECTED
self.__connFailed = False
# 这两个变量要给子类用不能用name mangling隐藏
self._sendBuff = ''
self._recvBuf = ''
def __del__(self):
tarsLogger.debug('Transceiver:__del__')
self.close()
def getSock(self):
'''
@return: socket对象
@rtype: socket.socket
'''
return self.__sock
def getFd(self):
'''
@brief: 获取socket的文件描述符
@return: 如果self.__sock没有建立返回-1
@rtype: int
'''
if self.__sock:
return self.__sock.fileno()
else:
return -1
def getEndPointInfo(self):
'''
@return: 端口信息
@rtype: EndPointInfo
'''
return self.__epi
def isValid(self):
'''
@return: 是否创建了socket
@rtype: bool
'''
return self.__sock is not None
def hasConnected(self):
'''
@return: 是否连接上了
@rtype: bool
'''
return self.isValid() and self.__connStatus == Transceiver.CONNECTED
def isConnFailed(self):
'''
@return: 是否连接失败
@rtype: bool
'''
return self.__connFailed
def isConnecting(self):
'''
@return: 是否正在连接
@rtype: bool
'''
return self.isValid() and self.__connStatus == Transceiver.CONNECTING
def setConnFailed(self):
'''
@brief: 设置为连接失败
@return: None
@rtype: None
'''
self.__connFailed = True
self.__connStatus = Transceiver.UNCONNECTED
def setConnected(self):
'''
@brief: 设置为连接完
@return: None
@rtype: None
'''
self.__connFailed = False
self.__connStatus = Transceiver.CONNECTED
def close(self):
'''
@brief: 关闭连接
@return: None
@rtype: None
@note: 多次调用不会有问题
'''
tarsLogger.debug('Transceiver:close')
if not self.isValid():
return
self.__sock.close()
self.__sock = None
self.__connStatus = Transceiver.UNCONNECTED
self.__connFailed = False
self._sendBuff = ''
self._recvBuf = ''
tarsLogger.info('trans close : %s' % self.__epi)
def writeToSendBuf(self, msg):
'''
@brief: 把数据添加到send buffer里
@param msg: 发送的数据
@type msg: str
@return: None
@rtype: None
@note: 没有加锁多线程调用会有race conditions
'''
self._sendBuff += msg
def recv(self, bufsize, flag=0):
raise NotImplementedError()
def send(self, buf, flag=0):
raise NotImplementedError()
def doResponse(self):
raise NotImplementedError()
def doRequest(self):
'''
@brief: 将请求数据发送出去
@return: 发送的字节数
@rtype: int
'''
tarsLogger.debug('Transceiver:doRequest')
if not self.isValid():
return -1
nbytes = 0
buf = buffer(self._sendBuff)
while True:
if not buf:
break
ret = self.send(buf[nbytes:])
if ret > 0:
nbytes += ret
else:
break
# 发送前面的字节后将后面的字节拷贝上来
self._sendBuff = buf[nbytes:]
return nbytes
def reInit(self):
'''
@brief: 初始化socket并连接服务器
@return: 成功返回0失败返回-1
@rtype: int
'''
tarsLogger.debug('Transceiver:reInit')
assert(self.isValid() is False)
if self.__epi.getConnType() != EndPointInfo.SOCK_TCP:
return -1
try:
self.__sock = socket.socket()
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__sock.setblocking(0)
self.__sock.connect((self.__epi.getIp(), self.__epi.getPort()))
self.__connStatus = Transceiver.CONNECTED
except socket.error as msg:
if msg.errno == errno.EINPROGRESS:
self.__connStatus = Transceiver.CONNECTING
else:
tarsLogger.info('reInit, %s, faild!, %s',
self.__epi, msg)
self.__sock = None
return -1
tarsLogger.info('reInit, connect: %s, fd: %d',
self.__epi, self.getFd())
return 0
class TcpTransceiver(Transceiver):
'''
@brief: TCP传输实现
'''
def send(self, buf, flag=0):
'''
@brief: 实现tcp的发送
@param buf: 发送的数据
@type buf: str
@param flag: 发送标志
@param flag: int
@return: 发送字节数
@rtype: int
'''
tarsLogger.debug('TcpTransceiver:send')
if not self.isValid():
return -1
nbytes = 0
try:
nbytes = self.getSock().send(buf, flag)
tarsLogger.info('tcp send, fd: %d, %s, len: %d',
self.getFd(), self.getEndPointInfo(), nbytes)
except socket.error as msg:
if msg.errno != errno.EAGAIN:
tarsLogger.error('tcp send, fd: %d, %s, fail!, %s, close',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return 0
return nbytes
def recv(self, bufsize, flag=0):
'''
@brief: 实现tcp的recv
@param bufsize: 接收大小
@type bufsize: int
@param flag: 接收标志
@param flag: int
@return: 接收的内容接收出错返回None
@rtype: str
'''
tarsLogger.debug('TcpTransceiver:recv')
assert(self.isValid())
buf = ''
try:
buf = self.getSock().recv(bufsize, flag)
if len(buf) == 0:
tarsLogger.info('tcp recv, fd: %d, %s, recv 0 bytes, close',
self.getFd(), self.getEndPointInfo())
self.close()
return None
except socket.error as msg:
if msg.errno != errno.EAGAIN:
tarsLogger.info('tcp recv, fd: %d, %s, faild!, %s, close',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return None
tarsLogger.info('tcp recv, fd: %d, %s, nbytes: %d',
self.getFd(), self.getEndPointInfo(), len(buf))
return buf
def doResponse(self):
'''
@brief: 处理接收的数据
@return: 返回响应报文的列表如果出错返回None
@rtype: list: ResponsePacket
'''
tarsLogger.debug('TcpTransceiver:doResponse')
if not self.isValid():
return None
bufs = [self._recvBuf]
while True:
buf = self.recv(8292)
if not buf:
break
bufs.append(buf)
self._recvBuf = ''.join(bufs)
tarsLogger.info('tcp doResponse, fd: %d, recvbuf: %d',
self.getFd(), len(self._recvBuf))
if not self._recvBuf:
return None
rsplist = None
try:
rsplist, bufsize = ReqMessage.unpackRspList(self._recvBuf)
self._recvBuf = self._recvBuf[bufsize:]
except Exception as msg:
tarsLogger.error(
'tcp doResponse, fd: %d, %s, tcp recv unpack error: %s',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return rsplist
class FDReactor(threading.Thread):
'''
@brief: 监听FD事件并解发注册的handle
'''
def __init__(self):
tarsLogger.debug('FDReactor:__init__')
# threading.Thread.__init__(self)
super(FDReactor, self).__init__()
self.__terminate = False
self.__ep = None
self.__shutdown = None
# {fd : adapterproxy}
self.__adapterTab = {}
def __del__(self):
tarsLogger.debug('FDReactor:__del__')
self.__ep.close()
self.__shutdown.close()
self.__ep = None
self.__shutdown = None
def initialize(self):
'''
@brief: 初始化使用FDReactor前必须调用
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:initialize')
self.__ep = select.epoll()
self.__shutdown = socket.socket()
self.__ep.register(self.__shutdown.fileno(),
select.EPOLLET | select.EPOLLIN)
tarsLogger.debug('FDReactor init, shutdown fd : %d',
self.__shutdown.fileno())
def terminate(self):
'''
@brief: 结束FDReactor的线程
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:terminate')
self.__terminate = True
self.__ep.modify(self.__shutdown.fileno(), select.EPOLLOUT)
self.__adapterTab = {}
def handle(self, adapter, events):
'''
@brief: 处理epoll事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@param events: epoll事件
@param events: int
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handle events : %d', events)
assert(adapter)
try:
if events == 0:
return
if events & (select.EPOLLERR | select.EPOLLHUP):
tarsLogger.debug('FDReactor::handle EPOLLERR or EPOLLHUP: %s',
adapter.trans().getEndPointInfo())
adapter.trans().close()
return
if adapter.shouldCloseTrans():
tarsLogger.debug('FDReactor::handle should close trans: %s',
adapter.trans().getEndPointInfo())
adapter.setCloseTrans(False)
adapter.trans().close()
return
if adapter.trans().isConnecting():
if not adapter.finishConnect():
return
if events & select.EPOLLIN:
self.handleInput(adapter)
if events & select.EPOLLOUT:
self.handleOutput(adapter)
except Exception as msg:
tarsLogger.error('FDReactor handle exception: %s', msg)
def handleExcept(self):
pass
def handleInput(self, adapter):
'''
@brief: 处理接收事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handleInput')
if not adapter.trans().isValid():
return
rsplist = adapter.trans().doResponse()
if not rsplist:
return
for rsp in rsplist:
adapter.finished(rsp)
def handleOutput(self, adapter):
'''
@brief: 处理发送事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handleOutput')
if not adapter.trans().isValid():
return
while adapter.trans().doRequest() >= 0 and adapter.sendRequest():
pass
def notify(self, adapter):
'''
@brief: 更新adapter对应的fd的epoll状态
@return: None
@rtype: None
@note: FDReactor使用的epoll是EPOLLET模式同一事件只通知一次
希望某一事件再次通知需调用此函数
'''
tarsLogger.debug('FDReactor:notify')
fd = adapter.trans().getFd()
if fd != -1:
self.__ep.modify(fd,
select.EPOLLET | select.EPOLLOUT | select.EPOLLIN)
def registerAdapter(self, adapter, events):
'''
@brief: 注册adapter
@param adapter: 收发事件处理类
@type adapter: AdapterProxy
@param events: 注册事件
@type events: int
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:registerAdapter events : %d', events)
events |= select.EPOLLET
try:
self.__ep.unregister(adapter.trans().getFd())
except:
pass
self.__ep.register(adapter.trans().getFd(), events)
self.__adapterTab[adapter.trans().getFd()] = adapter
def unregisterAdapter(self, adapter):
'''
@brief: 注销adapter
@param adapter: 收发事件处理类
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:registerAdapter')
self.__ep.unregister(adapter.trans().getFd())
self.__adapterTab.pop(adapter.trans().getFd(), None)
def run(self):
'''
@brief: 线程启动函数循环监听网络事件
'''
tarsLogger.debug('FDReactor:run')
while not self.__terminate:
try:
eplist = self.__ep.poll(1)
if eplist:
tarsLogger.debug('FDReactor run get eplist : %s, terminate : %s', str(
eplist), self.__terminate)
if self.__terminate:
tarsLogger.debug('FDReactor terminate')
break
for fd, events in eplist:
adapter = self.__adapterTab.get(fd, None)
if not adapter:
continue
self.handle(adapter, events)
except Exception as msg:
tarsLogger.error('FDReactor run exception: %s', msg)
tarsLogger.debug('FDReactor:run finished')
if __name__ == '__main__':
print('hello world')
epi = EndPointInfo('127.0.0.1', 1313)
print(epi)
trans = TcpTransceiver(epi)
print(trans.getSock())
print(trans.getFd())
print(trans.reInit())
print(trans.isConnecting())
print(trans.hasConnected())
buf = 'hello world'
print(trans.send(buf))
buf = trans.recv(1024)
print(buf)
trans.close()

118
danmu/danmaku/tars/__tup.py Normal file
View File

@ -0,0 +1,118 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
import struct
import string
from .__util import util
from .__tars import TarsOutputStream
from .__tars import TarsInputStream
from .__packet import RequestPacket
class TarsUniPacket(object):
def __init__(self):
self.__mapa = util.mapclass(util.string, util.bytes)
self.__mapv = util.mapclass(util.string, self.__mapa)
self.__buffer = self.__mapv()
self.__code = RequestPacket()
# @property
# def version(self):
# return self.__code.iVersion
# @version.setter
# def version(self, value):
# self.__code.iVersion = value
@property
def servant(self):
return self.__code.sServantName
@servant.setter
def servant(self, value):
self.__code.sServantName = value
@property
def func(self):
return self.__code.sFuncName
@func.setter
def func(self, value):
self.__code.sFuncName = value
@property
def requestid(self):
return self.__code.iRequestId
@requestid.setter
def requestid(self, value):
self.__code.iRequestId = value
@property
def result_code(self):
if ("STATUS_RESULT_CODE" in self.__code.status) == False:
return 0
return string.atoi(self.__code.status["STATUS_RESULT_CODE"])
@property
def result_desc(self):
if ("STATUS_RESULT_DESC" in self.__code.status) == False:
return ''
return self.__code.status["STATUS_RESULT_DESC"]
def put(self, vtype, name, value):
oos = TarsOutputStream()
oos.write(vtype, 0, value)
self.__buffer[name] = {vtype.__tars_class__: oos.getBuffer()}
def get(self, vtype, name):
if (name in self.__buffer) == False:
raise Exception("UniAttribute not found key:%s,type:%s" %
(name, vtype.__tars_class__))
t = self.__buffer[name]
if (vtype.__tars_class__ in t) == False:
raise Exception("UniAttribute not found type:" +
vtype.__tars_class__)
o = TarsInputStream(t[vtype.__tars_class__])
return o.read(vtype, 0, True)
def encode(self):
oos = TarsOutputStream()
oos.write(self.__mapv, 0, self.__buffer)
self.__code.iVersion = 2
self.__code.sBuffer = oos.getBuffer()
sos = TarsOutputStream()
RequestPacket.writeTo(sos, self.__code)
return struct.pack('!i', 4 + len(sos.getBuffer())) + sos.getBuffer()
def decode(self, buf):
ois = TarsInputStream(buf[4:])
self.__code = RequestPacket.readFrom(ois)
sis = TarsInputStream(self.__code.sBuffer)
self.__buffer = sis.read(self.__mapv, 0, True)
def clear(self):
self.__code.__init__()
def haskey(self, name):
return name in self.__buffer

View File

@ -0,0 +1,252 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
import sys
from threading import Lock
import hashlib
from xml.etree import cElementTree as ET
from .exception import TarsException
class util:
@staticmethod
def printHex(buff):
count = 0
for c in buff:
sys.stdout.write("0X%02X " % ord(c))
count += 1
if count % 16 == 0:
sys.stdout.write("\n")
sys.stdout.write("\n")
sys.stdout.flush()
@staticmethod
def mapclass(ktype, vtype):
class mapklass(dict):
def size(self): return len(self)
setattr(mapklass, '__tars_index__', 8)
setattr(mapklass, '__tars_class__', "map<" +
ktype.__tars_class__ + "," + vtype.__tars_class__ + ">")
setattr(mapklass, 'ktype', ktype)
setattr(mapklass, 'vtype', vtype)
return mapklass
@staticmethod
def vectorclass(vtype):
class klass(list):
def size(self): return len(self)
setattr(klass, '__tars_index__', 9)
setattr(klass, '__tars_class__', "list<" + vtype.__tars_class__ + ">")
setattr(klass, 'vtype', vtype)
return klass
class boolean:
__tars_index__ = 999
__tars_class__ = "bool"
class int8:
__tars_index__ = 0
__tars_class__ = "char"
class uint8:
__tars_index__ = 1
__tars_class__ = "short"
class int16:
__tars_index__ = 1
__tars_class__ = "short"
class uint16:
__tars_index__ = 2
__tars_class__ = "int32"
class int32:
__tars_index__ = 2
__tars_class__ = "int32"
class uint32:
__tars_index__ = 3
__tars_class__ = "int64"
class int64:
__tars_index__ = 3
__tars_class__ = "int64"
class float:
__tars_index__ = 4
__tars_class__ = "float"
class double:
__tars_index__ = 5
__tars_class__ = "double"
class bytes:
__tars_index__ = 13
__tars_class__ = "list<char>"
class string:
__tars_index__ = 67
__tars_class__ = "string"
class struct:
__tars_index__ = 1011
def xml2dict(node, dic={}):
'''
@brief: 将xml解析树转成字典
@param node: 树的根节点
@type node: cElementTree.Element
@param dic: 存储信息的字典
@type dic: dict
@return: 转换好的字典
@rtype: dict
'''
dic[node.tag] = ndic = {}
[xml2dict(child, ndic) for child in node.getchildren() if child != node]
ndic.update([list(map(str.strip, exp.split('=')[:2]))
for exp in node.text.splitlines() if '=' in exp])
return dic
def configParse(filename):
'''
@brief: 解析tars配置文件
@param filename: 文件名
@type filename: str
@return: 解析出来的配置信息
@rtype: dict
'''
tree = ET.parse(filename)
return xml2dict(tree.getroot())
class NewLock(object):
def __init__(self):
self.__count = 0
self.__lock = Lock()
self.__lockForCount = Lock()
pass
def newAcquire(self):
self.__lockForCount.acquire()
self.__count += 1
if self.__count == 1:
self.__lock.acquire()
self.__lockForCount.release()
pass
def newRelease(self):
self.__lockForCount.acquire()
self.__count -= 1
if self.__count == 0:
self.__lock.release()
self.__lockForCount.release()
class LockGuard(object):
def __init__(self, newLock):
self.__newLock = newLock
self.__newLock.newAcquire()
def __del__(self):
self.__newLock.newRelease()
class ConsistentHashNew(object):
def __init__(self, nodes=None, nodeNumber=3):
"""
:param nodes: 服务器的节点的epstr列表
:param n_number: 一个节点对应的虚拟节点数量
:return:
"""
self.__nodes = nodes
self.__nodeNumber = nodeNumber # 每一个节点对应多少个虚拟节点这里默认是3个
self.__nodeDict = dict() # 用于记录虚拟节点的hash值与服务器epstr的对应关系
self.__sortListForKey = [] # 用于存放所有的虚拟节点的hash值这里需要保持排序以找出对应的服务器
if nodes:
for node in nodes:
self.addNode(node)
@property
def nodes(self):
return self.__nodes
@nodes.setter
def nodes(self, value):
self.__nodes = value
def addNode(self, node):
"""
添加node首先要根据虚拟节点的数目创建所有的虚拟节点并将其与对应的node对应起来
当然还需要将虚拟节点的hash值放到排序的里面
这里在添加了节点之后需要保持虚拟节点hash值的顺序
:param node:
:return:
"""
for i in range(self.__nodeNumber):
nodeStr = "%s%s" % (node, i)
key = self.__genKey(nodeStr)
self.__nodeDict[key] = node
self.__sortListForKey.append(key)
self.__sortListForKey.sort()
def removeNode(self, node):
"""
这里一个节点的退出需要将这个节点的所有的虚拟节点都删除
:param node:
:return:
"""
for i in range(self.__nodeNumber):
nodeStr = "%s%s" % (node, i)
key = self.__genKey(nodeStr)
del self.__nodeDict[key]
self.__sortListForKey.remove(key)
def getNode(self, key):
"""
返回这个字符串应该对应的node这里先求出字符串的hash值然后找到第一个小于等于的虚拟节点然后返回node
如果hash值大于所有的节点那么用第一个虚拟节点
:param : hashNum or keyStr
:return:
"""
keyStr = ''
if isinstance(key, int):
keyStr = "the keyStr is %d" % key
elif isinstance(key, type('a')):
keyStr = key
else:
raise TarsException("the hash code has wrong type")
if self.__sortListForKey:
key = self.__genKey(keyStr)
for keyItem in self.__sortListForKey:
if key <= keyItem:
return self.__nodeDict[keyItem]
return self.__nodeDict[self.__sortListForKey[0]]
else:
return None
def __genKey(self, keyStr):
"""
通过key返回当前key的hash值这里采用md5
:param key:
:return:
"""
md5Str = hashlib.md5(keyStr).hexdigest()
return int(md5Str, 16)

View File

@ -0,0 +1,91 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
__version__ = "0.0.1"
from __util import util
from __tars import TarsInputStream
from __tars import TarsOutputStream
from __tup import TarsUniPacket
class tarscore:
class TarsInputStream(TarsInputStream):
pass
class TarsOutputStream(TarsOutputStream):
pass
class TarsUniPacket(TarsUniPacket):
pass
class boolean(util.boolean):
pass
class int8(util.int8):
pass
class uint8(util.uint8):
pass
class int16(util.int16):
pass
class uint16(util.uint16):
pass
class int32(util.int32):
pass
class uint32(util.uint32):
pass
class int64(util.int64):
pass
class float(util.float):
pass
class double(util.double):
pass
class bytes(util.bytes):
pass
class string(util.string):
pass
class struct(util.struct):
pass
@staticmethod
def mapclass(ktype, vtype): return util.mapclass(ktype, vtype)
@staticmethod
def vctclass(vtype): return util.vectorclass(vtype)
@staticmethod
def printHex(buff): util.printHex(buff)
# 被用户引用
from __util import configParse
from __rpc import Communicator
from exception import *
from __logger import tarsLogger

View File

@ -0,0 +1,36 @@
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
class TarsException(Exception): pass
class TarsTarsDecodeRequireNotExist(TarsException): pass
class TarsTarsDecodeMismatch(TarsException): pass
class TarsTarsDecodeInvalidValue(TarsException): pass
class TarsTarsUnsupportType(TarsException): pass
class TarsNetConnectException(TarsException): pass
class TarsNetConnectLostException(TarsException): pass
class TarsNetSocketException(TarsException): pass
class TarsProxyDecodeException(TarsException): pass
class TarsProxyEncodeException(TarsException): pass
class TarsServerEncodeException(TarsException): pass
class TarsServerDecodeException(TarsException): pass
class TarsServerNoFuncException(TarsException): pass
class TarsServerNoServantException(TarsException): pass
class TarsServerQueueTimeoutException(TarsException): pass
class TarsServerUnknownException(TarsException): pass
class TarsSyncCallTimeoutException(TarsException): pass
class TarsRegistryException(TarsException): pass
class TarsServerResetGridException(TarsException): pass

View File

@ -0,0 +1,25 @@
module register
{
/**
* ¶Ë¿ÚÐÅÏ¢
*/
struct EndpointF
{
0 require string host;
1 require int port;
2 require int timeout;
3 require int istcp;
4 require int grid;
5 optional int groupworkid;
6 optional int grouprealid;
7 optional string setId;
8 optional int qos;
9 optional int bakFlag;
11 optional int weight;
12 optional int weightType;
};
key[EndpointF, host, port, timeout, istcp, grid, qos, weight, weightType];
};

View File

@ -0,0 +1,70 @@
#include "EndpointF.tars"
module register
{
/**
* 获取对象endpoint的query接口
*/
interface QueryF
{
/** 根据id获取对象
*
* @param id 对象名称
*
* @return 返回所有该对象的活动endpoint列表
*/
vector<EndpointF> findObjectById(string id);
/**根据id获取所有对象,包括活动和非活动对象
*
* @param id 对象名称
* @param activeEp 存活endpoint列表
* @param inactiveEp 非存活endpoint列表
* @return: 0-成功 others-失败
*/
int findObjectById4Any(string id, out vector<EndpointF> activeEp, out vector<EndpointF> inactiveEp);
/** 根据id获取对象所有endpoint列表,功能同findObjectByIdInSameGroup
*
* @param id 对象名称
* @param activeEp 存活endpoint列表
* @param inactiveEp 非存活endpoint列表
* @return: 0-成功 others-失败
*/
int findObjectById4All(string id, out vector<EndpointF> activeEp, out vector<EndpointF> inactiveEp);
/** 根据id获取对象同组endpoint列表
*
* @param id 对象名称
* @param activeEp 存活endpoint列表
* @param inactiveEp 非存活endpoint列表
* @return: 0-成功 others-失败
*/
int findObjectByIdInSameGroup(string id, out vector<EndpointF> activeEp, out vector<EndpointF> inactiveEp);
/** 根据id获取对象指定归属地的endpoint列表
*
* @param id 对象名称
* @param activeEp 存活endpoint列表
* @param inactiveEp 非存活endpoint列表
* @return: 0-成功 others-失败
*/
int findObjectByIdInSameStation(string id, string sStation, out vector<EndpointF> activeEp, out vector<EndpointF> inactiveEp);
/** 根据id获取对象同组endpoint列表
*
* @param id 对象名称
* @param setId set全称,格式为setname.setarea.setgroup
* @param activeEp 存活endpoint列表
* @param inactiveEp 非存活endpoint列表
* @return: 0-成功 others-失败
*/
int findObjectByIdInSameSet(string id, string setId, out vector<EndpointF> activeEp, out vector<EndpointF> inactiveEp);
};
};

View File