700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Python爬虫实战-官方API怎么用?结合Socket实现斗鱼实时弹幕抓取-最新API

Python爬虫实战-官方API怎么用?结合Socket实现斗鱼实时弹幕抓取-最新API

时间:2018-09-28 17:39:33

相关推荐

Python爬虫实战-官方API怎么用?结合Socket实现斗鱼实时弹幕抓取-最新API

目录

一、原理概述及结果展示

1.1 计算机网络基础知识

1.2 结果展示

二、开发环境

2.1 基础环境及相关文档

2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)

2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)

三、斗鱼弹幕抓取文档及源码

3.1 Socket编程-asyncore模块基础模板的使用

3.2 斗鱼弹幕抓取实战

一、原理概述及结果展示

1.1 计算机网络基础知识

Socket:在计算机通信领域,socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种方式。通过 socket 这种约定,一台计算机可以接收其他计算机的数据,也可以向其他计算机发送数据。

nc命令:nc是netcat的简写,有着网络界的瑞士军刀美誉。因为它短小精悍、功能实用,被设计为一个简单、可靠的网络工具。

(1)实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口

(2)端口的扫描,nc可以作为client发起TCP或UDP连接

(3)机器之间传输文件

(4)机器之间网络测速

telnet命令:telnet就是查看某个端口是否可访问。我们在搞开发的时候,经常要用的端口就是 8080。那么你可以启动服务器,用telnet 去查看这个端口是否可用。

1.2 结果展示

后期可以自行尝试存入数据库,以:进行正则即可

二、开发环境

2.1 基础环境及相关文档

软件环境:Windows10 + Pycharm

requirements.txt内容:(先创建同名文档,然后直接终端使用pip install -r requirements.txt)

certifi==.4.5.1wincertstore==0.2

斗鱼官方API文档(暂时未针对个人开发者开放):弹幕相关API

项目本质:使用Python作为telnet客户端,与斗鱼服务器进行通信。

2.2 Windows环境下nc的配置(因为windows不支持nc,linux用户可以直接跳过这一步)

下载netcat:netcat下载地址

配置环境变量(将解压后的文件中的exe文件加入Path路径)

开启telnet服务(Windows默认关闭)

2.3 测试工具可用性(开启两个cmd窗口,可以实现nc窗口与telnet窗口互相通信即可)

三、斗鱼弹幕抓取文档及源码

3.1 Socket编程-asyncore模块基础模板的使用

# socket.py#!/usr/bin/env python# -*- encoding: utf-8 -*-'''@Author : {Jack Zhao}@Time : /5/12 9:52@Contact : {zc_dlmu@}@Desc : Socket客户端的标准实现'''import asyncoreimport sys# 定义类,并继承自 asyncore.dispatcherclass SocketClient(asyncore.dispatcher):# 实现类中的回调函数代码def __init__(self,host,port):# 调用父类方法asyncore.dispatcher.__init__(self)# 创建Socket对象self.create_socket()# 链接服务器address = (host,port)self.connect(address)def handle_connect(self):'''连接成功回调该函数:return:'''print('连接成功')def writable(self):'''描述是否有数据要被发送到服务器:return: True表示可写,不实现默认返回True,为True时调用handle.write函数'''return Falsedef handle_write(self):'''writable函数返回值为True时触发,编写send方法发送数据:return:'''# 内部实现对服务器发送数据的代码# send方法发送数据,参数是字节数据self.send('hello world\n'.encode('utf-8'))def readable(self):'''描述是否有数据要从服务器中被读取:return: True表示可读,不实现默认返回True,为True时调用handle.read函数'''return Truedef handle_read(self):'''readable为True时触发,实际接收,通常编写recv方法:return:'''# 主动接收数据,参数是需要接收数据的长度# 返回数据是字节数据result = self.recv(1024)print(result)def handle_error(self):'''运行出错回调函数'''# t错误类型,e错误说明,trace错误追踪t,e,trace = sys.exc_info()# print(t,e,trace)self.close()def handle_close(self):'''连接关闭时触发'''print('连接关闭')self.close()# 创建对象,并且执行asyncore.loop进入运行循环if __name__ == '__main__':client = SocketClient('127.0.0.1',9000)# 开始进行运行循环,5s触发一次asyncore.loop(timeout=5)

