mirror of
https://github.com/wbt5/real-url.git
synced 2025-08-02 15:44:49 +08:00
202 lines
5.6 KiB
Python
202 lines
5.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# filename: __rpc.py
|
|
|
|
# Tencent is pleased to support the open source community by making Tars available.
|
|
#
|
|
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
|
|
#
|
|
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
|
|
# in compliance with the License. You may obtain a copy of the License at
|
|
#
|
|
# https://opensource.org/licenses/BSD-3-Clause
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software distributed
|
|
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
|
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations under the License.
|
|
#
|
|
|
|
'''
|
|
@version: 0.01
|
|
@brief: 异步rpc实现
|
|
'''
|
|
|
|
import threading
|
|
import Queue
|
|
from __logger import tarsLogger
|
|
from __packet import ResponsePacket
|
|
from __servantproxy import ServantProxy
|
|
|
|
|
|
class AsyncProcThread:
|
|
'''
|
|
@brief: 异步调用线程管理类
|
|
'''
|
|
|
|
def __init__(self):
|
|
tarsLogger.debug('AsyncProcThread:__init__')
|
|
self.__initialize = False
|
|
self.__runners = []
|
|
self.__queue = None
|
|
self.__nrunner = 0
|
|
self.__popTimeout = 0.1
|
|
|
|
def __del__(self):
|
|
tarsLogger.debug('AsyncProcThread:__del__')
|
|
|
|
def initialize(self, nrunner=3):
|
|
'''
|
|
@brief: 使用AsyncProcThread前必须先调用此函数
|
|
@param nrunner: 异步线程个数
|
|
@type nrunner: int
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThread:initialize')
|
|
if self.__initialize:
|
|
return
|
|
self.__nrunner = nrunner
|
|
self.__queue = Queue.Queue()
|
|
self.__initialize = True
|
|
|
|
def terminate(self):
|
|
'''
|
|
@brief: 关闭所有异步线程
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThread:terminate')
|
|
|
|
for runner in self.__runners:
|
|
runner.terminate()
|
|
|
|
for runner in self.__runners:
|
|
runner.join()
|
|
self.__runners = []
|
|
|
|
def put(self, reqmsg):
|
|
'''
|
|
@brief: 处理数据入队列
|
|
@param reqmsg: 待处理数据
|
|
@type reqmsg: ReqMessage
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThread:put')
|
|
# 异步请求超时
|
|
if not reqmsg.response:
|
|
reqmsg.response = ResponsePacket()
|
|
reqmsg.response.iVerson = reqmsg.request.iVerson
|
|
reqmsg.response.cPacketType = reqmsg.request.cPacketType
|
|
reqmsg.response.iRequestId = reqmsg.request.iRequestId
|
|
reqmsg.response.iRet = ServantProxy.TARSASYNCCALLTIMEOUT
|
|
|
|
self.__queue.put(reqmsg)
|
|
|
|
def pop(self):
|
|
'''
|
|
@brief: 处理数据出队列
|
|
@return: ReqMessage
|
|
@rtype: ReqMessage
|
|
'''
|
|
# tarsLogger.debug('AsyncProcThread:pop')
|
|
ret = None
|
|
try:
|
|
ret = self.__queue.get(True, self.__popTimeout)
|
|
except Queue.Empty:
|
|
pass
|
|
return ret
|
|
|
|
def start(self):
|
|
'''
|
|
@brief: 启动异步线程
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThread:start')
|
|
for i in xrange(self.__nrunner):
|
|
runner = AsyncProcThreadRunner()
|
|
runner.initialize(self)
|
|
runner.start()
|
|
self.__runners.append(runner)
|
|
|
|
|
|
class AsyncProcThreadRunner(threading.Thread):
|
|
'''
|
|
@brief: 异步调用线程
|
|
'''
|
|
|
|
def __init__(self):
|
|
tarsLogger.debug('AsyncProcThreadRunner:__init__')
|
|
super(AsyncProcThreadRunner, self).__init__()
|
|
# threading.Thread.__init__(self)
|
|
self.__terminate = False
|
|
self.__initialize = False
|
|
self.__procQueue = None
|
|
|
|
def __del__(self):
|
|
tarsLogger.debug('AsyncProcThreadRunner:__del__')
|
|
|
|
def initialize(self, queue):
|
|
'''
|
|
@brief: 使用AsyncProcThreadRunner前必须调用此函数
|
|
@param queue: 有pop()的类,用于提取待处理数据
|
|
@type queue: AsyncProcThread
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThreadRunner:initialize')
|
|
self.__procQueue = queue
|
|
|
|
def terminate(self):
|
|
'''
|
|
@brief: 关闭线程
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
tarsLogger.debug('AsyncProcThreadRunner:terminate')
|
|
self.__terminate = True
|
|
|
|
def run(self):
|
|
'''
|
|
@brief: 线程启动函数,执行异步调用
|
|
'''
|
|
tarsLogger.debug('AsyncProcThreadRunner:run')
|
|
while not self.__terminate:
|
|
if self.__terminate:
|
|
break
|
|
reqmsg = self.__procQueue.pop()
|
|
if not reqmsg or not reqmsg.callback:
|
|
continue
|
|
|
|
if reqmsg.adapter:
|
|
succ = reqmsg.response.iRet == ServantProxy.TARSSERVERSUCCESS
|
|
reqmsg.adapter.finishInvoke(succ)
|
|
|
|
try:
|
|
reqmsg.callback.onDispatch(reqmsg)
|
|
except Exception, msg:
|
|
tarsLogger.error('AsyncProcThread excepttion: %s', msg)
|
|
|
|
tarsLogger.debug('AsyncProcThreadRunner:run finished')
|
|
|
|
|
|
class ServantProxyCallback(object):
|
|
'''
|
|
@brief: 异步回调对象基类
|
|
'''
|
|
|
|
def __init__(self):
|
|
tarsLogger.debug('ServantProxyCallback:__init__')
|
|
|
|
def onDispatch(reqmsg):
|
|
'''
|
|
@brief: 分配响应报文到对应的回调函数
|
|
@param queue: 有pop()的类,用于提取待处理数据
|
|
@type queue: AsyncProcThread
|
|
@return: None
|
|
@rtype: None
|
|
'''
|
|
raise NotImplementedError()
|