1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-08-02 15:44:49 +08:00
zhibo-url/danmu/danmaku/tars/__TimeoutQueue.py
2020-06-18 10:39:11 +08:00

301 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __timeQueue.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 请求响应报文和超时队列
'''
import threading
import time
import struct
from .__logger import tarsLogger
from .__tars import TarsInputStream
from .__tars import TarsOutputStream
from .__packet import RequestPacket
from .__packet import ResponsePacket
from .__util import (NewLock, LockGuard)
class ReqMessage:
'''
@brief: 请求响应报文,保存一个请求响应所需要的数据
'''
SYNC_CALL = 1
ASYNC_CALL = 2
ONE_WAY = 3
def __init__(self):
self.type = ReqMessage.SYNC_CALL
self.servant = None
self.lock = None
self.adapter = None
self.request = None
self.response = None
self.callback = None
self.begtime = None
self.endtime = None
self.isHash = False
self.isConHash = False
self.hashCode = 0
def packReq(self):
'''
@brief: 序列化请求报文
@return: 序列化后的请求报文
@rtype: str
'''
if not self.request:
return ''
oos = TarsOutputStream()
RequestPacket.writeTo(oos, self.request)
reqpkt = oos.getBuffer()
plen = len(reqpkt) + 4
reqpkt = struct.pack('!i', plen) + reqpkt
return reqpkt
@staticmethod
def unpackRspList(buf):
'''
@brief: 解码响应报文
@param buf: 多个序列化后的响应报文数据
@type buf: str
@return: 解码出来的响应报文和解码的buffer长度
@rtype: rsplist: 装有ResponsePacket的list
unpacklen: int
'''
rsplist = []
if not buf:
return rsplist
unpacklen = 0
buf = buffer(buf)
while True:
if len(buf) - unpacklen < 4:
break
packsize = buf[unpacklen: unpacklen+4]
packsize, = struct.unpack_from('!i', packsize)
if len(buf) < unpacklen + packsize:
break
ios = TarsInputStream(buf[unpacklen+4: unpacklen+packsize])
rsp = ResponsePacket.readFrom(ios)
rsplist.append(rsp)
unpacklen += packsize
return rsplist, unpacklen
# 超时队列,加锁,线程安全
class TimeoutQueue:
'''
@brief: 超时队列,加锁,线程安全
可以像队列一样FIFO也可以像字典一样按key取item
@todo: 限制队列长度
'''
def __init__(self, timeout=3):
self.__uniqId = 0
# self.__lock = threading.Lock()
self.__lock = NewLock()
self.__data = {}
self.__queue = []
self.__timeout = timeout
def getTimeout(self):
'''
@brief: 获取超时时间单位为s
@return: 超时时间
@rtype: float
'''
return self.__timeout
def setTimeout(self, timeout):
'''
@brief: 设置超时时间单位为s
@param timeout: 超时时间
@type timeout: float
@return: None
@rtype: None
'''
self.__timeout = timeout
def size(self):
'''
@brief: 获取队列长度
@return: 队列长度
@rtype: int
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = len(self.__data)
# self.__lock.release()
return ret
def generateId(self):
'''
@brief: 生成唯一id0 < id < 2 ** 32
@return: id
@rtype: int
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = self.__uniqId
ret = (ret + 1) % 0x7FFFFFFF
while ret <= 0:
ret = (ret + 1) % 0x7FFFFFFF
self.__uniqId = ret
# self.__lock.release()
return ret
def pop(self, uniqId=0, erase=True):
'''
@brief: 弹出item
@param uniqId: item的id如果为0按FIFO弹出
@type uniqId: int
@param erase: 弹出后是否从字典里删除item
@type erase: bool
@return: item
@rtype: any type
'''
ret = None
# self.__lock.acquire()
lock = LockGuard(self.__lock)
if not uniqId:
if len(self.__queue):
uniqId = self.__queue.pop(0)
if uniqId:
if erase:
ret = self.__data.pop(uniqId, None)
else:
ret = self.__data.get(uniqId, None)
# self.__lock.release()
return ret[0] if ret else None
def push(self, item, uniqId):
'''
@brief: 数据入队列如果队列已经有了uniqId插入失败
@param item: 插入的数据
@type item: any type
@return: 插入是否成功
@rtype: bool
'''
begtime = time.time()
ret = True
# self.__lock.acquire()
lock = LockGuard(self.__lock)
if uniqId in self.__data:
ret = False
else:
self.__data[uniqId] = [item, begtime]
self.__queue.append(uniqId)
# self.__lock.release()
return ret
def peek(self, uniqId):
'''
@brief: 根据uniqId获取item不会删除item
@param uniqId: item的id
@type uniqId: int
@return: item
@rtype: any type
'''
# self.__lock.acquire()
lock = LockGuard(self.__lock)
ret = self.__data.get(uniqId, None)
# self.__lock.release()
if not ret:
return None
return ret[0]
def timeout(self):
'''
@brief: 检测是否有item超时如果有就删除
@return: None
@rtype: None
'''
endtime = time.time()
# self.__lock.acquire()
lock = LockGuard(self.__lock)
# 处理异常情况,防止死锁
try:
new_data = {}
for uniqId, item in self.__data.items():
if endtime - item[1] < self.__timeout:
new_data[uniqId] = item
else:
tarsLogger.debug(
'TimeoutQueue:timeout remove id : %d' % uniqId)
self.__data = new_data
finally:
# self.__lock.release()
pass
class QueueTimeout(threading.Thread):
"""
超时线程,定时触发超时事件
"""
def __init__(self, timeout=0.1):
# threading.Thread.__init__(self)
tarsLogger.debug('QueueTimeout:__init__')
super(QueueTimeout, self).__init__()
self.timeout = timeout
self.__terminate = False
self.__handler = None
self.__lock = threading.Condition()
def terminate(self):
tarsLogger.debug('QueueTimeout:terminate')
self.__terminate = True
self.__lock.acquire()
self.__lock.notifyAll()
self.__lock.release()
def setHandler(self, handler):
self.__handler = handler
def run(self):
while not self.__terminate:
try:
self.__lock.acquire()
self.__lock.wait(self.timeout)
self.__lock.release()
if self.__terminate:
break
self.__handler()
except Exception as msg:
tarsLogger.error('QueueTimeout:run exception : %s', msg)
tarsLogger.debug('QueueTimeout:run finished')
if __name__ == '__main__':
pass