python-并发和线程

并发

并发和并行的区别

并行、parallel

​ 同时做某些事情,可以互不干扰的同一时刻做几件事

并发、concurrency

​ 一个时段内有事情要处理。


并发的解决

1、队列、缓冲区

使用队列,先进先出,解决了资源使用的问题。排成的队列,其实就是一个缓冲地带,就是缓冲区。(队列的作用:解耦,缓冲)

2、争抢

通过争抢,当一个抢到时就会触发一种类似锁机制,抢到资源就上锁,排他性的锁。这也是一种高并发解决方案,但是这样就有可能会有些会很长时间都抢不到。

3、预处理

一种提前加载用户需要的数据的思路,预处理思想,缓存常用。(要考虑到冷、热数据的问题,经常访问的数据可以先预加载)

4、并行

一般日常可以通过购买服务器,或多开进程、线程实现并行处理,来解决并发问题。注意:这些都是水平拓展的思想。

注:
如果线程在单CPU上处理,就不是并行了。
但是多数服务器都是多CPU的,服务的部署往往是多机的、分布式的,这都是并行处理。

5、提速

提高单个CPU性能,或单个服务器安装更多的CPU。
这是一种垂直扩展思想。

6、消息中间件

一般就是在程序之前实现的技术。

常见的消息中间件有RabbitMQ、ActiveMQ(Apache)、RocketMQ(阿里Apache)、kafka(Apache)等

总结:一般来说不同的并发场景使用不同的策略,而策略可能是多种方式的优化组合。


进程和线程

在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个程序的执行实例就是一个进程。
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

进程和程序的关系
程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),它也是线程的容器。

Linux进程有父进程、子进程,Windows的进程是平等关系。

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。
一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。
在许多系统中,创建一个线程比创建一个进程快10-100倍。

进程、线程的理解
现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈

线程的状态

状态 含义
就绪(Ready) 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占
运行(Running) 线程正在运行
阻塞(Blocked) 线程等待外部事件发生而无法运行,如I/O操作
终止(Terminated) 线程完成,或退出,或被取消

Python中的进程和线程


进程会启动一个解释器进程,线程共享一个解释器进程

Python的线程开发


Python的线程开发使用标准库threading

Thread类

1
2
3
#签名
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
参数名 含义
target 线程调用的对象,就是目标函数
name 为线程起的名字
args 为目标函数传递实参,元组
kwargs 为目标函数关键字传参,字典

线程启动

1
2
3
4
5
6
7
8
9
import threading

# 最简单的线程程序
def worker():
print("I'm working")
print('Fineshed')

t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动

通过threading.Thread创建一个线程对象,target是目标函数,name可以指定名称。
但是线程没有启动,需要调用start方法。
线程之所以执行函数,是因为线程中就是执行代码的,而最简单的封装就是函数,所以其实还是函数调用,当函数执行完,线程就退出,当主线程结束后,程序也就执行完毕。

线程退出

Python没有提供线程退出的方法,线程在下面情况时退出。

1、线程函数内语句执行完毕
2、线程函数中抛出未处理的异常

Python的线程没有优先级,没有线程组的概念,也不能被销毁、停止、挂起、那也没有恢复、中断了。

threading的属性和方法

名称 含义
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程
get_ident() 返回当前线程的ID,非0整数

active_count、enumerate方法返回的值还包括主线程。

Thread实例的属性和方法

名称 含义
name 只是一个名字,只是一个标识,名称可以重名。getName(),setName(),获取设置这个名词
ident 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。此ID可以重复使用
is_alive() 返回线程是否活着

注意:线程的name这是一个名称,可以重复;ID必须唯一,但可以在线程退出后再利用。

start方法和run方法

名称 含义
start() 启动线程。每一个线程必须且只能执行该方法一次
run() 运行线程函数

虽然说,start()方法会调用run()方法,而run()方法可以运行函数。
但是在使用start()方法启动线程,是启动了一个新的线程,而使用run方法只是在主线程中调用了一个普通的函数而已,并没有启动新的线程。


多线程

一个进程中如果有多个线程,就是多线程,实现一种并发。

当使用start方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。

一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。一个进程至少有一个主线程。
其他线程称为工作线程。


线程安全

线程安全,就是线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。

当在线程中使用print函数的时候,可以让它不打印换行,这样就可以避免print函数线程不安全;还可以使用标准库中的logging模块,日志处理模块,线程安全的。

