Asynchronous IO คืออะไร? และตัวอย่างการใช้ asyncio ใน Python

What is Asynchronous IO? And The asyncio Examples in Python

Nopnithi Khaokaew (Game)
6 min readMay 26, 2020

— — — — — — — — — — — — — — —
สารบัญเนื้อหาทั้งหมด (My Contents)
— — — — — — — — — — — — — — —

รู้จักกับ Asynchronous Programming

Asynchronous programming เป็นหนึ่งใน concurrent programming โดยเป็นวิธีการที่สามารถทำงานกับ request จำนวนมากได้ในเวลาเดียวกัน แต่ไม่ใช่การใช้หลาย process หรือ thread แยกกันไปทำงานนะครับ หลักการคือ CPU จะแว๊บไปทำงานอื่น ๆ ในขณะที่กำลังรอ response จากงานใด ๆ อยู่

ตัวอย่าง: ซักผ้า 2 ตะกร้า (การทำงานของ Asynchronous I/O)

สมมุติว่าผมต้องการซักผ้า 2 ตะกร้า (ตะกร้า A และ B) ผมก็เริ่มต้นโดยเดินไปหยอดเหรียญเพื่อซักผ้าตะกร้า A และในระหว่างที่รอผ้าตะกร้า A ซักอยู่ ผมก็เดินไปหยอดเหรียญเพื่อซักผ้าตะกร้า B ต่อทันที ความหมายคือผมพยายามจะไม่อยู่เฉยในระหว่างที่ต้องรอนั่นเอง

จะเห็นว่าการทำงานลักษณะนี้จะใช้ CPU แค่ core เดียวเท่านั้น (ซึ่งในเรื่องก็คือตัวผมเอง) โดยมันจะเหมาะกับงานประเภท I/O bound task หรือก็คือ task ที่เรา request อะไรสักอย่างไปแล้วรอปลายทางประมวลผลเพื่อ response กลับมา (แม้ในระดับ millisecond) เช่นพวกงาน network หรือ database เป็นต้น

Concept ของ Asynchronous I/O บน Python

Python จะมีโมดูล asyncio เป็นตัวช่วยสำหรับการเขียน asynchronous programming อยู่แล้ว มีมาตั้งแต่เวอร์ชั่น 3.4 โน่นแหละ แต่ทุกเวอร์ชั่นก็มักจะมีการเปลี่ยนแปลงไปจนทำให้หลายคนเกิดความสับสนกันมานักต่อนักละ(ผมก็ด้วย) ดังนั้นบอกก่อนว่าตัวอย่างทั้งหมดในบล็อกนี้ผมใช้ Python 3.8.3 ซึ่งเป็นตัวล่าสุด ณ วันที่เขียนแล้ว เผื่อใครจะลองรันตามเดี๋ยวไม่เวิร์คแล้วมาด่าผมอีก

ไปเริ่มเรียนรู้เกี่ยวกับ concept กันก่อน

Event Loop คืออะไร?

Event loop คือ loop ที่ใช้สำหรับรัน task นั่นแหละครับ และต้องเป็น task ที่สามารถรันได้อิสระจาก task อื่นด้วยจะมาพัวพันกันไม่ได้ และในหนึ่งช่วงเวลาจะมีเพียงแค่ task เดียวที่กำลังถูกรันอยู่

จุดสำคัญคือเมื่อ task ที่กำลังรันอยู่เข้าสู่ waiting period เช่น อยู่ระหว่างรอ response ก็จะหยุด task นั้นไว้แล้วสลับเอา task อื่นขึ้นมารันแทน (เป็นได้ทั้ง start task ใหม่หรือ resume task ที่ได้ response แล้ว) ซึ่งไอ้การเรียก task ที่ได้ response แล้วกลับมาทำงานต่อเรียกว่า callback และเจ้า event loop นี้จะดำเนินไปจนกว่า task ทั้งหมดจะเสร็จสิ้น

Coroutines คืออะไร?

