分析Python感知線程狀態(tài)的解決方案之Event與信號(hào)量
利用Threading庫我們可以很方便地創(chuàng)建線程,讓它按照我們的想法執(zhí)行我們想讓它執(zhí)行的事情,從而加快程序運(yùn)行的效率。然而有一點(diǎn)坑爹的是,線程創(chuàng)建之后,就交給了操作系統(tǒng)執(zhí)行,我們無法直接結(jié)束一個(gè)線程,也無法給它發(fā)送信號(hào),無法調(diào)整它的調(diào)度,也沒有其他高級(jí)操作。如果想要相關(guān)的功能,只能自己開發(fā)。
怎么開發(fā)呢?
我們創(chuàng)建線程的時(shí)候指定了target等于一個(gè)我們想讓它執(zhí)行的函數(shù),這個(gè)函數(shù)并不一定是全局函數(shù),實(shí)際上也可以是一個(gè)對(duì)象中的函數(shù)。如果是對(duì)象中的函數(shù),那么我們就可以在這個(gè)函數(shù)當(dāng)中獲取到對(duì)象中的其他信息,我們可以利用這一點(diǎn)來實(shí)現(xiàn)手動(dòng)控制線程的停止。
說起來好像不太好理解,但是看下代碼真的非常簡(jiǎn)單:
import timefrom threading import Threadclass TaskWithSwitch: def __init__(self):self._running = True def terminate(self):self._running = False def run(self, n):while self._running and n > 0: print(’Running {}’.format(n)) n -= 1 time.sleep(1)c = TaskWithSwitch()t = Thread(target=c.run, args=(10, ))t.start()c.terminate()t.join()
如果你運(yùn)行這段代碼,會(huì)發(fā)現(xiàn)屏幕上只輸出了10,因?yàn)槲覀儗running這個(gè)字段置為False之后,下次循環(huán)的時(shí)候不再滿足循環(huán)條件,它就會(huì)自己退出了。
如果我們想要用多線程來讀取IO,由于IO可能存在堵塞,所以可能會(huì)出現(xiàn)線程一直無法返回的情況。也就是說我們?cè)谘h(huán)內(nèi)部卡死了,這個(gè)時(shí)候單純用_running來判斷還是不夠的,我們需要在線程內(nèi)部設(shè)置計(jì)時(shí)器,防止循環(huán)內(nèi)部的卡死。
class IOTask: def __init__(self):self._running = True def terminate(self):self._running = False def run(self, sock):# 在socket中設(shè)置計(jì)時(shí)器sock.settimeout(10)while self._running: try:# 由于設(shè)置了計(jì)時(shí)器,所以這里不會(huì)永久等待data = sock.recv(1024)break except socket.timeout:continuereturn二、線程信號(hào)的傳遞
我們之所以如此費(fèi)勁才能控制線程的運(yùn)行,主要原因是線程的狀態(tài)是不可知的,并且我們無法直接操作它,因?yàn)樗潜徊僮飨到y(tǒng)管理的。我們運(yùn)行的主線程和創(chuàng)建出來的線程是獨(dú)立的,兩者之間并沒有從屬關(guān)系,所以想要實(shí)現(xiàn)對(duì)線程的狀態(tài)進(jìn)行控制,往往需要我們通過其他手段來實(shí)現(xiàn)。
我們來思考一個(gè)場(chǎng)景,假設(shè)我們有一個(gè)任務(wù),需要在另外一個(gè)線程運(yùn)行結(jié)束之后才能開始執(zhí)行。要想要實(shí)現(xiàn)這一點(diǎn),就必須對(duì)線程的狀態(tài)有所感知,需要其他線程傳遞出信號(hào)來才行。我們可以使用threading中的Event工具來實(shí)現(xiàn)這一點(diǎn)。Event工具就是可以用來傳遞信號(hào)的,就好像是一個(gè)開關(guān),當(dāng)一個(gè)線程執(zhí)行完成之后,會(huì)去啟動(dòng)這個(gè)開關(guān)。而這個(gè)開關(guān)控制著另外一段邏輯的運(yùn)行。
我們來看下樣例代碼:
import timefrom threading import Thread, Eventdef run_in_thread(): time.sleep(1) print(’Thread is running’)t = Thread(target=run_in_thread)t.start()print(’Main thread print’)
我們?cè)诰€程里面就只做了輸出一行提示符,沒有其他任何邏輯。由于我們?cè)趓un_in_thread函數(shù)當(dāng)中沉睡了1s,所以一定是先輸出Main thread print再輸出的Thread is running。假設(shè)這個(gè)線程是一個(gè)很重要的任務(wù),我們希望主線程能夠等待它運(yùn)行到一個(gè)階段再往下執(zhí)行,我們應(yīng)該怎么辦呢?
注意,這里說的是運(yùn)行到一個(gè)階段,并不是運(yùn)行結(jié)束。運(yùn)行結(jié)束我們很好處理,可以通過join來完成。但如果不是運(yùn)行結(jié)束,而是運(yùn)行完成了某一個(gè)階段,當(dāng)然通過join也可以,但是會(huì)損害整體的效率。這個(gè)時(shí)候我們就必須要用上Event了。加上Event之后,我們?cè)賮砜聪麓a:
import timefrom threading import Thread, Eventdef run_in_thread(event): time.sleep(1) print(’Thread is running’) # set一下event,這樣外面wait的部分就會(huì)被啟動(dòng) event.set()# 初始化Eventevent = Event()t = Thread(target=run_in_thread, args=(event, ))t.start()# event等待setevent.wait()print(’Main thread print’)
整體的邏輯沒有太多的修改,主要的是增加了幾行關(guān)于Event的使用代碼。
我們?nèi)绻玫紼vent,最好在代碼當(dāng)中只使用一次。當(dāng)然通過Event中的clear方法我們可以重置Event的值,但問題是我們沒辦法保證重置的這個(gè)邏輯會(huì)在wait之前執(zhí)行。如果是在之后執(zhí)行的,那么就會(huì)問題,并且在debug的時(shí)候會(huì)異常痛苦,因?yàn)閎ug不是必現(xiàn)的,而是有時(shí)候會(huì)出現(xiàn)有時(shí)候不會(huì)出現(xiàn)。這種情況往往都是因?yàn)槎嗑€程的使用問題。
所以如果要多次使用開關(guān)和信號(hào)的話,不要使用Event,可以使用信號(hào)量。
三、信號(hào)量Event的問題在于如果多個(gè)線程在等待Event的發(fā)生,當(dāng)它一旦被set的時(shí)候,那么這些線程都會(huì)同時(shí)執(zhí)行。但有時(shí)候我們并不希望這樣,我們希望可以控制這些線程一個(gè)一個(gè)地運(yùn)行。如果想要做到這一點(diǎn),Event就無法滿足了,而需要使用信號(hào)量。
信號(hào)量和Event的使用方法類似,不同的是,信號(hào)量可以保證每次只會(huì)啟動(dòng)一個(gè)線程。因?yàn)檫@兩者的底層邏輯不太一致,對(duì)于Event來說,它更像是一個(gè)開關(guān)。一旦開關(guān)啟動(dòng),所有和這個(gè)開關(guān)關(guān)聯(lián)的邏輯都會(huì)同時(shí)執(zhí)行。而信號(hào)量則像是許可證,只有拿到許可證的線程才能執(zhí)行工作,并且許可證一次只發(fā)一張。
想要使用信號(hào)量并不需要自己開發(fā),thread庫當(dāng)中為我們提供了現(xiàn)成的工具——Semaphore,我們來看它的使用代碼:
# 工作線程def worker(n, sema): # 等待信號(hào)量 sema.acquire() print(’Working’, n)# 初始化sema = threading.Semaphore(0)nworkers = 10for n in range(nworkers): t = threading.Thread(target=worker, args=(n, sema,)) t.start()
在上面的代碼當(dāng)中我們創(chuàng)建了10個(gè)線程,雖然這些線程都被啟動(dòng)了,但是都不會(huì)執(zhí)行邏輯,因?yàn)閟ema.acquire是一個(gè)阻塞方法,沒有監(jiān)聽到信號(hào)量是會(huì)一直掛起等待。
當(dāng)我們釋放信號(hào)量之后,線程被啟動(dòng),才開始了執(zhí)行。我們每釋放一個(gè)信號(hào),則會(huì)多啟動(dòng)一個(gè)線程。這里面的邏輯應(yīng)該不難理解。
四、總結(jié)在并發(fā)場(chǎng)景當(dāng)中,多線程的使用絕不是多啟動(dòng)幾個(gè)線程做不同的任務(wù)而已,我們需要線程間協(xié)作,需要同步、獲取它們的狀態(tài),這是非常不容易的。一不小心就會(huì)出現(xiàn)幽靈bug,時(shí)顯時(shí)隱,這也是并發(fā)問題讓人頭疼的主要原因。
以上就是分析Python感知線程狀態(tài)的解決方案之Event與信號(hào)量的詳細(xì)內(nèi)容,更多關(guān)于Python 感知線程狀態(tài) Event與信號(hào)量的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. 推薦一個(gè)好看Table表格的css樣式代碼詳解2. 基于Surprise協(xié)同過濾實(shí)現(xiàn)短視頻推薦方法示例3. 不使用XMLHttpRequest對(duì)象實(shí)現(xiàn)Ajax效果的方法小結(jié)4. vue-electron中修改表格內(nèi)容并修改樣式5. 微信小程序?qū)崿F(xiàn)商品分類頁過程結(jié)束6. 以PHP代碼為實(shí)例詳解RabbitMQ消息隊(duì)列中間件的6種模式7. ASP新手必備的基礎(chǔ)知識(shí)8. AJAX實(shí)現(xiàn)文件上傳功能報(bào)錯(cuò)Current request is not a multipart request詳解9. PHP獲取時(shí)間戳等相關(guān)函數(shù)匯總10. ASP常用日期格式化函數(shù) FormatDate()
