1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-08-02 15:44:49 +08:00
2020-06-18 10:39:11 +08:00

442 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __rpc.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: rpc调用逻辑实现
'''
import time
import argparse
from .__logger import tarsLogger
from .__logger import initLog
from .__trans import EndPointInfo
from .__TimeoutQueue import TimeoutQueue
from .__TimeoutQueue import QueueTimeout
from .__trans import FDReactor
from .__adapterproxy import AdapterProxyManager
from .__servantproxy import ServantProxy
from .exception import (TarsException)
from .__async import AsyncProcThread
class Communicator:
'''
@brief: 通讯器创建和维护ServantProxy、ObjectProxy、FDReactor线程和超时线程
'''
default_config = {'tars':
{'application':
{'client':
{'async-invoke-timeout': 20000,
'asyncthread': 0,
'locator': '',
'loglevel': 'error',
'logpath': 'tars.log',
'logsize': 15728640,
'lognum': 0,
'refresh-endpoint-interval': 60000,
'sync-invoke-timeout': 5000}}}}
def __init__(self, config={}):
tarsLogger.debug('Communicator:__init__')
self.__terminate = False
self.__initialize = False
self.__objects = {}
self.__servants = {}
self.__reactor = None
self.__qTimeout = None
self.__asyncProc = None
self.__config = Communicator.default_config.copy()
self.__config.update(config)
self.initialize()
def __del__(self):
tarsLogger.debug('Communicator:__del__')
def initialize(self):
'''
@brief: 使用通讯器前必须先调用此函数
'''
tarsLogger.debug('Communicator:initialize')
if self.__initialize:
return
logpath = self.getProperty('logpath')
logsize = self.getProperty('logsize', int)
lognum = self.getProperty('lognum', int)
loglevel = self.getProperty('loglevel')
initLog(logpath, logsize, lognum, loglevel)
self.__reactor = FDReactor()
self.__reactor.initialize()
self.__reactor.start()
self.__qTimeout = QueueTimeout()
self.__qTimeout.setHandler(self.handleTimeout)
self.__qTimeout.start()
async_num = self.getProperty('asyncthread', int)
self.__asyncProc = AsyncProcThread()
self.__asyncProc.initialize(async_num)
self.__asyncProc.start()
self.__initialize = True
def terminate(self):
'''
@brief: 不再使用通讯器需调用此函数释放资源
'''
tarsLogger.debug('Communicator:terminate')
if not self.__initialize:
return
self.__reactor.terminate()
self.__qTimeout.terminate()
self.__asyncProc.terminate()
for objName in self.__servants:
self.__servants[objName]._terminate()
for objName in self.__objects:
self.__objects[objName].terminate()
self.__objects = {}
self.__servants = {}
self.__reactor = None
self.__initialize = False
def parseConnAddr(self, connAddr):
'''
@brief: 解析connAddr字符串
@param connAddr: 连接地址
@type connAddr: str
@return: 解析结果
@rtype: dict, key是strval里name是str
timeout是floatendpoint是EndPointInfo的list
'''
tarsLogger.debug('Communicator:parseConnAddr')
connAddr = connAddr.strip()
connInfo = {
'name': '',
'timeout': -1,
'endpoint': []
}
if '@' not in connAddr:
connInfo['name'] = connAddr
return connInfo
try:
tks = connAddr.split('@')
connInfo['name'] = tks[0]
tks = tks[1].lower().split(':')
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-h')
parser.add_argument('-p')
parser.add_argument('-t')
for tk in tks:
argv = tk.split()
if argv[0] != 'tcp':
raise TarsException(
'unsupport transmission protocal : %s' % connInfo['name'])
mes = parser.parse_args(argv[1:])
try:
ip = mes.h if mes.h is not None else ''
port = int(mes.p) if mes.p is not None else '-1'
timeout = int(mes.t) if mes.t is not None else '-1'
connInfo['endpoint'].append(
EndPointInfo(ip, port, timeout))
except Exception:
raise TarsException('Unrecognized option : %s' % mes)
except TarsException:
raise
except Exception as exp:
raise TarsException(exp)
return connInfo
def getReactor(self):
'''
@brief: 获取reactor
'''
return self.__reactor
def getAsyncProc(self):
'''
@brief: 获取asyncProc
'''
return self.__asyncProc
def getProperty(self, name, dt_type=str):
'''
@brief: 获取配置
@param name: 配置名称
@type name: str
@param dt_type: 数据类型
@type name: type
@return: 配置内容
@rtype: str
'''
try:
ret = self.__config['tars']['application']['client'][name]
ret = dt_type(ret)
except:
ret = Communicator.default_config['tars']['application']['client'][name]
return ret
def setProperty(self, name, value):
'''
@brief: 修改配置
@param name: 配置名称
@type propertys: str
@param value: 配置内容
@type propertys: str
@return: 设置是否成功
@rtype: bool
'''
try:
self.__config['tars']['application']['client'][name] = value
return True
except:
return False
def setPropertys(self, propertys):
'''
@brief: 修改配置
@param propertys: 配置集合
@type propertys: map, key type: str, value type: str
@return: 无
@rtype: None
'''
pass
def updateConfig(self):
'''
@brief: 重新设置配置
'''
def stringToProxy(self, servantProxy, connAddr):
'''
@brief: 初始化ServantProxy
@param connAddr: 服务器地址信息
@type connAddr: str
@param servant: servant proxy
@type servant: ServantProxy子类
@return: 无
@rtype: None
@note: 如果connAddr的ServantObj连接过返回连接过的ServantProxy
如果没有连接过用参数servant初始化返回servant
'''
tarsLogger.debug('Communicator:stringToProxy')
connInfo = self.parseConnAddr(connAddr)
objName = connInfo['name']
if objName in self.__servants:
return self.__servants[objName]
objectPrx = ObjectProxy()
objectPrx.initialize(self, connInfo)
servantPrx = servantProxy()
servantPrx._initialize(self.__reactor, objectPrx)
self.__objects[objName] = objectPrx
self.__servants[objName] = servantPrx
return servantPrx
def handleTimeout(self):
'''
@brief: 处理超时事件
@return: 无
@rtype: None
'''
# tarsLogger.debug('Communicator:handleTimeout')
for obj in self.__objects.values():
obj.handleQueueTimeout()
class ObjectProxy:
'''
@brief: 一个object name在一个Communicator里有一个objectproxy
管理收发的消息队列
'''
DEFAULT_TIMEOUT = 3.0
def __init__(self):
tarsLogger.debug('ObjectProxy:__init__')
self.__name = ''
self.__timeout = ObjectProxy.DEFAULT_TIMEOUT
self.__comm = None
self.__epi = None
self.__adpmanager = None
self.__timeoutQueue = None
# self.__adapter = None
self.__initialize = False
def __del__(self):
tarsLogger.debug('ObjectProxy:__del__')
def initialize(self, comm, connInfo):
'''
@brief: 初始化使用ObjectProxy前必须调用
@param comm: 通讯器
@type comm: Communicator
@param connInfo: 连接信息
@type comm: dict
@return: None
@rtype: None
'''
if self.__initialize:
return
tarsLogger.debug('ObjectProxy:initialize')
self.__comm = comm
# async-invoke-timeout来设置队列时间
async_timeout = self.__comm.getProperty(
'async-invoke-timeout', float) / 1000
self.__timeoutQueue = TimeoutQueue(async_timeout)
self.__name = connInfo['name']
self.__timeout = self.__comm.getProperty(
'sync-invoke-timeout', float) / 1000
# 通过Communicator的配置设置超时
# 不再通过连接信息的-t来设置
# if connInfo['timeout'] != -1:
# self.__timeout = connInfo['timeout']
eplist = connInfo['endpoint']
self.__adpmanager = AdapterProxyManager()
self.__adpmanager.initialize(comm, self, eplist)
self.__initialize = True
def terminate(self):
'''
@brief: 回收资源不再使用ObjectProxy时调用
@return: None
@rtype: None
'''
tarsLogger.debug('ObjectProxy:terminate')
self.__timeoutQueue = None
self.__adpmanager.terminate()
self.__initialize = False
def name(self):
'''
@brief: 获取object name
@return: object name
@rtype: str
'''
return self.__name
# def setTimeout(self, timeout):
# '''
# @brief: 设置超时
# @param timeout: 超时时间单位为s
# @type timeout: float
# @return: None
# @rtype: None
# '''
# self.__timeout = timeout
# self.__timeoutQueue.setTimeout(timeout)
def timeout(self):
'''
@brief: 获取超时时间
@return: 超时时间单位为s
@rtype: float
'''
return self.__timeout
def getTimeoutQueue(self):
'''
@brief: 获取超时队列
@return: 超时队列
@rtype: TimeoutQueue
'''
return self.__timeoutQueue
def handleQueueTimeout(self):
'''
@brief: 超时事件发生时处理超时事务
@return: None
@rtype: None
'''
# tarsLogger.debug('ObjectProxy:handleQueueTimeout')
self.__timeoutQueue.timeout()
def invoke(self, reqmsg):
'''
@brief: 远程过程调用
@param reqmsg: 请求响应报文
@type reqmsg: ReqMessage
@return: 错误码
@rtype:
'''
tarsLogger.debug('ObjectProxy:invoke, objname: %s, func: %s',
self.__name, reqmsg.request.sFuncName)
# 负载均衡
# adapter = self.__adpmanager.getNextValidProxy()
adapter = self.__adpmanager.selectAdapterProxy(reqmsg)
if not adapter:
tarsLogger.error("invoke %s, select adapter proxy return None",
self.__name)
return -2
adapter.checkActive(True)
reqmsg.adapter = adapter
return adapter.invoke(reqmsg)
# 弹出请求报文
def popRequest(self):
'''
@brief: 返回消息队列里的请求响应报文FIFO
不删除TimeoutQueue里的数据响应时要用
@return: 请求响应报文
@rtype: ReqMessage
'''
return self.__timeoutQueue.pop(erase=False)
if __name__ == '__main__':
connAddr = "apptest.lightServer.lightServantObj@tcp -h 10.130.64.220 -p 10001 -t 10000"
connAddr = 'MTT.BookMarksUnifyServer.BookMarksUnifyObj@tcp -h 172.17.149.77 -t 60000 -p 10023'
comm = Communicator()
comm.initialize()
servant = ServantProxy()
servant = comm.stringToProxy(connAddr, servant)
print(servant.tars_timeout())
try:
rsp = servant.tars_invoke(
ServantProxy.TARSNORMAL, "test", '', ServantProxy.mapcls_context(), None)
print('Servant invoke success, request id: %d, iRet: %d' % (
rsp.iRequestId, rsp.iRet))
except Exception as msg:
print(msg)
finally:
servant.tars_terminate()
time.sleep(2)
print('app closing ...')
comm.terminate()
time.sleep(2)
print('cpp closed')