The Road to Python 2 Multi Threading

Summary about Python Multi Threading

Posted by Huajian on August 23, 2022
Viewed times

Reference - basic concept, join, setDaemon

Reference - lock & Rlock

Reference - Condition

Reference - Event

Reference - Timer

Reference - Local

序言

这篇文章是对Reference系列文章的个人理解与总结

基本概念

  • 线程的状态分为以下几种
    • Runnable/Ready
    • Running
    • Blocked
      • Waiting 线程主动调用 wait() 方法,等待来自其他线程的 notify() 唤醒
      • Locked
      • Sleeping 线程主动调用sleep()join() 让自身block

来自 Reference - basic concept, join, setDaemon 的参考图 800

  • 开线程:
import threading
import time

#定义线程需要做的内容,写在函数里面
def target():
    print('当前的线程%s 在运行' % threading.current_thread().name)
    time.sleep(1)
    print('当前的线程 %s 结束' % threading.current_thread().name)

print('当前的线程 %s 在运行' % threading.current_thread().name)
t = threading.Thread(target=target,args = [])
 
t.start()  #线程启动
print('当前的线程 %s 结束' % threading.current_thread().name)

其中最重要的是:

t = threading.Thread(): 声明线程
t.start() 启动线程

join()

  • 假设我们有两个线程: 主线程和子线程。通常是子线程使用join(),使得主线程必须等待子线程完成后才可以继续。程序拓展为:
    t = threading.Thread() 声明线程
    t.start() 启动线程
    t.join(Timeout) 阻塞主线程。如设置Timeout,则只阻塞这么久

setDaemon(True)

  • 设置在线程启动前。当没有join() 时,setDaemon(True) 可以保证主线程结束时程序不会退出,直到子线程结束。程序拓展为:
    t = threading.Thread(): 声明线程
    t.setDaemon(True) 守护线程
    t.start() 启动线程

lock & Rlock

  • 互斥锁,用于解决多线程访问共享变量的问题

  • 声明锁。
    lock = threading.Lock()
    lock = threading.RLock()

  • 获得锁。使用时线程首先blocked,直到获得锁/Timeout时返回是否获得锁,如获得,则运行
    lock.acquire(Timeout)

  • 释放锁。线程结束时将锁释出
    lock.release()

  • lock 与 Rlock的区别:

    • 如果对lock连续使用release() 会出现死锁问题
    • Rlock要求acquire()release() 成对出现,较为常用

Condition

  • 是一个条件变量。
    通过设置的条件来控制线程的激活。
    Condition的对象维护了一个锁 + 一个wait池

  • 声明Condition对象 con = threading.Condition()

  • wait()方法 调用wait() 时,线程blocked并进入wait池

  • notify()方法 调用notify() 时,Condition对象选择wait池中的一个线程,让其调用acquire() 尝试获取锁

  • 典型的例子

    • 生产者消费者问题 Reference - Condition
    • 强化学习action与observation的交互
      • 在每一步 step() 中: Agent首先publish一个action到Simulator,调用wait() 暂时block自己
      • Agent的Subscriber收到来自Simulator的message,在Callback中获取observation,调用notify() 重新激活 step() 线程
      • step() 完成全部操作,返回observation得到的观测数据

Event

  • 通过一个 Flag 控制线程的状态

  • 声明Event对象
    event = threading.Event()

  • wait(timeout)
    t = threading.Thread(): 声明线程
    event.wait(Timeout) block自己,等待被唤醒

  • set(); isSet(); clear()
    event.set()Flag改为True,唤醒所有在等待的线程 event.isSet()检查标志状态 event.clear()Flag重置为False

Timer

  • 声明timer对象
    • timer = threading.Timer(interval, function, args=[ ], kwargs={ })
    • interval: 指定的时间,在interval时间后开始thread
    • funtion: 要执行的函数
    • args/kwargs: 函数的参数
  • start() & cancel()
    timer = threading.Timer(interval, function, args=[ ], kwargs={ })
    timer.start()
    timer.cancel(): 在start后,thread真正激活前,可以取消start

Local

  • 声明local对象 localManager = threading.local()
    这个对象可以被当作一个全局变量,自身有一些属性。
    每个thread对这个全局变量的读写都是隔离的。
    (需要在具体项目中才能理解)

Top
Down