Python-并发编程

并发编程

并发编程

Python对并发编程的支持

单线程:不加改造的程序

多线程并发:threading

多核cpu并行:multiprocessing

异步IO:asyncio

使用LOCK对资源加锁,防止冲突访问

使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式

使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果

使用subprocess启动外部程序的进程,并进行输入输出交互

怎样选择多线程、多进程和多协程

Python并发编程有三种方式:多线程(Thread)、多进程(Process)、多协程(Coroutine)

什么是CPU密集型计算、IO密集型计算?

IO密集型计算:IO密集型指的是系统运作大部分的情况时CPU在等I/O的读写操作,CPU占用率较低。

例如:文件处理程序、网络爬虫程序、读写数据库程序

CPU密集型计算:CPU密集型也叫计算密集型,是指I/O在很短时间内就可以完成,而CPU需要大量的计算和处理,特点是CPU占用率相当高。

例如:压缩解压缩、加密解密、正则表达式搜索

三种执行方式的区别

如何选择对应技术?

GIL

Python速度慢的两大原因

  1. Python是动态类型语言,需要边解释边执行
  2. GIL(Python无法利用多核CPU并发执行)

GIL是什么

全局解释器锁(GIL)是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程。

Python设计初期为了规避并发问题引入了GIL,现在想取出却取出不掉了。

怎样规避GIL带来的限制

  1. 多线程Threading​机制依然有用,用于IO密集型计算是仍然可以大幅提升速度。但是用于CPU密集型计算时,只会更加拖慢速度
  2. 使用multiprocessing​的多进程机制实现并行计算、利用多核CPU优势。为了应对GIL的问题,​Python​提供了multiprocessing

Python创建多线程的方式

import threading

# 创建线程
# target=函数名, args=需要传递的参数,是个元组
t = threading.Thread(target=func, args=(100,200))

# 启动线程
t.start()
# 等待结束
t.join()

python实现生产-消费者架构的爬虫

多组件的Pipeline技术架构

​​

生产者消费者爬虫的架构

多线程数据通信的queue.Queue

用于多线程之间的、线程安全(并发访问不会冲突)的数据通信

import queue

# 创建Queue
q = queue.Queue()

# 添加元素
item = []
q.put(item)

# 获取元素
item = q.get()

# 查询状态
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()

代码编写实现生产者消费者爬虫
import requests
from bs4 import BeautifulSoup

urls = [
    f"https://www.cnblogs.com/#p{page}"
    for page in range(1, 50+1)
]

def craw(url):
    r = requests.get(url)
    return r.text

def parse(html):
    # class="post-item-title"
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return  [(link["href"],link.get_text()) for link in links]

if __name__ == '__main__':
    for result in parse(craw(urls[2])):
        print(result)

import queue
import random
import time
import threading
import blog_spider

def do_craw(url_queue:queue.Queue, html_queue:queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        html_queue.put(html)
        print(threading.current_thread().name, f"craw {url}",
              "url_queue.size=",url_queue.qsize())
        time.sleep(random.randint(1,2))

def do_parse(html_queue:queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = blog_spider.parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        print(threading.current_thread().name, f"results.size", len(results),
              "html_queue.size=", html_queue.qsize())
        time.sleep(random.randint(1, 2))

if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in blog_spider.urls:
        url_queue.put(url)

    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue),
                             name=f"craw{idx}")
        t.start()

    fout = open("02.data.txt", "w", encoding="utf-8")
    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout),
                             name=f"parse{idx}")
        t.start()

线程安全

线程安全概念介绍

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,是程序功能正确完成。

但是由于线程的执行随时会发生切换,会造成了不可预料的结果,出现线程不安全。

比如对于一个取钱程序:如果同时有两笔钱在进行交易,那么可能发生冲突

Lock用于解决线程安全问题

用法1:try-finally模式

import threading

lock = threading.Lock()

lock.acquire()
try:
    # do something
finally:
    lock.release()

用法2:with模式

import threading

lock = threading.Lock()

with lock:
    # do something

示例:

import threading
import time

lock = threading.Lock()
class Account:
    def __init__(self, balance):
        self.balance = balance

def draw(account, amount):
    with lock:
        if account.balance >= amount:
        # 被阻塞一定会切换线程,确保一定冲突
            time.sleep(0.1)
            print(threading.current_thread().name, "successful")
            account.balance -= amount
            print(threading.current_thread().name, "余额", account.balance)
        else:
            print(threading.current_thread().name, "取钱失败,余额不足")