Coroutine คือ ฟังก์ชั่นพิเศษใน Python โดยมันสามารถหยุดการทำงานก่อนที่จะถึงการ return ค่าได้ และมันยังสามารถส่งผ่านการควบคุม(โดย CPU)ไปยัง coroutine อื่น ๆ ได้ด้วย หรือพูดง่าย ๆ ว่า CPU สามารถสลับการทำงานไปมาระหว่าง coroutine เหล่านี้ได้นั่นเอง

Task คืออะไร?

ในโลกของ Asycn IO นั้น task ก็คืองานหรือสิ่งที่เราต้องทำ โดย task จะเกิดขึ้นมาจาก coroutine (ใช้ asyncio.create_task() ในการสร้าง)

Code ตัวอย่าง

ใครใช้ Git ก็จัดไปตามนี้เลย

git clone https://github.com/nopnithi/python_ssh_tutorial.git

เสร็จแล้วก็ install พวก library ต่าง ๆ จากไฟล์ requirements.txt

pip3 install -r requirements.txt

ติดตั้ง asyncio Library ก่อน

ถ้าใคร clone repository ผมไปแล้วติดตั้งผ่าน requirements.txt แล้วก็ข้ามไป ถ้าใครยังก็จัดไปตามนี้ครับ

pip3 install asyncio

วิธีการสร้าง Coroutine ใน Python

คีย์เวิร์ด async def ใช้สำหรับสร้างฟังก์ชั่นที่เป็น coroutine

# asyncio_tutorial/example1.pyimport asyncio

async def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
await asyncio.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

ตอนนี้ผมได้ coroutine แล้ว และถ้าผมเรียกใช้งาน wash() แบบปกติเหมือนฟังก์ชั่นทั่วไปมันจะไม่ทำงาน เราจะต้องใช้ asyncio.run() เพื่อเรียกใช้งานแทน ซึ่งเบื้องหลังของ asyncio.run() จะทำการแปลง coroutine ให้เป็น task แล้วจึงรันอีกที

รัน Coroutine ตรง ๆ ด้วย asyncio.run()

# asyncio_tutorial/example2.pyimport asyncio
import time

async def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
await asyncio.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

if __name__ == '__main__':
t1 = time.time()
asyncio.run(wash('Basket A'))
t2 = time.time() - t1
print(f'Executed in {t2:0.2f} seconds.')

จากตัวอย่างด้านบนนี้ผมรัน wash() ซึ่งเป็น coroutine ด้วย asyncio.run() ก็จะได้

Washing Machine (Basket A): Put the coin
Washing Machine (Basket A): Start washing...
Washing Machine (Basket A): Finished washing
Executed in 5.00 seconds.

เขียน Python Code จาก ตัวอย่าง: ซักผ้า 2 ตะกร้า

ทีนี้ผมจะลองเขียน Python code เพื่อทำตามตัวอย่างซักผ้าที่เคยยกมาให้ดู โดยสมมุติว่าการซักผ้าหนึ่งรอบเราจะใช้เวลา 5 วินาทีเหมือนเดิม

ไม่ใช้ Async IO

# asyncio_tutorial/example3.pyimport time

def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
time.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

def main():
for basket in ['Basket A', 'Basket B']:
wash(basket)

if __name__ == '__main__':
t1 = time.time()
main()
t2 = time.time() - t1
print(f'Executed in {t2:0.2f} seconds.')

จะได้

Washing Machine (Basket A): Put the coin
Washing Machine (Basket A): Start washing...
Washing Machine (Basket A): Finished washing
Washing Machine (Basket B): Put the coin
Washing Machine (Basket B): Start washing...
Washing Machine (Basket B): Finished washing
Executed in 10.00 seconds.

สังเกตว่าใช้เวลาไปทั้งสิ้น 10 วินาที เพราะต้องรอผ้าตะกร้า A เสร็จก่อนแล้วจึงเริ่มซักผ้าตะกร้า B ต่อ ยิ่งมีผ้าหลายตะกร้าก็ใช้เวลา จำนวนตะกร้า x 5 วินาที โดยประมาณ

ใช้ Async IO

# asyncio_tutorial/example4.pyimport asyncio
import time

async def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
await asyncio.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

async def main():
await asyncio.gather(wash('Basket A'), wash('Basket B'))

