mirror of
https://github.com/wbt5/real-url.git
synced 2025-08-02 15:44:49 +08:00
301 lines
8.1 KiB
Python
301 lines
8.1 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
# filename: __timeQueue.py
|
||
|
||
# Tencent is pleased to support the open source community by making Tars available.
|
||
#
|
||
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
|
||
#
|
||
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
|
||
# in compliance with the License. You may obtain a copy of the License at
|
||
#
|
||
# https://opensource.org/licenses/BSD-3-Clause
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software distributed
|
||
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||
# specific language governing permissions and limitations under the License.
|
||
#
|
||
|
||
'''
|
||
@version: 0.01
|
||
@brief: 请求响应报文和超时队列
|
||
'''
|
||
|
||
import threading
|
||
import time
|
||
import struct
|
||
|
||
from .__logger import tarsLogger
|
||
from .__tars import TarsInputStream
|
||
from .__tars import TarsOutputStream
|
||
from .__packet import RequestPacket
|
||
from .__packet import ResponsePacket
|
||
from .__util import (NewLock, LockGuard)
|
||
|
||
|
||
class ReqMessage:
|
||
'''
|
||
@brief: 请求响应报文,保存一个请求响应所需要的数据
|
||
'''
|
||
SYNC_CALL = 1
|
||
ASYNC_CALL = 2
|
||
ONE_WAY = 3
|
||
|
||
def __init__(self):
|
||
self.type = ReqMessage.SYNC_CALL
|
||
self.servant = None
|
||
self.lock = None
|
||
self.adapter = None
|
||
self.request = None
|
||
self.response = None
|
||
self.callback = None
|
||
self.begtime = None
|
||
self.endtime = None
|
||
self.isHash = False
|
||
self.isConHash = False
|
||
self.hashCode = 0
|
||
|
||
def packReq(self):
|
||
'''
|
||
@brief: 序列化请求报文
|
||
@return: 序列化后的请求报文
|
||
@rtype: str
|
||
'''
|
||
if not self.request:
|
||
return ''
|
||
oos = TarsOutputStream()
|
||
RequestPacket.writeTo(oos, self.request)
|
||
reqpkt = oos.getBuffer()
|
||
plen = len(reqpkt) + 4
|
||
reqpkt = struct.pack('!i', plen) + reqpkt
|
||
return reqpkt
|
||
|
||
@staticmethod
|
||
def unpackRspList(buf):
|
||
'''
|
||
@brief: 解码响应报文
|
||
@param buf: 多个序列化后的响应报文数据
|
||
@type buf: str
|
||
@return: 解码出来的响应报文和解码的buffer长度
|
||
@rtype: rsplist: 装有ResponsePacket的list
|
||
unpacklen: int
|
||
'''
|
||
rsplist = []
|
||
if not buf:
|
||
return rsplist
|
||
|
||
unpacklen = 0
|
||
buf = buffer(buf)
|
||
while True:
|
||
if len(buf) - unpacklen < 4:
|
||
break
|
||
packsize = buf[unpacklen: unpacklen+4]
|
||
packsize, = struct.unpack_from('!i', packsize)
|
||
if len(buf) < unpacklen + packsize:
|
||
break
|
||
|
||
ios = TarsInputStream(buf[unpacklen+4: unpacklen+packsize])
|
||
rsp = ResponsePacket.readFrom(ios)
|
||
rsplist.append(rsp)
|
||
unpacklen += packsize
|
||
|
||
return rsplist, unpacklen
|
||
|
||
# 超时队列,加锁,线程安全
|
||
|
||
|
||
class TimeoutQueue:
|
||
'''
|
||
@brief: 超时队列,加锁,线程安全
|
||
可以像队列一样FIFO,也可以像字典一样按key取item
|
||
@todo: 限制队列长度
|
||
'''
|
||
|
||
def __init__(self, timeout=3):
|
||
self.__uniqId = 0
|
||
# self.__lock = threading.Lock()
|
||
self.__lock = NewLock()
|
||
self.__data = {}
|
||
self.__queue = []
|
||
self.__timeout = timeout
|
||
|
||
def getTimeout(self):
|
||
'''
|
||
@brief: 获取超时时间,单位为s
|
||
@return: 超时时间
|
||
@rtype: float
|
||
'''
|
||
return self.__timeout
|
||
|
||
def setTimeout(self, timeout):
|
||
'''
|
||
@brief: 设置超时时间,单位为s
|
||
@param timeout: 超时时间
|
||
@type timeout: float
|
||
@return: None
|
||
@rtype: None
|
||
'''
|
||
self.__timeout = timeout
|
||
|
||
def size(self):
|
||
'''
|
||
@brief: 获取队列长度
|
||
@return: 队列长度
|
||
@rtype: int
|
||
'''
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
ret = len(self.__data)
|
||
# self.__lock.release()
|
||
return ret
|
||
|
||
def generateId(self):
|
||
'''
|
||
@brief: 生成唯一id,0 < id < 2 ** 32
|
||
@return: id
|
||
@rtype: int
|
||
'''
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
ret = self.__uniqId
|
||
ret = (ret + 1) % 0x7FFFFFFF
|
||
while ret <= 0:
|
||
ret = (ret + 1) % 0x7FFFFFFF
|
||
self.__uniqId = ret
|
||
# self.__lock.release()
|
||
return ret
|
||
|
||
def pop(self, uniqId=0, erase=True):
|
||
'''
|
||
@brief: 弹出item
|
||
@param uniqId: item的id,如果为0,按FIFO弹出
|
||
@type uniqId: int
|
||
@param erase: 弹出后是否从字典里删除item
|
||
@type erase: bool
|
||
@return: item
|
||
@rtype: any type
|
||
'''
|
||
ret = None
|
||
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
|
||
if not uniqId:
|
||
if len(self.__queue):
|
||
uniqId = self.__queue.pop(0)
|
||
if uniqId:
|
||
if erase:
|
||
ret = self.__data.pop(uniqId, None)
|
||
else:
|
||
ret = self.__data.get(uniqId, None)
|
||
|
||
# self.__lock.release()
|
||
|
||
return ret[0] if ret else None
|
||
|
||
def push(self, item, uniqId):
|
||
'''
|
||
@brief: 数据入队列,如果队列已经有了uniqId,插入失败
|
||
@param item: 插入的数据
|
||
@type item: any type
|
||
@return: 插入是否成功
|
||
@rtype: bool
|
||
'''
|
||
begtime = time.time()
|
||
ret = True
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
|
||
if uniqId in self.__data:
|
||
ret = False
|
||
else:
|
||
self.__data[uniqId] = [item, begtime]
|
||
self.__queue.append(uniqId)
|
||
# self.__lock.release()
|
||
return ret
|
||
|
||
def peek(self, uniqId):
|
||
'''
|
||
@brief: 根据uniqId获取item,不会删除item
|
||
@param uniqId: item的id
|
||
@type uniqId: int
|
||
@return: item
|
||
@rtype: any type
|
||
'''
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
|
||
ret = self.__data.get(uniqId, None)
|
||
# self.__lock.release()
|
||
if not ret:
|
||
return None
|
||
return ret[0]
|
||
|
||
def timeout(self):
|
||
'''
|
||
@brief: 检测是否有item超时,如果有就删除
|
||
@return: None
|
||
@rtype: None
|
||
'''
|
||
endtime = time.time()
|
||
# self.__lock.acquire()
|
||
lock = LockGuard(self.__lock)
|
||
|
||
# 处理异常情况,防止死锁
|
||
try:
|
||
new_data = {}
|
||
for uniqId, item in self.__data.items():
|
||
if endtime - item[1] < self.__timeout:
|
||
new_data[uniqId] = item
|
||
else:
|
||
tarsLogger.debug(
|
||
'TimeoutQueue:timeout remove id : %d' % uniqId)
|
||
self.__data = new_data
|
||
finally:
|
||
# self.__lock.release()
|
||
pass
|
||
|
||
|
||
class QueueTimeout(threading.Thread):
|
||
"""
|
||
超时线程,定时触发超时事件
|
||
"""
|
||
|
||
def __init__(self, timeout=0.1):
|
||
# threading.Thread.__init__(self)
|
||
tarsLogger.debug('QueueTimeout:__init__')
|
||
super(QueueTimeout, self).__init__()
|
||
self.timeout = timeout
|
||
self.__terminate = False
|
||
self.__handler = None
|
||
self.__lock = threading.Condition()
|
||
|
||
def terminate(self):
|
||
tarsLogger.debug('QueueTimeout:terminate')
|
||
self.__terminate = True
|
||
self.__lock.acquire()
|
||
self.__lock.notifyAll()
|
||
self.__lock.release()
|
||
|
||
def setHandler(self, handler):
|
||
self.__handler = handler
|
||
|
||
def run(self):
|
||
while not self.__terminate:
|
||
try:
|
||
self.__lock.acquire()
|
||
self.__lock.wait(self.timeout)
|
||
self.__lock.release()
|
||
if self.__terminate:
|
||
break
|
||
self.__handler()
|
||
except Exception as msg:
|
||
tarsLogger.error('QueueTimeout:run exception : %s', msg)
|
||
|
||
tarsLogger.debug('QueueTimeout:run finished')
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pass
|