Python面试题整理01——GIL全局解释器锁+多线程/多进程+race condition
Python面试题整理01——GIL全局解释器锁+多线程/多进程+race condition

Python面试题整理01——GIL全局解释器锁+多线程/多进程+race condition

GIL

GIL(Global Interpreter Lock, 全局解释器锁),其实并非Python的特点。但是因为CPython解释器的广泛应用,导致这个问题多在Python面试中会被问及。

GIL更多的是一种工程上的取舍,算不上优点或者缺点。GIL主要目的是简化缓存管理,当一个线程获取GIL时,其他线程必须等待线程释放GIL才能继续执行。这样的可以避免多个线程同时修改一个对象的引用计数,比如变量赋值,导致内存泄漏或者对象被错误回收。同时GIL非常的简单安全,实现成本比给每个对象加锁低了很多。而且很多C扩展库本身不是线程安全,GIL保证了调用这些库时的安全性。

不过这带来的问题就是CPython利用多核CPU的能力大大削弱,比如计算密集型的任务:数学运算、图像处理、矩阵计算、视频解码等等,即便CPU有16核,启动16个线程运算,最后也无法并行运行,只能并发运行。不过补充一点,GIL对于I/O密集型任务:网络爬虫、读写文件、数据库查询影响并不是很大。

python3.13中的free-threaded模式已经在尝试移除GIL,3.13中包含了实验性的no-GIL构建版本。

线程/进程

线程和进程,以及并行和并发是不同的概念。

线程是CPU调度的最小单位。同一个进程内的线程共享内存空间和资源,但是各自有独立的栈、寄存器、程序计数器。粗糙点理解就是同一个世界内多个执行线路。

进程是操作系统分配资源的最小单位。每个进程是有自己的独立的虚拟内存空间的。粗糙的可以理解为具备一个独立的小世界,A进程和B进程之间内存不共享。

并发是逻辑上同时发生但是不要求真的同时执行,是一个程序设计问题。

并行则是物理上真的同时执行,是一个硬件资源问题。

python里的多线程、多进程

python中常见的多线程模块是threading,不过正如上面所说,因为GIL存在,多线程也不能跑CPU密集任务,只能处理I/O密集型任务,比如文件读写、网络请求类的。此外还可以利用异步编程asyncio来实现多线程。

python中常见的多进程模块是multiprocessing,这个单独开了几个内存空间和独立的GIL,这样就可以利用多核CPU,实现并行计算。不过创建、销毁进程的开销很大,而且进程间通讯IPC比线程间更复杂。

为了更好的体现多线程、多进程特点,遂整理演示代码。如仅仅面试,无需过于深入了解。

额外补充:threading展示

首先我们构建一个task1任务,来模拟一个需要等待的任务。

import asyncio
import multiprocessing
import threading
import time

num = 0
def task1(name):
    global num
    print(f"{name}启动!")
    for _ in range(100):
        tmp = num + 1  # 模拟参数传递
        time.sleep(0.000001)  # 模拟等待
        num = tmp
    time.sleep(1) # 模拟等待
    print(f"{name}终止!")

常规调用输出如下:

start_time = time.time()
for i in range(5):
    task1(f"任务{i}")
end_time = time.time()
print(f"耗时为{end_time - start_time},num为{num}。")
 # 输出:耗时为5.266556978225708,num为500。
 # 总结:结果正确,但是耗时太久。

想要加速,我们可以优先使用多进程,也就是threading,不过任务中需要加锁,我们构建task2任务。

# 我们用task2来模拟一个需要等待的加锁的任务
lock = threading.Lock()  # 锁
def task2(name):
    global num
    print(f"{name}启动!")
    for _ in range(100):
        with lock:
            tmp = num + 1  # 模拟参数传递
            time.sleep(0.000001)  # 模拟等待
            num = tmp
    time.sleep(1)
    print(f"{name}终止!")

启用threading:

    num = 0
    start_time = time.time()
    threads = []
    for i in range(5):
        t = threading.Thread(target=task2, args=(f"任务{i}",))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    end_time = time.time()
    print(f"耗时为{end_time - start_time},num为{num}。")
    # 输出:耗时为1.276193618774414,num为500。

其实上述代码中,多线程主要规避了最后那个time.sleep(1),也就是在无效的等待时间。在这个无效等待时间里去执行其他更有用的代码。不过需要注意lock的使用,避免出现race condition(后续补充介绍)。

额外补充:asyncio展示

我们同样构建两个任务,一个不用asyncio,一个使用asyncio。

import asyncio
import multiprocessing
import threading
import time

num = 0

# 我们用task1来模拟一个需要等待的io
def task1(name):
    global num
    print(f"{name}启动!")
    for _ in range(100):
        tmp = num + 1  # 模拟参数传递
        time.sleep(0.000001)  # 模拟等待
        num = tmp
    time.sleep(1)
    print(f"{name}终止!")