if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(name="ta", target=draw, args=(account, 800))
    ta2 = threading.Thread(name="ta2", target=draw, args=(account, 800))
    ta.start()
    ta2.start()

线程池

线程池的原理

新建和终止要花费很多资源,所以如果重用线程,就可以减去新建/终止的开销。

线程池里有新建好的线程,如果有新任务则加入任务队列,线程池则会从任务队列里取出任务,挨个执行。

使用线程池的好处

  1. 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;
  2. 适用常见:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
  3. 防御功能:能优先避免系统因为创建线程过多,而导致系统负荷过大响应变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

ThreadPoolExecutor使用语法

# 按顺序打印
for future in futures:
    print(future.result())
# 先完成了先打印
for future in as_completed(futures):
    print(future.result())

# 整体提交
htmls = pool.map(blog_spider.craw, blog_spider.urls)
# 单个提交
future = pool.submit(blog_spider.parse, html)
import concurrent.futures
import blog_spider

#craw
with concurrent.futures.ThreadPoolExecutor() as pool:
    # 整体提交
    htmls = pool.map(blog_spider.craw, blog_spider.urls)
    htmls = list(zip(blog_spider.urls, htmls))
    for url, html in htmls:
        print(url, len(html))

print("craw over")

#parse
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}
    for url, html in htmls:
        # 单个提交
        future = pool.submit(blog_spider.parse, html)
        futures[future] = url

    #for future,url in futures.items():
    #    print(url, future.result())
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        print(url, future.result())

多进程multiprocessing

为什么使用多进程multiprocessing

有了多线程threading模块,为什么还要用多进程multiprocessing?因为如果遇到了CPU密集型计算,多线程反而会降低执行速度。

多进程在使用上也和多线程十分相似

代码实践

import math
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

PRIMES = [112272535095293] * 100

def is_prime(n):
    if n<2:
        return False
    if n == 2:
        return True
    if n%2==0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n+1, 2):
        if n%i == 0:
            return False
    return True

def single_thread():
    for number in  PRIMES:
        is_prime(number)

def multi_thread():
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)

def multi_process():
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, PRIMES)

if __name__ == '__main__':
    start = time.time()
    single_thread()
    end = time.time()
    print("single_thread, ",end-start,"s")

    start = time.time()
    multi_thread()
    end = time.time()
    print("multi_thread, ", end - start, "s")

    start = time.time()
    multi_process()
    end = time.time()
    print("multi_process, ", end - start, "s")

协程爬虫

简介

协程:在单线程内实现并发

核心原理:

  1. 用一个超级循环(其实就是while true循环)
  2. 配合IO多路复用原理(IO时CPU可以干其他事情)

注意:想要使用异步IO编程,那么依赖库必须支持异步IO

代码示例

import asyncio
import time
import aiohttp
import blog_spider

# 定义了一个协程
async def async_craw(url):
    print("craw url:",url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url},{len(result)}")

# 创建超级循环
loop = asyncio.get_event_loop()

# 使用协程函数定义一个task列表
tasks = [loop.create_task(async_craw(url))
    for url in blog_spider.urls]

start = time.time()
# 等待tasks完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time:",end - start)

用信号量控制爬虫并发

信号量(Semaphore)又称为信号量、旗语。是一个同步对象,用于保持在0至指定最大值之间的一个计数值。

使用方式1:

sem = asyncio.Semaphore(10)

async with sem:
    # 协程语句

使用方式2:

sem = asyncio.Semaphore(10)

await sem.acquire()
try:
    # 协程语句
finally:
    sem.release()

爬虫示例

import asyncio
import time
import aiohttp
import blog_spider

semaphore = asyncio.Semaphore(10)

# 定义了一个协程
async def async_craw(url):
    async with semaphore:
        print("craw url:",url)
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                result = await resp.text()
                await asyncio.sleep(5)
                print(f"craw url: {url},{len(result)}")

# 创建超级循环
loop = asyncio.get_event_loop()

# 使用协程函数定义一个task列表
tasks = [loop.create_task(async_craw(url))
    for url in blog_spider.urls]

start = time.time()
# 等待tasks完成
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time:",end - start)
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