diff --git a/danmu/danmaku/tars/EndpointF.py b/danmu/danmaku/tars/EndpointF.py new file mode 100644 index 0000000..72fa6b3 --- /dev/null +++ b/danmu/danmaku/tars/EndpointF.py @@ -0,0 +1,69 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +from core import tarscore + + +class EndpointF(tarscore.struct): + __tars_class__ = "register.EndpointF" + + def __init__(self): + self.host = "" + self.port = 0 + self.timeout = 0 + self.istcp = 0 + self.grid = 0 + self.groupworkid = 0 + self.grouprealid = 0 + self.setId = "" + self.qos = 0 + self.bakFlag = 0 + self.weight = 0 + self.weightType = 0 + + @staticmethod + def writeTo(oos, value): + oos.write(tarscore.string, 0, value.host) + oos.write(tarscore.int32, 1, value.port) + oos.write(tarscore.int32, 2, value.timeout) + oos.write(tarscore.int32, 3, value.istcp) + oos.write(tarscore.int32, 4, value.grid) + oos.write(tarscore.int32, 5, value.groupworkid) + oos.write(tarscore.int32, 6, value.grouprealid) + oos.write(tarscore.string, 7, value.setId) + oos.write(tarscore.int32, 8, value.qos) + oos.write(tarscore.int32, 9, value.bakFlag) + oos.write(tarscore.int32, 11, value.weight) + oos.write(tarscore.int32, 12, value.weightType) + + @staticmethod + def readFrom(ios): + value = EndpointF() + value.host = ios.read(tarscore.string, 0, True, value.host) + value.port = ios.read(tarscore.int32, 1, True, value.port) + value.timeout = ios.read(tarscore.int32, 2, True, value.timeout) + value.istcp = ios.read(tarscore.int32, 3, True, value.istcp) + value.grid = ios.read(tarscore.int32, 4, True, value.grid) + value.groupworkid = ios.read( + tarscore.int32, 5, False, value.groupworkid) + value.grouprealid = ios.read( + tarscore.int32, 6, False, value.grouprealid) + value.setId = ios.read(tarscore.string, 7, False, value.setId) + value.qos = ios.read(tarscore.int32, 8, False, value.qos) + value.bakFlag = ios.read(tarscore.int32, 9, False, value.bakFlag) + value.weight = ios.read(tarscore.int32, 11, False, value.weight) + value.weightType = ios.read( + tarscore.int32, 12, False, value.weightType) + return value diff --git a/danmu/danmaku/tars/QueryF.py b/danmu/danmaku/tars/QueryF.py new file mode 100644 index 0000000..2a78267 --- /dev/null +++ b/danmu/danmaku/tars/QueryF.py @@ -0,0 +1,276 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +# from tars.core import tarscore; +# from tars.core import ServantProxy; +# from tars.core import ServantProxyCallback; +# from com.qq.register.EndpointF import *; +from .__init__ import tarscore +from .__servantproxy import ServantProxy +from .__async import ServantProxyCallback +from .EndpointF import EndpointF + +import time + +# proxy for client + + +class QueryFProxy(ServantProxy): + def findObjectById(self, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + rsp = self.tars_invoke(ServantProxy.TARSNORMAL, + "findObjectById", oos.getBuffer(), context, None) + + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.vctclass(EndpointF), 0, True) + + return (ret) + + def async_findObjectById(self, callback, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectById", oos.getBuffer(), context, None, callback) + + def findObjectById4Any(self, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + rsp = self.tars_invoke( + ServantProxy.TARSNORMAL, "findObjectById4Any", oos.getBuffer(), context, None) + + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + + return (ret, activeEp, inactiveEp) + + def async_findObjectById4Any(self, callback, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectById4Any", oos.getBuffer(), context, None, callback) + + def findObjectById4All(self, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + rsp = self.tars_invoke( + ServantProxy.TARSNORMAL, "findObjectById4All", oos.getBuffer(), context, None) + + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + + return (ret, activeEp, inactiveEp) + + def async_findObjectById4All(self, callback, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectById4All", oos.getBuffer(), context, None, callback) + + def findObjectByIdInSameGroup(self, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + rsp = self.tars_invoke( + ServantProxy.TARSNORMAL, "findObjectByIdInSameGroup", oos.getBuffer(), context, None) + + startDecodeTime = time.time() + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + endDecodeTime = time.time() + return (ret, activeEp, inactiveEp, (endDecodeTime - startDecodeTime)) + + def async_findObjectByIdInSameGroup(self, callback, id, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectByIdInSameGroup", oos.getBuffer(), context, None, callback) + + def findObjectByIdInSameStation(self, id, sStation, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + oos.write(tarscore.string, 2, sStation) + + rsp = self.tars_invoke( + ServantProxy.TARSNORMAL, "findObjectByIdInSameStation", oos.getBuffer(), context, None) + + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True) + + return (ret, activeEp, inactiveEp) + + def async_findObjectByIdInSameStation(self, callback, id, sStation, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + oos.write(tarscore.string, 2, sStation) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectByIdInSameStation", oos.getBuffer(), context, None, callback) + + def findObjectByIdInSameSet(self, id, setId, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + oos.write(tarscore.string, 2, setId) + + rsp = self.tars_invoke( + ServantProxy.TARSNORMAL, "findObjectByIdInSameSet", oos.getBuffer(), context, None) + + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True) + + return (ret, activeEp, inactiveEp) + + def async_findObjectByIdInSameSet(self, callback, id, setId, context=ServantProxy.mapcls_context()): + oos = tarscore.TarsOutputStream() + oos.write(tarscore.string, 1, id) + oos.write(tarscore.string, 2, setId) + + self.tars_invoke_async( + ServantProxy.TARSNORMAL, "findObjectByIdInSameSet", oos.getBuffer(), context, None, callback) + + +# ======================================================== +# callback of async proxy for client +# ======================================================== +class QueryFPrxCallback(ServantProxyCallback): + def __init__(self): + ServantProxyCallback.__init__(self) + self.callback_map = { + 'findObjectById': self.__invoke_findObjectById, + 'findObjectById4Any': self.__invoke_findObjectById4Any, + 'findObjectById4All': self.__invoke_findObjectById4All, + 'findObjectByIdInSameGroup': self.__invoke_findObjectByIdInSameGroup, + 'findObjectByIdInSameStation': self.__invoke_findObjectByIdInSameStation, + 'findObjectByIdInSameSet': self.__invoke_findObjectByIdInSameSet + } + + def callback_findObjectById(self, ret): + raise NotImplementedError() + + def callback_findObjectById_exception(self, ret): + raise NotImplementedError() + + def callback_findObjectById4Any(self, ret, activeEp, inactiveEp): + raise NotImplementedError() + + def callback_findObjectById4Any_exception(self, ret): + raise NotImplementedError() + + def callback_findObjectById4All(self, ret, activeEp, inactiveEp): + raise NotImplementedError() + + def callback_findObjectById4All_exception(self, ret): + raise NotImplementedError() + + def callback_findObjectByIdInSameGroup(self, ret, activeEp, inactiveEp): + raise NotImplementedError() + + def callback_findObjectByIdInSameGroup_exception(self, ret): + raise NotImplementedError() + + def callback_findObjectByIdInSameStation(self, ret, activeEp, inactiveEp): + raise NotImplementedError() + + def callback_findObjectByIdInSameStation_exception(self, ret): + raise NotImplementedError() + + def callback_findObjectByIdInSameSet(self, ret, activeEp, inactiveEp): + raise NotImplementedError() + + def callback_findObjectByIdInSameSet_exception(self, ret): + raise NotImplementedError() + + def __invoke_findObjectById(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectById_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.vctclass(EndpointF), 0, True) + self.callback_findObjectById(ret) + + def __invoke_findObjectById4Any(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectById4Any_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + self.callback_findObjectById4Any(ret, activeEp, inactiveEp) + + def __invoke_findObjectById4All(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectById4All_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + self.callback_findObjectById4All(ret, activeEp, inactiveEp) + + def __invoke_findObjectByIdInSameGroup(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectByIdInSameGroup_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 2, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + self.callback_findObjectByIdInSameGroup(ret, activeEp, inactiveEp) + + def __invoke_findObjectByIdInSameStation(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectByIdInSameStation_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True) + self.callback_findObjectByIdInSameStation(ret, activeEp, inactiveEp) + + def __invoke_findObjectByIdInSameSet(self, reqmsg): + rsp = reqmsg.response + if rsp.iRet != ServantProxy.TARSSERVERSUCCESS: + self.callback_findObjectByIdInSameSet_exception(rsp.iRet) + return rsp.iRet + ios = tarscore.TarsInputStream(rsp.sBuffer) + ret = ios.read(tarscore.int32, 0, True) + activeEp = ios.read(tarscore.vctclass(EndpointF), 3, True) + inactiveEp = ios.read(tarscore.vctclass(EndpointF), 4, True) + self.callback_findObjectByIdInSameSet(ret, activeEp, inactiveEp) + + def onDispatch(self, reqmsg): + self.callback_map[reqmsg.request.sFuncName](reqmsg) diff --git a/danmu/danmaku/tars/__TimeoutQueue.py b/danmu/danmaku/tars/__TimeoutQueue.py new file mode 100644 index 0000000..65bde43 --- /dev/null +++ b/danmu/danmaku/tars/__TimeoutQueue.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __timeQueue.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +''' +@version: 0.01 +@brief: 请求响应报文和超时队列 +''' + +import threading +import time +import struct + +from .__logger import tarsLogger +from .__tars import TarsInputStream +from .__tars import TarsOutputStream +from .__packet import RequestPacket +from .__packet import ResponsePacket +from .__util import (NewLock, LockGuard) + + +class ReqMessage: + ''' + @brief: 请求响应报文,保存一个请求响应所需要的数据 + ''' + SYNC_CALL = 1 + ASYNC_CALL = 2 + ONE_WAY = 3 + + def __init__(self): + self.type = ReqMessage.SYNC_CALL + self.servant = None + self.lock = None + self.adapter = None + self.request = None + self.response = None + self.callback = None + self.begtime = None + self.endtime = None + self.isHash = False + self.isConHash = False + self.hashCode = 0 + + def packReq(self): + ''' + @brief: 序列化请求报文 + @return: 序列化后的请求报文 + @rtype: str + ''' + if not self.request: + return '' + oos = TarsOutputStream() + RequestPacket.writeTo(oos, self.request) + reqpkt = oos.getBuffer() + plen = len(reqpkt) + 4 + reqpkt = struct.pack('!i', plen) + reqpkt + return reqpkt + + @staticmethod + def unpackRspList(buf): + ''' + @brief: 解码响应报文 + @param buf: 多个序列化后的响应报文数据 + @type buf: str + @return: 解码出来的响应报文和解码的buffer长度 + @rtype: rsplist: 装有ResponsePacket的list + unpacklen: int + ''' + rsplist = [] + if not buf: + return rsplist + + unpacklen = 0 + buf = buffer(buf) + while True: + if len(buf) - unpacklen < 4: + break + packsize = buf[unpacklen: unpacklen+4] + packsize, = struct.unpack_from('!i', packsize) + if len(buf) < unpacklen + packsize: + break + + ios = TarsInputStream(buf[unpacklen+4: unpacklen+packsize]) + rsp = ResponsePacket.readFrom(ios) + rsplist.append(rsp) + unpacklen += packsize + + return rsplist, unpacklen + +# 超时队列,加锁,线程安全 + + +class TimeoutQueue: + ''' + @brief: 超时队列,加锁,线程安全 + 可以像队列一样FIFO,也可以像字典一样按key取item + @todo: 限制队列长度 + ''' + + def __init__(self, timeout=3): + self.__uniqId = 0 + # self.__lock = threading.Lock() + self.__lock = NewLock() + self.__data = {} + self.__queue = [] + self.__timeout = timeout + + def getTimeout(self): + ''' + @brief: 获取超时时间,单位为s + @return: 超时时间 + @rtype: float + ''' + return self.__timeout + + def setTimeout(self, timeout): + ''' + @brief: 设置超时时间,单位为s + @param timeout: 超时时间 + @type timeout: float + @return: None + @rtype: None + ''' + self.__timeout = timeout + + def size(self): + ''' + @brief: 获取队列长度 + @return: 队列长度 + @rtype: int + ''' + # self.__lock.acquire() + lock = LockGuard(self.__lock) + ret = len(self.__data) + # self.__lock.release() + return ret + + def generateId(self): + ''' + @brief: 生成唯一id,0 < id < 2 ** 32 + @return: id + @rtype: int + ''' + # self.__lock.acquire() + lock = LockGuard(self.__lock) + ret = self.__uniqId + ret = (ret + 1) % 0x7FFFFFFF + while ret <= 0: + ret = (ret + 1) % 0x7FFFFFFF + self.__uniqId = ret + # self.__lock.release() + return ret + + def pop(self, uniqId=0, erase=True): + ''' + @brief: 弹出item + @param uniqId: item的id,如果为0,按FIFO弹出 + @type uniqId: int + @param erase: 弹出后是否从字典里删除item + @type erase: bool + @return: item + @rtype: any type + ''' + ret = None + + # self.__lock.acquire() + lock = LockGuard(self.__lock) + + if not uniqId: + if len(self.__queue): + uniqId = self.__queue.pop(0) + if uniqId: + if erase: + ret = self.__data.pop(uniqId, None) + else: + ret = self.__data.get(uniqId, None) + + # self.__lock.release() + + return ret[0] if ret else None + + def push(self, item, uniqId): + ''' + @brief: 数据入队列,如果队列已经有了uniqId,插入失败 + @param item: 插入的数据 + @type item: any type + @return: 插入是否成功 + @rtype: bool + ''' + begtime = time.time() + ret = True + # self.__lock.acquire() + lock = LockGuard(self.__lock) + + if uniqId in self.__data: + ret = False + else: + self.__data[uniqId] = [item, begtime] + self.__queue.append(uniqId) + # self.__lock.release() + return ret + + def peek(self, uniqId): + ''' + @brief: 根据uniqId获取item,不会删除item + @param uniqId: item的id + @type uniqId: int + @return: item + @rtype: any type + ''' + # self.__lock.acquire() + lock = LockGuard(self.__lock) + + ret = self.__data.get(uniqId, None) + # self.__lock.release() + if not ret: + return None + return ret[0] + + def timeout(self): + ''' + @brief: 检测是否有item超时,如果有就删除 + @return: None + @rtype: None + ''' + endtime = time.time() + # self.__lock.acquire() + lock = LockGuard(self.__lock) + + # 处理异常情况,防止死锁 + try: + new_data = {} + for uniqId, item in self.__data.items(): + if endtime - item[1] < self.__timeout: + new_data[uniqId] = item + else: + tarsLogger.debug( + 'TimeoutQueue:timeout remove id : %d' % uniqId) + self.__data = new_data + finally: + # self.__lock.release() + pass + + +class QueueTimeout(threading.Thread): + """ + 超时线程,定时触发超时事件 + """ + + def __init__(self, timeout=0.1): + # threading.Thread.__init__(self) + tarsLogger.debug('QueueTimeout:__init__') + super(QueueTimeout, self).__init__() + self.timeout = timeout + self.__terminate = False + self.__handler = None + self.__lock = threading.Condition() + + def terminate(self): + tarsLogger.debug('QueueTimeout:terminate') + self.__terminate = True + self.__lock.acquire() + self.__lock.notifyAll() + self.__lock.release() + + def setHandler(self, handler): + self.__handler = handler + + def run(self): + while not self.__terminate: + try: + self.__lock.acquire() + self.__lock.wait(self.timeout) + self.__lock.release() + if self.__terminate: + break + self.__handler() + except Exception as msg: + tarsLogger.error('QueueTimeout:run exception : %s', msg) + + tarsLogger.debug('QueueTimeout:run finished') + + +if __name__ == '__main__': + pass diff --git a/danmu/danmaku/tars/__adapterproxy.py b/danmu/danmaku/tars/__adapterproxy.py new file mode 100644 index 0000000..7785344 --- /dev/null +++ b/danmu/danmaku/tars/__adapterproxy.py @@ -0,0 +1,703 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __adapterproxymanager.py_compiler + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +''' +@version: 0.01 +@brief: 将rpc部分中的adapterproxymanager抽离出来,实现不同的负载均衡 +''' + +from enum import Enum +import random +import socket +import select +import os +import time + + +from .__util import (LockGuard, NewLock, ConsistentHashNew) +from .__trans import EndPointInfo +from .__logger import tarsLogger +from . import exception +from .__trans import TcpTransceiver +from .__TimeoutQueue import ReqMessage +from .exception import TarsException + + +# 因为循环import的问题只能放这里,不能放文件开始处 +from .QueryF import QueryFProxy +from .QueryF import QueryFPrxCallback + + +class AdapterProxy: + ''' + @brief: 每一个Adapter管理一个服务端端口的连接,数据收发 + ''' + + def __init__(self): + tarsLogger.debug('AdapterProxy:__init__') + self.__closeTrans = False + self.__trans = None + self.__object = None + self.__reactor = None + self.__lock = None + self.__asyncProc = None + self.__activeStateInReg = True + + @property + def activatestateinreg(self): + return self.__activeStateInReg + + @activatestateinreg.setter + def activatestateinreg(self, value): + self.__activeStateInReg = value + + def __del__(self): + tarsLogger.debug('AdapterProxy:__del__') + + def initialize(self, endPointInfo, objectProxy, reactor, asyncProc): + ''' + @brief: 初始化 + @param endPointInfo: 连接对端信息 + @type endPointInfo: EndPointInfo + @type objectProxy: ObjectProxy + @type reactor: FDReactor + @type asyncProc: AsyncProcThread + ''' + tarsLogger.debug('AdapterProxy:initialize') + self.__closeTrans = False + self.__trans = TcpTransceiver(endPointInfo) + self.__object = objectProxy + self.__reactor = reactor + # self.__lock = threading.Lock() + self.__lock = NewLock() + self.__asyncProc = asyncProc + + def terminate(self): + ''' + @brief: 关闭 + ''' + tarsLogger.debug('AdapterProxy:terminate') + self.setCloseTrans(True) + + def trans(self): + ''' + @brief: 获取传输类 + @return: 负责网络传输的trans + @rtype: Transceiver + ''' + return self.__trans + + def invoke(self, reqmsg): + ''' + @brief: 远程过程调用处理方法 + @param reqmsg: 请求响应报文 + @type reqmsg: ReqMessage + @return: 错误码:0表示成功,-1表示连接失败 + @rtype: int + ''' + tarsLogger.debug('AdapterProxy:invoke') + assert(self.__trans) + + if (not self.__trans.hasConnected() and + not self.__trans.isConnecting): + # -1表示连接失败 + return -1 + + reqmsg.request.iRequestId = self.__object.getTimeoutQueue().generateId() + self.__object.getTimeoutQueue().push(reqmsg, reqmsg.request.iRequestId) + + self.__reactor.notify(self) + + return 0 + + def finished(self, rsp): + ''' + @brief: 远程过程调用返回处理 + @param rsp: 响应报文 + @type rsp: ResponsePacket + @return: 函数是否执行成功 + @rtype: bool + ''' + tarsLogger.debug('AdapterProxy:finished') + reqmsg = self.__object.getTimeoutQueue().pop(rsp.iRequestId) + if not reqmsg: + tarsLogger.error( + 'finished, can not get ReqMessage, may be timeout, id: %d', + rsp.iRequestId) + return False + + reqmsg.response = rsp + if reqmsg.type == ReqMessage.SYNC_CALL: + return reqmsg.servant._finished(reqmsg) + elif reqmsg.callback: + self.__asyncProc.put(reqmsg) + return True + + tarsLogger.error('finished, adapter proxy finish fail, id: %d, ret: %d', + rsp.iRequestId, rsp.iRet) + return False + + # 检测连接是否失败,失效时重连 + def checkActive(self, forceConnect=False): + ''' + @brief: 检测连接是否失效 + @param forceConnect: 是否强制发起连接,为true时不对状态进行判断就发起连接 + @type forceConnect: bool + @return: 连接是否有效 + @rtype: bool + ''' + tarsLogger.debug('AdapterProxy:checkActive') + # self.__lock.acquire() + lock = LockGuard(self.__lock) + tarsLogger.info('checkActive, %s, forceConnect: %s', + self.__trans.getEndPointInfo(), forceConnect) + + if not self.__trans.isConnecting() and not self.__trans.hasConnected(): + self.doReconnect() + + # self.__lock.release() + return self.__trans.isConnecting() or self.__trans.hasConnected() + + def doReconnect(self): + ''' + @brief: 重新发起连接 + @return: None + @rtype: None + ''' + tarsLogger.debug('AdapterProxy:doReconnect') + assert(self.__trans) + + self.__trans.reInit() + tarsLogger.info('doReconnect, connect: %s, fd:%d', + self.__trans.getEndPointInfo(), + self.__trans.getFd()) + + self.__reactor.registerAdapter(self, select.EPOLLIN | select.EPOLLOUT) + + def sendRequest(self): + ''' + @brief: 把队列中的请求放到Transceiver的发送缓存里 + @return: 放入缓存的数据长度 + @rtype: int + ''' + tarsLogger.debug('AdapterProxy:sendRequest') + if not self.__trans.hasConnected(): + return False + + reqmsg = self.__object.popRequest() + blen = 0 + while reqmsg: + reqmsg.adapter = self + buf = reqmsg.packReq() + self.__trans.writeToSendBuf(buf) + tarsLogger.info('sendRequest, id: %d, len: %d', + reqmsg.request.iRequestId, len(buf)) + blen += len(buf) + # 合并一次发送的包 最大合并至8k 提高异步时客户端效率? + if (self.__trans.getEndPointInfo().getConnType() == EndPointInfo.SOCK_UDP + or blen > 8192): + break + reqmsg = self.__object.popRequest() + + return blen + + def finishConnect(self): + ''' + @brief: 使用的非阻塞socket连接不能立刻判断是否连接成功, + 在epoll响应后调用此函数处理connect结束后的操作 + @return: 是否连接成功 + @rtype: bool + ''' + tarsLogger.debug('AdapterProxy:finishConnect') + success = True + errmsg = '' + try: + ret = self.__trans.getSock().getsockopt( + socket.SOL_SOCKET, socket.SO_ERROR) + if ret: + success = False + errmsg = os.strerror(ret) + except Exception as msg: + errmsg = msg + success = False + + if not success: + self.__reactor.unregisterAdapter( + self, socket.EPOLLIN | socket.EPOLLOUT) + self.__trans.close() + self.__trans.setConnFailed() + tarsLogger.error( + 'AdapterProxy finishConnect, exception: %s, error: %s', + self.__trans.getEndPointInfo(), errmsg) + return False + self.__trans.setConnected() + self.__reactor.notify(self) + tarsLogger.info('AdapterProxy finishConnect, connect %s success', + self.__trans.getEndPointInfo()) + return True + + def finishInvoke(self, isTimeout): + pass + + # 弹出请求报文 + def popRequest(self): + pass + + def shouldCloseTrans(self): + ''' + @brief: 是否设置关闭连接 + @return: 关闭连接的flag的值 + @rtype: bool + ''' + return self.__closeTrans + + def setCloseTrans(self, closeTrans): + ''' + @brief: 设置关闭连接flag的值 + @param closeTrans: 是否关闭连接 + @type closeTrans: bool + @return: None + @rtype: None + ''' + self.__closeTrans = closeTrans + + +class QueryRegisterCallback(QueryFPrxCallback): + def __init__(self, adpManager): + self.__adpManager = adpManager + super(QueryRegisterCallback, self).__init__() + # QueryFPrxCallback.__init__(self) + + def callback_findObjectById4All(self, ret, activeEp, inactiveEp): + eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType) + for x in activeEp if ret == 0 and x.istcp] + ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType) + for x in inactiveEp if ret == 0 and x.istcp] + self.__adpManager.setEndpoints(eplist, ieplist) + + def callback_findObjectById4All_exception(self, ret): + tarsLogger.error('callback_findObjectById4All_exception ret: %d', ret) + + +class EndpointWeightType(Enum): + E_LOOP = 0 + E_STATIC_WEIGHT = 1 + + +class AdapterProxyManager: + ''' + @brief: 管理Adapter + ''' + + def __init__(self): + tarsLogger.debug('AdapterProxyManager:__init__') + self.__comm = None + self.__object = None + # __adps的key=str(EndPointInfo) value=[EndPointInfo, AdapterProxy, cnt] + # cnt是访问次数 + self.__adps = {} + self.__iadps = {} + self.__newLock = None + self.__isDirectProxy = True + self.__lastFreshTime = 0 + self.__queryRegisterCallback = QueryRegisterCallback(self) + self.__regAdapterProxyDict = {} + self.__lastConHashPrxList = [] + self.__consistentHashWeight = None + self.__weightType = EndpointWeightType.E_LOOP + self.__update = True + self.__lastWeightedProxyData = {} + + def initialize(self, comm, objectProxy, eplist): + ''' + @brief: 初始化 + ''' + tarsLogger.debug('AdapterProxyManager:initialize') + self.__comm = comm + self.__object = objectProxy + self.__newLock = NewLock() + + self.__isDirectProxy = len(eplist) > 0 + if self.__isDirectProxy: + self.setEndpoints(eplist, {}) + else: + self.refreshEndpoints() + + def terminate(self): + ''' + @brief: 释放资源 + ''' + tarsLogger.debug('AdapterProxyManager:terminate') + # self.__lock.acquire() + lock = LockGuard(self.__newLock) + for ep, epinfo in self.__adps.items(): + epinfo[1].terminate() + self.__adps = {} + self.__lock.release() + + def refreshEndpoints(self): + ''' + @brief: 刷新服务器列表 + @return: 新的服务列表 + @rtype: EndPointInfo列表 + ''' + tarsLogger.debug('AdapterProxyManager:refreshEndpoints') + if self.__isDirectProxy: + return + + interval = self.__comm.getProperty( + 'refresh-endpoint-interval', float) / 1000 + locator = self.__comm.getProperty('locator') + + if '@' not in locator: + raise exception.TarsRegistryException( + 'locator is not valid: ' + locator) + + now = time.time() + last = self.__lastFreshTime + epSize = len(self.__adps) + if last + interval < now or (epSize <= 0 and last + 2 < now): + queryFPrx = self.__comm.stringToProxy(QueryFProxy, locator) + # 首次访问是同步调用,之后访问是异步调用 + if epSize == 0 or last == 0: + ret, activeEps, inactiveEps = ( + queryFPrx.findObjectById4All(self.__object.name())) + # 目前只支持TCP + eplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType) + for x in activeEps if ret == 0 and x.istcp] + ieplist = [EndPointInfo(x.host, x.port, x.timeout, x.weight, x.weightType) + for x in inactiveEps if ret == 0 and x.istcp] + self.setEndpoints(eplist, ieplist) + else: + queryFPrx.async_findObjectById4All(self.__queryRegisterCallback, + self.__object.name()) + self.__lastFreshTime = now + + def getEndpoints(self): + ''' + @brief: 获取可用服务列表 如果启用分组,只返回同分组的服务端ip + @return: 获取节点列表 + @rtype: EndPointInfo列表 + ''' + tarsLogger.debug('AdapterProxyManager:getEndpoints') + # self.__lock.acquire() + lock = LockGuard(self.__newLock) + ret = [x[1][0] for x in list(self.__adps.items())] + # self.__lock.release() + + return ret + + def setEndpoints(self, eplist, ieplist): + ''' + @brief: 设置服务端信息 + @para eplist: 活跃的被调节点列表 + @para ieplist: 不活跃的被调节点列表 + ''' + tarsLogger.debug('AdapterProxyManager:setEndpoints') + adps = {} + iadps = {} + comm = self.__comm + isNeedNotify = False + # self.__lock.acquire() + lock = LockGuard(self.__newLock) + isStartStatic = True + + for ep in eplist: + if ep.getWeightType() == 0: + isStartStatic = False + epstr = str(ep) + if epstr in self.__adps: + adps[epstr] = self.__adps[epstr] + continue + isNeedNotify = True + self.__update = True + adapter = AdapterProxy() + adapter.initialize(ep, self.__object, + comm.getReactor(), comm.getAsyncProc()) + adapter.activatestateinreg = True + adps[epstr] = [ep, adapter, 0] + self.__adps, adps = adps, self.__adps + + for iep in ieplist: + iepstr = str(iep) + if iepstr in self.__iadps: + iadps[iepstr] = self.__iadps[iepstr] + continue + isNeedNotify = True + adapter = AdapterProxy() + adapter.initialize(iep, self.__object, + comm.getReactor(), comm.getAsyncProc()) + adapter.activatestateinreg = False + iadps[iepstr] = [iep, adapter, 0] + self.__iadps, iadps = iadps, self.__iadps + + if isStartStatic: + self.__weightType = EndpointWeightType.E_STATIC_WEIGHT + else: + self.__weightType = EndpointWeightType.E_LOOP + + # self.__lock.release() + if isNeedNotify: + self.__notifyEndpoints(self.__adps, self.__iadps) + # 关闭已经失效的连接 + for ep in adps: + if ep not in self.__adps: + adps[ep][1].terminate() + + def __notifyEndpoints(self, actives, inactives): + # self.__lock.acquire() + lock = LockGuard(self.__newLock) + self.__regAdapterProxyDict.clear() + self.__regAdapterProxyDict.update(actives) + self.__regAdapterProxyDict.update(inactives) + # self.__lock.release() + + def __getNextValidProxy(self): + ''' + @brief: 刷新本地缓存列表,如果服务下线了,要求删除本地缓存 + @return: + @rtype: EndPointInfo列表 + @todo: 优化负载均衡算法 + ''' + tarsLogger.debug('AdapterProxyManager:getNextValidProxy') + lock = LockGuard(self.__newLock) + if len(self.__adps) == 0: + raise TarsException("the activate adapter proxy is empty") + + sortedActivateAdp = sorted( + list(self.__adps.items()), key=lambda item: item[1][2]) + # self.refreshEndpoints() + # self.__lock.acquire() + sortedActivateAdpSize = len(sortedActivateAdp) + + while sortedActivateAdpSize != 0: + if sortedActivateAdp[0][1][1].checkActive(): + self.__adps[sortedActivateAdp[0][0]][2] += 1 + # 返回的是 adapterProxy + return self.__adps[sortedActivateAdp[0][0]][1] + sortedActivateAdp.pop(0) + sortedActivateAdpSize -= 1 + # 随机重连一个可用节点 + adpPrx = list(self.__adps.items())[random.randint( + 0, len(self.__adps))][1][1] + adpPrx.checkActive() + return None + # self.__lock.release() + + def __getHashProxy(self, reqmsg): + if self.__weightType == EndpointWeightType.E_LOOP: + if reqmsg.isConHash: + return self.__getConHashProxyForNormal(reqmsg.hashCode) + else: + return self.__getHashProxyForNormal(reqmsg.hashCode) + else: + if reqmsg.isConHash: + return self.__getConHashProxyForWeight(reqmsg.hashCode) + else: + return self.__getHashProxyForWeight(reqmsg.hashCode) + + def __getHashProxyForNormal(self, hashCode): + tarsLogger.debug('AdapterProxyManager:getHashProxyForNormal') + # self.__lock.acquire() + lock = LockGuard(self.__newLock) + regAdapterProxyList = sorted( + list(self.__regAdapterProxyDict.items()), key=lambda item: item[0]) + + allPrxSize = len(regAdapterProxyList) + if allPrxSize == 0: + raise TarsException("the adapter proxy is empty") + hashNum = hashCode % allPrxSize + + if regAdapterProxyList[hashNum][1][1].activatestateinreg and regAdapterProxyList[hashNum][1][1].checkActive(): + epstr = regAdapterProxyList[hashNum][0] + self.__regAdapterProxyDict[epstr][2] += 1 + if epstr in self.__adps: + self.__adps[epstr][2] += 1 + elif epstr in self.__iadps: + self.__iadps[epstr][2] += 1 + return self.__regAdapterProxyDict[epstr][1] + else: + if len(self.__adps) == 0: + raise TarsException("the activate adapter proxy is empty") + activeProxyList = list(self.__adps.items()) + actPrxSize = len(activeProxyList) + while actPrxSize != 0: + hashNum = hashCode % actPrxSize + if activeProxyList[hashNum][1][1].checkActive(): + self.__adps[activeProxyList[hashNum][0]][2] += 1 + return self.__adps[activeProxyList[hashNum][0]][1] + activeProxyList.pop(hashNum) + actPrxSize -= 1 + # 随机重连一个可用节点 + adpPrx = list(self.__adps.items())[random.randint( + 0, len(self.__adps))][1][1] + adpPrx.checkActive() + return None + + def __getConHashProxyForNormal(self, hashCode): + tarsLogger.debug('AdapterProxyManager:getConHashProxyForNormal') + lock = LockGuard(self.__newLock) + if len(self.__regAdapterProxyDict) == 0: + raise TarsException("the adapter proxy is empty") + if self.__consistentHashWeight is None or self.__checkConHashChange(self.__lastConHashPrxList): + self.__updateConHashProxyWeighted() + + if len(self.__consistentHashWeight.nodes) > 0: + conHashIndex = self.__consistentHashWeight.getNode(hashCode) + if conHashIndex in self.__regAdapterProxyDict and self.__regAdapterProxyDict[conHashIndex][1].activatestateinreg and self.__regAdapterProxyDict[conHashIndex][1].checkActive(): + self.__regAdapterProxyDict[conHashIndex][2] += 1 + if conHashIndex in self.__adps: + self.__adps[conHashIndex][2] += 1 + elif conHashIndex in self.__iadps: + self.__iadps[conHashIndex][2] += 1 + return self.__regAdapterProxyDict[conHashIndex][1] + else: + if len(self.__adps) == 0: + raise TarsException("the activate adapter proxy is empty") + activeProxyList = list(self.__adps.items()) + actPrxSize = len(activeProxyList) + while actPrxSize != 0: + hashNum = hashCode % actPrxSize + if activeProxyList[hashNum][1][1].checkActive(): + self.__adps[activeProxyList[hashNum][0]][2] += 1 + return self.__adps[activeProxyList[hashNum][0]][1] + activeProxyList.pop(hashNum) + actPrxSize -= 1 + # 随机重连一个可用节点 + adpPrx = list(self.__adps.items())[random.randint( + 0, len(self.__adps))][1][1] + adpPrx.checkActive() + return None + pass + else: + return self.__getHashProxyForNormal(hashCode) + + def __getHashProxyForWeight(self, hashCode): + return None + pass + + def __getConHashProxyForWeight(self, hashCode): + return None + pass + + def __checkConHashChange(self, lastConHashPrxList): + tarsLogger.debug('AdapterProxyManager:checkConHashChange') + lock = LockGuard(self.__newLock) + if len(lastConHashPrxList) != len(self.__regAdapterProxyDict): + return True + regAdapterProxyList = sorted( + list(self.__regAdapterProxyDict.items()), key=lambda item: item[0]) + regAdapterProxyListSize = len(regAdapterProxyList) + for index in range(regAdapterProxyListSize): + if cmp(lastConHashPrxList[index][0], regAdapterProxyList[index][0]) != 0: + return True + return False + + def __updateConHashProxyWeighted(self): + tarsLogger.debug('AdapterProxyManager:updateConHashProxyWeighted') + lock = LockGuard(self.__newLock) + if len(self.__regAdapterProxyDict) == 0: + raise TarsException("the adapter proxy is empty") + self.__lastConHashPrxList = sorted( + list(self.__regAdapterProxyDict.items()), key=lambda item: item[0]) + nodes = [] + for var in self.__lastConHashPrxList: + nodes.append(var[0]) + if self.__consistentHashWeight is None: + self.__consistentHashWeight = ConsistentHashNew(nodes) + else: + theOldActiveNodes = [ + var for var in nodes if var in self.__consistentHashWeight.nodes] + + theOldInactiveNodes = [ + var for var in self.__consistentHashWeight.nodes if var not in theOldActiveNodes] + for var in theOldInactiveNodes: + self.__consistentHashWeight.removeNode(var) + + theNewActiveNodes = [ + var for var in nodes if var not in theOldActiveNodes] + for var in theNewActiveNodes: + self.__consistentHashWeight.addNode(var) + + self.__consistentHashWeight.nodes = nodes + pass + + def __getWeightedProxy(self): + tarsLogger.debug('AdapterProxyManager:getWeightedProxy') + lock = LockGuard(self.__newLock) + if len(self.__adps) == 0: + raise TarsException("the activate adapter proxy is empty") + + if self.__update is True: + self.__lastWeightedProxyData.clear() + weightedProxyData = {} + minWeight = (list(self.__adps.items())[0][1][0]).getWeight() + for item in list(self.__adps.items()): + weight = (item[1][0].getWeight()) + weightedProxyData[item[0]] = (weight) + if minWeight > weight: + minWeight = weight + + if minWeight <= 0: + addWeight = -minWeight + 1 + for item in list(weightedProxyData.items()): + item[1] += addWeight + + self.__update = False + self.__lastWeightedProxyData = weightedProxyData + + weightedProxyData = self.__lastWeightedProxyData + while len(weightedProxyData) > 0: + total = sum(weightedProxyData.values()) + rand = random.randint(1, total) + temp = 0 + for item in list(weightedProxyData.items()): + temp += item[1] + if rand <= temp: + if self.__adps[item[0]][1].checkActive(): + self.__adps[item[0]][2] += 1 + return self.__adps[item[0]][1] + else: + weightedProxyData.pop(item[0]) + break + # 没有一个活跃的节点 + # 随机重连一个可用节点 + adpPrx = list(self.__adps.items())[random.randint( + 0, len(self.__adps))][1][1] + adpPrx.checkActive() + return None + + def selectAdapterProxy(self, reqmsg): + ''' + @brief: 刷新本地缓存列表,如果服务下线了,要求删除本地缓存,通过一定算法返回AdapterProxy + @param: reqmsg:请求响应报文 + @type reqmsg: ReqMessage + @return: + @rtype: EndPointInfo列表 + @todo: 优化负载均衡算法 + ''' + tarsLogger.debug('AdapterProxyManager:selectAdapterProxy') + self.refreshEndpoints() + if reqmsg.isHash: + return self.__getHashProxy(reqmsg) + else: + if self.__weightType == EndpointWeightType.E_STATIC_WEIGHT: + return self.__getWeightedProxy() + else: + return self.__getNextValidProxy() diff --git a/danmu/danmaku/tars/__async.py b/danmu/danmaku/tars/__async.py new file mode 100644 index 0000000..2964890 --- /dev/null +++ b/danmu/danmaku/tars/__async.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __rpc.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +''' +@version: 0.01 +@brief: 异步rpc实现 +''' + +import threading +import Queue +from __logger import tarsLogger +from __packet import ResponsePacket +from __servantproxy import ServantProxy + + +class AsyncProcThread: + ''' + @brief: 异步调用线程管理类 + ''' + + def __init__(self): + tarsLogger.debug('AsyncProcThread:__init__') + self.__initialize = False + self.__runners = [] + self.__queue = None + self.__nrunner = 0 + self.__popTimeout = 0.1 + + def __del__(self): + tarsLogger.debug('AsyncProcThread:__del__') + + def initialize(self, nrunner=3): + ''' + @brief: 使用AsyncProcThread前必须先调用此函数 + @param nrunner: 异步线程个数 + @type nrunner: int + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThread:initialize') + if self.__initialize: + return + self.__nrunner = nrunner + self.__queue = Queue.Queue() + self.__initialize = True + + def terminate(self): + ''' + @brief: 关闭所有异步线程 + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThread:terminate') + + for runner in self.__runners: + runner.terminate() + + for runner in self.__runners: + runner.join() + self.__runners = [] + + def put(self, reqmsg): + ''' + @brief: 处理数据入队列 + @param reqmsg: 待处理数据 + @type reqmsg: ReqMessage + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThread:put') + # 异步请求超时 + if not reqmsg.response: + reqmsg.response = ResponsePacket() + reqmsg.response.iVerson = reqmsg.request.iVerson + reqmsg.response.cPacketType = reqmsg.request.cPacketType + reqmsg.response.iRequestId = reqmsg.request.iRequestId + reqmsg.response.iRet = ServantProxy.TARSASYNCCALLTIMEOUT + + self.__queue.put(reqmsg) + + def pop(self): + ''' + @brief: 处理数据出队列 + @return: ReqMessage + @rtype: ReqMessage + ''' + # tarsLogger.debug('AsyncProcThread:pop') + ret = None + try: + ret = self.__queue.get(True, self.__popTimeout) + except Queue.Empty: + pass + return ret + + def start(self): + ''' + @brief: 启动异步线程 + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThread:start') + for i in xrange(self.__nrunner): + runner = AsyncProcThreadRunner() + runner.initialize(self) + runner.start() + self.__runners.append(runner) + + +class AsyncProcThreadRunner(threading.Thread): + ''' + @brief: 异步调用线程 + ''' + + def __init__(self): + tarsLogger.debug('AsyncProcThreadRunner:__init__') + super(AsyncProcThreadRunner, self).__init__() + # threading.Thread.__init__(self) + self.__terminate = False + self.__initialize = False + self.__procQueue = None + + def __del__(self): + tarsLogger.debug('AsyncProcThreadRunner:__del__') + + def initialize(self, queue): + ''' + @brief: 使用AsyncProcThreadRunner前必须调用此函数 + @param queue: 有pop()的类,用于提取待处理数据 + @type queue: AsyncProcThread + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThreadRunner:initialize') + self.__procQueue = queue + + def terminate(self): + ''' + @brief: 关闭线程 + @return: None + @rtype: None + ''' + tarsLogger.debug('AsyncProcThreadRunner:terminate') + self.__terminate = True + + def run(self): + ''' + @brief: 线程启动函数,执行异步调用 + ''' + tarsLogger.debug('AsyncProcThreadRunner:run') + while not self.__terminate: + if self.__terminate: + break + reqmsg = self.__procQueue.pop() + if not reqmsg or not reqmsg.callback: + continue + + if reqmsg.adapter: + succ = reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS + reqmsg.adapter.finishInvoke(succ) + + try: + reqmsg.callback.onDispatch(reqmsg) + except Exception, msg: + tarsLogger.error('AsyncProcThread excepttion: %s', msg) + + tarsLogger.debug('AsyncProcThreadRunner:run finished') + + +class ServantProxyCallback(object): + ''' + @brief: 异步回调对象基类 + ''' + + def __init__(self): + tarsLogger.debug('ServantProxyCallback:__init__') + + def onDispatch(reqmsg): + ''' + @brief: 分配响应报文到对应的回调函数 + @param queue: 有pop()的类,用于提取待处理数据 + @type queue: AsyncProcThread + @return: None + @rtype: None + ''' + raise NotImplementedError() diff --git a/danmu/danmaku/tars/__init__.py b/danmu/danmaku/tars/__init__.py new file mode 100644 index 0000000..65dcb9f --- /dev/null +++ b/danmu/danmaku/tars/__init__.py @@ -0,0 +1,85 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +__version__ = "0.0.1" + +from .__util import util +from .__tars import TarsInputStream +from .__tars import TarsOutputStream +from .__tup import TarsUniPacket + + +class tarscore: + class TarsInputStream(TarsInputStream): + pass + + class TarsOutputStream(TarsOutputStream): + pass + + class TarsUniPacket(TarsUniPacket): + pass + + class boolean(util.boolean): + pass + + class int8(util.int8): + pass + + class uint8(util.uint8): + pass + + class int16(util.int16): + pass + + class uint16(util.uint16): + pass + + class int32(util.int32): + pass + + class uint32(util.uint32): + pass + + class int64(util.int64): + pass + + class float(util.float): + pass + + class double(util.double): + pass + + class bytes(util.bytes): + pass + + class string(util.string): + pass + + class struct(util.struct): + pass + + @staticmethod + def mapclass(ktype, vtype): return util.mapclass(ktype, vtype) + + @staticmethod + def vctclass(vtype): return util.vectorclass(vtype) + + @staticmethod + def printHex(buff): util.printHex(buff) + diff --git a/danmu/danmaku/tars/__logger.py b/danmu/danmaku/tars/__logger.py new file mode 100644 index 0000000..88b5275 --- /dev/null +++ b/danmu/danmaku/tars/__logger.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# filename: __logger.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +''' +@version: 0.01 +@brief: 日志模块 +''' + +# 仅用于调试 + +import logging +from logging.handlers import RotatingFileHandler +import os +import re + +tarsLogger = logging.getLogger('TARS client') +strToLoggingLevel = { + "critical": logging.CRITICAL, + "error": logging.ERROR, + "warn": logging.WARNING, + "info": logging.INFO, + "debug": logging.DEBUG, + "none": logging.NOTSET +} +#console = logging.StreamHandler() +# console.setLevel(logging.DEBUG) +#filelog = logging.FileHandler('tars.log') +# filelog.setLevel(logging.DEBUG) +#formatter = logging.Formatter('%(asctime)s | %(levelname)8s | [%(name)s] %(message)s', '%Y-%m-%d %H:%M:%S') +# console.setFormatter(formatter) +# filelog.setFormatter(formatter) +# tarsLogger.addHandler(console) +# tarsLogger.addHandler(filelog) +# tarsLogger.setLevel(logging.DEBUG) +# tarsLogger.setLevel(logging.INFO) +# tarsLogger.setLevel(logging.ERROR) + + +def createLogFile(filename): + if filename.endswith('/'): + raise ValueError("The logfile is a dir not a file") + if os.path.exists(filename) and os.path.isfile(filename): + pass + else: + fileComposition = str.split(filename, '/') + print(fileComposition) + currentFile = '' + for item in fileComposition: + if item == fileComposition[-1]: + currentFile += item + if not os.path.exists(currentFile) or not os.path.isfile(currentFile): + while True: + try: + os.mknod(currentFile) + break + except OSError as msg: + errno = re.findall(r"\d+", str(msg)) + if len(errno) > 0 and errno[0] == '17': + currentFile += '.log' + continue + break + currentFile += (item + '/') + if not os.path.exists(currentFile): + os.mkdir(currentFile) + + +def initLog(logpath, logsize, lognum, loglevel): + createLogFile(logpath) + handler = RotatingFileHandler(filename=logpath, maxBytes=logsize, + backupCount=lognum) + formatter = logging.Formatter( + '%(asctime)s | %(levelname)6s | [%(filename)18s:%(lineno)4d] | [%(thread)d] %(message)s', '%Y-%m-%d %H:%M:%S') + handler.setFormatter(formatter) + tarsLogger.addHandler(handler) + if loglevel in strToLoggingLevel: + tarsLogger.setLevel(strToLoggingLevel[loglevel]) + else: + tarsLogger.setLevel(strToLoggingLevel["error"]) + + +if __name__ == '__main__': + tarsLogger.debug('debug log') + tarsLogger.info('info log') + tarsLogger.warning('warning log') + tarsLogger.error('error log') + tarsLogger.critical('critical log') diff --git a/danmu/danmaku/tars/__packet.py b/danmu/danmaku/tars/__packet.py new file mode 100644 index 0000000..91ca184 --- /dev/null +++ b/danmu/danmaku/tars/__packet.py @@ -0,0 +1,104 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + + +from .__util import util + + +class RequestPacket(util.struct): + mapcls_context = util.mapclass(util.string, util.string) + mapcls_status = util.mapclass(util.string, util.string) + + def __init__(self): + self.iVersion = 0 + self.cPacketType = 0 + self.iMessageType = 0 + self.iRequestId = 0 + self.sServantName = '' + self.sFuncName = '' + self.sBuffer = bytes() + self.iTimeout = 0 + self.context = RequestPacket.mapcls_context() + self.status = RequestPacket.mapcls_status() + + @staticmethod + def writeTo(oos, value): + oos.write(util.int16, 1, value.iVersion) + oos.write(util.int8, 2, value.cPacketType) + oos.write(util.int32, 3, value.iMessageType) + oos.write(util.int32, 4, value.iRequestId) + oos.write(util.string, 5, value.sServantName) + oos.write(util.string, 6, value.sFuncName) + oos.write(util.bytes, 7, value.sBuffer) + oos.write(util.int32, 8, value.iTimeout) + oos.write(RequestPacket.mapcls_context, 9, value.context) + oos.write(RequestPacket.mapcls_status, 10, value.status) + + @staticmethod + def readFrom(ios): + value = RequestPacket() + value.iVersion = ios.read(util.int16, 1, True, 0) + print(("iVersion = %d" % value.iVersion)) + value.cPacketType = ios.read(util.int8, 2, True, 0) + print(("cPackerType = %d" % value.cPacketType)) + value.iMessageType = ios.read(util.int32, 3, True, 0) + print(("iMessageType = %d" % value.iMessageType)) + value.iRequestId = ios.read(util.int32, 4, True, 0) + print(("iRequestId = %d" % value.iRequestId)) + value.sServantName = ios.read(util.string, 5, True, '22222222') + value.sFuncName = ios.read(util.string, 6, True, '') + value.sBuffer = ios.read(util.bytes, 7, True, value.sBuffer) + value.iTimeout = ios.read(util.int32, 8, True, 0) + value.context = ios.read( + RequestPacket.mapcls_context, 9, True, value.context) + value.status = ios.read( + RequestPacket.mapcls_status, 10, True, value.status) + return value + + +class ResponsePacket(util.struct): + __tars_class__ = "tars.RpcMessage.ResponsePacket" + mapcls_status = util.mapclass(util.string, util.string) + + def __init__(self): + self.iVersion = 0 + self.cPacketType = 0 + self.iRequestId = 0 + self.iMessageType = 0 + self.iRet = 0 + self.sBuffer = bytes() + self.status = RequestPacket.mapcls_status() + + @staticmethod + def writeTo(oos, value): + oos.write(util.int16, 1, value.iVersion) + oos.write(util.int8, 2, value.cPacketType) + oos.write(util.int32, 3, value.iRequestId) + oos.write(util.int32, 4, value.iMessageType) + oos.write(util.int32, 5, value.iRet) + oos.write(util.bytes, 6, value.sBuffer) + oos.write(value.mapcls_status, 7, value.status) + + @staticmethod + def readFrom(ios): + value = ResponsePacket() + value.iVersion = ios.read(util.int16, 1, True) + value.cPacketType = ios.read(util.int8, 2, True) + value.iRequestId = ios.read(util.int32, 3, True) + value.iMessageType = ios.read(util.int32, 4, True) + value.iRet = ios.read(util.int32, 5, True) + value.sBuffer = ios.read(util.bytes, 6, True) + value.status = ios.read(value.mapcls_status, 7, True) + return value diff --git a/danmu/danmaku/tars/__rpc.py b/danmu/danmaku/tars/__rpc.py new file mode 100644 index 0000000..f7568f2 --- /dev/null +++ b/danmu/danmaku/tars/__rpc.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __rpc.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + + +''' +@version: 0.01 +@brief: rpc调用逻辑实现 +''' + +import time +import argparse + +from .__logger import tarsLogger +from .__logger import initLog +from .__trans import EndPointInfo +from .__TimeoutQueue import TimeoutQueue +from .__TimeoutQueue import QueueTimeout +from .__trans import FDReactor +from .__adapterproxy import AdapterProxyManager +from .__servantproxy import ServantProxy +from .exception import (TarsException) +from .__async import AsyncProcThread + + +class Communicator: + ''' + @brief: 通讯器,创建和维护ServantProxy、ObjectProxy、FDReactor线程和超时线程 + ''' + default_config = {'tars': + {'application': + {'client': + {'async-invoke-timeout': 20000, + 'asyncthread': 0, + 'locator': '', + 'loglevel': 'error', + 'logpath': 'tars.log', + 'logsize': 15728640, + 'lognum': 0, + 'refresh-endpoint-interval': 60000, + 'sync-invoke-timeout': 5000}}}} + + def __init__(self, config={}): + tarsLogger.debug('Communicator:__init__') + self.__terminate = False + self.__initialize = False + self.__objects = {} + self.__servants = {} + self.__reactor = None + self.__qTimeout = None + self.__asyncProc = None + self.__config = Communicator.default_config.copy() + self.__config.update(config) + self.initialize() + + def __del__(self): + tarsLogger.debug('Communicator:__del__') + + def initialize(self): + ''' + @brief: 使用通讯器前必须先调用此函数 + ''' + tarsLogger.debug('Communicator:initialize') + if self.__initialize: + return + logpath = self.getProperty('logpath') + logsize = self.getProperty('logsize', int) + lognum = self.getProperty('lognum', int) + loglevel = self.getProperty('loglevel') + initLog(logpath, logsize, lognum, loglevel) + + self.__reactor = FDReactor() + self.__reactor.initialize() + self.__reactor.start() + + self.__qTimeout = QueueTimeout() + self.__qTimeout.setHandler(self.handleTimeout) + self.__qTimeout.start() + + async_num = self.getProperty('asyncthread', int) + self.__asyncProc = AsyncProcThread() + self.__asyncProc.initialize(async_num) + self.__asyncProc.start() + + self.__initialize = True + + def terminate(self): + ''' + @brief: 不再使用通讯器需调用此函数释放资源 + ''' + tarsLogger.debug('Communicator:terminate') + + if not self.__initialize: + return + + self.__reactor.terminate() + self.__qTimeout.terminate() + self.__asyncProc.terminate() + + for objName in self.__servants: + self.__servants[objName]._terminate() + + for objName in self.__objects: + self.__objects[objName].terminate() + + self.__objects = {} + self.__servants = {} + self.__reactor = None + self.__initialize = False + + def parseConnAddr(self, connAddr): + ''' + @brief: 解析connAddr字符串 + @param connAddr: 连接地址 + @type connAddr: str + @return: 解析结果 + @rtype: dict, key是str,val里name是str, + timeout是float,endpoint是EndPointInfo的list + ''' + tarsLogger.debug('Communicator:parseConnAddr') + connAddr = connAddr.strip() + connInfo = { + 'name': '', + 'timeout': -1, + 'endpoint': [] + } + if '@' not in connAddr: + connInfo['name'] = connAddr + return connInfo + + try: + tks = connAddr.split('@') + connInfo['name'] = tks[0] + tks = tks[1].lower().split(':') + parser = argparse.ArgumentParser(add_help=False) + parser.add_argument('-h') + parser.add_argument('-p') + parser.add_argument('-t') + for tk in tks: + argv = tk.split() + if argv[0] != 'tcp': + raise TarsException( + 'unsupport transmission protocal : %s' % connInfo['name']) + mes = parser.parse_args(argv[1:]) + try: + ip = mes.h if mes.h is not None else '' + port = int(mes.p) if mes.p is not None else '-1' + timeout = int(mes.t) if mes.t is not None else '-1' + connInfo['endpoint'].append( + EndPointInfo(ip, port, timeout)) + except Exception: + raise TarsException('Unrecognized option : %s' % mes) + except TarsException: + raise + + except Exception as exp: + raise TarsException(exp) + + return connInfo + + def getReactor(self): + ''' + @brief: 获取reactor + ''' + return self.__reactor + + def getAsyncProc(self): + ''' + @brief: 获取asyncProc + ''' + return self.__asyncProc + + def getProperty(self, name, dt_type=str): + ''' + @brief: 获取配置 + @param name: 配置名称 + @type name: str + @param dt_type: 数据类型 + @type name: type + @return: 配置内容 + @rtype: str + ''' + try: + ret = self.__config['tars']['application']['client'][name] + ret = dt_type(ret) + except: + ret = Communicator.default_config['tars']['application']['client'][name] + + return ret + + def setProperty(self, name, value): + ''' + @brief: 修改配置 + @param name: 配置名称 + @type propertys: str + @param value: 配置内容 + @type propertys: str + @return: 设置是否成功 + @rtype: bool + ''' + try: + self.__config['tars']['application']['client'][name] = value + return True + except: + return False + + def setPropertys(self, propertys): + ''' + @brief: 修改配置 + @param propertys: 配置集合 + @type propertys: map, key type: str, value type: str + @return: 无 + @rtype: None + ''' + pass + + def updateConfig(self): + ''' + @brief: 重新设置配置 + ''' + + def stringToProxy(self, servantProxy, connAddr): + ''' + @brief: 初始化ServantProxy + @param connAddr: 服务器地址信息 + @type connAddr: str + @param servant: servant proxy + @type servant: ServantProxy子类 + @return: 无 + @rtype: None + @note: 如果connAddr的ServantObj连接过,返回连接过的ServantProxy + 如果没有连接过,用参数servant初始化,返回servant + ''' + tarsLogger.debug('Communicator:stringToProxy') + + connInfo = self.parseConnAddr(connAddr) + objName = connInfo['name'] + if objName in self.__servants: + return self.__servants[objName] + + objectPrx = ObjectProxy() + objectPrx.initialize(self, connInfo) + + servantPrx = servantProxy() + servantPrx._initialize(self.__reactor, objectPrx) + self.__objects[objName] = objectPrx + self.__servants[objName] = servantPrx + return servantPrx + + def handleTimeout(self): + ''' + @brief: 处理超时事件 + @return: 无 + @rtype: None + ''' + # tarsLogger.debug('Communicator:handleTimeout') + for obj in self.__objects.values(): + obj.handleQueueTimeout() + + +class ObjectProxy: + ''' + @brief: 一个object name在一个Communicator里有一个objectproxy + 管理收发的消息队列 + ''' + DEFAULT_TIMEOUT = 3.0 + + def __init__(self): + tarsLogger.debug('ObjectProxy:__init__') + self.__name = '' + self.__timeout = ObjectProxy.DEFAULT_TIMEOUT + self.__comm = None + self.__epi = None + self.__adpmanager = None + self.__timeoutQueue = None + # self.__adapter = None + self.__initialize = False + + def __del__(self): + tarsLogger.debug('ObjectProxy:__del__') + + def initialize(self, comm, connInfo): + ''' + @brief: 初始化,使用ObjectProxy前必须调用 + @param comm: 通讯器 + @type comm: Communicator + @param connInfo: 连接信息 + @type comm: dict + @return: None + @rtype: None + ''' + if self.__initialize: + return + tarsLogger.debug('ObjectProxy:initialize') + self.__comm = comm + # async-invoke-timeout来设置队列时间 + async_timeout = self.__comm.getProperty( + 'async-invoke-timeout', float) / 1000 + self.__timeoutQueue = TimeoutQueue(async_timeout) + + self.__name = connInfo['name'] + + self.__timeout = self.__comm.getProperty( + 'sync-invoke-timeout', float) / 1000 + + # 通过Communicator的配置设置超时 + # 不再通过连接信息的-t来设置 + # if connInfo['timeout'] != -1: + # self.__timeout = connInfo['timeout'] + eplist = connInfo['endpoint'] + + self.__adpmanager = AdapterProxyManager() + self.__adpmanager.initialize(comm, self, eplist) + + self.__initialize = True + + def terminate(self): + ''' + @brief: 回收资源,不再使用ObjectProxy时调用 + @return: None + @rtype: None + ''' + tarsLogger.debug('ObjectProxy:terminate') + self.__timeoutQueue = None + self.__adpmanager.terminate() + self.__initialize = False + + def name(self): + ''' + @brief: 获取object name + @return: object name + @rtype: str + ''' + return self.__name + + # def setTimeout(self, timeout): + # ''' + # @brief: 设置超时 + # @param timeout: 超时时间,单位为s + # @type timeout: float + # @return: None + # @rtype: None + # ''' + # self.__timeout = timeout + # self.__timeoutQueue.setTimeout(timeout) + + def timeout(self): + ''' + @brief: 获取超时时间 + @return: 超时时间,单位为s + @rtype: float + ''' + return self.__timeout + + def getTimeoutQueue(self): + ''' + @brief: 获取超时队列 + @return: 超时队列 + @rtype: TimeoutQueue + ''' + return self.__timeoutQueue + + def handleQueueTimeout(self): + ''' + @brief: 超时事件发生时处理超时事务 + @return: None + @rtype: None + ''' + # tarsLogger.debug('ObjectProxy:handleQueueTimeout') + self.__timeoutQueue.timeout() + + def invoke(self, reqmsg): + ''' + @brief: 远程过程调用 + @param reqmsg: 请求响应报文 + @type reqmsg: ReqMessage + @return: 错误码 + @rtype: + ''' + tarsLogger.debug('ObjectProxy:invoke, objname: %s, func: %s', + self.__name, reqmsg.request.sFuncName) + # 负载均衡 + # adapter = self.__adpmanager.getNextValidProxy() + adapter = self.__adpmanager.selectAdapterProxy(reqmsg) + if not adapter: + tarsLogger.error("invoke %s, select adapter proxy return None", + self.__name) + return -2 + + adapter.checkActive(True) + reqmsg.adapter = adapter + return adapter.invoke(reqmsg) + + # 弹出请求报文 + def popRequest(self): + ''' + @brief: 返回消息队列里的请求响应报文,FIFO + 不删除TimeoutQueue里的数据,响应时要用 + @return: 请求响应报文 + @rtype: ReqMessage + ''' + return self.__timeoutQueue.pop(erase=False) + + +if __name__ == '__main__': + connAddr = "apptest.lightServer.lightServantObj@tcp -h 10.130.64.220 -p 10001 -t 10000" + connAddr = 'MTT.BookMarksUnifyServer.BookMarksUnifyObj@tcp -h 172.17.149.77 -t 60000 -p 10023' + comm = Communicator() + comm.initialize() + servant = ServantProxy() + servant = comm.stringToProxy(connAddr, servant) + print(servant.tars_timeout()) + try: + rsp = servant.tars_invoke( + ServantProxy.TARSNORMAL, "test", '', ServantProxy.mapcls_context(), None) + print('Servant invoke success, request id: %d, iRet: %d' % ( + rsp.iRequestId, rsp.iRet)) + except Exception as msg: + print(msg) + finally: + servant.tars_terminate() + time.sleep(2) + print('app closing ...') + comm.terminate() + time.sleep(2) + print('cpp closed') diff --git a/danmu/danmaku/tars/__servantproxy.py b/danmu/danmaku/tars/__servantproxy.py new file mode 100644 index 0000000..5baeef3 --- /dev/null +++ b/danmu/danmaku/tars/__servantproxy.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __servantproxy.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + + +''' +@version: 0.01 +@brief: rpc抽离出servantproxy +''' +import threading +import time + +from __logger import tarsLogger +from __util import util +from __packet import RequestPacket +# from __packet import ResponsePacket +from __TimeoutQueue import ReqMessage +import exception +from exception import TarsException + + +class ServantProxy(object): + ''' + @brief: 1、远程对象的本地代理 + 2、同名servant在一个通信器中最多只有一个实例 + 3、防止和用户在Tars中定义的函数名冲突,接口以tars_开头 + ''' + + # 服务器响应的错误码 + TARSSERVERSUCCESS = 0 # 服务器端处理成功 + TARSSERVERDECODEERR = -1 # 服务器端解码异常 + TARSSERVERENCODEERR = -2 # 服务器端编码异常 + TARSSERVERNOFUNCERR = -3 # 服务器端没有该函数 + TARSSERVERNOSERVANTERR = -4 # 服务器端五该Servant对象 + TARSSERVERRESETGRID = -5 # 服务器端灰度状态不一致 + TARSSERVERQUEUETIMEOUT = -6 # 服务器队列超过限制 + TARSASYNCCALLTIMEOUT = -7 # 异步调用超时 + TARSPROXYCONNECTERR = -8 # proxy链接异常 + TARSSERVERUNKNOWNERR = -99 # 服务器端未知异常 + + TARSVERSION = 1 + TUPVERSION = 2 + TUPVERSION2 = 3 + + TARSNORMAL = 0 + TARSONEWAY = 1 + + TARSMESSAGETYPENULL = 0 + TARSMESSAGETYPEHASH = 1 + TARSMESSAGETYPEGRID = 2 + TARSMESSAGETYPEDYED = 4 + TARSMESSAGETYPESAMPLE = 8 + TARSMESSAGETYPEASYNC = 16 + + mapcls_context = util.mapclass(util.string, util.string) + + def __init__(self): + tarsLogger.debug('ServantProxy:__init__') + self.__reactor = None + self.__object = None + self.__initialize = False + + def __del__(self): + tarsLogger.debug('ServantProxy:__del__') + + def _initialize(self, reactor, obj): + ''' + @brief: 初始化函数,需要调用才能使用ServantProxy + @param reactor: 网络管理的reactor实例 + @type reactor: FDReactor + @return: None + @rtype: None + ''' + tarsLogger.debug('ServantProxy:_initialize') + + assert(reactor and obj) + if self.__initialize: + return + self.__reactor = reactor + self.__object = obj + self.__initialize = True + + def _terminate(self): + ''' + @brief: 不再使用ServantProxy时调用,会释放相应资源 + @return: None + @rtype: None + ''' + tarsLogger.debug('ServantProxy:_terminate') + self.__object = None + self.__reactor = None + self.__initialize = False + + def tars_name(self): + ''' + @brief: 获取ServantProxy的名字 + @return: ServantProxy的名字 + @rtype: str + ''' + return self.__object.name() + + def tars_timeout(self): + ''' + @brief: 获取超时时间,单位是ms + @return: 超时时间 + @rtype: int + ''' + # 默认的为3S = ObjectProxy.DEFAULT_TIMEOUT + return int(self.__timeout() * 1000) + + def tars_ping(self): + pass + + # def tars_initialize(self): + # pass + + # def tars_terminate(self): + # pass + + def tars_invoke(self, cPacketType, sFuncName, sBuffer, context, status): + ''' + @brief: TARS协议同步方法调用 + @param cPacketType: 请求包类型 + @type cPacketType: int + @param sFuncName: 调用函数名 + @type sFuncName: str + @param sBuffer: 序列化后的发送参数 + @type sBuffer: str + @param context: 上下文件信息 + @type context: ServantProxy.mapcls_context + @param status: 状态信息 + @type status: + @return: 响应报文 + @rtype: ResponsePacket + ''' + tarsLogger.debug('ServantProxy:tars_invoke, func: %s', sFuncName) + req = RequestPacket() + req.iVersion = ServantProxy.TARSVERSION + req.cPacketType = cPacketType + req.iMessageType = ServantProxy.TARSMESSAGETYPENULL + req.iRequestId = 0 + req.sServantName = self.tars_name() + req.sFuncName = sFuncName + req.sBuffer = sBuffer + req.iTimeout = self.tars_timeout() + + reqmsg = ReqMessage() + reqmsg.type = ReqMessage.SYNC_CALL + reqmsg.servant = self + reqmsg.lock = threading.Condition() + reqmsg.request = req + reqmsg.begtime = time.time() + # # test + reqmsg.isHash = True + reqmsg.isConHash = True + reqmsg.hashCode = 123456 + + rsp = None + try: + rsp = self.__invoke(reqmsg) + except exception.TarsSyncCallTimeoutException: + if reqmsg.adapter: + reqmsg.adapter.finishInvoke(True) + raise + except TarsException: + raise + except: + raise TarsException('ServantProxy::tars_invoke excpetion') + + if reqmsg.adapter: + reqmsg.adapter.finishInvoke(False) + + return rsp + + def tars_invoke_async(self, cPacketType, sFuncName, sBuffer, + context, status, callback): + ''' + @brief: TARS协议同步方法调用 + @param cPacketType: 请求包类型 + @type cPacketType: int + @param sFuncName: 调用函数名 + @type sFuncName: str + @param sBuffer: 序列化后的发送参数 + @type sBuffer: str + @param context: 上下文件信息 + @type context: ServantProxy.mapcls_context + @param status: 状态信息 + @type status: + @param callback: 异步调用回调对象 + @type callback: ServantProxyCallback的子类 + @return: 响应报文 + @rtype: ResponsePacket + ''' + tarsLogger.debug('ServantProxy:tars_invoke') + req = RequestPacket() + req.iVersion = ServantProxy.TARSVERSION + req.cPacketType = cPacketType if callback else ServantProxy.TARSONEWAY + req.iMessageType = ServantProxy.TARSMESSAGETYPENULL + req.iRequestId = 0 + req.sServantName = self.tars_name() + req.sFuncName = sFuncName + req.sBuffer = sBuffer + req.iTimeout = self.tars_timeout() + + reqmsg = ReqMessage() + reqmsg.type = ReqMessage.ASYNC_CALL if callback else ReqMessage.ONE_WAY + reqmsg.callback = callback + reqmsg.servant = self + reqmsg.request = req + reqmsg.begtime = time.time() + + rsp = None + try: + rsp = self.__invoke(reqmsg) + except TarsException: + raise + except Exception: + raise TarsException('ServantProxy::tars_invoke excpetion') + + if reqmsg.adapter: + reqmsg.adapter.finishInvoke(False) + + return rsp + + def __timeout(self): + ''' + @brief: 获取超时时间,单位是s + @return: 超时时间 + @rtype: float + ''' + return self.__object.timeout() + + def __invoke(self, reqmsg): + ''' + @brief: 远程过程调用 + @param reqmsg: 请求数据 + @type reqmsg: ReqMessage + @return: 调用成功或失败 + @rtype: bool + ''' + tarsLogger.debug('ServantProxy:invoke, func: %s', + reqmsg.request.sFuncName) + ret = self.__object.invoke(reqmsg) + if ret == -2: + errmsg = ('ServantProxy::invoke fail, no valid servant,' + + ' servant name : %s, function name : %s' % + (reqmsg.request.sServantName, + reqmsg.request.sFuncName)) + raise TarsException(errmsg) + if ret == -1: + errmsg = ('ServantProxy::invoke connect fail,' + + ' servant name : %s, function name : %s, adapter : %s' % + (reqmsg.request.sServantName, + reqmsg.request.sFuncName, + reqmsg.adapter.getEndPointInfo())) + raise TarsException(errmsg) + elif ret != 0: + errmsg = ('ServantProxy::invoke unknown fail, ' + + 'Servant name : %s, function name : %s' % + (reqmsg.request.sServantName, + reqmsg.request.sFuncName)) + raise TarsException(errmsg) + + if reqmsg.type == ReqMessage.SYNC_CALL: + reqmsg.lock.acquire() + reqmsg.lock.wait(self.__timeout()) + reqmsg.lock.release() + + if not reqmsg.response: + errmsg = ('ServantProxy::invoke timeout: %d, servant name' + ': %s, adapter: %s, request id: %d' % ( + self.tars_timeout(), + self.tars_name(), + reqmsg.adapter.trans().getEndPointInfo(), + reqmsg.request.iRequestId)) + raise exception.TarsSyncCallTimeoutException(errmsg) + elif reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS: + return reqmsg.response + else: + errmsg = 'servant name: %s, function name: %s' % ( + self.tars_name(), reqmsg.request.sFuncName) + self.tarsRaiseException(reqmsg.response.iRet, errmsg) + + def _finished(self, reqmsg): + ''' + @brief: 通知远程过程调用线程响应报文到了 + @param reqmsg: 请求响应报文 + @type reqmsg: ReqMessage + @return: 函数执行成功或失败 + @rtype: bool + ''' + tarsLogger.debug('ServantProxy:finished') + if not reqmsg.lock: + return False + reqmsg.lock.acquire() + reqmsg.lock.notifyAll() + reqmsg.lock.release() + return True + + def tarsRaiseException(self, errno, desc): + ''' + @brief: 服务器调用失败,根据服务端给的错误码抛出异常 + @param errno: 错误码 + @type errno: int + @param desc: 错误描述 + @type desc: str + @return: 没有返回值,函数会抛出异常 + @rtype: + ''' + if errno == ServantProxy.TARSSERVERSUCCESS: + return + + elif errno == ServantProxy.TARSSERVERDECODEERR: + raise exception.TarsServerDecodeException( + "server decode exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSSERVERENCODEERR: + raise exception.TarsServerEncodeException( + "server encode exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSSERVERNOFUNCERR: + raise exception.TarsServerNoFuncException( + "server function mismatch exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSSERVERNOSERVANTERR: + raise exception.TarsServerNoServantException( + "server servant mismatch exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSSERVERRESETGRID: + raise exception.TarsServerResetGridException( + "server reset grid exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSSERVERQUEUETIMEOUT: + raise exception.TarsServerQueueTimeoutException( + "server queue timeout exception: errno: %d, msg: %s" % (errno, desc)) + + elif errno == ServantProxy.TARSPROXYCONNECTERR: + raise exception.TarsServerQueueTimeoutException( + "server connection lost: errno: %d, msg: %s" % (errno, desc)) + + else: + raise exception.TarsServerUnknownException( + "server unknown exception: errno: %d, msg: %s" % (errno, desc)) diff --git a/danmu/danmaku/tars/__tars.py b/danmu/danmaku/tars/__tars.py new file mode 100644 index 0000000..520dc4a --- /dev/null +++ b/danmu/danmaku/tars/__tars.py @@ -0,0 +1,546 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +import struct +from .__util import util +from .exception import * + + +class BinBuffer: + def __init__(self, buff=bytes()): + self.buffer = buff + self.position = 0 + + def writeBuf(self, buff): + self.buffer += buff + + def getBuffer(self): + return self.buffer + + def length(self): + return len(self.buffer) + + +class DataHead: + EN_INT8 = 0 + EN_INT16 = 1 + EN_INT32 = 2 + EN_INT64 = 3 + EN_FLOAT = 4 + EN_DOUBLE = 5 + EN_STRING1 = 6 + EN_STRING4 = 7 + EN_MAP = 8 + EN_LIST = 9 + EN_STRUCTBEGIN = 10 + EN_STRUCTEND = 11 + EN_ZERO = 12 + EN_BYTES = 13 + + @staticmethod + def writeTo(buff, tag, vtype): + if tag < 15: + helper = (tag << 4) | vtype + buff.writeBuf(struct.pack('!B', helper)) + else: + helper = (0xF0 | vtype) << 8 | tag + buff.writeBuf(struct.pack('!H', helper)) + + +class TarsOutputStream(object): + def __init__(self): + self.__buffer = BinBuffer() + + def __writeBoolean(self, tag, value): + self.__writeInt8(tag, int(value)) + + def __writeInt8(self, tag, value): + if value == 0: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_ZERO) + else: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT8) + self.__buffer.writeBuf(struct.pack('!b', value)) + + def __writeInt16(self, tag, value): + if value >= -128 and value <= 127: + self.__writeInt8(tag, value) + else: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT16) + self.__buffer.writeBuf(struct.pack('!h', value)) + + def __writeInt32(self, tag, value): + if value >= -32768 and value <= 32767: + self.__writeInt16(tag, value) + else: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT32) + self.__buffer.writeBuf(struct.pack('!i', value)) + + def __writeInt64(self, tag, value): + if value >= (-2147483648) and value <= 2147483647: + self.__writeInt32(tag, value) + else: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_INT64) + self.__buffer.writeBuf(struct.pack('!q', value)) + + def __writeFloat(self, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_FLOAT) + self.__buffer.writeBuf(struct.pack('!f', value)) + + def __writeDouble(self, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_DOUBLE) + self.__buffer.writeBuf(struct.pack('!d', value)) + + def __writeString(self, tag, value): + length = len(value) + if length <= 255: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRING1) + self.__buffer.writeBuf(struct.pack('!B', length)) + self.__buffer.writeBuf(str.encode(value)) + else: + DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRING4) + self.__buffer.writeBuf(struct.pack('!I', length)) + self.__buffer.writeBuf(str.encode(value)) + + def __writeBytes(self, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_BYTES) + DataHead.writeTo(self.__buffer, 0, DataHead.EN_INT8) + length = len(value) + self.__writeInt32(0, length) + self.__buffer.buffer += value + self.__buffer.position += length + + def __writeMap(self, coder, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_MAP) + self.__writeInt32(0, len(value)) + for key in value: + self.write(coder.ktype, 0, key) + self.write(coder.vtype, 1, value.get(key)) + + def __writeVector(self, coder, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_LIST) + n = len(value) + self.__writeInt32(0, n) + for i in range(0, n): + self.write(value.vtype, 0, value[i]) + + def __writeStruct(self, coder, tag, value): + DataHead.writeTo(self.__buffer, tag, DataHead.EN_STRUCTBEGIN) + value.writeTo(self, value) + DataHead.writeTo(self.__buffer, 0, DataHead.EN_STRUCTEND) + + def write(self, coder, tag, value): + if coder.__tars_index__ == 999: + self.__writeBoolean(tag, value) + elif coder.__tars_index__ == 0: + self.__writeInt8(tag, value) + elif coder.__tars_index__ == 1: + self.__writeInt16(tag, value) + elif coder.__tars_index__ == 2: + self.__writeInt32(tag, value) + elif coder.__tars_index__ == 3: + self.__writeInt64(tag, value) + elif coder.__tars_index__ == 4: + self.__writeFloat(tag, value) + elif coder.__tars_index__ == 5: + self.__writeDouble(tag, value) + elif coder.__tars_index__ == 13: + self.__writeBytes(tag, value) + elif coder.__tars_index__ == 67: + self.__writeString(tag, value) + elif coder.__tars_index__ == 8: + self.__writeMap(coder, tag, value) + elif coder.__tars_index__ == 9: + self.__writeVector(coder, tag, value) + elif coder.__tars_index__ == 1011: + self.__writeStruct(coder, tag, value) + else: + raise TarsTarsUnsupportType( + "tars unsupport data type:" % coder.__tars_index__) + + def getBuffer(self): + return self.__buffer.getBuffer() + + def printHex(self): + util.printHex(self.__buffer.getBuffer()) + + +class TarsInputStream(object): + def __init__(self, buff): + self.__buffer = BinBuffer(buff) + + def __peekFrom(self): + helper, = struct.unpack_from( + '!B', self.__buffer.buffer, self.__buffer.position) + t = (helper & 0xF0) >> 4 + p = (helper & 0x0F) + l = 1 + if t >= 15: + l = 2 + t, = struct.unpack_from( + '!B', self.__buffer.buffer, self.__buffer.position + 1) + return (t, p, l) + + def __readFrom(self): + t, p, l = self.__peekFrom() + self.__buffer.position += l + return (t, p, l) + + def __skipToStructEnd(self): + t, p, l = self.__readFrom() + while p != DataHead.EN_STRUCTEND: + self.__skipField(p) + t, p, l = self.__readFrom() + + def __skipField(self, p): + if p == DataHead.EN_INT8: + self.__buffer.position += 1 + elif p == DataHead.EN_INT16: + self.__buffer.position += 2 + elif p == DataHead.EN_INT32: + self.__buffer.position += 4 + elif p == DataHead.EN_INT64: + self.__buffer.position += 8 + elif p == DataHead.EN_FLOAT: + self.__buffer.position += 4 + elif p == DataHead.EN_DOUBLE: + self.__buffer.position += 8 + elif p == DataHead.EN_STRING1: + length, = struct.unpack_from( + '!B', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += length + 1 + elif p == DataHead.EN_STRING4: + length, = struct.unpack_from( + '!i', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += length + 4 + elif p == DataHead.EN_MAP: + size = self.__readInt32(0, True) + for i in range(0, size * 2): + ti, pi, li = self.__readFrom() + self.__skipField(pi) + elif p == DataHead.EN_LIST: + size = self.__readInt32(0, True) + for i in range(0, size): + ti, pi, li = self.__readFrom() + self.__skipField(pi) + elif p == DataHead.EN_BYTES: + ti, pi, li = self.__readFrom() + if pi != DataHead.EN_INT8: + raise TarsTarsDecodeInvalidValue( + "skipField with invalid type, type value: %d, %d." % (p, pi)) + size = self.__readInt32(0, True) + self.__buffer.position += size + elif p == DataHead.EN_STRUCTBEGIN: + self.__skipToStructEnd() + elif p == DataHead.EN_STRUCTEND: + pass + #self.__buffer.position += length + 1; + elif p == DataHead.EN_ZERO: + pass + #self.__buffer.position += length + 1; + else: + raise TarsTarsDecodeMismatch( + "skipField with invalid type, type value:%d" % p) + + def __skipToTag(self, tag): + length = self.__buffer.length() + while self.__buffer.position < length: + t, p, l = self.__peekFrom() + if tag <= t or p == DataHead.EN_STRUCTEND: + return False if (p == DataHead.EN_STRUCTEND) else (t == tag) + + self.__buffer.position += l + self.__skipField(p) + return False + + def __readBoolean(self, tag, require, default=None): + v = self.__readInt8(tag, require) + if v is None: + return default + else: + return (v != 0) + + def __readInt8(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_INT8: + value, = struct.unpack_from( + '!b', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 1 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Char' type mismatch, tag: %d , get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readInt16(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_INT8: + value, = struct.unpack_from( + '!b', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 1 + return value + elif p == DataHead.EN_INT16: + value, = struct.unpack_from( + '!h', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 2 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Short' type mismatch, tag: %d , get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readInt32(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_INT8: + value, = struct.unpack_from( + '!b', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 1 + return value + elif p == DataHead.EN_INT16: + value, = struct.unpack_from( + '!h', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 2 + return value + elif p == DataHead.EN_INT32: + value, = struct.unpack_from( + '!i', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 4 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Int32' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readInt64(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_INT8: + value, = struct.unpack_from( + '!b', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 1 + return value + elif p == DataHead.EN_INT16: + value, = struct.unpack_from( + '!h', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 2 + return value + elif p == DataHead.EN_INT32: + value, = struct.unpack_from( + '!i', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 4 + return value + elif p == DataHead.EN_INT64: + value, = struct.unpack_from( + '!q', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 8 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Int64' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readString(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_STRING1: + length, = struct.unpack_from( + '!B', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 1 + value, = struct.unpack_from( + str(length) + "s", self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += length + return value + elif p == DataHead.EN_STRING4: + length, = struct.unpack_from( + '!i', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 4 + value, = struct.unpack_from( + str(length) + "s", self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += length + return value + else: + raise TarsTarsDecodeMismatch( + "read 'string' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readBytes(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_BYTES: + ti, pi, li = self.__readFrom() + if pi != DataHead.EN_INT8: + raise TarsTarsDecodeMismatch( + "type mismatch, tag: %d, type: %d, %d" % (tag, p, pi)) + size = self.__readInt32(0, True) + value, = struct.unpack_from( + str(size) + 's', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += size + return value + else: + raise TarsTarsDecodeMismatch( + "type mismatch, tag: %d, type: %d" % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readFloat(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_FLOAT: + value, = struct.unpack_from( + '!f', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 4 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Float' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readDouble(self, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_ZERO: + return 0 + elif p == DataHead.EN_FLOAT: + value, = struct.unpack_from( + '!f', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 4 + return value + elif p == DataHead.EN_DOUBLE: + value, = struct.unpack_from( + '!d', self.__buffer.buffer, self.__buffer.position) + self.__buffer.position += 8 + return value + else: + raise TarsTarsDecodeMismatch( + "read 'Double' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readStruct(self, coder, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p != DataHead.EN_STRUCTBEGIN: + raise TarsTarsDecodeMismatch( + "read 'struct' type mismatch, tag: %d, get type: %d." % (tag, p)) + value = coder.readFrom(self) + self.__skipToStructEnd() + return value + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readMap(self, coder, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_MAP: + size = self.__readInt32(0, True) + omap = coder() + for i in range(0, size): + k = self.read(coder.ktype, 0, True) + v = self.read(coder.vtype, 1, True) + omap[k] = v + return omap + else: + raise TarsTarsDecodeMismatch( + "read 'map' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def __readVector(self, coder, tag, require, default=None): + if self.__skipToTag(tag): + t, p, l = self.__readFrom() + if p == DataHead.EN_LIST: + size = self.__readInt32(0, True) + value = coder() + for i in range(0, size): + k = self.read(coder.vtype, 0, True) + value.append(k) + return value + else: + raise TarsTarsDecodeMismatch( + "read 'vector' type mismatch, tag: %d, get type: %d." % (tag, p)) + elif require: + raise TarsTarsDecodeRequireNotExist( + "require field not exist, tag: %d" % tag) + return default + + def read(self, coder, tag, require, default=None): + if coder.__tars_index__ == 999: + return self.__readBoolean(tag, require, default) + elif coder.__tars_index__ == 0: + return self.__readInt8(tag, require, default) + elif coder.__tars_index__ == 1: + return self.__readInt16(tag, require, default) + elif coder.__tars_index__ == 2: + return self.__readInt32(tag, require, default) + elif coder.__tars_index__ == 3: + return self.__readInt64(tag, require, default) + elif coder.__tars_index__ == 4: + return self.__readFloat(tag, require, default) + elif coder.__tars_index__ == 5: + return self.__readDouble(tag, require, default) + elif coder.__tars_index__ == 13: + return self.__readBytes(tag, require, default) + elif coder.__tars_index__ == 67: + return self.__readString(tag, require, default) + elif coder.__tars_index__ == 8: + return self.__readMap(coder, tag, require, default) + elif coder.__tars_index__ == 9: + return self.__readVector(coder, tag, require, default) + elif coder.__tars_index__ == 1011: + return self.__readStruct(coder, tag, require, default) + else: + raise TarsTarsUnsupportType( + "tars unsupport data type:" % coder.__tars_index__) + + def printHex(self): + util.printHex(self.__buffer.buffer) diff --git a/danmu/danmaku/tars/__trans.py b/danmu/danmaku/tars/__trans.py new file mode 100644 index 0000000..1df078f --- /dev/null +++ b/danmu/danmaku/tars/__trans.py @@ -0,0 +1,575 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# filename: __trans.py + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + + +''' +@version: 0.01 +@brief: 网络相关模块 +''' + +import socket +import select +import errno +import threading + +from .__logger import tarsLogger +from .__TimeoutQueue import ReqMessage + + +class EndPointInfo: + ''' + @brief: 保存每个连接端口的信息 + ''' + SOCK_TCP = 'TCP' + SOCK_UDP = 'UDP' + + def __init__(self, + ip='', + port=0, + timeout=-1, + weight=0, + weightType=0, + connType=SOCK_TCP): + self.__ip = ip + self.__port = port + self.__timeout = timeout + self.__connType = connType + self.__weightType = weightType + self.__weight = weight + + def getIp(self): + return self.__ip + + def getPort(self): + return self.__port + + def getConnType(self): + ''' + @return: 传输层连接类型 + @rtype: EndPointInfo.SOCK_TCP 或 EndPointInfo.SOCK_UDP + ''' + return self.__connType + + def getWeightType(self): + return self.__weightType + + def getWeight(self): + return self.__weight + + def __str__(self): + return '%s %s:%s %d:%d' % (self.__connType, self.__ip, self.__port, self.__weightType, self.__weight) + + +class Transceiver: + ''' + @brief: 网络传输基类,提供网络send/recv接口 + ''' + CONNECTED = 0 + CONNECTING = 1 + UNCONNECTED = 2 + + def __init__(self, endPointInfo): + tarsLogger.debug('Transceiver:__init__, %s', endPointInfo) + self.__epi = endPointInfo + self.__sock = None + self.__connStatus = Transceiver.UNCONNECTED + self.__connFailed = False + # 这两个变量要给子类用,不能用name mangling隐藏 + self._sendBuff = '' + self._recvBuf = '' + + def __del__(self): + tarsLogger.debug('Transceiver:__del__') + self.close() + + def getSock(self): + ''' + @return: socket对象 + @rtype: socket.socket + ''' + return self.__sock + + def getFd(self): + ''' + @brief: 获取socket的文件描述符 + @return: 如果self.__sock没有建立返回-1 + @rtype: int + ''' + if self.__sock: + return self.__sock.fileno() + else: + return -1 + + def getEndPointInfo(self): + ''' + @return: 端口信息 + @rtype: EndPointInfo + ''' + return self.__epi + + def isValid(self): + ''' + @return: 是否创建了socket + @rtype: bool + ''' + return self.__sock is not None + + def hasConnected(self): + ''' + @return: 是否连接上了 + @rtype: bool + ''' + return self.isValid() and self.__connStatus == Transceiver.CONNECTED + + def isConnFailed(self): + ''' + @return: 是否连接失败 + @rtype: bool + ''' + return self.__connFailed + + def isConnecting(self): + ''' + @return: 是否正在连接 + @rtype: bool + ''' + return self.isValid() and self.__connStatus == Transceiver.CONNECTING + + def setConnFailed(self): + ''' + @brief: 设置为连接失败 + @return: None + @rtype: None + ''' + self.__connFailed = True + self.__connStatus = Transceiver.UNCONNECTED + + def setConnected(self): + ''' + @brief: 设置为连接完 + @return: None + @rtype: None + ''' + self.__connFailed = False + self.__connStatus = Transceiver.CONNECTED + + def close(self): + ''' + @brief: 关闭连接 + @return: None + @rtype: None + @note: 多次调用不会有问题 + ''' + tarsLogger.debug('Transceiver:close') + if not self.isValid(): + return + self.__sock.close() + self.__sock = None + self.__connStatus = Transceiver.UNCONNECTED + self.__connFailed = False + self._sendBuff = '' + self._recvBuf = '' + tarsLogger.info('trans close : %s' % self.__epi) + + def writeToSendBuf(self, msg): + ''' + @brief: 把数据添加到send buffer里 + @param msg: 发送的数据 + @type msg: str + @return: None + @rtype: None + @note: 没有加锁,多线程调用会有race conditions + ''' + self._sendBuff += msg + + def recv(self, bufsize, flag=0): + raise NotImplementedError() + + def send(self, buf, flag=0): + raise NotImplementedError() + + def doResponse(self): + raise NotImplementedError() + + def doRequest(self): + ''' + @brief: 将请求数据发送出去 + @return: 发送的字节数 + @rtype: int + ''' + tarsLogger.debug('Transceiver:doRequest') + if not self.isValid(): + return -1 + + nbytes = 0 + buf = buffer(self._sendBuff) + while True: + if not buf: + break + ret = self.send(buf[nbytes:]) + if ret > 0: + nbytes += ret + else: + break + + # 发送前面的字节后将后面的字节拷贝上来 + self._sendBuff = buf[nbytes:] + return nbytes + + def reInit(self): + ''' + @brief: 初始化socket,并连接服务器 + @return: 成功返回0,失败返回-1 + @rtype: int + ''' + tarsLogger.debug('Transceiver:reInit') + assert(self.isValid() is False) + if self.__epi.getConnType() != EndPointInfo.SOCK_TCP: + return -1 + try: + self.__sock = socket.socket() + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.setblocking(0) + self.__sock.connect((self.__epi.getIp(), self.__epi.getPort())) + self.__connStatus = Transceiver.CONNECTED + except socket.error as msg: + if msg.errno == errno.EINPROGRESS: + self.__connStatus = Transceiver.CONNECTING + else: + tarsLogger.info('reInit, %s, faild!, %s', + self.__epi, msg) + self.__sock = None + return -1 + tarsLogger.info('reInit, connect: %s, fd: %d', + self.__epi, self.getFd()) + return 0 + + +class TcpTransceiver(Transceiver): + ''' + @brief: TCP传输实现 + ''' + + def send(self, buf, flag=0): + ''' + @brief: 实现tcp的发送 + @param buf: 发送的数据 + @type buf: str + @param flag: 发送标志 + @param flag: int + @return: 发送字节数 + @rtype: int + ''' + tarsLogger.debug('TcpTransceiver:send') + if not self.isValid(): + return -1 + + nbytes = 0 + try: + nbytes = self.getSock().send(buf, flag) + tarsLogger.info('tcp send, fd: %d, %s, len: %d', + self.getFd(), self.getEndPointInfo(), nbytes) + except socket.error as msg: + if msg.errno != errno.EAGAIN: + tarsLogger.error('tcp send, fd: %d, %s, fail!, %s, close', + self.getFd(), self.getEndPointInfo(), msg) + self.close() + return 0 + return nbytes + + def recv(self, bufsize, flag=0): + ''' + @brief: 实现tcp的recv + @param bufsize: 接收大小 + @type bufsize: int + @param flag: 接收标志 + @param flag: int + @return: 接收的内容,接收出错返回None + @rtype: str + ''' + tarsLogger.debug('TcpTransceiver:recv') + assert(self.isValid()) + + buf = '' + try: + buf = self.getSock().recv(bufsize, flag) + if len(buf) == 0: + tarsLogger.info('tcp recv, fd: %d, %s, recv 0 bytes, close', + self.getFd(), self.getEndPointInfo()) + self.close() + return None + except socket.error as msg: + if msg.errno != errno.EAGAIN: + tarsLogger.info('tcp recv, fd: %d, %s, faild!, %s, close', + self.getFd(), self.getEndPointInfo(), msg) + self.close() + return None + + tarsLogger.info('tcp recv, fd: %d, %s, nbytes: %d', + self.getFd(), self.getEndPointInfo(), len(buf)) + return buf + + def doResponse(self): + ''' + @brief: 处理接收的数据 + @return: 返回响应报文的列表,如果出错返回None + @rtype: list: ResponsePacket + ''' + tarsLogger.debug('TcpTransceiver:doResponse') + if not self.isValid(): + return None + + bufs = [self._recvBuf] + while True: + buf = self.recv(8292) + if not buf: + break + bufs.append(buf) + self._recvBuf = ''.join(bufs) + tarsLogger.info('tcp doResponse, fd: %d, recvbuf: %d', + self.getFd(), len(self._recvBuf)) + + if not self._recvBuf: + return None + + rsplist = None + try: + rsplist, bufsize = ReqMessage.unpackRspList(self._recvBuf) + self._recvBuf = self._recvBuf[bufsize:] + except Exception as msg: + tarsLogger.error( + 'tcp doResponse, fd: %d, %s, tcp recv unpack error: %s', + self.getFd(), self.getEndPointInfo(), msg) + self.close() + + return rsplist + + +class FDReactor(threading.Thread): + ''' + @brief: 监听FD事件并解发注册的handle + ''' + + def __init__(self): + tarsLogger.debug('FDReactor:__init__') + # threading.Thread.__init__(self) + super(FDReactor, self).__init__() + self.__terminate = False + self.__ep = None + self.__shutdown = None + # {fd : adapterproxy} + self.__adapterTab = {} + + def __del__(self): + tarsLogger.debug('FDReactor:__del__') + self.__ep.close() + self.__shutdown.close() + self.__ep = None + self.__shutdown = None + + def initialize(self): + ''' + @brief: 初始化,使用FDReactor前必须调用 + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:initialize') + self.__ep = select.epoll() + self.__shutdown = socket.socket() + self.__ep.register(self.__shutdown.fileno(), + select.EPOLLET | select.EPOLLIN) + tarsLogger.debug('FDReactor init, shutdown fd : %d', + self.__shutdown.fileno()) + + def terminate(self): + ''' + @brief: 结束FDReactor的线程 + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:terminate') + self.__terminate = True + self.__ep.modify(self.__shutdown.fileno(), select.EPOLLOUT) + self.__adapterTab = {} + + def handle(self, adapter, events): + ''' + @brief: 处理epoll事件 + @param adapter: 事件对应的adapter + @type adapter: AdapterProxy + @param events: epoll事件 + @param events: int + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:handle events : %d', events) + assert(adapter) + + try: + if events == 0: + return + + if events & (select.EPOLLERR | select.EPOLLHUP): + tarsLogger.debug('FDReactor::handle EPOLLERR or EPOLLHUP: %s', + adapter.trans().getEndPointInfo()) + adapter.trans().close() + return + + if adapter.shouldCloseTrans(): + tarsLogger.debug('FDReactor::handle should close trans: %s', + adapter.trans().getEndPointInfo()) + adapter.setCloseTrans(False) + adapter.trans().close() + return + + if adapter.trans().isConnecting(): + if not adapter.finishConnect(): + return + + if events & select.EPOLLIN: + self.handleInput(adapter) + + if events & select.EPOLLOUT: + self.handleOutput(adapter) + + except Exception as msg: + tarsLogger.error('FDReactor handle exception: %s', msg) + + def handleExcept(self): + pass + + def handleInput(self, adapter): + ''' + @brief: 处理接收事件 + @param adapter: 事件对应的adapter + @type adapter: AdapterProxy + @return: None + @rtype: None + ''' + + tarsLogger.debug('FDReactor:handleInput') + if not adapter.trans().isValid(): + return + + rsplist = adapter.trans().doResponse() + if not rsplist: + return + for rsp in rsplist: + adapter.finished(rsp) + + def handleOutput(self, adapter): + ''' + @brief: 处理发送事件 + @param adapter: 事件对应的adapter + @type adapter: AdapterProxy + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:handleOutput') + if not adapter.trans().isValid(): + return + while adapter.trans().doRequest() >= 0 and adapter.sendRequest(): + pass + + def notify(self, adapter): + ''' + @brief: 更新adapter对应的fd的epoll状态 + @return: None + @rtype: None + @note: FDReactor使用的epoll是EPOLLET模式,同一事件只通知一次 + 希望某一事件再次通知需调用此函数 + ''' + tarsLogger.debug('FDReactor:notify') + fd = adapter.trans().getFd() + if fd != -1: + self.__ep.modify(fd, + select.EPOLLET | select.EPOLLOUT | select.EPOLLIN) + + def registerAdapter(self, adapter, events): + ''' + @brief: 注册adapter + @param adapter: 收发事件处理类 + @type adapter: AdapterProxy + @param events: 注册事件 + @type events: int + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:registerAdapter events : %d', events) + events |= select.EPOLLET + try: + self.__ep.unregister(adapter.trans().getFd()) + except: + pass + self.__ep.register(adapter.trans().getFd(), events) + self.__adapterTab[adapter.trans().getFd()] = adapter + + def unregisterAdapter(self, adapter): + ''' + @brief: 注销adapter + @param adapter: 收发事件处理类 + @type adapter: AdapterProxy + @return: None + @rtype: None + ''' + tarsLogger.debug('FDReactor:registerAdapter') + self.__ep.unregister(adapter.trans().getFd()) + self.__adapterTab.pop(adapter.trans().getFd(), None) + + def run(self): + ''' + @brief: 线程启动函数,循环监听网络事件 + ''' + tarsLogger.debug('FDReactor:run') + + while not self.__terminate: + try: + eplist = self.__ep.poll(1) + if eplist: + tarsLogger.debug('FDReactor run get eplist : %s, terminate : %s', str( + eplist), self.__terminate) + if self.__terminate: + tarsLogger.debug('FDReactor terminate') + break + for fd, events in eplist: + adapter = self.__adapterTab.get(fd, None) + if not adapter: + continue + self.handle(adapter, events) + except Exception as msg: + tarsLogger.error('FDReactor run exception: %s', msg) + + tarsLogger.debug('FDReactor:run finished') + + +if __name__ == '__main__': + print('hello world') + epi = EndPointInfo('127.0.0.1', 1313) + print(epi) + trans = TcpTransceiver(epi) + print(trans.getSock()) + print(trans.getFd()) + print(trans.reInit()) + print(trans.isConnecting()) + print(trans.hasConnected()) + buf = 'hello world' + print(trans.send(buf)) + buf = trans.recv(1024) + print(buf) + trans.close() diff --git a/danmu/danmaku/tars/__tup.py b/danmu/danmaku/tars/__tup.py new file mode 100644 index 0000000..6f71879 --- /dev/null +++ b/danmu/danmaku/tars/__tup.py @@ -0,0 +1,118 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +import struct +import string +from .__util import util +from .__tars import TarsOutputStream +from .__tars import TarsInputStream +from .__packet import RequestPacket + + +class TarsUniPacket(object): + def __init__(self): + self.__mapa = util.mapclass(util.string, util.bytes) + self.__mapv = util.mapclass(util.string, self.__mapa) + self.__buffer = self.__mapv() + self.__code = RequestPacket() + + # @property + # def version(self): + # return self.__code.iVersion + + # @version.setter + # def version(self, value): + # self.__code.iVersion = value + + @property + def servant(self): + return self.__code.sServantName + + @servant.setter + def servant(self, value): + self.__code.sServantName = value + + @property + def func(self): + return self.__code.sFuncName + + @func.setter + def func(self, value): + self.__code.sFuncName = value + + @property + def requestid(self): + return self.__code.iRequestId + + @requestid.setter + def requestid(self, value): + self.__code.iRequestId = value + + @property + def result_code(self): + if ("STATUS_RESULT_CODE" in self.__code.status) == False: + return 0 + + return string.atoi(self.__code.status["STATUS_RESULT_CODE"]) + + @property + def result_desc(self): + if ("STATUS_RESULT_DESC" in self.__code.status) == False: + return '' + + return self.__code.status["STATUS_RESULT_DESC"] + + def put(self, vtype, name, value): + oos = TarsOutputStream() + oos.write(vtype, 0, value) + self.__buffer[name] = {vtype.__tars_class__: oos.getBuffer()} + + def get(self, vtype, name): + if (name in self.__buffer) == False: + raise Exception("UniAttribute not found key:%s,type:%s" % + (name, vtype.__tars_class__)) + + t = self.__buffer[name] + if (vtype.__tars_class__ in t) == False: + raise Exception("UniAttribute not found type:" + + vtype.__tars_class__) + + o = TarsInputStream(t[vtype.__tars_class__]) + return o.read(vtype, 0, True) + + def encode(self): + oos = TarsOutputStream() + oos.write(self.__mapv, 0, self.__buffer) + + self.__code.iVersion = 2 + self.__code.sBuffer = oos.getBuffer() + + sos = TarsOutputStream() + RequestPacket.writeTo(sos, self.__code) + + return struct.pack('!i', 4 + len(sos.getBuffer())) + sos.getBuffer() + + def decode(self, buf): + ois = TarsInputStream(buf[4:]) + self.__code = RequestPacket.readFrom(ois) + + sis = TarsInputStream(self.__code.sBuffer) + self.__buffer = sis.read(self.__mapv, 0, True) + + def clear(self): + self.__code.__init__() + + def haskey(self, name): + return name in self.__buffer diff --git a/danmu/danmaku/tars/__util.py b/danmu/danmaku/tars/__util.py new file mode 100644 index 0000000..baa12fb --- /dev/null +++ b/danmu/danmaku/tars/__util.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + + +import sys +from threading import Lock +import hashlib +from xml.etree import cElementTree as ET +from .exception import TarsException + + +class util: + @staticmethod + def printHex(buff): + count = 0 + for c in buff: + sys.stdout.write("0X%02X " % ord(c)) + count += 1 + if count % 16 == 0: + sys.stdout.write("\n") + sys.stdout.write("\n") + sys.stdout.flush() + + @staticmethod + def mapclass(ktype, vtype): + class mapklass(dict): + def size(self): return len(self) + setattr(mapklass, '__tars_index__', 8) + setattr(mapklass, '__tars_class__', "map<" + + ktype.__tars_class__ + "," + vtype.__tars_class__ + ">") + setattr(mapklass, 'ktype', ktype) + setattr(mapklass, 'vtype', vtype) + return mapklass + + @staticmethod + def vectorclass(vtype): + class klass(list): + def size(self): return len(self) + setattr(klass, '__tars_index__', 9) + setattr(klass, '__tars_class__', "list<" + vtype.__tars_class__ + ">") + setattr(klass, 'vtype', vtype) + return klass + + class boolean: + __tars_index__ = 999 + __tars_class__ = "bool" + + class int8: + __tars_index__ = 0 + __tars_class__ = "char" + + class uint8: + __tars_index__ = 1 + __tars_class__ = "short" + + class int16: + __tars_index__ = 1 + __tars_class__ = "short" + + class uint16: + __tars_index__ = 2 + __tars_class__ = "int32" + + class int32: + __tars_index__ = 2 + __tars_class__ = "int32" + + class uint32: + __tars_index__ = 3 + __tars_class__ = "int64" + + class int64: + __tars_index__ = 3 + __tars_class__ = "int64" + + class float: + __tars_index__ = 4 + __tars_class__ = "float" + + class double: + __tars_index__ = 5 + __tars_class__ = "double" + + class bytes: + __tars_index__ = 13 + __tars_class__ = "list" + + class string: + __tars_index__ = 67 + __tars_class__ = "string" + + class struct: + __tars_index__ = 1011 + + +def xml2dict(node, dic={}): + ''' + @brief: 将xml解析树转成字典 + @param node: 树的根节点 + @type node: cElementTree.Element + @param dic: 存储信息的字典 + @type dic: dict + @return: 转换好的字典 + @rtype: dict + ''' + dic[node.tag] = ndic = {} + [xml2dict(child, ndic) for child in node.getchildren() if child != node] + ndic.update([list(map(str.strip, exp.split('=')[:2])) + for exp in node.text.splitlines() if '=' in exp]) + return dic + + +def configParse(filename): + ''' + @brief: 解析tars配置文件 + @param filename: 文件名 + @type filename: str + @return: 解析出来的配置信息 + @rtype: dict + ''' + tree = ET.parse(filename) + return xml2dict(tree.getroot()) + + +class NewLock(object): + def __init__(self): + self.__count = 0 + self.__lock = Lock() + self.__lockForCount = Lock() + pass + + def newAcquire(self): + self.__lockForCount.acquire() + self.__count += 1 + if self.__count == 1: + self.__lock.acquire() + self.__lockForCount.release() + pass + + def newRelease(self): + self.__lockForCount.acquire() + self.__count -= 1 + if self.__count == 0: + self.__lock.release() + self.__lockForCount.release() + + +class LockGuard(object): + def __init__(self, newLock): + self.__newLock = newLock + self.__newLock.newAcquire() + + def __del__(self): + self.__newLock.newRelease() + + +class ConsistentHashNew(object): + def __init__(self, nodes=None, nodeNumber=3): + """ + :param nodes: 服务器的节点的epstr列表 + :param n_number: 一个节点对应的虚拟节点数量 + :return: + """ + self.__nodes = nodes + self.__nodeNumber = nodeNumber # 每一个节点对应多少个虚拟节点,这里默认是3个 + self.__nodeDict = dict() # 用于记录虚拟节点的hash值与服务器epstr的对应关系 + self.__sortListForKey = [] # 用于存放所有的虚拟节点的hash值,这里需要保持排序,以找出对应的服务器 + if nodes: + for node in nodes: + self.addNode(node) + + @property + def nodes(self): + return self.__nodes + + @nodes.setter + def nodes(self, value): + self.__nodes = value + + def addNode(self, node): + """ + 添加node,首先要根据虚拟节点的数目,创建所有的虚拟节点,并将其与对应的node对应起来 + 当然还需要将虚拟节点的hash值放到排序的里面 + 这里在添加了节点之后,需要保持虚拟节点hash值的顺序 + :param node: + :return: + """ + for i in range(self.__nodeNumber): + nodeStr = "%s%s" % (node, i) + key = self.__genKey(nodeStr) + self.__nodeDict[key] = node + self.__sortListForKey.append(key) + self.__sortListForKey.sort() + + def removeNode(self, node): + """ + 这里一个节点的退出,需要将这个节点的所有的虚拟节点都删除 + :param node: + :return: + """ + for i in range(self.__nodeNumber): + nodeStr = "%s%s" % (node, i) + key = self.__genKey(nodeStr) + del self.__nodeDict[key] + self.__sortListForKey.remove(key) + + def getNode(self, key): + """ + 返回这个字符串应该对应的node,这里先求出字符串的hash值,然后找到第一个小于等于的虚拟节点,然后返回node + 如果hash值大于所有的节点,那么用第一个虚拟节点 + :param : hashNum or keyStr + :return: + """ + keyStr = '' + if isinstance(key, int): + keyStr = "the keyStr is %d" % key + elif isinstance(key, type('a')): + keyStr = key + else: + raise TarsException("the hash code has wrong type") + if self.__sortListForKey: + key = self.__genKey(keyStr) + for keyItem in self.__sortListForKey: + if key <= keyItem: + return self.__nodeDict[keyItem] + return self.__nodeDict[self.__sortListForKey[0]] + else: + return None + + def __genKey(self, keyStr): + """ + 通过key,返回当前key的hash值,这里采用md5 + :param key: + :return: + """ + md5Str = hashlib.md5(keyStr).hexdigest() + return int(md5Str, 16) diff --git a/danmu/danmaku/tars/core.py b/danmu/danmaku/tars/core.py new file mode 100644 index 0000000..3af030d --- /dev/null +++ b/danmu/danmaku/tars/core.py @@ -0,0 +1,91 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + + +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +__version__ = "0.0.1" + +from __util import util +from __tars import TarsInputStream +from __tars import TarsOutputStream +from __tup import TarsUniPacket + + +class tarscore: + class TarsInputStream(TarsInputStream): + pass + + class TarsOutputStream(TarsOutputStream): + pass + + class TarsUniPacket(TarsUniPacket): + pass + + class boolean(util.boolean): + pass + + class int8(util.int8): + pass + + class uint8(util.uint8): + pass + + class int16(util.int16): + pass + + class uint16(util.uint16): + pass + + class int32(util.int32): + pass + + class uint32(util.uint32): + pass + + class int64(util.int64): + pass + + class float(util.float): + pass + + class double(util.double): + pass + + class bytes(util.bytes): + pass + + class string(util.string): + pass + + class struct(util.struct): + pass + + @staticmethod + def mapclass(ktype, vtype): return util.mapclass(ktype, vtype) + + @staticmethod + def vctclass(vtype): return util.vectorclass(vtype) + + @staticmethod + def printHex(buff): util.printHex(buff) + + +# 被用户引用 +from __util import configParse +from __rpc import Communicator +from exception import * +from __logger import tarsLogger diff --git a/danmu/danmaku/tars/exception.py b/danmu/danmaku/tars/exception.py new file mode 100644 index 0000000..ee61cde --- /dev/null +++ b/danmu/danmaku/tars/exception.py @@ -0,0 +1,36 @@ +# Tencent is pleased to support the open source community by making Tars available. +# +# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +class TarsException(Exception): pass + +class TarsTarsDecodeRequireNotExist(TarsException): pass +class TarsTarsDecodeMismatch(TarsException): pass +class TarsTarsDecodeInvalidValue(TarsException): pass +class TarsTarsUnsupportType(TarsException): pass + +class TarsNetConnectException(TarsException): pass +class TarsNetConnectLostException(TarsException): pass +class TarsNetSocketException(TarsException): pass +class TarsProxyDecodeException(TarsException): pass +class TarsProxyEncodeException(TarsException): pass +class TarsServerEncodeException(TarsException): pass +class TarsServerDecodeException(TarsException): pass +class TarsServerNoFuncException(TarsException): pass +class TarsServerNoServantException(TarsException): pass +class TarsServerQueueTimeoutException(TarsException): pass +class TarsServerUnknownException(TarsException): pass +class TarsSyncCallTimeoutException(TarsException): pass +class TarsRegistryException(TarsException): pass +class TarsServerResetGridException(TarsException): pass diff --git a/danmu/danmaku/tars/tars/EndpointF.tars b/danmu/danmaku/tars/tars/EndpointF.tars new file mode 100644 index 0000000..c2d8d5d --- /dev/null +++ b/danmu/danmaku/tars/tars/EndpointF.tars @@ -0,0 +1,25 @@ + +module register +{ + /** + * ˿Ϣ + */ + struct EndpointF + { + 0 require string host; + 1 require int port; + 2 require int timeout; + 3 require int istcp; + 4 require int grid; + 5 optional int groupworkid; + 6 optional int grouprealid; + 7 optional string setId; + 8 optional int qos; + 9 optional int bakFlag; + 11 optional int weight; + 12 optional int weightType; + }; + key[EndpointF, host, port, timeout, istcp, grid, qos, weight, weightType]; +}; + + diff --git a/danmu/danmaku/tars/tars/QueryF.tars b/danmu/danmaku/tars/tars/QueryF.tars new file mode 100644 index 0000000..06cb2e9 --- /dev/null +++ b/danmu/danmaku/tars/tars/QueryF.tars @@ -0,0 +1,70 @@ +#include "EndpointF.tars" + +module register +{ + /** + * ȡendpointqueryӿ + */ + + interface QueryF + { + /** idȡ + * + * @param id + * + * @return иöĻendpointб + */ + vector findObjectById(string id); + + /**idȡж,ͷǻ + * + * @param id + * @param activeEp endpointб + * @param inactiveEp Ǵendpointб + * @return: 0-ɹ others-ʧ + */ + int findObjectById4Any(string id, out vector activeEp, out vector inactiveEp); + + /** idȡendpointб,ͬfindObjectByIdInSameGroup + * + * @param id + * @param activeEp endpointб + * @param inactiveEp Ǵendpointб + * @return: 0-ɹ others-ʧ + */ + int findObjectById4All(string id, out vector activeEp, out vector inactiveEp); + + /** idȡͬendpointб + * + * @param id + * @param activeEp endpointб + * @param inactiveEp Ǵendpointб + * @return: 0-ɹ others-ʧ + */ + int findObjectByIdInSameGroup(string id, out vector activeEp, out vector inactiveEp); + + + /** idȡָصendpointб + * + * @param id + * @param activeEp endpointб + * @param inactiveEp Ǵendpointб + * @return: 0-ɹ others-ʧ + */ + int findObjectByIdInSameStation(string id, string sStation, out vector activeEp, out vector inactiveEp); + + /** idȡͬendpointб + * + * @param id + * @param setId setȫ,ʽΪsetname.setarea.setgroup + * @param activeEp endpointб + * @param inactiveEp Ǵendpointб + * @return: 0-ɹ others-ʧ + */ + int findObjectByIdInSameSet(string id, string setId, out vector activeEp, out vector inactiveEp); + + }; + +}; + + diff --git a/danmu/danmaku/tars/tars/__init__.py b/danmu/danmaku/tars/tars/__init__.py new file mode 100644 index 0000000..e69de29