在开始之前我们先了解接个概念,出于理解的目的,都属于口头定义。

  • 程序:一堆代码以文本形式存入一个文档,可以被解释或者编译后让计算机去执行。

  • 进程:
    • 是程序运行的一个状态
    • 因为程序在运行,所以包含程序运行的时候的地址空间,内存,数据栈等
    • 每个进程由自己完全独立的运行环境
    • 多进程共享数据是一个问题,一般要借助于一个独立于进程的内容

    进程可以看做是你打开某一个程序后的运行状态,比如打开QQ那就启动了一个QQ进程,如果再打开一个QQ,你虽然 电脑上运行了两个QQ,但这两个QQ之间的运行数据其实不共享,你登录的也是两个号码。

  • 线程:
    • 把一个进程,也就是一个程序的的运行细分成几个片段,每一个片段理解成一个线程,比如我教python是一个 进程的话,那么备课,上课,讲作业,答疑可以看做教python这件事的子任务,也可以理解成都是教python这个进程的线程。
    • 一个进程的多个线程间共享共享同一套数据和上下文运行环境
    • 因为贡献共同一套资源,可能会存在资源的共享互斥问题,比如我备课要用电脑,上课也要用电脑,如果我一边上课 一边备课的话,我的电脑到底应该打开上课的课件还是出现备课的内容?
  • 全局解释器锁(GIL:GlobalInterpreterLock):
    • Python开发的时候为了彻底解决多线程共享资源的问题,cPython解释器禁止真正的多线程,即一个CPU内不允许同时 运行两个线程,注意是同时,这个机制成为GIL,如果是单核CPU问题还不太大,但CPU跨入多核时代就显得限制太多了
  • 举一个栗子:
    • 把程序运行当成做饭就好了,做一桌饭就是一个进程, 一般只有一个,必须要同时准备两桌饭的情况少
    • 做饭的妈妈就是CPU,在做饭过程中,红烧肉,炒白菜,炖肘子就是三个线程
    • 效率最低的就是做好红烧肉再炒白菜,炒好白菜在准备做炖肘子,这样其实三个线程每个执行完后再执行,等于不分线程,即单线程
    • 在做饭过程中我们使用同一套资源,比如案板,菜刀,锅等,还有同一个妈妈
    • 妈妈都是超人,她的正常顺序是红烧肉放上水炖着就收拾肘子,红烧肉好了把肘子炖上后开始洗菜切菜,肘子好了直接开炒白菜
    • 如果按照方案2做的饭时间缩短好多,前提是提高了资源(厨房用具)的利用率和妈妈的或火气值(CPU的温度,CPU利用率)
    • 如果很着急,可以让爸爸也进厨房帮忙(增加一个CPU,多核), 爸爸炒菜做红烧肉,妈妈炖肘子
    • 一般程序没有GIL,所以爸爸妈妈可以同时在厨房干活,但Python这个厨房有GIL锁,规定一个时间厨房里只能有一个人,这样 如果一定要让爸爸帮忙,只能妈妈出来爸爸再进去(不是真正的多核多线程)

多线程很多时候我们会感觉是这个程序好几个任务在”同时”执行, 但其实是CPU利用自己超高的执行速度把时间片段 分化的很细,每个任务轮流去执行,只不过轮换的速度极快,你不会感觉到卡顿。

thread包实现多线程

thread这个模块可以实现多线程,因为历史原因,thread也有些问题,所以并不推荐,但这个模块对 多线程的使用比较底层,python3中把这个模块改成了_thread

下面我用代码展示thread模块的时候用。

顺序执行案例

time模块中有个函数ctime可以用来得到当前的时间,我们程序每次执行开头和结束得到这个时间,然后 就可以大致测量一个函数的执行时间。

看下面代码:

'''
利用time函数,生成两个函数
顺序调用
计算总的运行时间
'''
import time

def loop1():
    print('Start loop 1 at :', time.ctime())
    time.sleep(4)
    print('End loop 1 at:', time.ctime())

def loop2():
    print('Start loop 2 at :', time.ctime())
    time.sleep(2)
    print('End loop 2 at:', time.ctime())

def main():
    print("Starting at:", time.ctime())
    loop1()
    loop2()
    print("All done at:", time.ctime())

if __name__ == '__main__':
    main()

函数loop1loop2是顺序执行,程序总执行时间也基本等于这两个函数时间的和。

程序执行结果:

Starting at: Tue Apr 27 14:41:56 2021
Start loop 1 at : Tue Apr 27 14:41:56 2021
End loop 1 at: Tue Apr 27 14:42:00 2021
Start loop 2 at : Tue Apr 27 14:42:00 2021
End loop 2 at: Tue Apr 27 14:42:02 2021
All done at: Tue Apr 27 14:42:02 2021

第一个多线程案例

如果我们把这两个函数改成度线程,则在这个程序执行中,这是一个进程,但在进程里有两个线程,线程是 并行执行的,所以程序总执行时间应该大体等于这两个函数执行时间最长的时间。

如果使用_thread创建多线程,则需要用函数start_new_thread创建一个县城并执行, 这个函数的参数有两个,第一个是多线程中需要执行的任务,是一个函数,需要注意的是调用格式,即只写入 函数的名称即可, 第二个参数是元祖,用来 表示执行函数的时候需要的参数,如果调用函数不需要参数,则写空元祖。

在需要注意的是,我们在下面的代码中创建了两个线程,这两个线程叫子线程,那么下面代码的执行,其实也是 一个线程,我们叫主线程,简而言之,是主线程创建了子线程,创建完子线程后主线程如果执行完毕,可能造成一些问题, 所以在代码中为了不让主线程结束执行,我们给加了一个死循环。

我们把上面程序改造成度线程,多线程利用_thread模块实现:

'''
利用time函数,生成两个函数
顺序调用
计算总的运行时间
'''
import time
import _thread as thread

def loop1():
    print('Start loop 1 at :', time.ctime())
    time.sleep(4)
    print('End loop 1 at:', time.ctime())

def loop2():
    print('Start loop 2 at :', time.ctime())
    time.sleep(2)
    print('End loop 2 at:', time.ctime())

def main():
    print("Starting at:", time.ctime())
    # 启动多线程的意思是用多线程去执行某个函数
    # 启动多线程函数为start_new_thead
    # 参数两个,一个是需要运行的函数名,第二是函数的参数,作为元祖使用,为空则使用空元祖
    # 注意:如果函数只有一个参数,需要参数后由一个逗号
    
    # 线程开始创建后开始执行
    thread.start_new_thread(loop1, ())
    thread.start_new_thread(loop2, ())

    print("All done at:", time.ctime())


if __name__ == '__main__':
    main()
    # 下面这个死循环要有,否则主线程结束后得不到我们的结果
    while True:
        time.sleep(1)

上面代码的执行结果如下:

Starting at: Tue Apr 27 14:47:57 2021
All done at: Tue Apr 27 14:47:57 2021
Start loop 1 at :Start loop 2 at : Tue Apr 27 14:47:57 2021
Tue Apr 27 14:47:57 2021
End loop 2 at: Tue Apr 27 14:47:59 2021
End loop 1 at: Tue Apr 27 14:48:01 2021

上面代码子线程loop1loop2执行时间分别是4秒和2秒,而主线程只是启动了两个线程然后就结束了, 理论上执行时间可以忽略掉,所以,这个程序总共执行时间基本上是 max(4,2)

子线程带参数调用

下面案例展示了一个启动一个需要参数的子线程的案例,其余的都一样,不做过多解释。

# 练习带参数的多线程启动方法
import time
import _thread as thread

def loop1(in1):
    print('Start loop 1 at :', time.ctime())
    print("我是参数 ",in1)
    time.sleep(4)
    print('End loop 1 at:', time.ctime())

def loop2(in1, in2):
    print('Start loop 2 at :', time.ctime())
    print("我是参数 " ,in1 , "和参数  ", in2)
    time.sleep(2)
    print('End loop 2 at:', time.ctime())

