1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-08-02 15:44:49 +08:00
2020-06-18 10:39:11 +08:00

202 lines
5.6 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __rpc.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 异步rpc实现
'''
import threading
import Queue
from __logger import tarsLogger
from __packet import ResponsePacket
from __servantproxy import ServantProxy
class AsyncProcThread:
'''
@brief: 异步调用线程管理类
'''
def __init__(self):
tarsLogger.debug('AsyncProcThread:__init__')
self.__initialize = False
self.__runners = []
self.__queue = None
self.__nrunner = 0
self.__popTimeout = 0.1
def __del__(self):
tarsLogger.debug('AsyncProcThread:__del__')
def initialize(self, nrunner=3):
'''
@brief: 使用AsyncProcThread前必须先调用此函数
@param nrunner: 异步线程个数
@type nrunner: int
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:initialize')
if self.__initialize:
return
self.__nrunner = nrunner
self.__queue = Queue.Queue()
self.__initialize = True
def terminate(self):
'''
@brief: 关闭所有异步线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:terminate')
for runner in self.__runners:
runner.terminate()
for runner in self.__runners:
runner.join()
self.__runners = []
def put(self, reqmsg):
'''
@brief: 处理数据入队列
@param reqmsg: 待处理数据
@type reqmsg: ReqMessage
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:put')
# 异步请求超时
if not reqmsg.response:
reqmsg.response = ResponsePacket()
reqmsg.response.iVerson = reqmsg.request.iVerson
reqmsg.response.cPacketType = reqmsg.request.cPacketType
reqmsg.response.iRequestId = reqmsg.request.iRequestId
reqmsg.response.iRet = ServantProxy.TARSASYNCCALLTIMEOUT
self.__queue.put(reqmsg)
def pop(self):
'''
@brief: 处理数据出队列
@return: ReqMessage
@rtype: ReqMessage
'''
# tarsLogger.debug('AsyncProcThread:pop')
ret = None
try:
ret = self.__queue.get(True, self.__popTimeout)
except Queue.Empty:
pass
return ret
def start(self):
'''
@brief: 启动异步线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThread:start')
for i in xrange(self.__nrunner):
runner = AsyncProcThreadRunner()
runner.initialize(self)
runner.start()
self.__runners.append(runner)
class AsyncProcThreadRunner(threading.Thread):
'''
@brief: 异步调用线程
'''
def __init__(self):
tarsLogger.debug('AsyncProcThreadRunner:__init__')
super(AsyncProcThreadRunner, self).__init__()
# threading.Thread.__init__(self)
self.__terminate = False
self.__initialize = False
self.__procQueue = None
def __del__(self):
tarsLogger.debug('AsyncProcThreadRunner:__del__')
def initialize(self, queue):
'''
@brief: 使用AsyncProcThreadRunner前必须调用此函数
@param queue: 有pop()的类,用于提取待处理数据
@type queue: AsyncProcThread
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThreadRunner:initialize')
self.__procQueue = queue
def terminate(self):
'''
@brief: 关闭线程
@return: None
@rtype: None
'''
tarsLogger.debug('AsyncProcThreadRunner:terminate')
self.__terminate = True
def run(self):
'''
@brief: 线程启动函数,执行异步调用
'''
tarsLogger.debug('AsyncProcThreadRunner:run')
while not self.__terminate:
if self.__terminate:
break
reqmsg = self.__procQueue.pop()
if not reqmsg or not reqmsg.callback:
continue
if reqmsg.adapter:
succ = reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS
reqmsg.adapter.finishInvoke(succ)
try:
reqmsg.callback.onDispatch(reqmsg)
except Exception, msg:
tarsLogger.error('AsyncProcThread excepttion: %s', msg)
tarsLogger.debug('AsyncProcThreadRunner:run finished')
class ServantProxyCallback(object):
'''
@brief: 异步回调对象基类
'''
def __init__(self):
tarsLogger.debug('ServantProxyCallback:__init__')
def onDispatch(reqmsg):
'''
@brief: 分配响应报文到对应的回调函数
@param queue: 有pop()的类,用于提取待处理数据
@type queue: AsyncProcThread
@return: None
@rtype: None
'''
raise NotImplementedError()