Python 并发总结,多线程,多进程,异步IO

2019-11-28 来源:  发布在  https://www.cnblogs.com/junmoxiao/p/11948993.html

1 测量函数运行时间
import time
def profile(func):
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        func(*args, **kwargs)
        end   = time.time()
        print 'COST: {}'.format(end - start)
    return wrapper

@profile
def fib(n):
    if n<= 2:
        return 1
    return fib(n-1) + fib(n-2)

fib(35)
 

2 启动多个线程,并等待完成
 
2.1 使用threading.enumerate()
import threading
for i in range(2):
    t = threading.Thread(target=fib, args=(35,))
    t.start()
main_thread = threading.currentThread()

for t in threading.enumerate():
    if t is main_thread:
        continue
    t.join()
2.2 先保存启动的线程
threads = []
for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
 

3 使用信号量,限制同时能有几个线程访问临界区
from threading import Semaphore
import time

sema = Semaphore(3)

def foo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2
        time.sleep(wt)
        print('{} release sema'.format(tid))
 

4 锁,相当于信号量为1的情况
from threading import Thread Lock
value = 0
lock = Lock()
def getlock():
    global lock
    with lock:
        new = value + 1
        time.sleep(0.001)
        value = new
 

5 可重入锁RLock
    acquire() 可以不被阻塞的被同一个线程调用多次,release()需要和acquire()调用次数匹配才能释放锁

6 条件 Condition
一个线程发出信号,另一个线程等待信号
常用于生产者-消费者模型
import time
import threading

def consumer(cond):
    t = threading.currentThread()
    with cond:
        cond.wait()
        print("{}: Resource is available to sonsumer".format(t.name))

def producer(cond):
    t = threading.currentThread()
    with cond:
        print("{}: Making resource available".format(t.name))
        cond.notifyAll()

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
c2.start()
p.start()
 

7 事件 Event
感觉和Condition 差不多
import time
import threading
from random import randint

TIMEOUT = 2

def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print '{} popped from list by {}'.format(integer, t.name)
                event.clear()  # 重置事件状态
            except IndexError:  # 为了让刚启动时容错
                pass

def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10, 100)
        l.append(integer)
        print '{} appended to list by {}'.format(integer, t.name)
        event.set()  # 设置事件
        time.sleep(1)

event = threading.Event()
l = []

threads = []

for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)

p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)

for t in threads:
    t.join()
 

8 线程队列 
线程队列有task_done() 和 join()
标准库里的例子
往队列内放结束标志,注意do_work阻塞可能无法结束,需要用超时
import queue
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)
for item in source():
    q.put(item)
q.join()
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()
 

9 优先级队列 PriorityQueue
import threading
from random import randint
from queue import PriorityQueue

q = PriorityQueue()

def double(n):
    return n * 2

def producer():
    count = 0
    while 1:
        if count > 5:
            break
        pri = randint(0, 100)
        print('put :{}'.format(pri))
        q.put((pri, double, pri))  # (priority, func, args)
        count += 1

def consumer():
    while 1:
        if q.empty():
            break
        pri, task, arg = q.get()
        print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
        q.task_done()
        time.sleep(0.1)

t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
 

10 线程池
当线程执行相同的任务时用线程池
10.1 multiprocessing.pool 中的线程池
from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
pool.map(lambda x: x**2, range(5))
10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool
10.3 concurrent.futures.ThreadPoolExecutor
from concurrent.futures improt ThreadPoolExecutor
from concurrent.futures import as_completed
import urllib.request

URLS = ['http://www.baidu.com', 'http://www.hao123.com']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        execpt Exception as exc:
            print("%r generated an exception: %s" % (url, exc))
        else:
            print("%r page is %d bytes" % (url, len(data)))
 

11 启动多进程,等待多个进程结束
import multiprocessing
jobs = []
for i in range(2):
    p = multiprocessing.Process(target=fib, args=(12,))
    p.start()
    jobs.append(p)
for p in jobs:
    p.join()
 

12 进程池
12.1 multiprocessing.Pool
from multiprocessing import Pool
pool = Pool(2)
pool.map(fib, [36] * 2)
 
12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import math

PRIMES = [ 112272535095293, 112582705942171]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print("%d is prime: %s" % (number, prime))
 

 
13 asyncio
 
13.1 最基本的示例,单个任务
import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")

loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
13.2 最基本的示例,多个任务
import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
 
13.3 结合httpx 执行多个任务并接收返回结果
httpx 接口和 requests基本一致
import asyncio
import httpx

async def get_url():
    r = await httpx.get("http://www.baidu.com")
    return r.status_code

loop = asyncio.get_event_loop()
tasks = [get_url() for i in range(10)]
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

for num, result in zip(range(10), results):
    print(num, result)
 
 
 

相关文章