if __name__ == '__main__':
t1 = time.time()
asyncio.run(main())
t2 = time.time() - t1
print(f'Executed in {t2:0.2f} seconds.')

จะได้

Washing Machine (Basket A): Put the coin
Washing Machine (Basket A): Start washing...
Washing Machine (Basket B): Put the coin
Washing Machine (Basket B): Start washing...
Washing Machine (Basket A): Finished washing
Washing Machine (Basket B): Finished washing
Executed in 5.01 seconds.

เมื่อดู result จะเห็นว่าการทำงานเป็นไปตามที่ผมอธิบายในตัวอย่างเลย คือหลังจากที่ผมหยอดเหรียญเพื่อซักผ้าตะกร้า A แล้วพอถึงขั้นตอนของการรอปุ๊ป ผมก็ไปหยอดเหรียญเพื่อซักผ้าตะกร้า B ต่อทันที สังเกตว่าเวลาทั้งหมดใช้ไปแค่ 5 วินาทีเท่านั้นเอง

จาก code นี้ต่อให้ผมมี 100,000 ตะกร้าก็จะใช้เวลาไปเพียงแค่ 8.58 วินาทีเท่านั้นเอง (ทดสอบบน laptop ของผม)

วิธีสร้าง Task

ทีนี้เรายังสามารถสร้าง task จาก coroutine ขึ้นมาได้เอง นั่นก็คือใช้ asyncio.create_task() แบบนี้

# asyncio_tutorial/example5.pyimport asyncio

async def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
await asyncio.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

async def main():
coro = wash('Basket A')
print(coro)
print(type(coro))
task = asyncio.create_task(coro)
print(task)
print(type(task))
result = await task
print(result)

if __name__ == '__main__':
asyncio.run(main())

จะได้ output มาแบบนี้

<coroutine object wash at 0x10d1755c0>
<class 'coroutine'>
<Task pending name='Task-2' coro=<wash() running at /Users/nopnithi/projects/asyncio_tutorial/example5.py:4>>
<class '_asyncio.Task'>
Washing Machine (Basket A): Put the coin
Washing Machine (Basket A): Start washing...
Washing Machine (Basket A): Finished washing
Basket A is completed

ผมพยายาม print ให้ดูนะครับว่าแต่ละอันมันคืออะไร เช่น

coro = wash('Basket A')

ตัวแปร coro เก็บ coroutine object และจากนั้นผมเอา coro ไปสร้าง task ด้วย asyncio.create_task() ด้วยบรรทัดด้านล่างนี้

task = asyncio.create_task(coro)

ซึ่งพอ print ออกมาก็จะเห็นว่ามันเป็น _asyncio.Task

result = await task

สุดท้ายผมอยากให้ task มันทำงานทันทีและ return ค่าออกมา ผมจึงใช้คีย์เวิร์ด await ตามด้วย task object (ซึ่งกรณีนี้มันชื่อ task พอดี) แล้วนำไปเก็บไว้ในตัวแปร result ซึ่งผลก็คือจะได้เป็น completed (string) นั่นแหละ

การรัน Task หรือ Coroutine พร้อมกันหลายตัว

เราสามารถใช้ method ในการรัน task หรือ coroutine พร้อมกันหลายงานได้ โดยผมจะขอยกตัวอย่างมา 3 ตัว คือ

  1. asyncio.gather()
  2. asyncio.wait()
  3. asyncio.as_completed()

ซึ่งความแตกต่างระหว่าง 3 ตัวนี้ คือ

  • asyncio.gather() จะรันเรียงตามลำดับก่อนหลัง ส่วน asyncio.wait() และ asyncio.as_completed() นั้นไม่เรียง
  • asyncio.gather() ไม่สามารถกำหนด timeout ได้ แต่ asyncio.wait() กำหนดได้ผ่านพารามิเตอร์ timeout
  • asyncio.wait() สามารถกำหนดเงื่อนไขในการรอได้โดยใช้พารามิเตอร์ return_when
  • asyncio.gather() ไม่สามารถรับค่าเป็น list ของ coroutine/task ได้โดยตรงแบบ asyncio.wait() ให้ใช้ * หน้า list เพื่อกระจายเป็น argument แทน
  • asyncio.as_completed() จะใช้กับ for Loop ซึ่งไม่เหมือนตัวอื่น และเป็น for Loop ที่ทุกรอบเริ่มทำงานพร้อมกัน

