Post

(python) asyncio

asyncio

실행 흐름
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

  

async def sub(n):
await asyncio.sleep(n)
print("sub{}".format(n))

  


  

async def comain():
times = [1, 2, 2]
fts = [asyncio.ensure\_future(sub(t)) for t in times]
for f in asyncio.as\_completed(fts):
x = await f
print("comain")




loop = asyncio.get\_event\_loop()
loop.run\_until\_complete(comain())  # comain()이 Future로 wrapping되며 Task schedule.
loop.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
========== result ==========
17:18:03.944503 sub1
17:18:04.946825 sub2
17:18:04.946891 sub2
comain
========control flow========
comain{ await sub(1) }
 sub(1){ await asyncio.sleep(1) }
 comain{ await sub(2) }
 sub(2){ await asyncio.sleep(2) }
 comain{ await sub(2) }
 sub(2){ await asyncio.sleep(2) }

코루틴은 함수와 다르다. 코루틴 cortn()을 호출하면 실행되는 것이 아니라, 코루틴 객체를 반환한다.

Note ) callback 을 연결할 수도 있다.

Note ) 이렇게 get_event_loop()를 반환받는 것 보다, asyncio.run()을 쓰는게 더 낫다. 문서 최상단 링크 참조.

ensure_future(coro) == create_task (in 3.7+ 이름만 다르다)

coroutine object를 받아 Future로 wrapping하며 이 Task를 schedule.

분명 fts[0]만 실행했는데 sub1까지 같이 실행됐다. ensure\_future()로 Task가 schedule되어 있기 때문에 직접 호출하지 않아도 await를 만나면 무조건 다음 Task를 실행하기 때문인 것으로 보인다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def comain():
fts = [asyncio.ensure\_future(sub2()), asyncio.ensure\_future(sub1())]
pprint.pprint(asyncio.Task.all\_tasks())
x = await fts[0]
print("comain")

  

{<Task pending coro=<comain() running at co.py:18> cb=[\_run\_until\_complete\_cb() at /usr/lib/python3.5/asyncio/base\_events.py:176]>,
<Task pending coro=<sub2() running at co.py:9>>,
<Task pending coro=<sub1() running at co.py:4>>}
sub1
sub2
comain

as_completed(fts)

Futures로 이루어진 generator를 반환한다. 원래는 Futures를 받도록 되어 있어 반드시 ensure\_future(coro)와 함께 사용해야 하지만 그냥 coroutine object를 넘겨도 되는 듯?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def comain():
fts = [sub2(), sub1()]
pprint.pprint(asyncio.Task.all\_tasks())
for f in asyncio.as\_completed(fts):
print("==========================")
pprint.pprint(asyncio.Task.all\_tasks())
x = await f
print("comain")

  

{<Task pending coro=<comain() running at co.py:18> cb=[\_run\_until\_complete\_cb() at /usr/lib/python3.5/asyncio/base\_events.py:176]>}
==========================
{<Task pending coro=<comain() running at co.py:21> cb=[\_run\_until\_complete\_cb() at /usr/lib/python3.5/asyncio/base\_events.py:176]>,
<Task pending coro=<sub1() running at co.py:4> cb=[as\_completed.<locals>.\_on\_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>,
<Task pending coro=<sub2() running at co.py:9> cb=[as\_completed.<locals>.\_on\_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>}
sub1
==========================
{<Task pending coro=<comain() running at co.py:21> cb=[\_run\_until\_complete\_cb() at /usr/lib/python3.5/asyncio/base\_events.py:176]>,
<Task finished coro=<sub1() done, defined at co.py:4> result=None>,
<Task pending coro=<sub2() running at co.py:10> wait\_for=<Future pending cb=[Task.\_wakeup()]> cb=[as\_completed.<locals>.\_on\_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>}
sub2
comain

gather(*coros_or_fts)

각 코루틴의 리턴값을 한 번에 리스트 형식으로 반환받고 싶을 때.

run_in_executor(None, func, *args)

비동기 함수가 아닌 blocking 함수를 다른 thread에서 실행시켜 비동기처럼 사용할 수 있도록 해준다.( multi-threading )

None을 적으면 기본 스레드풀을 사용한다. \*\*kwarg를 가진 함수는 그냥 넘길 수는 없고 functools.partial()을 이용해서 넘길 수 있는 형태로 변환해야한다.

run_coroutine_threadsafe(coro, loop)

run\_in\_executor와 반대로, EventLoop가 돌아가는 main thread가 아닌 다른 thread에서 coroutine object를 schedule하고 싶을 때 사용한다. Note ) To handle signals and to execute subprocesses, the event loop must be run in the main thread.

async with / async for

비동기 클래스 만들기 https://dojang.io/mod/page/view.php?id=1167

Socket Programming

Low-level socket operations

반드시 다음을 수행해주어야 함. 이를 설정하지 않으면 timeout 때문에 asyncio.sock\_recv()같은 비동기 함수를 사용해도 blocking된다.

1
2
sock.setblocking(False)    OR    sock.settimeout(0.0)

Streams

고수준 IO API. 그냥 raw asyncio socket을 사용하는 것 보다 이를 사용하는 것이 더 편하다. socket 뿐만 아니라 다른 곳에도 stream을 연결해 IO할 수 있음.

timeout

1
2
3
4
5
as\_completed(fts, \*, loop=None, timeout=None)
wait(fts, \*, loop=None, timeout=None, return\_when=ALL\_COMPLETED) # run\_until\_complete 대신.
wait\_for(ft, timeout, \*, loop=None)    # single future
future.result(timeout)

이렇게 입출력이 연쇄되는 경우에는 순차적으로 실행해야 한다
1
2
3
4
5
6
7
8
async def main2():
print(f"started at {time.strftime('%X')}")
task1 = asyncio.create\_task(func\_a())
a = await task1
task2 = asyncio.create\_task(func\_b(a))
print(await task2)
print(f"finished at {time.strftime('%X')}")

  • callback 방식으로 바꿔서 생각해보면, 결과가 있어야 func_b가 실행되므로 이렇게 밖에 될 수가 없다.
  • 정 func_a와 func_b를 동시에 실행하고 싶다면 리팩토링이 필요함.
  • 이런 경우는 동시 실행 안할것이기 때문에 create_task도 필요 없다. 그냥 await만 불러도 됨.

이렇게 입출력이 순차적으로 연관되는 케이스는 스레드를 쓰든 async를 쓰든 별다른 수가 없다. async가 효과적인 순간은 함수 간 입출력 연쇄 없이 동시 실행 가능할 때. *** 또는 JS같은 non blocking 언어에서는 입출력 연쇄가 필요하다면 callback을 써야만 하므로, 이에 대한 대안으로서도 가치가 있음.

1
2
3
4
5
6
7
8
9
10
11
12
13
async def main2():
print(f"started at {time.strftime('%X')}")


task1 = asyncio.create\_task(func\_a())
task2 = asyncio.create\_task(func\_b("C"))
print(await task1)
print(await task2)

  

print(f"finished at {time.strftime('%X')}")

여기서도 마찬가지로 await task1을 하든, await task2를 하든 schedule 되어 있던 task가 모조리 실행된다. debug 찍어보면 먼저 등록된 task1 실행하다가 await sleep 하면서 task2로 넘어가서 실행함. await 만나면 코드 흐름이 다음 task로 넘어간다. 이런 식으로 여러 task를 동시에 실행하고 있음.

This post is licensed under CC BY 4.0 by the author.