GIL
GIL(Global Interpreter Lock, 全局解释器锁),其实并非Python的特点。但是因为CPython解释器的广泛应用,导致这个问题多在Python面试中会被问及。
GIL更多的是一种工程上的取舍,算不上优点或者缺点。GIL主要目的是简化缓存管理,当一个线程获取GIL时,其他线程必须等待线程释放GIL才能继续执行。这样的可以避免多个线程同时修改一个对象的引用计数,比如变量赋值,导致内存泄漏或者对象被错误回收。同时GIL非常的简单安全,实现成本比给每个对象加锁低了很多。而且很多C扩展库本身不是线程安全,GIL保证了调用这些库时的安全性。
不过这带来的问题就是CPython利用多核CPU的能力大大削弱,比如计算密集型的任务:数学运算、图像处理、矩阵计算、视频解码等等,即便CPU有16核,启动16个线程运算,最后也无法并行运行,只能并发运行。不过补充一点,GIL对于I/O密集型任务:网络爬虫、读写文件、数据库查询影响并不是很大。
python3.13中的free-threaded模式已经在尝试移除GIL,3.13中包含了实验性的no-GIL构建版本。
线程/进程
线程和进程,以及并行和并发是不同的概念。
线程是CPU调度的最小单位。同一个进程内的线程共享内存空间和资源,但是各自有独立的栈、寄存器、程序计数器。粗糙点理解就是同一个世界内多个执行线路。
进程是操作系统分配资源的最小单位。每个进程是有自己的独立的虚拟内存空间的。粗糙的可以理解为具备一个独立的小世界,A进程和B进程之间内存不共享。
并发是逻辑上同时发生但是不要求真的同时执行,是一个程序设计问题。
并行则是物理上真的同时执行,是一个硬件资源问题。
python里的多线程、多进程
python中常见的多线程模块是threading,不过正如上面所说,因为GIL存在,多线程也不能跑CPU密集任务,只能处理I/O密集型任务,比如文件读写、网络请求类的。此外还可以利用异步编程asyncio来实现多线程。
python中常见的多进程模块是multiprocessing,这个单独开了几个内存空间和独立的GIL,这样就可以利用多核CPU,实现并行计算。不过创建、销毁进程的开销很大,而且进程间通讯IPC比线程间更复杂。
为了更好的体现多线程、多进程特点,遂整理演示代码。如仅仅面试,无需过于深入了解。
额外补充:threading展示
首先我们构建一个task1任务,来模拟一个需要等待的任务。
import asyncio
import multiprocessing
import threading
import time
num = 0
def task1(name):
global num
print(f"{name}启动!")
for _ in range(100):
tmp = num + 1 # 模拟参数传递
time.sleep(0.000001) # 模拟等待
num = tmp
time.sleep(1) # 模拟等待
print(f"{name}终止!")
常规调用输出如下:
start_time = time.time()
for i in range(5):
task1(f"任务{i}")
end_time = time.time()
print(f"耗时为{end_time - start_time},num为{num}。")
# 输出:耗时为5.266556978225708,num为500。
# 总结:结果正确,但是耗时太久。
想要加速,我们可以优先使用多进程,也就是threading,不过任务中需要加锁,我们构建task2任务。
# 我们用task2来模拟一个需要等待的加锁的任务
lock = threading.Lock() # 锁
def task2(name):
global num
print(f"{name}启动!")
for _ in range(100):
with lock:
tmp = num + 1 # 模拟参数传递
time.sleep(0.000001) # 模拟等待
num = tmp
time.sleep(1)
print(f"{name}终止!")
启用threading:
num = 0
start_time = time.time()
threads = []
for i in range(5):
t = threading.Thread(target=task2, args=(f"任务{i}",))
t.start()
threads.append(t)
for t in threads:
t.join()
end_time = time.time()
print(f"耗时为{end_time - start_time},num为{num}。")
# 输出:耗时为1.276193618774414,num为500。
其实上述代码中,多线程主要规避了最后那个time.sleep(1),也就是在无效的等待时间。在这个无效等待时间里去执行其他更有用的代码。不过需要注意lock的使用,避免出现race condition(后续补充介绍)。
额外补充:asyncio展示
我们同样构建两个任务,一个不用asyncio,一个使用asyncio。
import asyncio
import multiprocessing
import threading
import time
num = 0
# 我们用task1来模拟一个需要等待的io
def task1(name):
global num
print(f"{name}启动!")
for _ in range(100):
tmp = num + 1 # 模拟参数传递
time.sleep(0.000001) # 模拟等待
num = tmp
time.sleep(1)
print(f"{name}终止!")
# 我们用task2来模拟一个async任务
async def task2(name):
global num
print(f"{name}启动!")
for _ in range(100):
tmp = num + 1 # 模拟参数传递
time.sleep(0.000001) # 模拟等待
num = tmp
await asyncio.sleep(1)
print(f"{name}终止!")
下面来开始执行:
if __name__ == "__main__":
print("##############测试1:asyncio测试。##############")
# #################################未启用threading###########################################
start_time = time.time()
for i in range(5):
task1(f"任务1_{i}")
end_time = time.time()
print(f"不启用asyncio,耗时为{end_time - start_time}。{num}")
# 输出:耗时为5.277087450027466。500
# #################################启用asyncio###########################################
num = 0
async def main():
start_time = time.time()
tasks = [
asyncio.create_task(task2("任务1_1")),
asyncio.create_task(task2("任务1_2")),
asyncio.create_task(task2("任务1_3")),
asyncio.create_task(task2("任务1_4")),
asyncio.create_task(task2("任务1_5")),
]
await asyncio.gather(*tasks)
end_time = time.time()
# gather表示多任务并发执行+收集结果
print(f"启用asyncio,耗时为{end_time - start_time}。{num}")
asyncio.run(main()) # 创建并运行
# 输出:启用asyncio,耗时为1.279505729675293。500
额外补充:race condition
race condition,竞态条件。这个翻译有点奇怪~
上述我们展示中,无论是threading还是asyncio,在task2中都会有一定程度的任务改造,比如lock和await,那如果我们不改造,直接使用task1会怎么样呢?
import asyncio
import multiprocessing
import threading
import time
# 我们用task1来模拟一个需要等待的任务
def task1(name):
global num
print(f"{name}启动!")
for _ in range(100):
tmp = num + 1 # 模拟参数传递
time.sleep(0.000001) # 模拟等待
num = tmp
time.sleep(1)
print(f"{name}终止!")
if __name__ == "__main__":
num = 0
start_time = time.time()
threads = []
for i in range(5):
t = threading.Thread(target=task1, args=(f"任务1_{i}",))
t.start()
threads.append(t)
for t in threads:
t.join()
end_time = time.time()
print(f"启用threading,共享变量导致race condition,耗时为{end_time - start_time},num为{num}。")
# 输出:启用threading,共享变量导致race condition,耗时为1.0609424114227295,num为100。
# 结果错误,理想状态是500,但是耗时确实变短,说明在并发执行。
可以看到,并发执行了,但是结果不太对。这是因为没有lock约束,导致一个线程计算num时候,另一个线程也在计算num。本来线程1计算num加到了98,结果线程2也在计算,赋值num为88,这就导致num被覆盖掉了。下一轮线程1计算不是在98基础上计算,而是在88基础上计算了。
额外补充:multiprocessing展示
multiprocessing是多进程计算,主要解决复杂的计算任务。因此我们构建一个相对简单的数据计算任务。
import asyncio
import threading
import time
from multiprocessing import Process, Queue
# 我们用task1来模拟一个需要复杂的计算任务
def task1():
result = 0
for i in range(100000000):
result += i*i+i-(i+1)*(i-1)
print(result)
然后运行看一下:
if __name__ == "__main__":
print("##############测试1:asyncio测试。##############"
# #################################不启用multiprocessing###########################################
start_time = time.time()
task1()
end_time = time.time()
print(f"不启用multiprocessing,耗时为{end_time - start_time}。")
# 输出:不启用multiprocessing,耗时为12.580572128295898。
然后我们拆分一下上述任务,比如之前任务计算1加到100,现在既然我们有四个进程,是不是就可以分开计算,A负责1加到25,B负责26加到50,C负责51加到75,D负责76加到100,然后把四个结果在加起来。也就是计算的任务需要我们手动拆分一下。
def task2(start,end,q):
result = 0
for i in range(start,end):
result += i*i+i-(i+1)*(i-1)
print(result)
q.put(result) # 结果回传
然后我们启用multiprocessing:
# #################################启用multiprocessing###########################################
start_time = time.time()
q = Queue() # 通信通道
processes = []
nums = 100000000
step = nums // 4 # 就是本来是100000000,现在拆分成四个任务,就是25000000每个
for i in range(4):
# 创建进程对象,任务,参数
p = Process(target=task2, args=(i * step, (i + 1) * step, q))
# 启动进程
p.start()
processes.append(p)
for p in processes:
# 等待进程结束
p.join()
result = sum(q.get() for _ in range(4)) # q.get收集结果
print("结果:", result)
end_time = time.time()
print("多进程耗时:", end_time - start_time, "秒")
# 输出:多进程耗时: 3.616527557373047 秒