# 我们用task2来模拟一个async任务
async def task2(name):
    global num
    print(f"{name}启动!")
    for _ in range(100):
        tmp = num + 1  # 模拟参数传递
        time.sleep(0.000001)  # 模拟等待
        num = tmp
    await asyncio.sleep(1)
    print(f"{name}终止!")

下面来开始执行:


if __name__ == "__main__":

    print("##############测试1:asyncio测试。##############")
    # #################################未启用threading###########################################
    start_time = time.time()
    for i in range(5):
        task1(f"任务1_{i}")
    end_time = time.time()
    print(f"不启用asyncio,耗时为{end_time - start_time}。{num}")
    # 输出:耗时为5.277087450027466。500

    # #################################启用asyncio###########################################
    num = 0
    async def main():
        start_time = time.time()
        tasks = [
            asyncio.create_task(task2("任务1_1")),
            asyncio.create_task(task2("任务1_2")),
            asyncio.create_task(task2("任务1_3")),
            asyncio.create_task(task2("任务1_4")),
            asyncio.create_task(task2("任务1_5")),
        ]
        await asyncio.gather(*tasks)
        end_time = time.time()
        # gather表示多任务并发执行+收集结果
        print(f"启用asyncio,耗时为{end_time - start_time}。{num}")
    asyncio.run(main())  # 创建并运行
    # 输出:启用asyncio,耗时为1.279505729675293。500

额外补充:race condition

race condition,竞态条件。这个翻译有点奇怪~

上述我们展示中,无论是threading还是asyncio,在task2中都会有一定程度的任务改造,比如lock和await,那如果我们不改造,直接使用task1会怎么样呢?

import asyncio
import multiprocessing
import threading
import time
# 我们用task1来模拟一个需要等待的任务
def task1(name):
    global num
    print(f"{name}启动!")
    for _ in range(100):
        tmp = num + 1  # 模拟参数传递
        time.sleep(0.000001)  # 模拟等待
        num = tmp
    time.sleep(1)
    print(f"{name}终止!")    
if __name__ == "__main__":
    num = 0
    start_time = time.time()
    threads = []
    for i in range(5):
        t = threading.Thread(target=task1, args=(f"任务1_{i}",))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    end_time = time.time()
    print(f"启用threading,共享变量导致race condition,耗时为{end_time - start_time},num为{num}。")
    # 输出:启用threading,共享变量导致race condition,耗时为1.0609424114227295,num为100。
    # 结果错误,理想状态是500,但是耗时确实变短,说明在并发执行。

可以看到,并发执行了,但是结果不太对。这是因为没有lock约束,导致一个线程计算num时候,另一个线程也在计算num。本来线程1计算num加到了98,结果线程2也在计算,赋值num为88,这就导致num被覆盖掉了。下一轮线程1计算不是在98基础上计算,而是在88基础上计算了。

额外补充:multiprocessing展示

multiprocessing是多进程计算,主要解决复杂的计算任务。因此我们构建一个相对简单的数据计算任务。

import asyncio
import threading
import time
from multiprocessing import Process, Queue

# 我们用task1来模拟一个需要复杂的计算任务
def task1():
    result = 0
    for i in range(100000000):
        result += i*i+i-(i+1)*(i-1)
    print(result)

然后运行看一下:

if __name__ == "__main__":

    print("##############测试1:asyncio测试。##############"
    # #################################不启用multiprocessing###########################################
    start_time = time.time()
    task1()
    end_time = time.time()
    print(f"不启用multiprocessing,耗时为{end_time - start_time}。")
    # 输出:不启用multiprocessing,耗时为12.580572128295898。

然后我们拆分一下上述任务,比如之前任务计算1加到100,现在既然我们有四个进程,是不是就可以分开计算,A负责1加到25,B负责26加到50,C负责51加到75,D负责76加到100,然后把四个结果在加起来。也就是计算的任务需要我们手动拆分一下。

def task2(start,end,q):
    result = 0
    for i in range(start,end):
        result += i*i+i-(i+1)*(i-1)
    print(result)
    q.put(result)  # 结果回传

然后我们启用multiprocessing:

    # #################################启用multiprocessing###########################################
    start_time = time.time()
    q = Queue()  # 通信通道
    processes = []
    nums = 100000000
    step = nums // 4  # 就是本来是100000000,现在拆分成四个任务,就是25000000每个

    for i in range(4):
        # 创建进程对象,任务,参数
        p = Process(target=task2, args=(i * step, (i + 1) * step, q))
        # 启动进程
        p.start()
        processes.append(p)

    for p in processes:
        # 等待进程结束
        p.join()

    result = sum(q.get() for _ in range(4))  # q.get收集结果

    print("结果:", result)
    end_time = time.time()
    print("多进程耗时:", end_time - start_time, "秒")
    # 输出:多进程耗时: 3.616527557373047 秒

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注