엄범


파이썬의 코루틴과 태스크 API

파이썬의 이벤트 루프 API


asyncio

3.4    generator based coroutine

```python

import asyncio


@asyncio.coroutine

def cortn():

    yield from asyncio.sleep(1.0)

    print('Hello, world!')

 

loop = asyncio.get_event_loop()

loop.run_until_complete(cortn())

loop.close()

```

Note ) ``python await == yield from`` 이다.

3.5 ~    native coroutine


실행 흐름

```python

import asyncio


async def sub(n):

    await asyncio.sleep(n)

    print("sub{}".format(n))



async def comain():

    times = [2, 1]

    fts = [asyncio.ensure_future(sub(t)) for t in times]

    for f in asyncio.as_completed(fts):

        x = await f

    print("comain")

    

 

loop = asyncio.get_event_loop()

loop.run_until_complete(comain())  # comain()이 Future로 wrapping되며 Task schedule.

loop.close()

```
```python
========== result ==========
sub1
sub2
comain
========control flow========
comain{ await sub(2) } 
→ sub(2){ await asyncio.sleep(2) }
→ comain{ await sub(1) } 
→ sub(1){ await asyncio.sleep(1) }
```

코루틴은 함수와 다르다. 코루틴 ``c cortn()``을 호출하면 실행되는 것이 아니라, 코루틴 객체를 반환한다.

Note ) callback을 연결할 수도 있다.

Note ) 이렇게 get_event_loop()를 반환받는 것 보다, asyncio.run()을 쓰는게 더 낫다. 문서 최상단 링크 참조.


ensure_future(coro)    == create_task (in 3.7+ 이름만 다르다)

coroutine object를 받아 Future로 wrapping하며 이 Task를 schedule.

분명 `` fts[0]``만 실행했는데 sub1까지 같이 실행됐다. `` ensure_future()``로 Task가 schedule되어 있기 때문에 직접 호출하지 않아도 ``python await``를 만나면 무조건 다음 Task를 실행하기 때문인 것으로 보인다.

```python

async def comain():
    fts = [asyncio.ensure_future(sub2()), asyncio.ensure_future(sub1())]
    pprint.pprint(asyncio.Task.all_tasks())
    x = await fts[0]
    print("comain")

{<Task pending coro=<comain() running at co.py:18> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:176]>,
 <Task pending coro=<sub2() running at co.py:9>>,
 <Task pending coro=<sub1() running at co.py:4>>}
sub1
sub2
comain

```


as_completed(fts)

Futures로 이루어진 generator를 반환한다. 원래는 Futures를 받도록 되어 있어 반드시 `` ensure_future(coro)``와 함께 사용해야 하지만 그냥 coroutine object를 넘겨도 되는 듯?

```python

async def comain():

    fts = [sub2(), sub1()]

    pprint.pprint(asyncio.Task.all_tasks())

    for f in asyncio.as_completed(fts):

        print("==========================")

        pprint.pprint(asyncio.Task.all_tasks())

        x = await f

    print("comain")


{<Task pending coro=<comain() running at co.py:18> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:176]>}

==========================

{<Task pending coro=<comain() running at co.py:21> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:176]>,

 <Task pending coro=<sub1() running at co.py:4> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>,

 <Task pending coro=<sub2() running at co.py:9> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>}

sub1

==========================

{<Task pending coro=<comain() running at co.py:21> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:176]>,

 <Task finished coro=<sub1() done, defined at co.py:4> result=None>,

 <Task pending coro=<sub2() running at co.py:10> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.5/asyncio/tasks.py:486]>}

sub2

comain

```


gather(*coros_or_fts)

각 코루틴의 리턴값을 한 번에 리스트 형식으로 반환받고 싶을 때.


run_in_executor(None, func, *args)

비동기 함수가 아닌 blocking 함수를 다른 thread에서 실행시켜 비동기처럼 사용할 수 있도록 해준다. ( multi-threading )

`` None``을 적으면 기본 스레드풀을 사용한다.

``python **kwarg``를 가진 함수는 그냥 넘길 수는 없고 `` functools.partial()``을 이용해서 넘길 수 있는 형태로 변환해야한다.


run_coroutine_threadsafe(coro, loop)

`` run_in_executor``와 반대로, EventLoop가 돌아가는 main thread가 아닌 다른 thread에서 coroutine object를 schedule하고 싶을 때 사용한다.
Note ) To handle signals and to execute subprocesses, the event loop must be run in the main thread.

async with / async for

비동기 클래스 만들기

https://dojang.io/mod/page/view.php?id=1167


Socket Programming

Low-level socket operations

반드시 다음을 수행해주어야 함. 이를 설정하지 않으면 timeout 때문에 `` asyncio.sock_recv()``같은 비동기 함수를 사용해도 blocking된다.

```python

sock.setblocking(False)    OR    sock.settimeout(0.0)

```


Streams

고수준 IO API. 그냥 raw asyncio socket을 사용하는 것 보다 이를 사용하는 것이 더 편하다.

socket 뿐만 아니라 다른 곳에도 stream을 연결해 IO할 수 있음.


timeout

```python
as_completed(fts, *, loop=None, timeout=None)
wait(fts, *, loop=None, timeout=None, return_when=ALL_COMPLETED) # run_until_complete 대신.
wait_for(ft, timeout, *, loop=None)    # single future
future.result(timeout)
```