1
2
3
4
5
6
7
8
9
10
11
import threading
import logging

def worker():
for x in range(20):
logging.warning("{} is running".format(threading.current_thread().name))

for x in range(1, 5):
name = "worker-{}".format(x)
t = threading.Thread(target=worker, name=name)
t.start()

daemon线程和non-daemon线程

进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。

主线程是第一个启动的线程。

父线程:如果线程A中启动了一个线程B,A就是B的父线程。

子线程:B就是A的子线程。

python中,构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好。

1
2
3
4
5
6
# 源码Thread的__init__方法中
if daemon is not None:
self._daemonic = daemon # 用户设定bool值
else:
self._daemonic = current_thread().daemon
self._ident = None

线程daemon属性,如果设定就是用户的设置,否则就取当前线程的daemon值。
主线程是non-daemon线程,即daemon = False。

名称 含义
daemon属性 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常
isDaemon() 是否是daemon线程
setDaemon 设置为daemon线程,必须在start方法之前设置

总结:线程具有daemon属性,可以显示设置为True或False,也可以不设置,则取默认值None
如果不设置daemon,就取当前线程的daemon来设置它。

主线程是non-daemon线程,即daemon = False。
从主线程创建的所有线程的不设置daemon属性,则默认都是daemon = False,也就是non-daemon线程。

Python程序在没有活着的non-daemon线程运行时退出,也就是剩下的只能是daemon线程,主线程才能退出,否则主线程就只能等待。

如果有non-daemon线程的时候,主线程退出时,也不会杀掉所有daemon线程,直到所有non-daemon线程全部结束,如果还有daemon线程,主线程需要退出,会结束所有daemon线程,退出。


join方法

join(timeout=None),是线程的标准方法之一。
一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止。
一个线程可以被join多次。
timeout参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束。
一个线程调用谁的join方法,就是join谁,就要等谁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
import threading


def foo(n):
for i in range(n):
print(i)
time.sleep(0.3)

t1 = threading.Thread(target=foo, args=(10,), daemon=True)
t1.start()
t1.join()

print("Main Thread Exiting")

使用了join方法后,daemon线程执行完了,主线程才退出。


daemon线程的应用场景

这个概念唯一的作用就是,当你把一个线程设置为 daemon,它会随主线程的退出而退出。

主要应用场景有:

1、后台任务。如发送心跳包、监控,这种场景最多。

2、主线程工作才有用的线程。如主线程中维护这公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适。

3、随时可以被终止的线程

如果主线程退出,想所有其它工作线程一起退出,就使用daemon=True来创建工作线程。

比如,开启一个线程定时判断WEB服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程退出一起退出。这种daemon线程一旦创建,就可以忘记它了,只用关心主线程什么时候退出就行了。
daemon线程,简化了程序员手动关闭线程的工作。

如果在non-daemon线程A中,对另一个daemon线程B使用了join方法,这个线程B设置成daemon就没有什么意义了,因为non-daemon线程A总是要等待B。

如果在一个daemon线程C中,对另一个daemon线程D使用了join方法,只能说明C要等待D,主线程退出,C和D不管是否结束,也不管它们谁等谁,都要被杀掉。


threading.local类

1
2
3
4
5
6
7
8
9
10
11
12
import threading
import time
# 局部变量实现
def worker():
x = 0
for i in range(100):
time.sleep(0.0001)
x += 1
print(threading.current_thread(), x)

for i in range(10):
threading.Thread(target=worker).start()

当想保证线程安全,可以使用局部变量来进行运算,避免错误。

当然如果想使用全局变量,那就可以使用threading下的local类,将这个类实例化得到一个全局变量,但是不同的线程使用这个对象存储的数据,其他线程看不见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

global_date = threading.local()


def worker():
global_date.x = 0
for i in range(100):
time.sleep(0.001)
global_date.x += 1
print(threading.current_thread(), global_date.x)


for i in range(5):
threading.Thread(target=worker).start()

#<Thread(Thread-3, started 3648)> 100
#<Thread(Thread-4, started 9404)> 100
#<Thread(Thread-1, started 7472)> 100
#<Thread(Thread-5, started 10876)> 100
#<Thread(Thread-2, started 12424)> 100

threading.local类构建了一个大字典,存放所有线程相关的字典,定义如下:
{ id(thread) -> (ref(thread), thread-local dict) }
每一线程实例的id为key,元组为value。value中两部分为线程对象引用,每个线程自己的字典。

