爬虫
前言:谢谢D4wn哥推荐的课程,以及分享的笔记。很好吃,孩子已经吃了三斤。
【【爬虫1000集】目前B站最完整的爬虫教程,包含所有干货内容!这还没人看,我不更了!】
【Python + Selenium Web自动化 2022更新版教程 自动化测试 软件测试 爬虫】
一个简单爬虫程序(p3/4)
from urllib.request import urlopen
#请求&接收请求
url = "http://www.baidu.com"
resp = urlopen(url)
with open("baidu.html",mode="w",encoding="utf-8") as f:
#读取到的页面源代码
#一定一定记得utf-8,不然乱码
f.write(resp.read().decode("utf-8"))
print("baidu over!")
web前置基础知识
web请求过程剖析 (p5/6)
- 服务器渲染:在服务器那边直接把数据和html整合在一起,统一返回给浏览器。在页面源代码中,能看到数据。
- 客户端渲染:第一次请求只返回一个html骨架,第二次请求拿到数据,再进行数据展示。在页面源代码中,看不到数据。
可F12在Network里查看
HTTP协议(p7/8)
详细请见计网,这里大概记一些要用到的
请求头里的一些重要内容(爬虫需要):User-Agent,Referer,cookie
请求行->请求方式 url 协议版本
请求头->放一些服务器要用的附加信息
请求体->请求参数
响应头里的一些重要内容(爬虫需要):cookie,各种莫名其妙的字符串(一般是token字样,用于反爬)
状态行->协议 状态码
响应头->放一些客户端需要的附加信息
响应体->服务器返回的真正客户端要用的内容(HTML json)等
Requests
GET爬虫-搜狗搜索及User-Agent处理反爬(p9/10)
import requests
query = "周杰伦"
url = f'https://www.sogou.com/web?query={query}'
headers_dic = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.62"
}
resp = requests.get(url,headers=headers_dic) #处理一个小小滴反爬
print(resp.text) #拿到页面源代码
POST爬虫-百度翻译实例
import requests
#这个地址有说法。是F12之后在网络里找到翻译结果对应的请求
url = 'https://fanyi.baidu.com/sug'
s = input("翻译内容")
dat = {
"kw": s
}
#POST请求发送的数据,必须放在字典中,通过data发送
resp = requests.post(url, data=dat)
print(resp.json()) #将返回的内容直接处理成json
#关掉resp,防止再次请求的时候被拦捏
resp.close()
GET爬虫-豆瓣电影排行实例(p12/13)
import requests
#同样是F12在网络里看见的数据包
#合理利用自带过滤能快速定位到数据捏
url = 'https://movie.douban.com/j/chart/top_list'
#参数太多,重新封装参数
param = {
"type": "24",
"interval_id": "100:90",
"action":"",
"start": 0,
"limit": 20,
}
#反爬。被拦截了首先考虑User-Agent捏
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.62"
}
resp = requests.get(url, params=param, headers=header)
print(resp.json())
数据解析
利用Requests,我们基本上掌握了抓取整个页面的技能,但是在大多数情况下,我们只需要其中的一小部分,这就涉及到了数据提取的问题。
接下来会展示三种解析方式,这三种方式可以混合使用:
- re解析
- bs4解析
- xpath解析
re解析
正则表达式(p17-20)
Regular Expression,正则表达式,一种使用表达式的方式对字符串进行匹配的语法规则。
正则的优点:速度快、效率高、准确性高
正则的缺点:新手上手难度高
正则表达式在线测试:在线正则表达式测试 (oschina.net)
元字符:用于匹配的,具有固定含义的特殊符号。默认只匹配一个字符
常用元字符:
. 匹配除了换行符以外的所有字符
\w 匹配字母或数字或下划线
\s 匹配任意的空白符
\d 匹配数字
\n 匹配一个换行符
\t 匹配一个制表符
^ 匹配字符串的开始
$ 匹配字符串的结尾
\W 匹配非字母、非数字、非下划线
\D 匹配非数字
\S 匹配非空白符
a|b 匹配字符a或者b
() 匹配括号内的表达式 ,也表示一个组
[...] 匹配字符组中的字符,eg:[A-Za-z0-9]
[^...] 匹配除了字符组中字符的所有字符
量词:控制前面的元字符出现的次数。eg: \d+
* 重复零次或更多次
+ 重复一次或更多次
? 重复零次或一次
{n} 重复n次
{n,} 重复n次或更多次
{n,m} 重复n到m次
贪婪匹配和惰性匹配
.* 贪婪匹配,尽可能多的匹配
.*? 惰性匹配,尽可能少的匹配v
re模块(p21/22)
- 常用匹配方式
import re
# findall: 匹配字符串中所有符合正则的内容
lst = re.findall(r"\d+","我的电话号是:123,你的电话是:234")
print(lst)
# finditer: 匹配字符串中所有的内容,返回的是迭代器。从迭代器里拿到内容需要.group()
# 最重要的一个捏
it = re.finditer(r"\d+","我的电话号是:123,你的电话是:234")
for i in it:
print(i.group())
# search: 全文匹配,找到一个结果返回。返回结果是match对象,从中拿数据需要.group()
s = re.search(r"\d+","我的电话号是:123,你的电话是:234")
print(s.group())
# match: 从头匹配,找到一个结果返回。返回结果是match对象,从中拿数据需要.group()
s2 = re.match(r"\d+","我的电话号是:123,你的电话是:234")
print("this is empty")
- 预加载
import re
# 预加载正则表达式
obj = re.compile(r"\d+")
ret = obj.findall("我的电话号是:123,你的电话是:234")
print(ret)
- 从匹配到的内容中,进一步提取内容
import re
# (?P<分组名字>正则) 可以单独的从正则匹配的内容中,进一步提取xxx内容
str1 = """
<div class='jay'><span id='1'>周杰伦</span></div>
<div class='jj'><span id='2'>林俊杰</span></div>
<div class='eas'><span id='3'>陈奕迅</span></div>
"""
obj_str = re.compile(r"<div class='.*?'><span id='(?P<id>\d+)'>(?P<name>.*?)</span></div>", re.S) # re.S: 让.能匹配换行符
result = obj_str.finditer(str1)
for it in result:
print(it.group("name"))
print(it.group("id"))
Re模块实践-豆瓣电影top250
思路:
-
拿到网页源代码
-
通过re来提取想要的有效信息
import requests
import re
import csv
url = 'https://movie.douban.com/top250'
def spider(page):
param = {
'start': page,
'filter': ''
}
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.62"
}
resp = requests.get(url, headers=header, params=param)
page_content = resp.text
# 解析数据
obj = re.compile(r'<li>.*?<div class="item">.*?<span class="title">(?P<name>.*?)'
r'</span>.*?<p class="">.*?<br>(?P<year>.*?)'
r' .*?<span class="rating_num" property="v:average">(?P<rating>.*?)'
r'</span>.*?<span>(?P<num>.*?)人评价', re.S)
result = obj.finditer(page_content)
# 写入文件
f = open("data.csv", mode="a", encoding="utf-8", newline='')
csvwriter = csv.writer(f)
for it in result:
# print(it.group("name"))
# print(it.group("year").strip())
# print(it.group("rating"))
# print(it.group("num"))
dic = it.groupdict()
dic['year'] = dic['year'].strip()
csvwriter.writerow(dic.values())
f.close()
resp.close()
for i in range(10):
page = i*25
spider(page)
print("over!")
Re模块实践-电影天堂
- 定位到2023比看片
- 从2023必看片里提取到子页面的连接地址
- 请求子页面的连接地址,拿到我们想要的下载地址
import requests
import re
domain = 'https://www.dytt89.com/'
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0'
}
resp = requests.get(domain, verify=False, headers=header)
resp.encoding = 'gb2312' #默认utf-8,但这网站不是。所以需要处理编码问题
#print(resp.text)
obj1 = re.compile(r"2023必看热片.*?<ul>(?P<ul>.*?)</ul>", re.S)
obj2 = re.compile(r"<a href='(?P<href>.*?)'", re.S)
obj3 = re.compile(r'片 名(?P<movie>.*?)<br />.*?<td style="WORD-WRAP: break-word" bgcolor="#fdfddf"><a href="(?P<download>.*?)"', re.S)
result1 = obj1.finditer(resp.text)
child_href_list = []
for it in result1:
ul = it.group('ul')
#print(ul)
# 提取跳转用的子页面 url
result2 = obj2.finditer(ul)
for itt in result2:
# 拼接子页面的url: 域名+子页面地址
child_href = domain + itt.group('href').strip('/')
#print(child_href)
child_href_list.append(child_href) # 把子页面保存起来
# 提取子页面内容
for href in child_href_list:
child_resp = requests.get(href)
child_resp.encoding = 'gb2312'
#print(child_resp.text)
result3 = obj3.search(child_resp.text)
print(result3.group('movie'))
print(result3.group('download'))
resp.close()
bs4解析
前置知识:HTML语法
<h1>love you </h1>
<h2 align="center">love you </h2>
# h2: 标签
# align: 属性
# center: 属性值
# <标签 熟悉="属性值">被标记的内容</标签>
# 第二种类型:自带闭合的标签
# <img src="xxx.jgp"/>
# <标签 />
<div id="1" class="h1">周杰伦</div>
<div id="2" class="h2">林俊杰</div>
<div id="3" class="h2">陈奕迅</div>
<div id="4" class="h3">唐老鸭</div>
<div id="4" class="h4">米老鼠</div>
# 通过标签名称来拿数据
# div -> id:3 => 陈奕迅
# div -> class:h4 => 米老鼠
# 这就是bs4的思路
bs4的基本使用-北京新发地市场
# 网站更新了,原视频中的方法不能用
from bs4 import BeautifulSoup
resp = '123'
# 解析数据
# 1.把源码交给BeautifulSoup处理,生成bs对象
page = BeautifulSoup(resp, 'html.parser') # 指定html解析器
# 2.从bs对象中查找数据
# find(标签, 属性=值),只返回第一个
# find_all(标签, 属性=值)
# table = page.find('table', class_='hq_table') # class是python关键字,所以通过下划线进行区分
table = page.find('table', attrs={"class": 'hq_table'}) # 和上一行是一个意思,也可避免关键字class
# 然后继续剥离标签,直至取到数据捏
tr = table.text # .text表示取被标签标记的内容
bs4实践-抓取优美图库图片
- 拿到主页面源码,然后提取到子页面的连接地址,href
- 通过href拿到子页面源码,从中找到图片的下载地址,img->src
- 下载图片
import time
import requests
from bs4 import BeautifulSoup
url = 'https://www.umei.cc/bizhitupian/weimeibizhi/'
resp = requests.get(url)
resp.encoding = 'utf-8' # 处理乱码
#print(resp.text)
main_page = BeautifulSoup(resp.text, 'html.parser')
list = main_page.find('div', class_="item_list infinite_scroll").find_all(class_="item_b clearfix")
alist = []
for i in list:
alist.append(i.find('a'))
#print(alist)
for a in alist:
href = 'https://www.umei.cc' +a.get('href') # 直接通过get拿到属性的值
#print(href)
# 拿到子页面源码
child_page_resp = requests.get(href)
child_page_resp.encoding = 'utf-8'
child_page_text = child_page_resp.text
child_page = BeautifulSoup(child_page_text, 'html.parser')
# 拿到下载链接
img = child_page.find('div', class_='big-pic').find('img')
src = img.get('src')
#print(src)
# 下载图片
img_resp = requests.get(src)
#img_resp.content # 这里拿到的是字节!
img_name = src.split('/')[-1] # 拿到url中最后一个'/'之后的内容作为名字
with open('img/' + img_name, mode="wb") as f:
f.write(img_resp.content)
print('over', img_name)
time.sleep(1)
print('all over')
resp.close()
xpath解析
xpath是在xml文档中搜索内容的一门语言
html是xml的一个子集
简单的xpath用法
from lxml import etree
xml = """
<book>
<id>1</id>
<name>野花香</name>
<price>123元</price>
<author>
<nick id="1">作者1</nick>
<nick id="2">作者2</nick>
<nick class="joy">周杰伦</nick>
<nick class="jj">林俊杰</nick>
<div>
<nick>热热热热1</nick>
</div>
<span>
<nick>热热热热2</nick>
</span>
</author>
<nick>哈哈</nick>
</book>
"""
tree = etree.XML(xml)
#result = tree.xpath("/book") # /表示层级关系,第一个'/'是根节点
#result = tree.xpath("/book/name/text()") # text()那文本
#result = tree.xpath("/book/author//nick/text()") # 俩'//',在内部找出全部nick,不止搜索子节点
#result = tree.xpath("/book/author/*/nick/text()") # * 表示通配符
#result = tree.xpath("/book/author/nick[1]/text()") # xpath从1开始
#result = tree.xpath("/book/author/nick[@id='1']/text()") # 通过元素值筛选。一定注意'@'
author_list = result = tree.xpath('/book/author/nick')
for li in author_list:
result = li.xpath("./text()") # 在author里继续去查找,是相对查找
print(result)
result2 = li.xpath("./@id") # 拿属性的值:@属性
print(result2)
xpath实践-猪八戒
- 拿到页面源代码
- 提取和解析数据
除了第一行的六个公司之外,剩下的公司在转成xml的时候出现了一点问题捏。
import requests
from lxml import etree
url = 'https://www.zbj.com/search/service/?kw=saas&r=2'
resp = requests.get(url)
#print(resp.text)
html = etree.HTML(resp.text)
# 拿到每一个服务器商的div
divs = html.xpath('/html/body/div[@id="__nuxt"]/div/div/div[3]/div/div[4]/div/div[2]/div[1]/div')
#print(divs)
#t = etree.tostring(divs[7], encoding='utf-8', pretty_print=True)
#print(t)
for div in divs:
#price = div.xpath('./div/div[3]/div[1]/span/text()')[0].strip('¥')
#title = 'sass'.join(div.xpath('./div/div[3]/div[2]/a/text()'))
#com = div.xpath('./div/a/div[2]/div[1]/div/text()')[0].strip()
price = div.xpath('./div/div[3]/div[1]/span/text()')
title = div.xpath('./div/div[3]/div[2]/a/text()')
com = div.xpath('./div/a/div[2]/div[1]/div/text()')
#print(price)
print(price,title,com)
requests进阶
模拟浏览器登陆->处理cookie
- 登陆->得到cookie
- 带着cookie去请求到书架的url
- 获取书架上的内容
我们可以使用session进行请求——>session可以理解为是一连串的请求,同时在这个过程中cookie不会丢失(即不需要重复登陆)
法一:使用session()
方法
import requests
# 会话
session = requests.session()
# 先登陆
url = ''
# data里具体放什么,要看登陆的时候向服务器传递了什么
data = {
'log_name': 'name',
'password': '123'
}
session.post(url, data=data)
#resp = session.post(url, data=data)
#print(resp.cookies) # 看cookie
# 做其他操作
resp = session.get('child_page_url')
法二:直接复制cookie
import requests
header = {
"cookie": "your_cookie"
}
resp = requests.get('url', headers=header)
防盗链处理->解决反爬
浏览器显示页面和F12查看器里的代码是实时且一一对应的:即修改F12里的代码,浏览器显示的会同时更改。但是页面源代码可以和它们不同。浏览器显示页面其实是,页面源代码运行之后的产生东西。
反爬虫也可能检测请求头里的Referer
import requests
url = 'https://www.pearvideo.com/video_1707809'
contID = url.split("_")[1]
# print(contID)
videoStatuUrl = f'https://www.pearvideo.com/videoStatus.jsp?contId={contID}&mrd=0.8524528462435543'
header = {
'Referer': url,
# 防盗链
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69'
}
resp = requests.get(videoStatuUrl, headers=header)
# 拼接视频下载地址
dic = resp.json()
srcUrl = dic["videoInfo"]['videos']['srcUrl']
systemTime = dic['systemTime']
srcUrl = srcUrl.replace(systemTime, 'cont-'+contID)
# print(srcUrl)
# 下载视频
with open('a.mp4', mode='wb') as f:
f.write(requests.get(srcUrl).content)
代理->防止被封ip(不推荐捏)
这网站里,类型为透明的一般可以用
import requests
# 112.111.1.217:4430
proxie = {
# 这里版本不同可能需要其他写法才能用,自己试一试
# 'https': 'https://112.111.1.217:4430'
'https': '112.111.1.217:4430'
}
url = 'https://www.baidu.com'
resp = requests.get(url, proxies=proxie)
resp.encoding = 'utf-8'
print(resp.text)
综合练习-网易云评论
- 找到未加密的参数
- 想办法把参数进行加密(必须参考原站逻辑),params,encSecKey
- 请求网易服务器,拿到评论信息
内涵json
逆向内容,详见原视频
# 1. 找到未加密的参数 # windows.asrsea函数加的密
# 2. 想办法把参数进行加密(必须参考原站逻辑),params=>encText,encSecKey+>encSecKey
import json
from Crypto.Cipher import AES
from base64 import b64encode
import requests
url = 'https://music.163.com/weapi/comment/resource/comments/get?csrf_token='
data = {
'csrf_token': "",
'cursor': "-1",
'offset': "0",
'orderType': "1",
'pageNo': "1",
'pageSize': "20",
'rid': "R_SO_4_1325905146",
'threadId': "R_SO_4_1325905146"
}
e = '010001'
f = '00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7'
g = '0CoJUm6Qyw8W8jud'
i = "3epjsTwgpTTPTnHM"
def get_encSecKey():
return '427b0cdb3d45c95b878bb67f044cd301acbaf556ad4b08b4730bf5ed1170abd6a855b0ce37a3b68467bb3badfb889e757923886aec0dec75b90b024ccfc9e7d035ba278bd1198d9ac1cc834d46b369d836d8f3e0b2098be9491dc5336a0e03ba9e3cc313ac81508ab8fbe2ba900d81626a6f8657881cf065de64c8cc04c930bd'
def get_params(data): # data需要是字符串
first = enc_params(data, g)
second = enc_params(first, i)
return second #返回params
def to_16(data):
pad = 16 - len(data)%16
data += chr(pad) * pad
return data
def enc_params(data, key):
iv = "0102030405060708"
data = to_16(data)
aes = AES.new(key=key.encode('utf-8'), IV=iv.encode('utf-8'), mode=AES.MODE_CBC) # 加密配置
bs = aes.encrypt(data.encode('utf-8')) # 加密,加密内容的长度必须是16的倍数。且这个结果不能直接被utf-8翻译,所以要先Base64解密
return str(b64encode(bs), 'utf-8')
resp = requests.post(url,data={
"params": get_params(json.dumps(data)),
"encSecKey": get_encSecKey()
})
print(resp.text)
提速(有独立笔记)
进程:进程是资源单位,每一个进程至少要有一个线程。启动的每个程序,默认都会有一个主线程
线程:线程是执行单位
多线程
有两种写法:
from threading import Thread
def func(name):
for i in range(100):
print(name,i)
if __name__ == '__main__':
t1 = Thread(target=func,args= ('haha',)) # 创建线程并给线程安排任务。args必须是元组,所以不要忘了逗号
t1.start() # 开始执行该线程
t2 = Thread(target=func, args=('wuwu',))
t2.start() # 开始执行该线程
for i in range(100):
print('main',i)
from threading import Thread
class MyTread(Thread):
def run(self):
for i in range(100):
print('子线程', i)
if __name__ == '__main__':
t = MyTread()
t.start()
for i in range(100):
print('main', i)
多进程
比多线程消耗更多,不推荐
线程池/进程池
简介
一次性开辟一些线程,用户直接给线程池提交任务,线程任务的调度交给线程池来完成。进程同理
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(name):
for i in range(1000):
print(name, i)
if __name__ == '__main__':
# 创建线程池
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(func, name=f"线程{i}")
# 等待线程池中的任务全部执行完毕,才继续执行(守护)
print('over')
线程池实践-还是看看菜价捏
import csv
import requests
from concurrent.futures import ThreadPoolExecutor
f = open('data.csv', mode='w', encoding='utf-8', newline='')
csvwriter = csv.writer(f)
def download_page(i):
url = 'http://www.xinfadi.com.cn/getPriceData.html'
resp = requests.post(url)
dic = resp.json()
start = 20 * i
end = 20 * i + 20
for i in range(start, end):
name = dic['list'][i]['prodName']
avgPrice = dic['list'][i]['avgPrice']
txt = [name,avgPrice]
csvwriter.writerow(txt)
return 1
if __name__ == '__main__':
#for i in range(0,1000):
# download_page(i)
with ThreadPoolExecutor(50) as t:
for i in range(200):
t.submit(download_page, i)
print('all_over!')
协程
简介
import time
def func():
print('haha')
time.sleep(3) # 让线程处于阻塞状态,此时cpu不为我工作。
print('wohaha')
if __name__ == '__main__':
func()
同样的,input()/requests.get()也是阻塞状态。一般情况下,当程序处于IO操作时,线程都会处于阻塞状态。
协程:当程序遇见了IO操作时,可以选择性的切换到其他任务上。以上都是在单线程的条件下。
import asyncio
import time
async def func1():
print('hello,jam')
#time.sleep(3) # 当程序出现了同步操作的时候,异步就中断了
await asyncio.sleep(3) # 异步操作
print('hello,jam')
async def func2():
print('hello,jame')
#time.sleep(2)
await asyncio.sleep(2)
print('hello,jame')
async def func3():
print('hello,jim')
#time.sleep(5)
await asyncio.sleep(5)
print('hello,jim')
async def main():
# 推荐写法
tasks = [
asyncio.create_task(func1()), # 把协程任务包装成协程对象
asyncio.create_task(func2()),
asyncio.create_task(func3()),
]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
aiohttp模块实践-优美图库
import asyncio
import aiohttp
urls = [
'https://www.umei.cc/d/file/20230906/bec2a6b127c47b8d6f53f36e1a875f07.jpeg',
'https://www.umei.cc/d/file/20230906/3ff61c3ea61f07c98fb3afd8aff40bf8.jpeg',
'https://www.umei.cc/d/file/20230906/ab4f722dddfee80d1dcf114ac653d05a.jpg'
]
async def aiodownload(url):
name = url.rsplit('/', 1)[1]
# 发送请求,得到图片内容后保存文件
# s = aiohttp.ClientSession() <==> requests
# s.get() <==> requests.get()
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
# resp.content.read() <==> resp.content
# resp.text() <==> resp.text
# 注意和requests模块有一点细微的差别
# 写入文件可以学一个模块aiofile
with open(name, mode='wb') as f:
f.write(await resp.content.read()) # 读取内容是异步的,需要挂起
await asyncio.sleep(0.25)
print(name,'over')
async def main():
tasks = []
for url in urls:
tasks.append(asyncio.create_task(aiodownload(url)))
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
使用aiohttp
模块时可能出现以下报错:
解决方法:高级客户端用法 — aiohttp 4.0.0a2.dev0 文档
异步文件写入:aiofiles
模块
async with aiofiles.open('filename', mode='w', encoding='utf-8') as f:
await f.write(content)
爬视频
一般视频网站如何处理视频:
用户上传视频 -> 转码(把视频做处理,处理成多种清晰度,2k/1080/标清) -> 切片处理(把单个文件进行拆分,放在M3U格式的文件里)
M3U文件经过utf-8编码之后就是M3U8文件。如何解读M3U8文件:
基本知道这些就够了。
抓取一个视频:
- 找到M3U8文件
- 通过M3U8下载ts文件
- 通过各种手段(不仅是编程手段)把ts文件合并为一个mp4文件
合并软件:QuickTime
编程合并:
import os
def merge_ts():
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open('m3u8.txt', mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip()
lst.append(f'video/{line}')
s = '+'.join(lst)
os.system(f'copy /b {s} video.mp4')
协程实践-扒一本书
import requests
import asyncio
import aiohttp
import aiofiles
from lxml import etree
def get_chapter_url(url):
resp = requests.get(url)
resp.encoding = 'gbk'
tree = etree.HTML(resp.text)
href_list = tree.xpath('//div[@class="section-box"]/ul/li/a/@href')
# print(href_list)
return href_list
async def download_one(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
page_source = await resp.text()
# 定位标题和内容
tree = etree.HTML(page_source)
title = tree.xpath('//div[@class="reader-main"]/h1/text()')[0].strip()
content = tree.xpath('//div[@class="content"]/text()')
# 处理
content = ''.join(content).replace('\xa0','')
# print(content)
async with aiofiles.open(f'./盗墓笔记/{title}', mode='w', encoding='utf-8') as f:
await f.write(content)
await asyncio.sleep(0.25)
print(f'{title} 下载完毕')
async def download(url, href_list):
tasks = []
for href in href_list:
href = url + href
task = asyncio.create_task(download_one(href))
tasks.append(task)
#break
await asyncio.wait(tasks)
def main():
# 1.拿到页面中每个章节的url
url = 'https://www.zanghaihua.org/book/40527/'
href_list = get_chapter_url(url)
# 2.协程下载
asyncio.run(download(url, href_list)) # 运行协程任务
if __name__ == '__main__':
main()
Selenium(有独立笔记)
Selenium概述
Selenium是一个Web自动化测试工具。
Selenium可以根据我们的指令,让浏览器自动加载页面,获取需要的数据,甚至页面截屏,或者判断网站上某些动作是否发生。
Selenium自己不带浏览器,它需要与第三方浏览器结合在一起才能使用。
官方文档:Selenium 浏览器自动化项目 | Selenium
浏览器驱动安装
浏览器驱动用于Selenium操控本地浏览器执行自动化操作
Chrome驱动下载镜像站:CNPM Binaries Mirror (npmmirror.com)
注意需要下载和本机Chrome版本一样的驱动,也要和本机操作系统一致。
如何查看自己的Chrome版本:怎么查看Google Chrome浏览器版本-百度经验 (baidu.com)
from selenium.webdriver import Chrome
# 创建浏览器对象
web = Chrome()
web.get('http://baidu.com')
入门实践-扒拉钩网
包含操作:定位元素、输入内容/按键
import time
from selenium.webdriver import Chrome
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
# 创建浏览器对象
web = Chrome()
web.get('http://lagou.com')
# 找到元素并点击它
el = web.find_element(by=By.XPATH, value='//*[@id="changeCityBox"]/p[1]/a')
el.click() # 点击事件
# 让浏览器缓一会儿,免得处理速度跟不上加载速度,报错
time.sleep(1)
# 找到输入框 => 输入python => 按回车
web.find_element(by=By.XPATH, value='//*[@id="search_input"]').send_keys('python', Keys.ENTER)
# 找到数据存放内容
li_list = web.find_elements(by=By.XPATH, value='//*[@id="jobList"]/div[1]/div')
for li in li_list:
job_name = li.find_element(by=By.XPATH, value='.//*[@id="openWinPostion"]').text
job_price = li.find_element(by=By.XPATH, value='.//div[@class="p-bom__JlNur"]/span').text
company = li.find_element(by=By.XPATH, value='.//div[@class="company-name__2-SjF"]/a').text
print(job_name, job_price, company)
窗口切换操作
from selenium.webdriver import Chrome
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import time
# 创建浏览器对象
web = Chrome()
web.get('http://lagou.com')
# 找到元素并点击它
el = web.find_element(by=By.XPATH, value='//*[@id="changeCityBox"]/p[1]/a')
el.click() # 点击事件
# 让浏览器缓一会儿,免得处理速度跟不上加载速度,报错
time.sleep(1)
# 找到输入框 => 输入python => 按回车
web.find_element(by=By.XPATH, value='//*[@id="search_input"]').send_keys('python', Keys.ENTER)
time.sleep(1)
web.find_element(by=By.XPATH, value='//*[@id="openWinPostion"]').click()
# 在selenium眼中,新窗口是不默认切换的
# 所以要进行窗口切换
web.switch_to.window(web.window_handles[-1])
# 在新窗口里提取内容
job_detail = web.find_element(by=By.XPATH, value='//*[@id="job_detail"]/dd[2]/div').text
print(job_detail)
# 关窗口,然后切换回原窗口
web.close()
web.switch_to.window(web.window_handles[0])
print(web.find_element(by=By.XPATH, value='//*[@id="openWinPostion"]').text)
遇到iframe怎么办捏(内嵌html):先切换到iframe,再读取里面的内容(以iframe为准)
# 定位iframe,然后切换
iframe = web.find_element(by=By.XPATH, value='//*[@id="player_iframe"]')
web.switch_to.frame(iframe)
# 从iframe切出来。切换回原页面
web.switch_to.default_content()
实践-艺恩年度票房
import time
from selenium.webdriver import Chrome
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from selenium.webdriver.chrome.options import Options
# 准备参数配置
opt = Options()
opt.add_argument('--headless')
opt.add_argument('--disbale-gpu')
web = Chrome(options=opt) # 在后台运行selenium
web.get('https://www.endata.com.cn/BoxOffice/BO/Year/index.html')
# 定位到下拉列表
sel_el = web.find_element(by=By.XPATH, value='//*[@id="OptionDate"]')
# 对元素进行包装,包装成下拉菜单
sel = Select(sel_el)
# 让浏览器调整选项
for i in range(len(sel.options)): # i就是每一个下拉框选项的索引位置
sel.select_by_index(i) # 按照索引进行切换
time.sleep(2)
table = web.find_element(by=By.XPATH, value='//*[@id="TableList"]/table')
print(table.text)
web.close()
拿页面源代码
print(web.page_source)
拖拽滑块(拖拽到底)
btn = web.find_element(by=By.XPATH, value='//*[@id="btn"]')
ActionChains(web).drag_and_drop_by_offset(btn, 300, 0).perform()
被反爬了可以试试:
# 在控制台里输入:window.navigator.webdriver
# 若为true,进行如下处理(Chrome版本不同可能操作不同):
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
opt = Options()
opt.add_argument("--disable-blink-features=AutomationControlled")
web = Chrome(options=opt)
web.get("https://www.baidu.com")
处理验证码:借助第三方api,比如”超级鹰“
Scrapy
特点:速度快、简单、可扩展性强
Scrapy工作流程
spider
将起始url
构造成request
对象,并且将request
对象传递给scheduler
engine
从scheduler
中获取到request
对象,并且将request
对象交给downloader
- 由
downloader
发送请求来获取到页面源代码,并封装成response
对象返回给engine
engine
将获取到的response
对象传递给spider
spider
对数据进行解析,并返回给引擎- 如果返回的数据是子页面的
url
,则重复步骤2;如果是数据,则engine
会将数据交给pipeline
进行存储
Scrapy入门-4399
创建项目
scrapy startproject {project_name}
项目结构
game(01):项目文件夹
game(02):根目录
spider:爬虫主体,进行数据解析
items:封装大的数据的时候
middlewares:中间件
pipelines:数据存储
setting:scrapy整体的配置信息
cfg:项目的默认配置、布置的端口
创建spider
scrapy genspider {spider_name} {allowed_domains}
# allowed_domains是用来限制爬虫爬取范围的。爬虫只会抓取这个域名下的网页
# 注意输入命令的目录
然后就能发现spider
文件夹下多了一个xiao.py
。点开看看结构:
我们主要需要写的就是parse方法。啊,还要注意起始url
对不对
编写parse方法
def parse(self, response): # parse方法默认是用来处理解析的
# 看看页面源代码有没有问题
#print(response.text)
# 提取数据
# scrapy已经帮你打包好了,所以不用再from xml什么的
# 获取到页面中所有的游戏名字。非常简便喵
# 用xpath进行解析。xpath默认返回Selector对象,所以需要extract提取内容
#txt = response.xpath("//ul[@class='n-game cf']/li/a/b/text()").extract()
#还可以用css解析:response.css()
#print(txt)
li_list = response.xpath("//ul[@class='n-game cf']/li")
for li in li_list:
name = li.xpath('./a/b/text()').extract_first() # extract_first提取内容,如果没有返回NONE,不会报错
categroy = li.xpath('./em/a/text()').extract_first()
date = li.xpath('./em/text()').extract_first()
dic = {
"name": name,
"categroy": categroy,
"date": date
}
print(dic)
# 用yield将数据传输给管道
yield dic # 省内存
运行spider
scrapy crawl {spider_name}
发现会打印很多日志。嗯,不想看怎么办:在settings.py
文件里加入LOG_LEVEL = 'WARNING'
编写管道
管道在pipelines.py
文件里。并且管道默认是不开启的,需要去settings.py
里开启:取消掉ITEM_PIPELINES
的注释
管道这里就简单的打印一下数据吧
class GamePipeline: # 随便定义一个类
def process_item(self, item, spider): # 定死的,不能改。处理数据专用方法:item->数据;spider->爬虫
print(item)
return item # return到下一个管道