ตัวอย่างการใช้ asyncio.gather()

# asyncio_tutorial/example6.pyimport asyncio

async def cook(food, t):
print(f'Microwave ({food}): Cooking {t} seconds...')
await asyncio.sleep(t)
print(f'Microwave ({food}): Finished cooking')
return f'{food} is completed'

async def main():
coros = [cook('Rice', 5), cook('Noodle', 3), cook('Curry', 1)]
result = await asyncio.gather(*coros)
print(result)

if __name__ == '__main__':
asyncio.run(main())

จะได้

Microwave (Rice): Cooking 5 seconds...
Microwave (Noodle): Cooking 3 seconds...
Microwave (Curry): Cooking 1 seconds...
Microwave (Curry): Finished cooking
Microwave (Noodle): Finished cooking
Microwave (Rice): Finished cooking
['Rice is completed', 'Noodle is completed', 'Curry is completed']

สังเกตว่าผมเรียงลำดับ coroutine เป็น rice, noodle และ cookie นะครับ ตัวโปรแกรมจะรอจนกว่า task ทุกอันจะรันเสร็จ ในตัวอย่างจะเห็นว่า cookie เสร็จก่อนตามด้วย noodle และ rice เป็นลำดับสุดท้าย และเมื่อลำดับสุดท้ายเสร็จก็จะ return ค่าออกมาเป็น list ซึ่งเรียงลำดับก่อนหลังถูกต้อง

ตัวอย่างการใช้ asyncio.wait()

จุดเด่นของตัวนี้คือสามารถกำหนดพารามิเตอร์ timeout ในการรอ response ได้ และเพิ่มเงื่อนไขในการรอจากพารามิเตอร์ return_when ได้ เช่น FIRST_COMPLETED, FIRST_EXCEPTION และ ALL_COMPLETED (เป็น default)

ค่าที่ return มาจาก asyncio.wait() จะเป็น Tuple ที่แยก task ที่เสร็จและไม่เสร็จ ซึ่งข้างในของแต่ละฝั่งจะเป็น Set ที่เก็บ task ไว้นั่นเอง ลองไปเล่นดูนะครับ

# asyncio_tutorial/example7.pyimport asyncio

async def cook(food, t):
print(f'Microwave ({food}): Cooking {t} seconds...')
await asyncio.sleep(t)
print(f'Microwave ({food}): Finished cooking')
return f'{food} is completed'

async def main():
coros = [cook('Rice', 5), cook('Noodle', 3), cook('Curry', 1)]
results = await asyncio.wait(coros, return_when='FIRST_COMPLETED')
print(f'Completed task: {len(results[0])}')
for completed_task in results[0]:
print(f' - {completed_task.result()}')
print(f'Uncompleted task: {len(results[1])}')

if __name__ == '__main__':
asyncio.run(main())

จะได้

Microwave (Curry): Cooking 1 seconds...
Microwave (Rice): Cooking 5 seconds...
Microwave (Noodle): Cooking 3 seconds...
Microwave (Curry): Finished cooking
Completed task: 1
- Curry is completed
Uncompleted task: 2

จะเห็นว่าผมตั้งเงื่อนไขเป็น FIRST_COMPLETED นั่นหมายความว่าพอมี task ใด task หนึ่งเสร็จก็เป็นอันจบ ทำให้ได้ completed task มาแค่ตัวเดียวคือ curry

ทีนี้ผมลองเพิ่มพารามิเตอร์ timeout เป็น 0.5 ดูนะ (default จะเป็น None)

results = await asyncio.wait(coros, return_when='FIRST_COMPLETED', timeout=0.5)

ก็จะไม่มี task ไหนเสร็จเลยเพราะหมดเวลาก่อน (กูไปต่อไม่รอแล้วนะ)

