1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-08-02 15:44:49 +08:00
2020-06-18 10:39:11 +08:00

576 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: __trans.py
# Tencent is pleased to support the open source community by making Tars available.
#
# Copyright (C) 2016THL A29 Limited, a Tencent company. All rights reserved.
#
# Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
'''
@version: 0.01
@brief: 网络相关模块
'''
import socket
import select
import errno
import threading
from .__logger import tarsLogger
from .__TimeoutQueue import ReqMessage
class EndPointInfo:
'''
@brief: 保存每个连接端口的信息
'''
SOCK_TCP = 'TCP'
SOCK_UDP = 'UDP'
def __init__(self,
ip='',
port=0,
timeout=-1,
weight=0,
weightType=0,
connType=SOCK_TCP):
self.__ip = ip
self.__port = port
self.__timeout = timeout
self.__connType = connType
self.__weightType = weightType
self.__weight = weight
def getIp(self):
return self.__ip
def getPort(self):
return self.__port
def getConnType(self):
'''
@return: 传输层连接类型
@rtype: EndPointInfo.SOCK_TCP 或 EndPointInfo.SOCK_UDP
'''
return self.__connType
def getWeightType(self):
return self.__weightType
def getWeight(self):
return self.__weight
def __str__(self):
return '%s %s:%s %d:%d' % (self.__connType, self.__ip, self.__port, self.__weightType, self.__weight)
class Transceiver:
'''
@brief: 网络传输基类提供网络send/recv接口
'''
CONNECTED = 0
CONNECTING = 1
UNCONNECTED = 2
def __init__(self, endPointInfo):
tarsLogger.debug('Transceiver:__init__, %s', endPointInfo)
self.__epi = endPointInfo
self.__sock = None
self.__connStatus = Transceiver.UNCONNECTED
self.__connFailed = False
# 这两个变量要给子类用不能用name mangling隐藏
self._sendBuff = ''
self._recvBuf = ''
def __del__(self):
tarsLogger.debug('Transceiver:__del__')
self.close()
def getSock(self):
'''
@return: socket对象
@rtype: socket.socket
'''
return self.__sock
def getFd(self):
'''
@brief: 获取socket的文件描述符
@return: 如果self.__sock没有建立返回-1
@rtype: int
'''
if self.__sock:
return self.__sock.fileno()
else:
return -1
def getEndPointInfo(self):
'''
@return: 端口信息
@rtype: EndPointInfo
'''
return self.__epi
def isValid(self):
'''
@return: 是否创建了socket
@rtype: bool
'''
return self.__sock is not None
def hasConnected(self):
'''
@return: 是否连接上了
@rtype: bool
'''
return self.isValid() and self.__connStatus == Transceiver.CONNECTED
def isConnFailed(self):
'''
@return: 是否连接失败
@rtype: bool
'''
return self.__connFailed
def isConnecting(self):
'''
@return: 是否正在连接
@rtype: bool
'''
return self.isValid() and self.__connStatus == Transceiver.CONNECTING
def setConnFailed(self):
'''
@brief: 设置为连接失败
@return: None
@rtype: None
'''
self.__connFailed = True
self.__connStatus = Transceiver.UNCONNECTED
def setConnected(self):
'''
@brief: 设置为连接完
@return: None
@rtype: None
'''
self.__connFailed = False
self.__connStatus = Transceiver.CONNECTED
def close(self):
'''
@brief: 关闭连接
@return: None
@rtype: None
@note: 多次调用不会有问题
'''
tarsLogger.debug('Transceiver:close')
if not self.isValid():
return
self.__sock.close()
self.__sock = None
self.__connStatus = Transceiver.UNCONNECTED
self.__connFailed = False
self._sendBuff = ''
self._recvBuf = ''
tarsLogger.info('trans close : %s' % self.__epi)
def writeToSendBuf(self, msg):
'''
@brief: 把数据添加到send buffer里
@param msg: 发送的数据
@type msg: str
@return: None
@rtype: None
@note: 没有加锁多线程调用会有race conditions
'''
self._sendBuff += msg
def recv(self, bufsize, flag=0):
raise NotImplementedError()
def send(self, buf, flag=0):
raise NotImplementedError()
def doResponse(self):
raise NotImplementedError()
def doRequest(self):
'''
@brief: 将请求数据发送出去
@return: 发送的字节数
@rtype: int
'''
tarsLogger.debug('Transceiver:doRequest')
if not self.isValid():
return -1
nbytes = 0
buf = buffer(self._sendBuff)
while True:
if not buf:
break
ret = self.send(buf[nbytes:])
if ret > 0:
nbytes += ret
else:
break
# 发送前面的字节后将后面的字节拷贝上来
self._sendBuff = buf[nbytes:]
return nbytes
def reInit(self):
'''
@brief: 初始化socket并连接服务器
@return: 成功返回0失败返回-1
@rtype: int
'''
tarsLogger.debug('Transceiver:reInit')
assert(self.isValid() is False)
if self.__epi.getConnType() != EndPointInfo.SOCK_TCP:
return -1
try:
self.__sock = socket.socket()
self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__sock.setblocking(0)
self.__sock.connect((self.__epi.getIp(), self.__epi.getPort()))
self.__connStatus = Transceiver.CONNECTED
except socket.error as msg:
if msg.errno == errno.EINPROGRESS:
self.__connStatus = Transceiver.CONNECTING
else:
tarsLogger.info('reInit, %s, faild!, %s',
self.__epi, msg)
self.__sock = None
return -1
tarsLogger.info('reInit, connect: %s, fd: %d',
self.__epi, self.getFd())
return 0
class TcpTransceiver(Transceiver):
'''
@brief: TCP传输实现
'''
def send(self, buf, flag=0):
'''
@brief: 实现tcp的发送
@param buf: 发送的数据
@type buf: str
@param flag: 发送标志
@param flag: int
@return: 发送字节数
@rtype: int
'''
tarsLogger.debug('TcpTransceiver:send')
if not self.isValid():
return -1
nbytes = 0
try:
nbytes = self.getSock().send(buf, flag)
tarsLogger.info('tcp send, fd: %d, %s, len: %d',
self.getFd(), self.getEndPointInfo(), nbytes)
except socket.error as msg:
if msg.errno != errno.EAGAIN:
tarsLogger.error('tcp send, fd: %d, %s, fail!, %s, close',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return 0
return nbytes
def recv(self, bufsize, flag=0):
'''
@brief: 实现tcp的recv
@param bufsize: 接收大小
@type bufsize: int
@param flag: 接收标志
@param flag: int
@return: 接收的内容接收出错返回None
@rtype: str
'''
tarsLogger.debug('TcpTransceiver:recv')
assert(self.isValid())
buf = ''
try:
buf = self.getSock().recv(bufsize, flag)
if len(buf) == 0:
tarsLogger.info('tcp recv, fd: %d, %s, recv 0 bytes, close',
self.getFd(), self.getEndPointInfo())
self.close()
return None
except socket.error as msg:
if msg.errno != errno.EAGAIN:
tarsLogger.info('tcp recv, fd: %d, %s, faild!, %s, close',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return None
tarsLogger.info('tcp recv, fd: %d, %s, nbytes: %d',
self.getFd(), self.getEndPointInfo(), len(buf))
return buf
def doResponse(self):
'''
@brief: 处理接收的数据
@return: 返回响应报文的列表如果出错返回None
@rtype: list: ResponsePacket
'''
tarsLogger.debug('TcpTransceiver:doResponse')
if not self.isValid():
return None
bufs = [self._recvBuf]
while True:
buf = self.recv(8292)
if not buf:
break
bufs.append(buf)
self._recvBuf = ''.join(bufs)
tarsLogger.info('tcp doResponse, fd: %d, recvbuf: %d',
self.getFd(), len(self._recvBuf))
if not self._recvBuf:
return None
rsplist = None
try:
rsplist, bufsize = ReqMessage.unpackRspList(self._recvBuf)
self._recvBuf = self._recvBuf[bufsize:]
except Exception as msg:
tarsLogger.error(
'tcp doResponse, fd: %d, %s, tcp recv unpack error: %s',
self.getFd(), self.getEndPointInfo(), msg)
self.close()
return rsplist
class FDReactor(threading.Thread):
'''
@brief: 监听FD事件并解发注册的handle
'''
def __init__(self):
tarsLogger.debug('FDReactor:__init__')
# threading.Thread.__init__(self)
super(FDReactor, self).__init__()
self.__terminate = False
self.__ep = None
self.__shutdown = None
# {fd : adapterproxy}
self.__adapterTab = {}
def __del__(self):
tarsLogger.debug('FDReactor:__del__')
self.__ep.close()
self.__shutdown.close()
self.__ep = None
self.__shutdown = None
def initialize(self):
'''
@brief: 初始化使用FDReactor前必须调用
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:initialize')
self.__ep = select.epoll()
self.__shutdown = socket.socket()
self.__ep.register(self.__shutdown.fileno(),
select.EPOLLET | select.EPOLLIN)
tarsLogger.debug('FDReactor init, shutdown fd : %d',
self.__shutdown.fileno())
def terminate(self):
'''
@brief: 结束FDReactor的线程
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:terminate')
self.__terminate = True
self.__ep.modify(self.__shutdown.fileno(), select.EPOLLOUT)
self.__adapterTab = {}
def handle(self, adapter, events):
'''
@brief: 处理epoll事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@param events: epoll事件
@param events: int
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handle events : %d', events)
assert(adapter)
try:
if events == 0:
return
if events & (select.EPOLLERR | select.EPOLLHUP):
tarsLogger.debug('FDReactor::handle EPOLLERR or EPOLLHUP: %s',
adapter.trans().getEndPointInfo())
adapter.trans().close()
return
if adapter.shouldCloseTrans():
tarsLogger.debug('FDReactor::handle should close trans: %s',
adapter.trans().getEndPointInfo())
adapter.setCloseTrans(False)
adapter.trans().close()
return
if adapter.trans().isConnecting():
if not adapter.finishConnect():
return
if events & select.EPOLLIN:
self.handleInput(adapter)
if events & select.EPOLLOUT:
self.handleOutput(adapter)
except Exception as msg:
tarsLogger.error('FDReactor handle exception: %s', msg)
def handleExcept(self):
pass
def handleInput(self, adapter):
'''
@brief: 处理接收事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handleInput')
if not adapter.trans().isValid():
return
rsplist = adapter.trans().doResponse()
if not rsplist:
return
for rsp in rsplist:
adapter.finished(rsp)
def handleOutput(self, adapter):
'''
@brief: 处理发送事件
@param adapter: 事件对应的adapter
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:handleOutput')
if not adapter.trans().isValid():
return
while adapter.trans().doRequest() >= 0 and adapter.sendRequest():
pass
def notify(self, adapter):
'''
@brief: 更新adapter对应的fd的epoll状态
@return: None
@rtype: None
@note: FDReactor使用的epoll是EPOLLET模式同一事件只通知一次
希望某一事件再次通知需调用此函数
'''
tarsLogger.debug('FDReactor:notify')
fd = adapter.trans().getFd()
if fd != -1:
self.__ep.modify(fd,
select.EPOLLET | select.EPOLLOUT | select.EPOLLIN)
def registerAdapter(self, adapter, events):
'''
@brief: 注册adapter
@param adapter: 收发事件处理类
@type adapter: AdapterProxy
@param events: 注册事件
@type events: int
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:registerAdapter events : %d', events)
events |= select.EPOLLET
try:
self.__ep.unregister(adapter.trans().getFd())
except:
pass
self.__ep.register(adapter.trans().getFd(), events)
self.__adapterTab[adapter.trans().getFd()] = adapter
def unregisterAdapter(self, adapter):
'''
@brief: 注销adapter
@param adapter: 收发事件处理类
@type adapter: AdapterProxy
@return: None
@rtype: None
'''
tarsLogger.debug('FDReactor:registerAdapter')
self.__ep.unregister(adapter.trans().getFd())
self.__adapterTab.pop(adapter.trans().getFd(), None)
def run(self):
'''
@brief: 线程启动函数,循环监听网络事件
'''
tarsLogger.debug('FDReactor:run')
while not self.__terminate:
try:
eplist = self.__ep.poll(1)
if eplist:
tarsLogger.debug('FDReactor run get eplist : %s, terminate : %s', str(
eplist), self.__terminate)
if self.__terminate:
tarsLogger.debug('FDReactor terminate')
break
for fd, events in eplist:
adapter = self.__adapterTab.get(fd, None)
if not adapter:
continue
self.handle(adapter, events)
except Exception as msg:
tarsLogger.error('FDReactor run exception: %s', msg)
tarsLogger.debug('FDReactor:run finished')
if __name__ == '__main__':
print('hello world')
epi = EndPointInfo('127.0.0.1', 1313)
print(epi)
trans = TcpTransceiver(epi)
print(trans.getSock())
print(trans.getFd())
print(trans.reInit())
print(trans.isConnecting())
print(trans.hasConnected())
buf = 'hello world'
print(trans.send(buf))
buf = trans.recv(1024)
print(buf)
trans.close()