def main():
    print("Starting at:", time.ctime())
    # 启动多线程的意思是用多线程去执行某个函数
    # 启动多线程函数为start_new_thead
    # 参数两个,一个是需要运行的函数名,第二是函数的参数作为元祖使用,为空则使用空元祖
    # 注意:如果函数只有一个参数,需要参数后由一个逗号
    thread.start_new_thread(loop1,("王老大", ))
    thread.start_new_thread(loop2,("王大鹏", "王晓鹏"))

    print("All done at:", time.ctime())

if __name__ == "__main__":
    main()
    while True:
        time.sleep(10)

因为_thread已不再推荐使用,所以我们不在深入讲解,下面讲解常用的多线程调用方法。

threading的使用

threading是我们多线程代码中常用到的模块,我们在本章中进行介绍。

threading的基本使用

threading模块使用多线程的过程一般分为这两步:

  • 直接利用threading.Thread生成Thread实例

      t = threading.Thread(target=xxx, args=(xxx,))
    
  • 启动生成的实例去执行

       # 启动多线程
       t.start()
       # 等待多线程执行完成
       t.join()
    

我们还是从一个案例开始, 其中threading.Thread实例生成至少需要两个参数:target和arg:

  • target:多线程的执行代码块,这里是一个函数
  • args:启动多线程代码的参数元祖

一旦生成Thread实例后,需要利用:

  • Thread.start: 启动多线程的执行
  • Thread.join: 告诉主线程等待多线程执行完毕

正是因为有了Thread.join函数,在主线程的代码中才不再需要死循环来强制主线程的执行时间长过子线程的执行。

代码如下,指向效果跟上面的代码一致:

import time
# 导入多线程处理包
import threading

def loop1(in1):
    print('Start loop 1 at :', time.ctime())
    print("我是参数 ",in1)
    time.sleep(4)
    print('End loop 1 at:', time.ctime())

def loop2(in1, in2):
    print('Start loop 2 at :', time.ctime())
    print("我是参数 " ,in1 , "和参数  ", in2)
    time.sleep(2)
    print('End loop 2 at:', time.ctime())

def main():
    print("Starting at:", time.ctime())
    # 生成threading.Thread实例
    t1 = threading.Thread(target=loop1, args=("王老大",))
    t1.start()

    t2 = threading.Thread(target=loop2, args=("王大鹏", "王小鹏"))
    t2.start()

    # 等待执行完毕
    t1.join()
    t2.join()

    print("All done at:", time.ctime())

if __name__ == "__main__":
    main()
    while True:
        time.sleep(10)

threaidng的守护线程daemon

守护线程就是看线程离开主线程后是否能独立运行:

  • 如果在程序中将子线程设置成守护现成,则子线程会在主线程结束的时候自动退出
  • 一般认为,守护线程不中要或者不允许离开主线程独立运行
  • 守护线程案例能否有效果跟环境相关

下面案例没有设置守护进程,则子线程在主线程结束后也能运行:

import time
import threading

def fun():
    print("Start fun")
    time.sleep(2)
    print("end fun")

print("Main thread")

t1 = threading.Thread(target=fun, args=() )
t1.start()

time.sleep(1)
print("Main thread end") 运行结果如下:

Main thread
Start fun
Main thread end
end fun

下面案例是一个守护进程的案例,注意已经设置了守护进程:

import time
import threading

def fun():
    print("Start fun")
    time.sleep(2)
    print("end fun")

print("Main thread")

t1 = threading.Thread(target=fun, args=() )
t1.setDaemon(True)
t1.start()

time.sleep(1)
print("Main thread end")

执行后结果如下:

Main thread
Start fun
Main thread end

除此之外线程还有一些常见的属性,在实际使用中可以利用:

  • threading.currentThread:返回当前线程变量
  • threading.enumerate:返回一个包含正在运行的线程的list,正在运行的线程指的是线程启动后,结束前的状态
  • threading.activeCount: 返回正在运行的线程数量,效果跟 len(threading.enumerate)相同
  • thr.setName: 给线程设置名字
  • thr.getName: 得到线程的名字

    import time
    import threading

    def loop1():
        # ctime 得到当前时间
        print('Start loop 1 at :', time.ctime())
        # 睡眠多长时间,单位是秒
        time.sleep(6)
        print('End loop 1 at:', time.ctime())

    def loop2():
        # ctime 得到当前时间
        print('Start loop 2 at :', time.ctime())
        # 睡眠多长时间,单位是秒
        time.sleep(1)
        print('End loop 2 at:', time.ctime())


    def loop3():
        # ctime 得到当前时间
        print('Start loop 3 at :', time.ctime())
        # 睡眠多长时间,单位是秒
        time.sleep(5)
        print('End loop 3 at:', time.ctime())

    def main():
        print("Starting at:", time.ctime())
        # 生成threading.Thread实例
        t1 = threading.Thread(target=loop1, args=( ))
        # setName是给每一个子线程设置一个名字
        t1.setName("THR_1")
        t1.start()

        t2 = threading.Thread(target=loop2, args=( ))
        t2.setName("THR_2")
        t2.start()

        t3 = threading.Thread(target=loop3, args=( ))
        t3.setName("THR_3")
        t3.start()

        # 预期3秒后,thread2已经自动结束,
        time.sleep(3)
        # enumerate 得到正在运行子线程,即子线程1和子线程3
        for thr in threading.enumerate():
            # getName能够得到线程的名字
            print("正在运行的线程名字是: {0}".format(thr.getName()))

        print("正在运行的子线程数量为: {0}".format(threading.activeCount()))

        print("All done at:", time.ctime())

    if __name__ == "__main__":
        main()
        # 一定要有while语句
        # 因为启动多线程后本程序就作为主线程存在
        # 如果主线程执行完毕,则子线程可能也需要终止
        while True:
            time.sleep(10)
               

执行结果如下:

('Starting at:', 'Tue May 11 15:11:36 2021')
('Start loop 2 at :', 'Tue May 11 15:11:36 2021'()
'Start loop 3 at :' , ('Tue May 11 15:11:36 2021''Start loop 1 at :')
, 'Tue May 11 15:11:36 2021')
('End loop 2 at:', 'Tue May 11 15:11:37 2021')
正在运行的线程名字是: MainThread
正在运行的线程名字是: pydevd.Writer
正在运行的线程名字是: pydevd.Reader
正在运行的线程名字是: pydevd.CommandThread
正在运行的线程名字是: THR_3
正在运行的线程名字是: THR_1
正在运行的子线程数量为: 6
('All done at:', 'Tue May 11 15:11:39 2021')
('End loop 3 at:', 'Tue May 11 15:11:41 2021')
('End loop 1 at:', 'Tue May 11 15:11:42 2021')

threading的面向对象写法

有时候线程可以时候用面向对象技术,这样会最大程度利用OOP的好处,特别是子线程需要处理的内容比较复杂的时候。 使用方法可以直接继承threading.Thread,这时候只要重写run函数就可以,或者也可以 把子线程携程一个类,然后调用类的实例的方法。

下面案例直接重写threading.run,继承threading.Thread:

import threading
import time

# 1. 类需要继承自threading.Thread
class MyThread(threading.Thread):
    def __init__(self, arg):
        super(MyThread, self).__init__()
        self.arg = arg

    # 2 必须重写run函数,run函数代表的是真正执行的功能
    def  run(self):
        time.sleep(2)
        print("The args for this class is {0}".format(self.arg))

for i in range(5):
    t = MyThread(i)
    t.start()
    t.join()

print("Main thread is done!!!!!!!!")

下面这个案例的写法比较成熟:

#coding=utf-8
import threading
from time import sleep, ctime

loop = [4,2]

class ThreadFunc:

    def __init__(self, name):
        self.name = name

    def loop(self, nloop, nsec):
        '''
        :param nloop: loop函数的名称
        :param nsec: 系统休眠时间
        :return:
        '''
        print('Start loop ', nloop, 'at ', ctime())
        sleep(nsec)
        print('Done loop ', nloop, ' at ', ctime())

