1
0
mirror of https://github.com/wbt5/real-url.git synced 2025-08-01 14:48:01 +08:00
wbt5 5f7734d9c3
🐛 Fix 企鹅电竞弹幕 (#209,#140)
修复:企鹅电竞弹幕解包时,有一类消息type属性为空时导致判断出错的问题。
2021-07-05 00:33:29 +08:00

354 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
import struct
import json
import re
class eGame:
heartbeat = b'\x00\x00\x00\x12\x00\x12\x00\x01\x00\x07\x00\x00\x00\x01\x00\x00\x00\x00'
heartbeatInterval = 60
@staticmethod
async def get_ws_info(url):
rid = url.split('/')[-1]
page_id = aid = rid
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1'
}
async with aiohttp.ClientSession() as session:
async with session.get('https://m.egame.qq.com/live?anchorid' + rid, headers=headers) as resp:
res = await resp.text()
res_ = re.findall(r'"videoInfo":(.*),"h5Url"', res)[0]
str_id = json.loads(res_)['pid']
params = {
'param': json.dumps({"0":{"module":"pgg.ws_token_go_svr.DefObj","method":"get_token","param":{"scene_flag":16,"subinfo":{"page":{"scene":1,"page_id":int(page_id),"str_id":str(str_id),"msg_type_list":[1,2]}},"version":1,"message_seq":-1,"dc_param":{"params":{"info":{"aid":aid}},"position":{"page_id":"QG_HEARTBEAT_PAGE_LIVE_ROOM"},"refer":{}},"other_uid":0}}})
}
async with session.post('https://share.egame.qq.com/cgi-bin/pgg_async_fcgi', data=params, headers=headers) as resp:
res = json.loads(await resp.text())
token = res['data']['0']['retBody']['data']['token']
# 开始拼接reg_datas
reg_datas = []
tokenbuf = token.encode('ascii')
bodybuf = struct.pack('!Bi', 7, len(tokenbuf)) + tokenbuf
headerbuf = struct.pack('!ihhhihh', 18 + len(bodybuf), 18, 1, 1, 0, 0, 0)
data = headerbuf + bodybuf
reg_datas.append(data)
reg_datas.append(eGame.heartbeat)
return 'wss://barragepush.egame.qq.com/sub', reg_datas
@staticmethod
def decode_msg(data):
"""
type: 0、3、9用户发言7、33礼物信息29、35欢迎信息24、31系统提醒23关注信息
"""
msgs = []
msg = {}
s = MessageDecode(data)
body = s.v()['body']
if body:
bin_datas = body['bin_data']
for bin_data in bin_datas:
# if bin_data['type'] in (0, 3, 9):
if bin_data.get('type', '') in (0, 3, 9):
msg['name'] = bin_data['nick']
msg['content'] = bin_data['content']
msg['msg_type'] = 'danmaku'
else:
msg = {'name': '', 'content': '', 'msg_type': 'other'}
msgs.append(msg.copy())
return msgs
else:
msg = {'name': '', 'content': '', 'msg_type': 'None'}
msgs.append(msg.copy())
return msgs
class MessageDecode:
"""
数据解包还原JS中的操作步骤
"""
def __init__(self, data):
self.data = data
self.ie = {
'event_id': 0,
'msg_type': 1,
'bin_data': 2,
'params': 3,
'start_tm': 4,
'data_list': 6,
'end_tm': 5,
'message_seq': 7,
}
self.ne = {
'uid': 0,
'msgid': 1,
'nick': 2,
'content': 3,
'tm': 4,
'type': 5,
'scenes_flag': 6,
'ext': 7,
'send_scenes': 8
}
self.oe = {
'event_id': 0,
'event_name': 1,
'info': 2,
'params': 3,
'bin_data': 4
}
def v(self):
data = self.data
startPosition = 18
endPosition, = struct.unpack_from('!i', data, 0)
seq, = struct.unpack_from('!i', data, 10)
operation, = struct.unpack_from('!h', data, 8)
if endPosition != len(data):
raise Exception('The received packet length is abnormal')
return {
'seq': seq,
'operation': operation,
'body': self.w(operation, startPosition, endPosition, data)
}
def w(self, operation, startPosition, endPosition, data):
if operation == 3:
return self.x(startPosition, endPosition, data)
else:
return None
def x(self, startPosition, endPosition, data):
i, = struct.unpack_from('!i', data, startPosition)
n = data[startPosition: endPosition]
if len(n) >= (4 + i):
o = n[4:(4 + i)]
a = self.S(o)
y = self.ye(a)
return y
else:
return None
def ye(self, e):
return self.T({
'resultObj': e,
'template': self.ie,
'afterChange': 1,
})
def afterChange(self, e, t, i, n, o):
if t == 'bin_data':
v = []
ve = {}
for m in n:
a = self.S(e, m['ext'])
b = o['msg_type']
if b == 1:
ve = self.T({
'resultObj': a,
'template': self.ne
})
elif b == 2:
ve = self.T({
'resultObj': a,
'template': self.oe
})
v.append(ve.copy())
return v
else:
return n
def T(self, e):
i = e['resultObj']
n = e['template']
o = e.get('beforeChange', '')
r = e.get('afterChange', '')
a = {}
for s in n.keys():
for t in i[0]:
if t['tag'] == n[s]:
q = t
p = q['value']
c = q['ext']
if r:
a[s] = self.afterChange(i[1], s, c, p, a)
else:
a[s] = p
break
return a
def S(self, e, t=0):
if t == '':
t = 0
i = []
n = len(e)
while t < n:
o = self.m(e, t)
dict_ = {
'value': o['value'],
'lastPosition': o['position'],
'ext': o['ext'],
'tag': o['tag']
}
i.append(dict_.copy())
t = o['position']
return i, e
def m(self, e, t):
value = position = ext = ''
i = e
a, = struct.unpack_from('!B', i, t)
tag = (240 & a) >> 4
type = 15 & a
s_position = t + 1
if type == 0:
value, position = self.f0(i, s_position)
elif type == 1:
value, position = self.f1(i, s_position)
elif type == 2:
value, position = self.f2(i, s_position)
elif type == 3:
value, position = self.f3(i, s_position)
elif type == 6:
value, position, ext = self.f6(i, s_position)
elif type == 7:
value, position, ext = self.f7(i, s_position)
elif type == 8:
value, position = self.f8(i, s_position)
elif type == 9:
value, position = self.f9(i, s_position)
elif type == 12:
value, position = self.f12(i, s_position)
elif type == 13:
value, position = self.f13(i, s_position)
i = ''
return {
'i': i,
'tag': tag,
'type': type,
'value': value,
'position': position,
'ext': ext
}
def f0(self, e, t):
o = 1
try:
n, = struct.unpack_from('!B', e, t)
except:
n = ''
return n, t + o
def f1(self, e, t):
o = 2
try:
n, = struct.unpack_from('!H', e, t)
except:
n = ''
return n, t + o
def f2(self, e, t):
o = 4
try:
n, = struct.unpack_from('!I', e, t)
except:
n = ''
return n, t + o
def f3(self, e, t):
e = struct.unpack('!8B', e[t:t + 8])
i = (e[0] << 24) + (e[1] << 16) + (e[2] << 8) + e[3]
o = (e[4] << 24) + (e[5] << 16) + (e[6] << 8) + e[7]
value = (i << 32) + o
position = t + 8
return value, position
def f4(self, e, t):
o = 4
try:
n, = struct.unpack_from('!f', e, t)
except:
n = ''
return n, t + o
def f5(self, e, t):
o = 8
try:
n, = struct.unpack_from('!d', e, t)
except:
n = ''
return n, t + o
def f6(self, e, t):
n, = struct.unpack_from('!B', e, t)
r = t + 1
s = r + n
value = (e[r:s]).decode('utf8', errors='ignore')
return value, s, r
def f7(self, e, t):
n, = struct.unpack_from('!I', e, t)
r = t + 4
s = r + n
value = (e[r:s]).decode('utf8', errors='ignore')
return value, s, r
def f8(self, e, t):
i = {}
b = self.m(e, t)
o = b['value']
r = b['position']
while o > 0:
a = self.m(e, r)
s = self.m(e, a['position'])
if a['tag'] == 0 and s['tag'] == 1:
i[a['value']] = s['value']
r = s['position']
o -= 1
return i, r
def f9(self, e, t):
i = self.m(e, t)
n = i['value']
o = i['position']
r = []
while n > 0:
a = self.m(e, o)
r.append(a.copy())
o = a['position']
n -= 1
return r, o
def f10(self, e, t):
i = []
while True:
n = self.m(e, t)
t = n['position']
if n['type'] == 11:
return i, t
i.append(n['value'].copy())
def f11(self, e, t):
return '', t
def f12(self, e, t):
return 0, t
def f13(self, e, t):
i = self.m(e, t)
return e[(t + i['position']):i['value']], t + i['position'] + i['value']