本质
运行时,threading.local实例处在不同的线程中,就从大字典中找到当前线程相关键值对中的字典,覆盖threading.local实例的 __dict__ 。
这样就可以在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全

定时器Timer/延时执行


threading.Timer继承自Thread,这个类用来定义延迟多久后执行一个函数。

class.threading.Timer(interval, function, args=None, kwargs=None)
start方法执行之后,Timer对象会处于等待状态,等待了interval秒之后,然后开始执行function函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading
import time
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker():
logging.info("in worker")
time.sleep(2)


t = threading.Timer(4, worker)
t.setName("timer")

t.start()

while True:
print(threading.enumerate())
time.sleep(1)
if len(threading.enumerate()) == 1:
break


#打印
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]
2018-10-12 16:24:44,066 timer 2768 in worker
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]
[<_MainThread(MainThread, started 10416)>, <Timer(timer, started 2768)>]

Timer提供了cancel方法,用来取消一个未执行的函数,如果上面例子中worker函数已经开始执行,cancel就没有任何效果了。

总结
Timer是线程Thread的子类,就是线程类,具有线程的能力和特征。
它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它。
cancel方法本质使用Event类实现。这并不是说,线程提供了取消的方法。

线程同步

概念:线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

不同操作系统实现技术有所不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件Event等

Event事件


Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。

名称 含义
set() 标记设置为True
clear() 标记设置为False
is_set() 标记是否为True
wait(timeout=None) 设置等待标记为True的时长,None为无限等待。等到返回True,未等到超时了返回False

需求:

老板雇佣了一个工人,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import logging
import time
from threading import Event, Thread

FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


def boss(event: Event):
logging.info("I'm boss, waitting for U.")
event.wait() #等待
logging.info("Good Job.")


def worker(event: Event, count=10):
logging.info("i'm working for U.")
cups = []
while True:
logging.info("make 1")
time.sleep(0.5)
cups.append(1)
if len(cups) >= count:
event.set() #通知
break
logging.info("I'm finish my job, cups = {}".format(cups))


event = Event()
t1 = Thread(target=boss, name="bose", args=(event,)).start()
t2 = Thread(target=worker, name="worker", args=(event,)).start()

总结
使用同一个Event对象的标记flag。
谁wait就是等到flag变为True,或等到超时返回False。不限制等待的个数。

wait的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from threading import Event, Thread
import logging
logging.basicConfig(level=logging.INFO)

def do(event:Event, interval:int):
while not event.wait(interval): # 条件中使用,返回True或者False
logging.info('do sth.')

e = Event()
Thread(target=do, args=(e, 3)).start()

e.wait(10) # 也可以使用time.sleep(10)
e.set()
print('main exit')


#输出

INFO:root:do sth.
INFO:root:do sth.
INFO:root:do sth.
main exit

Process finished with exit code 0

Event的wait优于time.sleep,它会更快的切换到其它线程,提高并发效率。


Lock

锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以使用这个资源。

锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞

名称 含义
acquire(blocking=True,timeout=-1) 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回False
release() 释放锁。可以从任何线程调用释放。已上锁的锁,会被重置为unlocked,未上锁的锁上调用,抛RuntimeError异常。

加锁和解锁


一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

加锁、解锁常用语句:
1、使用try…finally语句保证锁的释放

1
2
3
4
5
try:            
self.lock.acquire()
finally:
self.lock.release()

2、with上下文管理,锁对象支持上下文管理

1
2
3
4
self.__lock = threading.Lock() 
with self.__lock:
******

锁的应用场景


锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?
不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁

使用锁的注意事项:

  • 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
    • 举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁,一辆辆过。
  • 加锁时间越短越好,不需要就立即释放锁
  • 一定要避免死锁

可重入锁RLOCK


可重入锁,是线程相关的锁。
线程A获得可重复锁,并可以在同一线程中多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release。release多了会报错。有个count在计数。属主owner会记录当前是谁在使用锁。

当锁未释放完,其他线程获得锁就会阻塞,直到当前持有锁的线程释放完锁。

Condition


构造方法Condition(lock=None),可以传入一个Lock或Rlock对象,默认是Rlock。

名称 含义
acquire(*args) 获取锁
wait(self,timeout = None) 等待或超时
notify(n =1) 唤醒至多指定指定个数的等待的线程,没有等待的线程就没有任何操作
notify_all() 唤醒所有等待的线程

Condition用于生产者、消费者模型,为了解决生产者消费者速度匹配的问题。