Microwave (Rice): Cooking 5 seconds...
Microwave (Noodle): Cooking 3 seconds...
Microwave (Curry): Cooking 1 seconds...
Completed task: 0
Uncompleted task: 3

ตัวอย่างการใช้ asyncio.as_completed()

# asyncio_tutorial/example8.pyimport asyncio

async def cook(food, t):
print(f'Microwave ({food}): Cooking {t} seconds...')
await asyncio.sleep(t)
print(f'Microwave ({food}): Finished cooking')
return f'{food} is completed'

async def main():
coros = [cook('Rice', 3), cook('Noodle', 3), cook('Curry', 3)]
for coro in asyncio.as_completed(coros):
result = await coro
print(result)

if __name__ == '__main__':
asyncio.run(main())

จะได้

Microwave (Curry): Cooking 3 seconds...
Microwave (Rice): Cooking 3 seconds...
Microwave (Noodle): Cooking 3 seconds...
Microwave (Curry): Finished cooking
Microwave (Rice): Finished cooking
Microwave (Noodle): Finished cooking
Curry is completed
Rice is completed
Noodle is completed

ซึ่งผมลองปรับเวลาให้เท่ากันทุก task แล้วลองรันดูหลาย ๆ รอบ จะเห็นว่ามันไม่การันตีว่าจะออก task ไหนก่อนหลังเพราะ asyncio.as_completed() ไม่เรียงลำดับนั่นเอง

ทดสอบ Performance ของ Asyncio หน่อยดีกว่า

ผมเลือกใช้ task เดิม คือ wash() ที่แต่ละการซักผ้าของเราจะใช้เวลา 5 วินาที แต่คราวนี้ผมจะซักผ้าไปเลย 100,000 ตะกร้า

# asyncio_tutorial/example9.pyimport asyncio
import time

async def wash(basket):
print(f'Washing Machine ({basket}): Put the coin')
print(f'Washing Machine ({basket}): Start washing...')
await asyncio.sleep(5)
print(f'Washing Machine ({basket}): Finished washing')
return f'{basket} is completed'

async def main():
coros = []
for i in range(1, 100001):
coros.append(wash(f'Basket {i}'))
results = await asyncio.wait(coros)
print(f'Completed task: {len(results[0])}')
print(f'Uncompleted task: {len(results[1])}')

if __name__ == '__main__':
t1 = time.time()
asyncio.run(main())
t2 = time.time() - t1
print(f'Executed in {t2:0.2f} seconds.')

จะได้ output ตามนี้ (ผมตัดมาเฉพาะช่วงท้ายนะครับ) สังเกตว่าใช้เวลาไปเพียงแค่ 8.65 วินาทีที่ 100,000 tasks

...
...
Washing Machine (Basket 11105): Finished washing
Washing Machine (Basket 23133): Finished washing
Washing Machine (Basket 34789): Finished washing
Washing Machine (Basket 88824): Finished washing
Washing Machine (Basket 4689): Finished washing
Washing Machine (Basket 11106): Finished washing
Washing Machine (Basket 23134): Finished washing
Washing Machine (Basket 34790): Finished washing
Washing Machine (Basket 88825): Finished washing
Completed task: 100000
Uncompleted task: 0
Executed in 8.65 seconds.

ก่อนจากกัน

ที่จริงยังมีอีกหลายเรื่องและมี method หลายตัวที่ผมไม่ได้พูดถึงในวันนี้นะครับ แต่ด้วยความรู้ในเบื้องต้นเท่านี้เชื่อว่าน่าจะพอทำให้หลาย ๆ คนนำไปใช้ในการต่อยอดได้แล้วแหละ หวังว่าจะมีประโยชน์กับทุกคนไม่มากก็น้อยครับ

— — — — — — — — — — — — — — —
สารบัญเนื้อหาทั้งหมด (My Contents)
— — — — — — — — — — — — — — —

--

--

Nopnithi Khaokaew (Game)
Nopnithi Khaokaew (Game)

Written by Nopnithi Khaokaew (Game)

Cloud Solutions Architect & Hobbyist Developer | 6x AWS Certified, CKA, CKAD, 2x HashiCorp Certified (Terraform, Vault), etc.