def main():
    print("Starting at: ", ctime())

    # ThreadFunc("loop").loop 跟一下两个式子相等:
    # t = ThreadFunc("loop")
    # t.loop
    # 以下t1 和  t2的定义方式相等
    t = ThreadFunc("loop")
    t1 = threading.Thread( target = t.loop, args=("LOOP1", 4))
    # 下面这种写法更西方人,工业化一点
    t2 = threading.Thread( target = ThreadFunc('loop').loop, args=("LOOP2", 2))

    # 常见错误写法
    #t1 = threading.Thread(target=ThreadFunc('loop').loop(100,4))
    #t2 = threading.Thread(target=ThreadFunc('loop').loop(100,2))

    t1.start()
    t2.start()

    t1.join( )
    t2.join()


    print("ALL done at: ", ctime())


if __name__ == '__main__':
    main()

多线程变量共享问题

  • 共享变量: 当多个线程同时访问一个变量的时候,会产生共享变量的问题

虽然线程访问同一变量很少”撞车”, 但这种情况并不意味着没有, 一旦两个变量”同时”对一个变量/内存进行写操作, 注意如果 只是读的话问题不大, 同时写可能就会造成一个”写了一半”, 结果另一个线程”写了另一半”, 造成结果混乱

请看下面案例:


import threading

sum = 0
loopSum = 1000000

def myAdd():
  global  sum, loopSum
  for i in range(1, loopSum):
    sum += 1

def myMinu():
  global  sum, loopSum
  for i in range(1, loopSum):
    sum -= 1

if __name__ == '__main__':
  print("Starting ....{0}".format(sum))

  # 开始多线程的实例,看执行结果是否一样
  t1 = threading.Thread(target=myAdd, args=())
  t2 = threading.Thread(target=myMinu, args=())

  t1.start()
  t2.start()

  t1.join()
  t2.join()

  print("Done .... {0}".format(sum))

上面代码一个函数负责对变量进行加操作, 一个函数负责减操作, 这两个都属于”写”操作, 在每个执行 10000次后,理论上变量等于没有改变,因为加减次数相当,但实际运行结果却是不可控的, 请多运行几次,就会 发现每次运行结果其实不一致.

通常解决上面问题就是我们常说的琐或者信号灯, 总体思想就是利用一个变量做为令牌,每次执行写操作的时候 要申请令牌, 只有拿到令牌的线程才有资格对变量进行操作,这样只要都遵循这个规矩就不会造成混乱.

线程安全

  • 如果一个资源/变量,他对于多线程来讲,不用加锁也不会引起任何问题,则称为线程安全
  • python中线程不安全变量类型: list, set, dict, 这种变量类型如果涉及到多线程使用,需要加锁控制共享问题
  • 线程安全变量类型: queue, 此种变量不会出现多线程共享问题

线程锁(Lock)

  • 是一个标志/令牌/变量,表示一个线程在占用一些资源, 线程如果相对一个资源操作, 就要查看这个锁的状态,只有锁的状态在打开状态 才可以被申请, 所谓申请就是对锁进行一个赋值,表示锁被关闭,此时意味着别的线程不能再操作共享变量

  • 使用方法

    • 上锁(lock.acquire)
    • 使用共享资源,放心的用
    • 取消锁,释放锁(lock.release)

import threading

sum = 0
loopSum = 1000000

# 定义锁
lock = threading.Lock()


def myAdd():
  global  sum, loopSum

  for i in range(1, loopSum):
    # 上锁,申请锁
    lock.acquire()
    sum += 1
    # 释放锁
    lock.release()


def myMinu():
  global  sum, loopSum
  for i in range(1, loopSum):
    lock.acquire()
    sum -= 1
    lock.release()

if __name__ == '__main__':
  print("Starting ....{0}".format(sum))

  # 开始多线程的实例,看执行结果是否一样
  t1 = threading.Thread(target=myAdd, args=())
  t2 = threading.Thread(target=myMinu, args=())

  t1.start()
  t2.start()

  t1.join()
  t2.join()

  print("Done .... {0}".format(sum))

上面案例我们定义了一把锁(lock), 每次对危险区域进行操作的时候, 这里指对共享变量操作, 需要申请锁, 锁本身 是一个实例, 他会记住你是否申请成功,如果成功,则你可以获得访问资源的权利,否则需要等待.

对上面变量加锁后, 运行结果就会正确, 不再混乱.

两个问题:

我们使用厨房做饭作为案例, 线程就是一个一个的厨师, 一个厨师肯定效率慢,此时我们会让两个厨师进厨房, 但两个厨师可能会带来 对共享变量的使用问题,比如两个厨师都想用炒锅炒菜, 此时就应该解决这个冲突,我们用锁解决:

  • 应该锁谁: 锁使用比较危险, 如果简化版,
    • 可以把整个代码上一把锁, 即一个厨师进入厨房后把厨房锁住,此时其实还是一个厨师, 另一个厨师是不会进入厨房的, 因为他没钥匙, 此时的多线程就多了个寂寞
    • 可以把每个资源,每个调料瓶,每个锅碗瓢盆都锁上, 需要的时候开锁,但这个时候效率可能反而会降低, 两个厨师光忙着开锁上锁了
    • 所以究竟对那个资源上锁是个问题, 不能太多,但太细密了可能也不好
  • 锁是什么: 锁我们在使用的时候其实理解成一个令牌, 只有申请到了锁/令牌才能进入下面的代码执行,否则只能等待

死锁问题

如果对于两个变量A,B, 两个线程one,two, 如果一个线程锁住/占用了一个变量, 同时要想完成整个任务还需要另一个变量, 这种情况就会发生死锁, 即都等待对方完成后释放资源,但如果完成任务又需要你先释放资源.

这个情况类似西游记里孙悟空去打水里妖怪的情形, 妖怪陆地上打不过孙悟空,孙悟空又不习惯水战, 于是就会出现妖怪站水里 让孙悟空下来, 孙悟空站岸上让妖怪上来,于是台词就简单变成了 “ 你上来, 你下来, 你上来,你下来, 你上来,你下来……..”

死锁出现的条件:

  1. 多个变量需要上锁
  2. 完成任务需要分批锁多余一个变量
  3. 多个线程

我们看下面案例:


import threading
import time

lock_1 = threading.Lock()
lock_2 = threading.Lock()


def func_1():
  print("func_1 starting.........")
  lock_1.acquire()
  print("func_1 申请了 lock_1....")
  time.sleep(2)
  print("func_1 等待 lock_2.......")
  lock_2.acquire()
  print("func_1 申请了 lock_2.......")

  lock_2.release()
  print("func_1 释放了 lock_2")

  lock_1.release()
  print("func_1 释放了 lock_1")

  print("func_1 done..........")


def func_2():
  print("func_2 starting.........")
  lock_2.acquire()
  print("func_2 申请了 lock_2....")
  time.sleep(4)
  print("func_2 等待 lock_1.......")
  lock_1.acquire()
  print("func_2 申请了 lock_1.......")

  lock_1.release()
  print("func_2 释放了 lock_1")

  lock_2.release()
  print("func_2 释放了 lock_2")

  print("func_2 done..........")

if __name__ == "__main__":

  print("主程序启动..............")
  t1 = threading.Thread(target=func_1, args=())
  t2 = threading.Thread(target=func_2, args=())

  t1.start()
  t2.start()

  t1.join()
  t2.join()

  print("主程序启动..............")

上面两个线程, 完成任务需要在锁住一个变量的时候申请另一个, 同样两个任务都是这种执行逻辑, 这样在多个线程执行不同的任务的时候 就可能两个多都在等待对方释放资源,于是死锁,程序没法继续执行下去.

死锁的解决

死锁产生根本原因就是多个线程都是死心眼, 等待对方资源就是死等, 你不给我我就等,直到天荒地老. 就像你追一个姑娘, 你就等, 姑娘有男朋友了你等她分手,姑娘结婚了你等她丧偶, 姑娘有娃了你等娃考上大学了姑娘(也可能变成老娘了)就自由了, 这样等下去大概率会 伤心的, 所以要想办法,而不是死等.

知道原因了解决方案也就简单多了, 别那么实诚, 设定一个时间, 在一定时间内等不到你就把手里资源释放, 在从新开始,.


import threading
import time

lock_1 = threading.Lock()
lock_2 = threading.Lock()

