python混合如何使用同步和異步函數
本文講解"python混合怎么使用同步和異步函數",希望能夠解決相關問題。
在協程函數中調用同步函數
在協程函數中直接調用同步函數會阻塞事件循環,從而影響整個程序的性能。我們先來看一個例子:
以下是使用異步 web 框架 fastapi 寫的一個例子,fastapi 是比較快,但不正確的操作將會變得很慢。
import?time from?fastapi?import?fastapi app?=?fastapi() @app.get("/") async?def?root(): ????time.sleep(10) ????return?{"message":?"hello?world"} @app.get("/health") async?def?health(): ????return?{"status":?"ok"}
上面我們寫了兩個接口,假設 root 接口函數耗時 10 秒,在這 10 秒內訪問 health 接口,想一想會發生什么?
訪問 root 接口(左),立即訪問 health 接口(右),health 接口被阻塞,直至 root 接口返回后,health 接口才成功響應。
time.sleep 就是一個「同步」函數,它會阻塞整個事件循環。
如何解決呢?想一想以前的處理方法,如果一個函數會阻塞主線程,那么就再開一個線程讓這個阻塞函數單獨運行。所以,這里也是同理,開一個線程單獨去運行那些阻塞式操作,比如讀取文件等。
loop.run_in_executor 方法將同步函數轉換為異步非阻塞方式進行處理。具體來說,loop.run_in_executor() 可以將同步函數創建為一個線程或進程,并在其中執行該函數,從而避免阻塞事件循環。
官方例子:在線程或者進程池中執行代碼。
那么,我們使用 loop.run_in_executor 改寫上面例子,如下:
import?asyncio import?time from?fastapi?import?fastapi app?=?fastapi() @app.get("/") async?def?root(): ????loop?=?asyncio.get_event_loop() ????def?do_blocking_work(): ????????time.sleep(10) ????????print("done?blocking?work!!") ????await?loop.run_in_executor(none,?do_blocking_work) ????return?{"message":?"hello?world"} @app.get("/health") async?def?health(): ????return?{"status":?"ok"}
效果如下:
root 接口被阻塞期間,health 依然正常訪問互不影響。
注意: 這里都是為了演示,實際在使用 fastapi 開發時,你可以直接將 async def root 更換成 def root ,也就是將其換成同步接口函數,fastapi 內部會自動創建線程處理這個同步接口函數??偟膩碚f,fastapi 內部也是依靠線程去處理同步函數從而避免阻塞主線程(或主線程中的事件循環)。
在同步函數中調用異步函數
協程只能在「事件循環」內被執行,且同一時刻只能有一個協程被執行。
所以,在同步函數中調用異步函數,其本質就是將協程「扔進」事件循環中,等待該協程執行完獲取結果即可。
以下這些函數,都可以實現這個效果:
- asyncio.run
- asyncio.run_coroutine_threadsafe
- loop.run_until_complete
- create_task
接下來,我們將一一講解這些方法并舉例說明。
asyncio.run這個方法使用起來最簡單,先看下如何使用,然后緊跟著講一下哪些場景不能直接使用 asyncio.run
import?asyncio async?def?do_work(): ????return?1 def?main(): ????result?=?asyncio.run(do_work()) ????print(result)??#?1 if?__name__?==?"__main__": ????main()
直接 run 就完事了,然后接受返回值即可。
但是需要,注意的是 asyncio.run 每次調用都會新開一個事件循環,當結束時自動關閉該事件循環。
一個線程內只存在一個事件循環,所以如果當前線程已經有存在的事件循環了,就不應該使用 asyncio.run 了,否則就會拋出如下異常:
runtimeerror: asyncio.run() cannot be called from a running event loop
因此,asyncio.run 用作新開一個事件循環時使用。
asyncio.run_coroutine_threadsafe向指定事件循環提交一個協程。(線程安全)
返回一個 concurrent.futures.future 以等待來自其他 os 線程的結果。
換句話說,就是將協程丟給其他線程中的事件循環去運行。
值得注意的是這里的「事件循環」應該是其他線程中的事件循環,非當前線程的事件循環。
其返回的結果是一個 future 對象,如果你需要獲取協程的執行結果可以使用 future.result() 獲取
下方給了一個例子,一共有兩個線程:thread_with_loop 和 another_thread,分別用于啟動事件循環和調用 run_coroutine_threadsafe
import?asyncio import?threading import?time loop?=?none def?get_loop(): ????global?loop ????if?loop?is?none: ????????loop?=?asyncio.new_event_loop() ????return?loop def?another_thread(): ????async?def?coro_func(): ????????return?1 ????loop?=?get_loop() ????#?將協程提交到另一個線程的事件循環中執行 ????future?=?asyncio.run_coroutine_threadsafe(coro_func(),?loop) ????#?等待協程執行結果 ????print(future.result()) ????#?停止事件循環 ????loop.call_soon_threadsafe(loop.stop) def?thread_with_loop(): ????loop?=?get_loop() ????#?啟動事件循環,確保事件循環不會退出,直到?loop.stop()?被調用 ????loop.run_forever() ????loop.close() #?啟動一個線程,線程內部啟動了一個事件循環 threading.thread(target=thread_with_loop).start() time.sleep(1) #?在主線程中啟動一個協程,?并將協程提交到另一個線程的事件循環中執行 t?=?threading.thread(target=another_thread) t.start() t.join()loop.run_until_complete
運行直到 future ( future 的實例 ) 被完成。
這個方法和 asyncio.run 類似。
具體就是傳入一個協程對象或者任務,然后可以直接拿到協程的返回值。
run_until_complete 屬于 loop 對象的方法,所以這個方法的使用前提是有一個事件循環,注意這個事件循環必須是非運行狀態,如果是運行中就會拋出如下異常:
runtimeerror: this event loop is already running
例子:
loop?=?asyncio.new_event_loop() loop.run_until_complete(do_async_work())create_task
再次準確一點:要運行一個協程函數的本質是將攜帶協程函數的任務提交至事件循環中,由事件循環發現、調度并執行。
其實一共就是滿足兩個條件:
- 任務;
- 事件循環。
我們使用 async def func 定義的函數叫做協程函數,func() 這樣調用之后返回的結果是協程對象,到這一步協程函數內的代碼都沒有被執行,直到協程對象被包裝成了任務,事件循環才會“正眼看它們”。
所以事件循環調度運行的基本單元就是任務,那為什么我們在使用 async/await 這些語句時沒有涉及到任務這個概念呢?
這是因為 await 語法糖在內部將協程對象封裝成了任務,再次強調事件循環只認識任務。
所以,想要運行一個協程對象,其實就是將協程對象封裝成一個任務,至于事件循環是如何發現、調度和執行的,這個我們不用關心。
那將協程封裝成的任務的方法有哪些呢?
- asyncio.create_task
- asyncio.ensure_future
- loop.create_task
看著有好幾個的,沒關系,我們只關心 loop.create_task,因為其他方法最終都是調用 loop.create_task。
使用起來也是很簡單的,將協程對象傳入,返回值是一個任務對象。
async?def?do_work(): ????return?222 task?=?loop.create_task(do_work())
do_work 會被異步執行,那么 do_work 的結果怎么獲取呢,task.result() 可以嗎?
分情況:
- 如果是在一個協程函數內使用 await task.result(),這是可以的;
- 如果是在普通函數內則不行。你不可能立即獲得協程函數的返回值,因為協程函數還沒有被執行呢。
asyncio.task 運行使用 add_done_callback 添加完成時的回調函數,所以我們可以「曲線救國」,使用回調函數將結果添加到隊列、future 等等。
我這里給個基于 concurrent.futures.future 獲取結果的例子,如下:
import?asyncio from?asyncio?import?task from?concurrent.futures?import?future from?fastapi?import?fastapi app?=?fastapi() loop?=?asyncio.get_event_loop() async?def?do_work1(): ????return?222 @app.get("/") def?root(): ????#?新建一個?future?對象,用于接受結果值 ????future?=?future() ????#?提交任務至事件循環 ????task?=?loop.create_task(do_work1()) ????#?回調函數 ????def?done_callback(task:?task): ????????#?設置結果 ????????future.set_result(task.result()) ????#?為這個任務添加回調函數 ????task.add_done_callback(done_callback) ????#?future.result?會被阻塞,直到有結果返回為止 ????return?future.result()??#?222
關于 "python混合怎么使用同步和異步函數" 就介紹到此。希望多多支持碩編程。