3.2 斗鱼弹幕抓取实战

3.2.1 重要概念理解

数据发送接收流程

均为先发送数据包长度,再发送数据包本身;

数据包相关计算

心跳机制:斗鱼为了防止客户端假死,需要每45s向服务器端发送一次心跳

弹幕分组:为了防止客户端弹幕流量过载而对其进行分组(新版API文档未找到“海量分组”),不清楚是不是默认海量分组。

3.2.2 弹幕抓取实战代码

#!/usr/bin/env python# -*- encoding: utf-8 -*-'''@Author : {Jack Zhao}@Time : /5/12 10:42@Contact : {zc_dlmu@}@Desc : 实现斗鱼弹幕抓取'''import asyncoreimport sysimport threadingimport timeimport hashlibfrom queue import QueueDATA_PACKET_TYPE_SEND = 689 # 发送数据包的消息类型DATA_PACKET_TYPE_RECV = 690 # 接收数据包的消息类型def encode_content(content):'''序列化弹幕内容函数:param content:需要序列化的内容:return:'''if isinstance(content,str):return content.replace(r'@',r'@A').replace(r'/',r'@S')elif isinstance(content,dict):return r'/'.join(['{}@={}'.format(encode_content(k),encode_content(v)) for k,v in content.items()])+r'/'elif isinstance(content,list):return r'/'.join([encode_content(data) for data in content]) + r'/'return ""def decode_to_str(content):'''反序列化:param content:字符串数据:return:'''if isinstance(content,str):return content.replace(r'@S',r'/').replace(r'@A',r'@')return ''def decode_to_dict(content):'''反序列化字典数据'''ret_dict = dict()if isinstance(content,str):item_strings = content.split(r'/')for item_string in item_strings:k_v_list = item_string.split('@=')if k_v_list is not None and len(k_v_list)>1:k = k_v_list[0]v = k_v_list[1]ret_dict[decode_to_str(k)] = decode_to_str(v)return ret_dictdef decode_to_list(content):'''反序列列表数据'''ret_list = []if isinstance(content,str):# 排除最后一个空/items = content.split(r'/')for idx,item in enumerate(items):if idx<len(items)-1:ret_list.append(decode_to_str(item))return ret_listclass DataPacket():'''数据包封装'''def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes = None):# 数据包的类型,数据内容,加密字段,保留字段if data_bytes is None:self.type = typeself.content = contentself.encrypt_flag = 0self.preserve_flag = 0else:# 根据消息传递中的消息类型进行定义self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)# 去掉最后一位\0self.content = str(data_bytes[8:-1],encoding='utf-8')def get_length(self):'''获取当前数据包的长度(与消息请求中的一致),为以后需要发送数据包做准备:return:'''# 汇总+\0的长度return 4+2+1+1+len(self.content.encode('utf-8'))+1def get_bytes(self):'''将数据包转换为二进制数据'''data = bytes()# 构建4个字节的消息长度数据data_packet_length = self.get_length()# 把一个整型数据转换成二进制数据# 第一个参数表示需要转换的二进制数据占几个字节# 第二个参数描述字节序# 第三个参数设置是否有符号# 处理消息长度data += data_packet_length.to_bytes(4,byteorder='little',signed=False)# 处理消息类型data += self.type.to_bytes(2, byteorder='little', signed=False)# 处理加密字段data += self.encrypt_flag.to_bytes(1, byteorder='little', signed=False)# 处理保留字段data += self.preserve_flag.to_bytes(1, byteorder='little', signed=False)# 处理数据内容data += self.content.encode('utf-8')# 添加\0数据data += b'\0'return dataclass DouyuClient(asyncore.dispatcher):def __init__(self,host,port,callback=None):# 构建发送数据包的队列容器# 存放数据包对象self.send_queue = Queue()# 接收数据包对象self.recv_queue = Queue()asyncore.dispatcher.__init__(self)self.create_socket()address = (host,port)self.connect(address)self.callback = callback # 外部传入的自定义回调函数# 构建一个专门处理接收数据包容器中数据包的线程self.callback_thread = threading.Thread(target=self.do_callback)# 设置守护进程,当进程结束时让该线程结束self.callback_thread.setDaemon(True)self.callback_thread.start()# 构建心跳线程self.heart_thread =threading.Thread(target=self.do_ping)self.heart_thread.setDaemon(True)self.ping_running = Falsedef handle_connect(self):print('连接成功')self.start_ping() # 链接成功则开启心跳def writable(self):# 有数据为Truereturn self.send_queue.qsize() > 0def handle_write(self):'''先发送长度,后发送数据,且均为二进制'''# 从发送数据包队列中获取数据包对象dp = self.send_queue.get()# 获取数据包长度,并发送给服务器dp_length = dp.get_length()dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)self.send(dp_length_data)# 发送数据包二进制数据self.send(dp.get_bytes())def readable(self):return Truedef handle_read(self):# 读取数据包长度,二进制数据data_length_data = self.recv(4)# 通过二进制获取length具体数据data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)# 通过数据包长度获取数据data = self.recv(data_length)# 通过二进制数据构建数据包对象dp = DataPacket(data_bytes=data)# 把数据包放入接收数据包容器中self.recv_queue.put(dp)def handle_error(self):t,e,trace = sys.exc_info()print(e)self.close()def handle_close(self):# 程序关闭时结束心跳self.stop_ping()print('连接关闭')self.close()def login_room_id(self,room_id=1,aid=1,token=1,secret=3,timenow=round(time.time())):'''实现登录函数'''# 构建登录数据包# 斗鱼修改了api,仅针对企业用户开放,所以可以写死# aid = 1 # yourapplicaitonID# token = 2 # 请求/api/thirdPart/token,其中data值为token# timenow = round(time.time())# secret = 3 # 开发者密钥# 保存传入的部分信息以便弹幕分组使用self.room_id = room_idself.token = tokenself.time =timenow# 检验码生成auth = hashlib.md5(('{}_{}_{}_{}'.format(secret,aid,time,token)).encode('utf-8')).hexdigest()self.auth = auth#content = "type@=loginreq/roomid@={}/aid@={}/token@={}/time={}/auth={}/".format(room_id,aid,token,timenow,auth)send_data = {"type": "loginreq","roomid": str(room_id),"aid": str(aid),"token":str(token),"time":str(timenow),"auth":str(auth)}content = encode_content(send_data)login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content)# 将数据包添加到发送数据包容器中self.send_queue.put(login_dp)def join_room_group(self):'''加入弹幕分组,防止用户端过载'''passsend_data = {"type":"joingroup","rid":str(self.room_id),"token":str(self.token),"time":str(self.time),"auth":str(self.auth)}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content = content)self.send_queue.put(dp)def send_heart_data_packet(self):'''心跳机制,防止客户端假死,每45s向后台发送数据即可'''send_data = {"type":"mrkl"}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)self.send_queue.put(dp)def start_ping(self):'''开启心跳'''self.ping_running = Truedef stop_ping(self):'''结束心跳'''self.ping_running = Falsedef do_ping(self):'''执行心跳'''while True:if self.ping_running:self.send_heart_data_packet()time.sleep(40)def do_callback(self):'''专门负责处理接收数据包容器中的数据:return:'''# 从接收容器中获取数据包while True:dp = self.recv_queue.get()# 处理数据if self.callback is not None:self.callback(self,dp)self.recv_queue.task_done()def data_callback(client,dp):'''自定义回调函数:param dp: 数据包对象:return:'''resp_data = decode_to_dict(dp.content)# print(resp_data)if resp_data["type"] =="loginres":# 调用加入分组请求print("登录成功",resp_data)client.join_room_group()elif resp_data["type"] =="chatmsg":print("{}:{}".format(resp_data['nn'],resp_data['txt']))elif resp_data["type"] =="al":print("主播离开啦!")elif resp_data["type"]=="ab":print("主播回来啦!")if __name__ == '__main__':# 斗鱼第三方接入协议,/source/api/63client = DouyuClient('openapi-',80,callback=data_callback)# 这里需要输入相关的所有信息!!!# room_id aid token secret# room_id 房间号# aid 你这次任务的applicaitonID# token 登录令牌 请求/api/thirdPart/token,其中data值为token# secret = 开发者密钥client.login_room_id(room_id='',aid='',token=''.secret='')asyncore.loop(timeout=10)# data = ['1,3,5']# data1 = 'a@=@Ax/b@=y/'# data2 = '1,3,5/'# print(decode_to_list(data2))pass

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。