def func_1():
  print("func_1 starting.........")
  lock_1.acquire(timeout=4)
  print("func_1 申请了 lock_1....")
  time.sleep(2)
  print("func_1 等待 lock_2.......")

  rst = lock_2.acquire(timeout=2)
  if rst:
    print("func_1 已经得到锁 lock_2")
    lock_2.release()
    print("func_1 释放了锁 lock_2")
  else:
    print("func_1 注定没申请到lock_2.....")

  lock_1.release()
  print("func_1 释放了 lock_1")

  print("func_1 done..........")


def func_2():
  print("func_2 starting.........")
  lock_2.acquire()
  print("func_2 申请了 lock_2....")
  time.sleep(4)
  print("func_2 等待 lock_1.......")
  lock_1.acquire()
  print("func_2 申请了 lock_1.......")

  lock_1.release()
  print("func_2 释放了 lock_1")

  lock_2.release()
  print("func_2 释放了 lock_2")


  print("func_2 done..........")

if __name__ == "__main__":

  print("主程序启动..............")
  t1 = threading.Thread(target=func_1, args=())
  t2 = threading.Thread(target=func_2, args=())

  t1.start()
  t2.start()

  t1.join()
  t2.join()

  print("主程序结束..............")

注意上面代码, 每个锁在申请等等时候都加了时间,即在一定时间内申请不到就释放手里 所有资源,以便让其他线程执行下去.

信号灯(Semphore)

锁就有排他性,就一把锁, 你锁了别人就不能用,但有时候我们需要对变量/共享区域 进行限制, 比如一个共享变量最多只能5个线程使用,此时锁就不合适了,我们对于这种 允许多个线程同时使用,只需要控制最多同时共享数量的时候就需要用信号灯(semphore)


import threading
import time

# 参数定义最多几个线程同时使用资源
semaphore = threading.Semaphore(3)

def func():
  if semaphore.acquire():
    for i in range(5):
      print(threading.currentThread().getName() + ' get semaphore')
    time.sleep(15)
    semaphore.release()
    print(threading.currentThread().getName() + ' release semaphore')


for i in range(8):
  t1 = threading.Thread(target=func)
  t1.start()

Semphore使用和锁基本一直, 不同的是定义的时候需要制定最多同时容纳的线程数量

可重入锁

  • 一个锁,可以被一个线程多次申请, 注意是同一个线程
  • 主要解决递归调用的时候,需要申请锁的情况

import threading
import time

class MyThread(threading.Thread):
  def run(self):
    global num
    time.sleep(1)

    if mutex.acquire(1):
      num = num+1
      msg = self.name+' set num to '+str(num)
      mutex.acquire()
      print(msg)
      mutex.release()
      mutex.release()

num = 0

mutex = threading.RLock()


def testTh():
  for i in range(5):
    t = MyThread()
    t.start()


if __name__ == '__main__':
  testTh()

Event

在初始情况下,Event对象中的信号标志被设置为假。 如果有线程等待的一个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。 一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。

wait方法可接受一个超时参数,加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。

Event相关方法

  • event.isSet():返回event的状态值;

  • event.wait():如果 event.isSet()==False将阻塞线程;

  • event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

  • event.clear():恢复event的状态值为False。

import threading,time
class Teacher(threading.Thread):
    def run(self):
        print("teacher:拖堂两分钟哈")
        print(event.isSet())
        event.set()
        time.sleep(5)
        print("teacher下课!!!")
        print(event.isSet())
        event.set()
class Students(threading.Thread):
    def run(self):
        event.wait()
        print("students:啊啊啊啊啊!")
        time.sleep(3)
        event.clear()
        event.wait()
        print("students:又上课了!!!!!")
 
if __name__ == "__main__":
    event = threading.Event()
    threads = []
 
    for i in range(5):
        threads.append(Students())
    threads.append(Teacher())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print("ending")

queue队列

queue 是一个用来存放变量的数据结构,特点是先进先出,内部元素排队, 可以理解成一个特殊的list, 但它是线程安全的

编程里常会出现消息队列的概念,常见的消息队列产品有kafka, rabitMQ, RocketMQ等,同类产品很多 可以把消息队列理解成扩展功能的queue

  • q = queue.Queue(maxsize = 10)
    • 创建一个“队列”对象
    • queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。
    • 可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
  • q.put(10)
    • 将一个值放入队列中
    • 调用队列对象的put()方法在队尾插入一个项目。
    • put()有两个参数,
      • 第一个item为必需的,为插入项目的值
      • 第二个block为可选参数,默认为1。
      • 如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。
      • 如果block为0,put方法将引发Full异常。
  • q.get()
    • 将一个值从队列中取出
    • 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。
    • 如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。
    • 如果队列为空且block为False,队列将引发Empty异常。
  • q.qsize(): #返回队列的大小
  • q.empty(): #如果队列为空,返回True,反之False
  • q.full(): #如果队列满了,返回True,反之False
  • q.get([block[, timeout]]): #获取队列,timeout等待时间
  • q.get_nowait(): #相当q.get(False)非阻塞
  • q.put(item): # 写入队列,timeout等待时间
  • q.put_nowait(item): #相当于q.put(item, False)
  • q.task_done(): #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
  • q.join(): #实际上意味着等到队列为空,再执行别的操作

Queue具体使用又根据特性分成以下三个类型, 只有存取特性区分,功能和方法基本一致:

  • Queue: 普通队列, 具有FIFO特性
  • LifoQueue: 跟普通Queue相反,
  • PriorityQueue: 进出规则根据放入内容权重来

普通的queue(FIFO)

import queue
q = queue.Queue()
q.put(1)
q.put("he")
q.put({"name":"chouzi"})
 
while True:
    data = q.get()
    print(data)
    print("$$$$$$$$$$$$$$$$$")

LifoQueue

import queue
q = queue.LifoQueue()
q.put(1)
q.put("he")
q.put({"name":"chouzi"})
 
while True:
    data = q.get()
    print(data)
    print("$$$$$$$$$$$$$$$$$")

PriorityQueue

import queue
q = queue.PriorityQueue()
q.put([2,1])
q.put([1,"he"])
q.put([3,{"name":"chouzi"}])
 
while True:
    data = q.get()
    print(data[1])
    print(data[0])
    print("$$$$$$$$$$$$$$$$$")

threading.Timer

  • Timer是利用多线程,在指定时间后启动一个功能

下面代码我们定义一个多线程, 但需要一定时间后多线程才启动,而不是 我们执行start的时候直接开始, 此时就用到了Timer功能,

请看下面案例:


import threading
import time

def func():
  print("I am running.........")
  time.sleep(4)
  print("I am done......")



if __name__ == "__main__":
  t = threading.Timer(6, func)
  t.start()

  i = 0
  while True:
    print("{0}***************".format(i))
    time.sleep(3)
    i += 1

subprocess

对于多线程, 我们有一些其他的替代方案, 常见的有subprocessmultiprocess

如果需要执行其他的程序并获得结果,可能直接使用os模块来完成:

os模块关于调用其他命令/程序的处理

os模块中有些功能可以调用其他的程序或者命令, 然后读取结果.

  • os.system: 执行某一个命令, 结果直接显示出来

    
    import os
    
    os.system("ls -l ")
    

    调用这两行代码运行结果如下:

    /Users/bbb/anaconda3/bin/python3 /Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 54534 --file /Users/bbb/baoshu/book/python/new_python/code/log/02.py
    Connected to pydev debugger (build 211.7142.13)
    total 40
    -rwxr-xr-x  1 bbb  staff   706 Feb 11 14:36 01.py
    -rwxr-xr-x  1 bbb  staff    36 Jun  4 18:52 02.py
    -rwxr-xr-x  1 bbb  staff  1891 Feb 11 14:36 03.py
    -rwxr-xr-x  1 bbb  staff     0 Feb 11 14:36 __init__.py
    -rw-r--r--  1 bbb  staff   248 Jun  2 22:47 all.log
    -rw-r--r--  1 bbb  staff   128 Jun  2 22:47 error.log
    -rwxr-xr-x  1 bbb  staff     0 Feb 11 14:36 note.md
    
    Process finished with exit code 0
    
    
  • os.popen: 跟上面命令类似,但命令执行结果需要读取出来才会显示, 作为函数执行结果返回

    import os
    
    res = os.popen("ls -l ")
    
    print(res.read())
    
    

    执行结果需要读取出来,否则看不到结果.

  • os.spawn*

    使用操作系统的fork函数, 直接启动外部命令运行,但根据函数的不同使用时稍微还有区别.

  • spawn* 可执行可执行文件和shell, system只能执行shell命令

