并发编程
Python对并发编程的支持
单线程:不加改造的程序
多线程并发:threading
多核cpu并行:multiprocessing
异步IO:asyncio
使用LOCK对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
使用subprocess启动外部程序的进程,并进行输入输出交互
怎样选择多线程、多进程和多协程
Python并发编程有三种方式:多线程(Thread)、多进程(Process)、多协程(Coroutine)
什么是CPU密集型计算、IO密集型计算?
IO密集型计算:IO密集型指的是系统运作大部分的情况时CPU在等I/O的读写操作,CPU占用率较低。
例如:文件处理程序、网络爬虫程序、读写数据库程序
CPU密集型计算:CPU密集型也叫计算密集型,是指I/O在很短时间内就可以完成,而CPU需要大量的计算和处理,特点是CPU占用率相当高。
例如:压缩解压缩、加密解密、正则表达式搜索
三种执行方式的区别
如何选择对应技术?
GIL
Python速度慢的两大原因
- Python是动态类型语言,需要边解释边执行
- GIL(Python无法利用多核CPU并发执行)
GIL是什么
全局解释器锁(GIL)是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程。
Python设计初期为了规避并发问题引入了GIL,现在想取出却取出不掉了。
怎样规避GIL带来的限制
- 多线程
Threading
机制依然有用,用于IO密集型计算是仍然可以大幅提升速度。但是用于CPU密集型计算时,只会更加拖慢速度 - 使用
multiprocessing
的多进程机制实现并行计算、利用多核CPU优势。为了应对GIL的问题,Python提供了multiprocessing
Python创建多线程的方式
import threading
# 创建线程
# target=函数名, args=需要传递的参数,是个元组
t = threading.Thread(target=func, args=(100,200))
# 启动线程
t.start()
# 等待结束
t.join()
python实现生产-消费者架构的爬虫
多组件的Pipeline技术架构
生产者消费者爬虫的架构
多线程数据通信的queue.Queue
用于多线程之间的、线程安全(并发访问不会冲突)的数据通信
import queue
# 创建Queue
q = queue.Queue()
# 添加元素
item = []
q.put(item)
# 获取元素
item = q.get()
# 查询状态
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()
代码编写实现生产者消费者爬虫
import requests
from bs4 import BeautifulSoup
urls = [
f"https://www.cnblogs.com/#p{page}"
for page in range(1, 50+1)
]
def craw(url):
r = requests.get(url)
return r.text
def parse(html):
# class="post-item-title"
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"],link.get_text()) for link in links]
if __name__ == '__main__':
for result in parse(craw(urls[2])):
print(result)
import queue
import random
import time
import threading
import blog_spider
def do_craw(url_queue:queue.Queue, html_queue:queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}",
"url_queue.size=",url_queue.qsize())
time.sleep(random.randint(1,2))
def do_parse(html_queue:queue.Queue, fout):
while True:
html = html_queue.get()
results = blog_spider.parse(html)
for result in results:
fout.write(str(result) + "\n")
print(threading.current_thread().name, f"results.size", len(results),
"html_queue.size=", html_queue.qsize())
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in blog_spider.urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue),
name=f"craw{idx}")
t.start()
fout = open("02.data.txt", "w", encoding="utf-8")
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout),
name=f"parse{idx}")
t.start()
线程安全
线程安全概念介绍
线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,是程序功能正确完成。
但是由于线程的执行随时会发生切换,会造成了不可预料的结果,出现线程不安全。
比如对于一个取钱程序:如果同时有两笔钱在进行交易,那么可能发生冲突
Lock用于解决线程安全问题
用法1:try-finally模式
import threading
lock = threading.Lock()
lock.acquire()
try:
# do something
finally:
lock.release()
用法2:with模式
import threading
lock = threading.Lock()
with lock:
# do something
示例:
import threading
import time
lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
with lock:
if account.balance >= amount:
# 被阻塞一定会切换线程,确保一定冲突
time.sleep(0.1)
print(threading.current_thread().name, "successful")
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足")
if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
ta2 = threading.Thread(name="ta2", target=draw, args=(account, 800))
ta.start()
ta2.start()
线程池
线程池的原理
新建和终止要花费很多资源,所以如果重用线程,就可以减去新建/终止的开销。
线程池里有新建好的线程,如果有新任务则加入任务队列,线程池则会从任务队列里取出任务,挨个执行。
使用线程池的好处
- 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;
- 适用常见:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
- 防御功能:能优先避免系统因为创建线程过多,而导致系统负荷过大响应变慢等问题
- 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁
ThreadPoolExecutor使用语法
# 按顺序打印
for future in futures:
print(future.result())
# 先完成了先打印
for future in as_completed(futures):
print(future.result())
# 整体提交
htmls = pool.map(blog_spider.craw, blog_spider.urls)
# 单个提交
future = pool.submit(blog_spider.parse, html)
import concurrent.futures
import blog_spider
#craw
with concurrent.futures.ThreadPoolExecutor() as pool:
# 整体提交
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
for url, html in htmls:
print(url, len(html))
print("craw over")
#parse
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
# 单个提交
future = pool.submit(blog_spider.parse, html)
futures[future] = url
#for future,url in futures.items():
# print(url, future.result())
for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())
多进程multiprocessing
为什么使用多进程multiprocessing
有了多线程threading模块,为什么还要用多进程multiprocessing?因为如果遇到了CPU密集型计算,多线程反而会降低执行速度。
多进程在使用上也和多线程十分相似
代码实践
import math
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
PRIMES = [112272535095293] * 100
def is_prime(n):
if n<2:
return False
if n == 2:
return True
if n%2==0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1, 2):
if n%i == 0:
return False
return True
def single_thread():
for number in PRIMES:
is_prime(number)
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("single_thread, ",end-start,"s")
start = time.time()
multi_thread()
end = time.time()
print("multi_thread, ", end - start, "s")
start = time.time()
multi_process()
end = time.time()
print("multi_process, ", end - start, "s")
协程爬虫
简介
协程:在单线程内实现并发
核心原理:
- 用一个超级循环(其实就是while true循环)
- 配合IO多路复用原理(IO时CPU可以干其他事情)
注意:想要使用异步IO编程,那么依赖库必须支持异步IO
代码示例
import asyncio
import time
import aiohttp
import blog_spider
# 定义了一个协程
async def async_craw(url):
print("craw url:",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print(f"craw url: {url},{len(result)}")
# 创建超级循环
loop = asyncio.get_event_loop()
# 使用协程函数定义一个task列表
tasks = [loop.create_task(async_craw(url))
for url in blog_spider.urls]
start = time.time()
# 等待tasks完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time:",end - start)
用信号量控制爬虫并发
信号量(Semaphore)又称为信号量、旗语。是一个同步对象,用于保持在0至指定最大值之间的一个计数值。
使用方式1:
sem = asyncio.Semaphore(10)
async with sem:
# 协程语句
使用方式2:
sem = asyncio.Semaphore(10)
await sem.acquire()
try:
# 协程语句
finally:
sem.release()
爬虫示例
import asyncio
import time
import aiohttp
import blog_spider
semaphore = asyncio.Semaphore(10)
# 定义了一个协程
async def async_craw(url):
async with semaphore:
print("craw url:",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
await asyncio.sleep(5)
print(f"craw url: {url},{len(result)}")
# 创建超级循环
loop = asyncio.get_event_loop()
# 使用协程函数定义一个task列表
tasks = [loop.create_task(async_craw(url))
for url in blog_spider.urls]
start = time.time()
# 等待tasks完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time:",end - start)