Python中threading庫實(shí)現(xiàn)線程鎖與釋放鎖
前文提到threading庫在多線程時(shí),對(duì)同一資源的訪問容易導(dǎo)致破壞與丟失數(shù)據(jù)。為了保證安全的訪問一個(gè)資源對(duì)象,我們需要?jiǎng)?chuàng)建鎖。
示例如下:
import threadingimport timeclass AddThread(): def __init__(self, start=0):self.lock = threading.Lock()self.value = start def increment(self):print('Wait Lock')self.lock.acquire()try: print('Acquire Lock') self.value += 1 print(self.value)finally: self.lock.release()def worker(a): time.sleep(1) a.increment()addThread = AddThread()for i in range(3): t = threading.Thread(target=worker, args=(addThread,)) t.start()
運(yùn)行之后,效果如下:
acquire()會(huì)通過鎖進(jìn)行阻塞其他線程執(zhí)行中間段,release()釋放鎖,可以看到,基本都是獲得鎖之后才執(zhí)行。避免了多個(gè)線程同時(shí)改變其資源對(duì)象,不會(huì)造成混亂。
判斷是否有另一個(gè)線程請(qǐng)求鎖要確定是否有另一個(gè)線程請(qǐng)求鎖而不影響當(dāng)前的線程,可以設(shè)置acquire()的參數(shù)blocking=False。
示例如下:
import threadingimport timedef worker2(lock): print('worker2 Wait Lock') while True:lock.acquire()try: print('Holding') time.sleep(0.5)finally: print('not Holding') lock.release()time.sleep(0.5)def worker1(lock): print('worker1 Wait Lock') num_acquire = 0 value = 0 while num_acquire < 3:time.sleep(0.5)have_it = lock.acquire(blocking=False)try: value += 1 print(value) print('Acquire Lock') if have_it:num_acquire += 1finally: print('release Lock') if have_it:lock.release()lock = threading.Lock()word2Thread = threading.Thread( target=worker2, name=’work2’, args=(lock,))word2Thread.start()word1Thread = threading.Thread( target=worker1, name=’work1’, args=(lock,))word1Thread.start()
運(yùn)行之后,效果如下:
這里,我們需要迭代很多次,work1才能獲取3次鎖。但是嘗試了很8次。
with lock前文,我們通過lock.acquire()與lock.release()實(shí)現(xiàn)了鎖的獲取與釋放,但其實(shí)我們Python還給我們提供了一個(gè)更簡(jiǎn)單的語法,通過with lock來獲取與釋放鎖。
示例如下:
import threadingimport timeclass AddThread(): def __init__(self, start=0):self.lock = threading.Lock()self.value = start def increment(self):print('Wait Lock')with self.lock: print('lock acquire') self.value += 1 print(self.value)print('lock release')def worker(a): time.sleep(1) a.increment()addThread = AddThread()for i in range(3): t = threading.Thread(target=worker, args=(addThread,)) t.start()
這里,我們只是將最上面的例子改變了一下。效果如下:
需要注意的是,正常的Lock對(duì)象不能請(qǐng)求多次,即使是由同一個(gè)線程請(qǐng)求也不例外。如果同一個(gè)調(diào)用鏈中的多個(gè)函數(shù)訪問一個(gè)鎖,則會(huì)發(fā)生意外。如果期望在同一個(gè)線程的不同代碼需要重新獲得鎖,那么這種情況下使用RLock。
同步線程Condition在實(shí)際的操作中,我們還可以使用Condition對(duì)象來同步線程。由于Condition使用了一個(gè)Lock,所以它可以綁定到一個(gè)共享資源,允許多個(gè)線程等待資源的更新。
示例如下:
import threadingimport timedef consumer(cond): print('waitCon') with cond:cond.wait()print(’獲取更新的資源’)def producer(cond): print('worker') with cond:print(’更新資源’)cond.notifyAll()cond = threading.Condition()t1 = threading.Thread(name=’t1’, target=consumer, args=(cond,))t2 = threading.Thread(name=’t2’, target=consumer, args=(cond,))t3 = threading.Thread(name=’t3’, target=producer, args=(cond,))t1.start()time.sleep(0.2)t2.start()time.sleep(0.2)t3.start()
運(yùn)行之后,效果如下:
這里,我們通過producer線程處理完成之后調(diào)用notifyAll(),consumer等線程等到了它的更新,可以類比為觀察者模式。這里是,當(dāng)一個(gè)線程用完資源之后時(shí),則會(huì)自動(dòng)通知依賴它的所有線程。
屏障(barrier)屏障是另一種線程的同步機(jī)制。barrier會(huì)建立一個(gè)控制點(diǎn),所有參與的線程會(huì)在這里阻塞,直到所有這些參與方都到達(dá)這一點(diǎn)。采用這種方法,線程可以單獨(dú)啟動(dòng)然后暫停,直到所有線程都準(zhǔn)備好了才可以繼續(xù)。
示例如下:
import threadingimport timedef worker(barrier): print(threading.current_thread().getName(), 'worker') worker_id = barrier.wait() print(threading.current_thread().getName(), worker_id)threads = []barrier = threading.Barrier(3)for i in range(3): threads.append(threading.Thread( name='t' + str(i), target=worker, args=(barrier,)) )for t in threads: print(t.name, ’starting’) t.start() time.sleep(0.1)for t in threads: t.join()
運(yùn)行之后,效果如下:
從控制臺(tái)的輸出會(huì)發(fā)發(fā)現(xiàn),barrier.wait()會(huì)阻塞線程,直到所有線程被創(chuàng)建后,才同時(shí)釋放越過這個(gè)控制點(diǎn)繼續(xù)執(zhí)行。wait()的返回值指示了釋放的參與線程數(shù),可以用來限制一些線程做清理資源等動(dòng)作。
當(dāng)然屏障Barrier還有一個(gè)abort()方法,該方法可以使所有等待線程接收一個(gè)BroKenBarrierError。如果線程在wait()上被阻塞而停止處理,會(huì)產(chǎn)生這個(gè)異常,通過except可以完成清理工作。
有限資源的并發(fā)訪問除了多線程可能訪問同一個(gè)資源之外,有時(shí)候?yàn)榱诵阅埽覀円矔?huì)限制多線程訪問同一個(gè)資源的數(shù)量。例如,線程池支持同時(shí)連接,但數(shù)據(jù)可能是固定的,或者一個(gè)網(wǎng)絡(luò)APP提供的并發(fā)下載數(shù)支持固定數(shù)目。這些連接就可以使用Semaphore來管理。
示例如下:
import threadingimport timeclass WorkerThread(threading.Thread): def __init__(self):super(WorkerThread, self).__init__()self.lock = threading.Lock()self.value = 0 def increment(self):with self.lock: self.value += 1 print(self.value)def worker(s, pool): with s:print(threading.current_thread().getName())pool.increment()time.sleep(1)pool.increment()pool = WorkerThread()s = threading.Semaphore(2)for i in range(5): t = threading.Thread(name='t' + str(i),target=worker,args=(s, pool,) ) t.start()
運(yùn)行之后,效果如下:
從圖片雖然能看所有輸出,但無法看到其停頓的事件。讀者自己運(yùn)行會(huì)發(fā)現(xiàn),每次頂多只有兩個(gè)線程在工作,是因?yàn)槲覀冊(cè)O(shè)置了threading.Semaphore(2)。
隱藏資源在實(shí)際的項(xiàng)目中,有些資源需要鎖定以便于多個(gè)線程使用,而另外一些資源則需要保護(hù),以使它們對(duì)并非使這些資源的所有者的線程隱藏。
local()函數(shù)會(huì)創(chuàng)建一個(gè)對(duì)象,它能夠隱藏值,使其在不同的線程中無法被看到。示例如下:
import threadingimport randomdef show_data(data): try:result = data.value except AttributeError:print(threading.current_thread().getName(), 'No value') else:print(threading.current_thread().getName(), 'value=', result)def worker(data): show_data(data) data.value = random.randint(1, 100) show_data(data)local_data = threading.local()show_data(local_data)local_data.value = 1000show_data(local_data)for i in range(2): t = threading.Thread(name='t' + str(i),target=worker,args=(local_data,) ) t.start()
運(yùn)行之后,效果如下:
這里local_data.value對(duì)所有線程都不可見,除非在某個(gè)線程中設(shè)置了這個(gè)屬性,這個(gè)線程才能看到它。
到此這篇關(guān)于Python中threading庫實(shí)現(xiàn)線程鎖與釋放鎖的文章就介紹到這了,更多相關(guān)Python 線程鎖與釋放鎖內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. Hybris在idea中debug配置方法詳解2. 在idea中為注釋標(biāo)記作者日期操作3. jsp cookie+session實(shí)現(xiàn)簡(jiǎn)易自動(dòng)登錄4. XPath入門 - XSL教程 - 35. 通過CSS數(shù)學(xué)函數(shù)實(shí)現(xiàn)動(dòng)畫特效6. .NET Core Web APi類庫內(nèi)嵌運(yùn)行的方法7. .NET6使用ImageSharp實(shí)現(xiàn)給圖片添加水印8. ASP.NET MVC實(shí)現(xiàn)橫向展示購物車9. ASP.NET MVC使用Boostrap實(shí)現(xiàn)產(chǎn)品展示、查詢、排序、分頁10. .net如何優(yōu)雅的使用EFCore實(shí)例詳解