subprocess

subprocess允许我们启动一个新进程, 并接管新进程的输入/输出/错误管道,并且获得返回值.

他是os.fork()os.execve() 的封装。 他启动的进程不会把父进程的模块加载一遍

  • 完全跳过线程,使用进程
  • 是派生进程的主要替代方案
  • 使用subprocess的通信机制比较少,可以使用管道或者信号机制.
  • python>=2.4 功能上用以替代上面介绍的os相关内容.

常用功能有:

  • run
  • call
  • Popen函数/类

下面我们分别介绍:

run

run函数用来调用外部命令/程序, 然后返回执行结果, 内部是Popen类实现.

  • Python >= 3.5
# 接口
def run(*popenargs,
        input=None, capture_output=False, timeout=None, check=False, **kwargs)
# popenrgs:表示要执行的命令
# timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并弹出 TimeoutExpired 异常。
# check:如果该参数设置为 True,并且进程退出状态码不是 0,则弹 出 CalledProcessError 异常。
# encoding: 如果指定了该参数,则 stdin、stdout 和 stderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
# 返回: CompletedProcess实例

下面代码中利用run来运行一个shell命令, 通过返回结果读取不同的运行结果:

import subprocess

rst = subprocess.run(["ls", "-l", "/dev/null"])

print(rst.returncode)
print(rst.stdout)

call

本身也是Popen类的实现, 阻塞进程,等待执行完毕后返回结果.

源代码为:


def call(*popenargs, timeout=None, **kwargs):
  """Run command with arguments.  Wait for command to complete or
  timeout, then return the returncode attribute.

  The arguments are the same as for the Popen constructor.  Example:

  retcode = call(["ls", "-l"])
  """

其他调用函数

-check_call: 执行命令,返回结果和状态,正常为0 ,执行错误则抛出异常

  • getstatusoutput: 返回 一个元组形式的结果,第一个元素是命令执行状态,第二个为执行结果
  • getoutput: 接受字符串形式的命令,放回执行结果
  • check_output: 执行命令,返回执行的结果,而不是打印

‘Popen’类

类构造函数

这个构造函数参数比较多,函数签名如下:


def __init__(self, args, bufsize=-1, executable=None,
             stdin=None, stdout=None, stderr=None,
             preexec_fn=None, close_fds=True,
             shell=False, cwd=None, env=None, universal_newlines=None,
             startupinfo=None, creationflags=0,
             restore_signals=True, start_new_session=False,
             pass_fds=(), *, encoding=None, errors=None, text=None)

# args: args应当是一个程序参数的序列或者是一个单独的字符串或 path-like object。 
# 默认情况下,如果 args 是序列则要运行的程序为 args 中的第一项。 如果 
# args 是字符串,则其解读依赖于具体平台

# bufsize: bufsize将在 open() 函数创建了 stdin/stdout/stderr 管道文件对象时作为对应的参数供应
# 0 表示不使用缓冲区 (读取与写入是一个系统调用并且可以返回短内容)
# 1 表示行缓冲(只有 universal_newlines=True 时才有用,例如,在文本模式中)
# 任何其他正值表示使用一个约为对应大小的缓冲区
# 负的 bufsize (默认)表示使用系统默认的 io.DEFAULT_BUFFER_SIZE。

# executable 参数指定一个要执行的替换程序。这很少需要。
# 当 shell=True, executable 替换 args 指定运行的程序。但是,原始的 args 仍然被传递给程序。
# 大多数程序将被 args 指定的程序作为命令名对待,这可以与实际运行的程序不同。
# 在 POSIX, args 名作为实际调用程序中可执行文件的显示名称,例如 ps。如果 shell=True,
# 在 POSIX, executable 参数指定用于替换默认 shell /bin/sh 的 shell。

# stdin, stdout 和 stderr: 分别指定被运行的程序的标准输入、输出和标准错误的文件句柄。
# 合法的值有 PIPE , DEVNULL , 一个存在的文件描述符(一个正整数),
# 一个存在的文件对象以及 None。 
# PIPE 表示应创建一个新的对子进程的管道。 DEVNULL 表示使用特殊的 os.devnull 文件。
# 使用默认的 None,则不进行成定向;子进程的文件流将继承自父进程。
# stderr 可设为 STDOUT,表示应用程序的标准错误数据应和标准输出一同捕获。

# preexec_fn:如果 preexec_fn 被设为一个可调用对象,此对象将在子进程刚创建时被调用。(仅 POSIX)
# 警告 preexec_fn 形参在应用程序中存在多线程时是不安全的。子进程在调用前可能死锁。如果你必须使用它,保持警惕!最小化你调用的库的数量。
# Python3.8 版更改: preexec_fn 形参在子解释器中已不再受支持。 在子解释器中使用此形参将引发 RuntimeError。 
# 这个新限制可能会影响部署在 mod_wsgi, uWSGI 和其他 嵌入式环境中的应用。

# close_fds: 如果 close_fds 为真,所有文件描述符除了 0, 1, 2 之外都会在子进程执行前关闭。
# 而当 close_fds 为假时,文件描述符遵守它们继承的标志,如文件描述符的继承所述。
# 在 Windows,如果 close_fds 为真, 则子进程不会继承任何句柄,除非在 STARTUPINFO.IpAttributeList 的 handle_list 的键中显式传递,或者通过标准句柄重定向传递。

# pass_fds: 是一个可选的在父子进程间保持打开的文件描述符序列。提供任何 pass_fds 将强制 close_fds 为 True。(仅 POSIX)

# cwd: 如果 cwd 不为 None,此函数在执行子进程前会将当前工作目录改为 cwd。 
# cwd 可以是一个字符串、字节串或路径类对象 
# 特别地,当可执行文件的路径为相对路径时,此函数会相对于*cwd* 来查找 executable (或 args 中的第一个条目)。

# restore_signals: 如果restore_signals 为 true(默认值),则 Python 设置为 SIG_IGN 的所有信号将在 exec 之前的子进程中恢复为 SIG_DFL。
# 目前,这包括 SIGPIPE, SIGXFZ 和 SIGXFSZ 信号。 (仅 POSIX)

#start_new_session: 如果 start_new_session 为 true,则 setsid() 系统调用将在子进程执行之前被执行。(仅 POSIX)

# grop:如果 group 不为 None,则 setregid() 系统调用将于子进程执行之前在下级进程中进行。
# 如果所提供的值为一个字符串,将通过 grp.getgrnam() 来查找它,并将使用 gr_gid 中的值。 
# 如果该值为一个整数,它将被原样传递。 (POSIX 专属)

# extra_groups:如果 extra_groups 不为 None,则 setgroups() 系统调用将于子进程之前在下级进程中进行。
# 在 extra_groups 中提供的字符串将通过 grp.getgrnam() 来查找,并将使用 gr_gid 中的值。 整数值将被原样传递。 (POSIX 专属)

# user:如果 user 不为 None,则 setreuid() 系统调用将于子进程执行之前在下级进程中进行。 
# 如果所提供的值为一个字符串,将通过 pwd.getpwnam() 来查找它,并将使用 pw_uid 中的值。 
# 如果该值为一个整数,它将被原样传递。 (POSIX 专属)


# umask:如果 umask 不为负值,则 umask() 系统调用将在子进程执行之前在下级进程中进行。(POSIX专属)

#env: 如果 env 不为 None,则必须为一个为新进程定义了环境变量的字典;这些用于替换继承的当前进程环境的默认行为。
# 注解:如果指定, env 必须提供所有被子进程需求的变量。
# 在 Windows,为了运行一个 side-by-side assembly ,指定的 env 必须 包含一个有效的 SystemRoot。

# encoding/errors/text:如果 encoding 或 errors 被指定,或者 text 为 true,则文件对象 stdin, stdout 和 stderr 将会以指定的编码和 errors 以文本模式打开,
# 如同常用参数所述。 

# universal_newlines 参数等同于 text 并且提供向后兼容性。默认情况下,文件对象都以二进制模式打开。

# startupinfo: 如果给出,startupinfo 将是一个将被传递给底层的 CreateProcess 函数的 STARTUPINFO 对象。
PIPE

在对subprcess的使用中可能会用到PIPE, 可以把PIPE当做一个特殊的缓冲区, 输入输出都经过这个缓存进行输送, 我们在需要的时候对这个缓冲读取或者写入操作即可.

参加下面案例,对管道PIPE的使用:

>>import subprocess
>>rst = subprocess.Popen("ls -l", shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
>>print(rst.stderr)
>><_io.BufferedReader name=12>
>>print(rst.stderr.read())
>>b''
>>print(rst.stdout.read())
>>b'total 24\ndrwxr-xr-x  28 bbb  staff    896 May 13 14:23 book\ndrwxr-xr-x   7 bbb  staff    224 May 29 22:40 qqbot\ndrwxr-xr-x  10 bbb  staff    320 May 30 11:31 tel\n-rw-r--r--   1 bbb  staff  12105 May 17 13:39 tel.zip\ndrwxr-xr-x   7 bbb  staff    224 May 26 14:57 tk\ndrwxr-xr-x   3 bbb  staff     96 May 29 10:32 yt\n'

poll函数

用来查询执行结果, 如果执行完毕则返回0, 否则是None:


import subprocess
import time

rst = subprocess.Popen("sleep 5;echo 'hello'",shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

while True:
  r = rst.poll()
  if r is None:
    print("也没个信儿")
  else:
    print("宝儿, 来信了, {}....".format(r))

  time.sleep(1)

上面执行结果是:

也没个信儿
也没个信儿
也没个信儿
也没个信儿
也没个信儿
宝儿, 来信了, 0....
宝儿, 来信了, 0....
宝儿, 来信了, 0....
宝儿, 来信了, 0....
宝儿, 来信了, 0....
宝儿, 来信了, 0....
宝儿, 来信了, 0....
wait

等待执行结果,阻塞式等待:


import subprocess
import time


print("Starting......{}".format(time.time()))
rst = subprocess.Popen("sleep 5;echo 'hello'",shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

r = rst.wait()

print(r)
print("ending......{}".format(time.time()))

上面执行结果会显示出当前时间戳, 两次打印相差5秒, 说明是阻塞了程序执行.

terminate

强行终止进程, 向进程发送结束信号SIGTERM(POSIX), 在 Windows 上则会调用 Win32 API 函数 TerminateProcess来停止子进程

下面案例用terminate结束进程后用wait拿结果,显示非正常死亡信号.


import subprocess
import time

print("Starting......{}".format(time.time()))
rst = subprocess.Popen("sleep 5;echo 'hello'",shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

rst.terminate()
r = rst.wait()

print(r)
print("ending......{}".format(time.time()))

执行结果如下, 进程执行结果是-15:

Starting......1654482969.985994
-15
ending......1654482969.992843

kill

强行终止进程, 向进程发送结束信号SIGKILL(POSIX), 在 Windows 上跟terminate一样.

communicate

与进程交互:

  • 将数据发送到 stdin
  • stdoutstderr 读取数据,直到抵达文件结尾
  • 等待进程终止并设置retuencode属性
  • input 参数应为要发送到下级进程的数据
  • 返回一个(stdout_data,stderr_data) 元组

  • 要向进程的stdin传输数据,需要通过stdin=PIPE创建此Popen 对象
  • 要从结果元组获取任何非None 值,你同样需要设置 stdout=PIPE或者 stderr=PIPE

  • 如果进程在 timeout 秒后未终止,一个TimeoutExpired异常将被抛
  • 如果超时到期,子进程不会被杀死,所以为了正确清理一个行为良好的应用程序应该杀死子进程并完成通讯。

正确用法参看下面案例:

p = subprocess.Popen(...)
try:
    outs, errs = p.communicate(timeout=15)
except TimeoutExpired:
    p.kill()
    outs, errs = p.communicate()

关于communicate另一个案例:


import subprocess
import time

p = subprocess.Popen("ifconfig",shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
rst = p.communicate()

print('STDIN: ', rst[0])

如果频繁的跟子进程通讯, 则不能使用communicate, 因为这个函数使用通道后 会自动关闭通道, 建议的方式是:

p= subprocess.Popen(command, stdin=subprocess.PIPE,stdout=subprocess.PIPE,shell=True)
p.stdin.write(some_info_to_input)
p.stdin.flush()
#......do something
try:
  #......do something
  p.stdout.readline()
  #......do something
except:
  print('IOError')
#......do something more
p.stdin.write(some_info_to_input)
p.stdin.flush()
#......do something more
Popen的属性
  • Popen.args: 参数, 传递一个程序参数的序列或者一个简单字符串

  • Popen.stdin:
    • 如果 stdin 参数为PIPE,此属性是一个类似 open()返回的可写的流对象。
    • 如果 encodingerrors 参数被指定或者 universal_newlines 参数为True,则此流是一个文本流,否则是字节流
    • 如果 stdin 参数非PIPE, 此属性为 None
  • Popen.stdout: 参考Popen.stdin
  • Popen.stderr: 参考Popen.stdin

警告: 尽量使用communicate()而非 stdin.writestdout.read 或者 stderr.read, 否则由于任意其他 OS 管道被子进程填满阻塞可能会死锁

Popen.pid: 子进程的进程号, 如果设置了shell=True,则这是生成的子 shell 的进程号

  • Popen.returncode: 进程的退出码, None表示此进程仍未结束

一个负值-N表示子进程被信号 N中断 (仅 POSIX)

subprocess常见坑

使用subprocess模块需要特别注意死锁问题, 官方也一直强调会有死锁问题

旧版本的文档明确说明会引发死锁问题, 但我的版本(3.7)中删除了这个说明,不知道是否做了改进, 尽量避免吧

  • 坑1-死锁的原因:

    管道的大小是有所限制的,当子进程一直向 stdout 管道写数据且写满的时候,子进程将发生 I/O 阻塞

    而此时父进程只是干等子进程退出,也处于阻塞状态;

    import subprocess
    
    child = subprocess.Popen(['./B'], stdout=subprocess.PIPE)
    child.wait()
    print(child.stdout)
    
  • 坑2:

    这是官方推荐的方式,但是也有坑 communicate 其实是循环读取管道中的数据(每次32768字节(32KB))并将其存在一个 list 里面, 到最后将 list 中的所有数据连接起来 b''.join(list) 返回给用户

    但如果子进程输出内容非常多甚至无限输出,则机器内存会被撑爆

    import subprocess
    child = subprocess.Popen(['./B'], stdout=subprocess.PIPE)
    stdout, stderr = child.communicate()
    
  • 比较好的方式:

    如果不可预测子进程是否会产生大量输出, 可以参考下面案例,

    当子进程的可能输出非常大的时候,直接将stdoutstderr 重定向到文件,避免了撑爆内存的问题

    outputfile = r'test.txt'
    with open(outputfile,mode='a+') as f:
      current_process = subprocess.Popen(cmd,stdout=f)
      current_process.wait()
    
    

multiprocessiong

  • 使用threadiing借口派生,使用子进程
  • 允许为多核或者多cpu派生进程,接口跟threading非常相似
  • python>=2.6

下面代码利用多进程调用函数:


import multiprocessing
from time import sleep, ctime


def clock(interval):
  while True:
    print("The time is %s" % ctime())
    sleep(interval)



if __name__ == '__main__':
  p = multiprocessing.Process(target = clock, args = (5,))
  p.start()

  while True:
    print('sleeping.......')
    sleep(1)

多进程的函数实现, 注意实现方法,跟threading类似, 实现run函数后自动会自动调用这个函数:


import multiprocessing
from time import sleep, ctime


class ClockProcess(multiprocessing.Process):
  '''
  两个函数比较重要
  1. init构造函数
  2. run
  '''

  def __init__(self, interval):
    super().__init__()
    self.interval = interval

  def run(self):
    while True:
      print("The time is %s" % ctime())
      sleep(self.interval)


if __name__ == '__main__':
  p = ClockProcess(3)
  p.start()

  while True:
    print('sleeping.......')
    sleep(1)

另一个案例:


from multiprocessing import Process
import os


def info(title):
  print(title)
  print('module name:', __name__)
  # 得到父亲进程的id
  print('parent process:', os.getppid())
  # 得到本身进程的id
  print('process id:', os.getpid())



def f(name):
  info('function f')
  print('hello', name)

if __name__ == '__main__':
  info('main line')
  p = Process(target=f, args=('bob',))
  p.start()
  p.join()

进程通讯

多线程可以使用共享资源, 但需要注意资源冲突, 但多进程直接的相互通讯就不择一些,因为进程的 隔离度比线程更高.

进程之间通讯可以使用过:

  • Queue
  • Pip
  • Manager

    进程通讯-Queue

import queue,time
import multiprocessing
def foo(q):
    time.sleep(3)
    print("son process",id(q))
    q.put(123)
    q.put("dazui")
if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=foo,args=(q,))
    p.start()
    p.join()
    print("main process",id(q))
    print(q.get())
    print(q.get())

进程通讯-Pip

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()recv()方法(等等),如果两个进程(或线程) 尝试同时读取或写入管道的同一端,管道中的数据可能会损坏

from multiprocessing import Process,Pipe
def func(conn):
    conn.send([111,{"name":"dazui"},"hello world!"])
    response = conn.recv()
    print("response:",response)
    conn.close()
    print("q_ID2:",id(conn))
 
if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    print("q_ID1",id(child_conn))
    p = Process(target=func,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    parent_conn.send("hello son!")
    p.join()

近程通讯-Manager

Manager负责共享变量的管理

from multiprocessing import Process,Manager
 
def func(d,l,n):
    d[n] = ""
    d[''] = 2
    l.append(n)
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        p_list = []
 
        for i in range(10):
            p = Process(target=func,args=(d,l,i))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

生产者消费者模型

这是个模型, 是计算机常用模型之一,一般用来模拟对消息/命令等的处理

加入你在一家公司, 有8个老板, 每个老板都可以给你们部门发布命令, 你们部门的任务是按时完成老板的指令, 此时8个老板不定时给你们任务,有可能会出现任务来了你们都在干活没时间搭理他的情况, 怎么办,我们让老板 把命令写成小纸条, 老板来了只是把小纸条一次放入你们指定的盒子就好,你们谁有时间就从命令盒子里拿出一个命令 执行,这样大家都能有条不紊的 干活了,这个就是生产者消费者模型, 老板是(任务)的生产者, 你们是(任务)的消费者.

实现这个模型关键是实现那个盛放命令的人盒子, 这个盒子一般具有先来的任务先处理的特性(FIFO), 这个我们案例中用 queue来实现, 在工业产品中用消息队列就好

看一个生产者消费者的模拟:


import threading
import time

# Python2
# from Queue import Queue

# Python3
import queue

# 模拟生产者
class Producer(threading.Thread):
  '''
  模拟生产者, 即不断往任务盒子里放命令
  '''
  def run(self):
    global queue
    count = 0
    while True:
      # qsize返回queue内容长度
      if queue.qsize() < 1000:
        for i in range(100):
          count = count +1
          msg = '生成产品'+str(count)
          # put是网queue中放入一个值
          queue.put(msg)
          print(msg)
      time.sleep(0.5)


class Consumer(threading.Thread):
  '''
  消费者
  即不断从任务盒子拿出任务并执行
  '''
  def run(self):
    global queue
    while True:
      if queue.qsize() > 100:
        for i in range(3):
          # get是从queue中取出一个值
          msg = self.name + '消费了 '+queue.get()
          print(msg)
      time.sleep(1)


if __name__ == '__main__':
  queue = queue.Queue()

  for i in range(500):
    queue.put('初始产品'+str(i))
  for i in range(2):
    p = Producer()
    p.start()
  for i in range(5):
    c = Consumer()
    c.start()

上面代码请仔细观察结果和代码进行对比理解.

利用多进程实现模型


import multiprocessing
from time import ctime


def consumer(input_q):
  print("Into consumer:", ctime())
  while True:
    # 处理项
    item = input_q.get()
    print ("pull", item, "out of q") # 此处替换为有用的工作
    input_q.task_done() # 发出信号通知任务完成
  print ("Out of consumer:", ctime()) ##此句未执行,因为q.join()收集到四个task_done()信号后,主进程启动,未等到print此句完成,程序就结束了


def producer(sequence, output_q):
  print ("Into procuder:", ctime())
  for item in sequence:
    output_q.put(item)
    print ("put", item, "into q")
  print ("Out of procuder:", ctime())


# 建立进程
if __name__ == '__main__':
  q = multiprocessing.JoinableQueue()
  # 运行消费者进程
  cons_p = multiprocessing.Process (target = consumer, args = (q,))
  cons_p.daemon = True
  cons_p.start()

  # 生产多个项,sequence代表要发送给消费者的项序列
  # 在实践中,这可能是生成器的输出或通过一些其他方式生产出来
  sequence = [1,2,3,4]
  producer(sequence, q)
  # 等待所有项被处理
  q.join()

模型设置哨兵用来进行特殊提示

哨兵就是某一种特殊类型/值的变量, 拿到这个变量或者值后就代表某种意思,比如循环结束

本质上哨兵是为了让队列两端的对象进行简单通信.

下面案例哨兵就是None的值


import multiprocessing
from time import ctime

# 设置哨兵问题
def consumer(input_q):
  print("Into consumer:", ctime())
  while True:
    item = input_q.get()
    if item is None:
      break
    print("pull", item, "out of q")
  print ("Out of consumer:", ctime()) ## 此句执行完成,再转入主进程


def producer(sequence, output_q):
  print ("Into procuder:", ctime())
  for item in sequence:
    output_q.put(item)
    print ("put", item, "into q")
  print ("Out of procuder:", ctime())

if __name__ == '__main__':
  q = multiprocessing.Queue()
  cons_p = multiprocessing.Process(target = consumer, args = (q,))
  cons_p.start()

  sequence = [1,2,3,4]
  producer(sequence, q)

  q.put(None)
  cons_p.join()

使用JoinableQueue

为了生产者消费者做到通信, 使用Queue的话使用哨兵, 但如果 使用JoinableQueue的话跟简介, JoinableQueue具有函数 task_done, 用来每个任务完成后通知队列.


import multiprocessing
from time import ctime


def consumer(input_q):
  print("Into consumer:", ctime())
  while True:
    # 处理项
    item = input_q.get()
    print ("pull", item, "out of q") # 此处替换为有用的工作
    input_q.task_done() # 发出信号通知任务完成
  print ("Out of consumer:", ctime()) ##此句未执行,因为q.join()收集到四个task_done()信号后,主进程启动,未等到print此句完成,程序就结束了


def producer(sequence, output_q):
  print ("Into procuder:", ctime())
  for item in sequence:
    output_q.put(item)
    print ("put", item, "into q")
  print ("Out of procuder:", ctime())


# 建立进程
if __name__ == '__main__':
  q = multiprocessing.JoinableQueue()
  # 运行消费者进程
  cons_p = multiprocessing.Process (target = consumer, args = (q,))
  cons_p.daemon = True
  cons_p.start()

  # 生产多个项,sequence代表要发送给消费者的项序列
  # 在实践中,这可能是生成器的输出或通过一些其他方式生产出来
  sequence = [1,2,3,4]
  producer(sequence, q)
  # 等待所有项被处理
  q.join()

线程池/进程池

创建线程/进程会消耗资源, 但创建多线程多进程的目的是为了提高效率, 这样就有点矛盾, 对于实际任务很’小’, 一执行就完, 但数量很多的情况, 我们需要频繁创建线程/进程, 这样显得就不太划算.

一张解决方案就是线程池/进程池, 即我提前创建固定数量的线程/进程, 此时多线程/进程只是不断的执行任务, 执行完任务后并不会 被销毁, 而是家在任务继续执行, 这样就可以尽可能降低了创建和销毁的开销.

multiprocessing.Pool

multiprocessing提供的进程池, 用来创建多进程.


def __init__(self, processes=None, initializer=None, initargs=(),
             maxtasksperchild=None, context=None):
# processes:进程数量,如果 processes 是 None那么使用 os.cpu_count()返回的数量
# initializer: 如果 initializer不是 None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
# maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是None,意味着只要Pool存在工作进程就会一直存活
# context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

利用进程池对象直接创建进程池即可, 池内有一定数量的闲散进程, 等待分配任务, 下面就可以进行分配任务, 池内进程在得到任务后 就可以自动执行并得到结果:

  • apply()
    • 同步阻塞执行,上一个子进程结束后才能进行下一个子进程
    • apply(func, args=(), kwds={}, callback=None, error_callback=None)
  • apply_async()
    • 异步非阻塞执行,每个子进程都是异步执行的(并行)
    • apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
  • map()
    • 同步阻塞,返回值是list类型
    • map(func, iterable, chunksize=None)
  • map_async():异步非阻塞
    • map_async(func, iterable, chunksize=None, callback=None, error_callback=None) - 返回 MapResult 实例,使用 get() 方法,获取结果(list 方法)
  • imap():内存不够用可以采用此种方式,速度慢于 map()
  • imap_unorderedimap 的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例

下面案例展示了一个进程池, 并用来计算1000次数字平方操作, 操作完后执行回调函数 保存结果:

import multiprocessing

def cb(rst):
  with open("liudana.txt", "a+", encoding="utf-8") as f:
    f.write(str(rst) + "\n")

def run(n):
  return n ** 2

if __name__ == '__main__':
  pool = multiprocessing.Pool(6)
  for i in range(1000):
    pool.apply_async(run, args=(i,), callback=cb)
    # # 如有多个参数,可传一个 iterable
    # pool.apply_async(run, args=([i, 123, 456]), callback=callback)
  pool.close()
  pool.join()

下面是上面代码的改版, 使用了map_async:

import multiprocessing
import time

def run(n):
  return n ** 2

data = []

def cb(rst):
  data.append(rst)

if __name__ == '__main__':
  st = time.time()
  pool = multiprocessing.Pool(6)

  # # 总耗时:0.76s
  # f= pool.map_async(run, range(20000))
  # print(f.get())

  # 总耗时:1.98s
  for i in range(20000):
    pool.apply_async(run, args=(i,), callback=cb)

  pool.close()
  pool.join()

  print(data)

  print(f"总耗时:{time.time() - st}")

‘concurrent.futures’

concurrent.futurespython3.2 中引入的新模块, 它为异步执行可调用对象提供了高层接口,分为两类:

  • ThreadPoolExecutor:多线程编程
  • ProcessPoolExecutor:多进程编程

两者使用接口基本一致,我们以线程为例讲解.

使用思路大致是:

  1. 创建ProcessPoolExecutor
  2. submiut或类似指令提交任务
  3. 利用result收集结果

使用上主要关注两个类:

  • Executor: 负责管理进程,执行任务
  • Future: 管理任务结果

` concurrent.futures.Executor`

主要的工作函数有:

  • submit:
    • 调用对象执行fn(*args, **kwargs)
    • 返回Future 对象,可用 future.result() 获取执行结果
    • submit(fn, *args, **kwargs)
  • map:
    • 异步执行 func,并支持多次并发调用,返回一个迭代器
    • timeout 如果设置为 None 或者不指定,则不限制等待时间
    • ProcessPoolExecutor 这个方法将 iterables 划分为多块,作为独立的任务提交到进程池显著提升性能
    • map(func, *iterables, timeout=None, chunksize=1)
  • shutdown:
    • 告诉当执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源
    • wait=True 会等待所有 future 执行完毕,且 executor 的资源都释放完会才会返回
    • wait=False 会立即返回,executor 的资源会在 future 执行完后释放
    • shutdown(wait=True)

参看下面案例:


from concurrent.futures import ProcessPoolExecutor, as_completed

def fib(n):
  if n <= 2:
    return 1
  return fib(n - 1) + fib(n - 2)


if __name__ == '__main__':
  n = range(20)
  with ProcessPoolExecutor(max_workers=3) as executor:
    # map 方式
    for num, result in zip(n, executor.map(fib, n)):
      print(f"fib({num})={result}")

    # # submit 方式
    # work_dict = {executor.submit(fib, i): i for i in n}
    # for future in as_completed(work_dict):
    #     num = work_dict[future]
    #     try:
    #         data = future.result()
    #     except Exception as e:
    #         print(e)
    #     else:
    #         print(f"fib({num}) = {data}")

上面代码函数as_completed是等待任务执行完毕.

Future

程序将任务通过submit方法提交给线程池之后,线程池会返回一个Future对象,该对象的作用主要是用于获取线程任务函数的返回值

Future的功能有:

  • cancel()
    • 取消该Future代表的线程任务
    • 如果该任务正在执行,不可取消,则该方法返回False
    • 程序会取消该任务,并返回True
  • result(timeout=None):
    • 获取该 Future 代表的线程任务最后返回的结果
    • 如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程
    • 其中 timeout 参数指定最多阻塞多少秒
  • add_done_callback(fn)
    • 为该 Future 代表的线程任务注册一个“回调函数”
  • done():如果该Future代表的线程任务被成功取消或执行完成,则该方法返回True

举个例子:


from concurrent.futures import ProcessPoolExecutor
import multiprocessing

import threading
import time


def async_add(n):
  s = 0
  for i in range(n):
    s = s + i
  time.sleep(1)

  print(multiprocessing.current_process().name + "执行求和操作求得的和是=" + str(s))
  return s


pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(async_add, 20)
future2 = pool.submit(async_add, 50)

# 判断future1代表的任务是否执行完
time.sleep(2)
print(future1.done())
print(future2.done())
# 查看future1代表的任务返回的结果
print('一的结果是=' + str(future1.result()))
# 查看future2代表的任务的返回结果
print('二的结果是=' + str(future2.result()))

print("----" + threading.current_thread().name + "----主线程执行结束-----")

上面代码执行结果如下:

ForkProcess-1执行求和操作求得的和是=190
ForkProcess-2执行求和操作求得的和是=1225
True
True
一的结果是=190
二的结果是=1225
----MainThread----主线程执行结束-----

上面代码会得到正确结果因为result函数是阻塞的, 所以会有结果, 如果去掉这个 函数, 结果不等出来就执行完毕了, 参考下面代码, 等待时间也做了修改:


from concurrent.futures import ProcessPoolExecutor
import multiprocessing

import threading
import time


def async_add(n):
  s = 0
  for i in range(n):
    s = s + i
  time.sleep(3)

  print(multiprocessing.current_process().name + "执行求和操作求得的和是=" + str(s))
  return s


pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(async_add, 20)
future2 = pool.submit(async_add, 50)

# 判断future1代表的任务是否执行完
time.sleep(1)
print(future1.done())
print(future2.done())

print("----" + threading.current_thread().name + "----主线程执行结束-----")

如果需要非阻塞的方式运行代码, 可以尝试添加回到函数, 执行任务完毕后自动调用回调 函数收集结果:


from concurrent.futures import ProcessPoolExecutor
import multiprocessing

import threading
import time


def async_add(n):
  s = 0
  for i in range(n):
    s = s + i
  time.sleep(3)

  print(multiprocessing.current_process().name + "执行求和操作求得的和是=" + str(s))
  return s


with ProcessPoolExecutor(max_workers=2) as pool:
  # 向线程池提交一个task
  future1 = pool.submit(async_add, 20)
  future2 = pool.submit(async_add, 50)


  # 定义获取结果的函数
  def get_result(future):
    print(multiprocessing.current_process().name + '运行结果:' + str(future.result()))


  # 查看future1代表的任务返回的结果
  future1.add_done_callback(get_result)
  # 查看future2代表的任务的返回结果
  future2.add_done_callback(get_result)
  print('------------主线程执行结束----')

上面代码执行完毕后结果如下:

------------主线程执行结束----
ForkProcess-1执行求和操作求得的和是=190
MainProcess运行结果190
ForkProcess-2执行求和操作求得的和是=1225
MainProcess运行结果1225