memo

2016-06-06

python の asyncio について調べる その4

前回 の最後で書いたように、今回は coroutine と future, そして task について調べる。

coroutine

Coroutine は asyncio とか関係なく、昔から python にあるもの。 Generator の特殊な使い方という感じ。

一応復習しておく。

def my_coroutine():
    # yield から値を受け取ることができる。
    ret = yield 'ping'
    print('coroutine got', ret)

    ret = yield 'ping2'
    print('coroutine got', ret)


coro = my_coroutine()

# まずは iterator 同様 next() を使って処理を動かす。
# 最初の yield まで処理が進んで停止する。
ret = next(coro)
print('main got', ret)

# coroutine に値を入れるには、 send メソッドを呼び出す。
# これにより coroutine がまた動き出し、2個めの yield で停止する。
ret = coro.send('pong')
print('main got', ret)

try:
    coro.send('pong2')
except StopIteration:
    # coroutine が最後まで進んで終了するときは、
    # iterator 同様 StopIteration 例外が起きる
    print('coroutine stopped')

coroutine と future

さて、 future を yield する coroutine というものを考える。

  • Coroutine は future を yield した時点で停止する

  • Coroutine を呼び出している、外側の処理には future が返っている

  • Coroutine に send() することで、処理を再開させることができる

ここで、 yield されてきた future の callback で、その結果を coroutine.send() につなげてみると、どうなるか。 以下のようなコードを書いてみる。

import asyncio
import types


class Coroutine2Future(asyncio.Future):
    '''Future サブクラスの coroutine wrapper

    Coroutine を進めて、出てきた future の callback でその結果をまた coroutine に戻し、
    最終的にに出てきた結果を自身の結果とする。
    '''
    def __init__(self, coro):
        super().__init__()

        assert isinstance(coro, types.GeneratorType)
        self.coro = coro

        self._run()

    def _run(self, value=None):
        try:
            # 受け取った結果を渡して coroutine を進める
            fut = self.coro.send(value)

        except StopIteration as e:
            # coroutine が終了した場合、
            # 最後に戻ってきた値を自身の結果としてセットする。
            self.set_result(e.value)
        else:
            assert isinstance(fut, asyncio.Future)
            # coroutine から出てきた future の callback で、
            # その結果をまた自身に戻すようにする。
            # (coroutine.send に結果が渡るようにする。)
            fut.add_done_callback(lambda f: self._run(f.result()))

以下のようなコードが動作するようになる。

loop = asyncio.get_event_loop()

def my_coroutine():
    print('sleeping...')

    # 1秒後に結果がセットされる future を用意する
    fut = asyncio.Future()
    loop.call_later(1, fut.set_result, 42)

    # Future を yield する。
    ret = yield from fut
    # 非同期処理の結果が yield から戻ってきていることを確認する。
    assert ret == 42

    print('and sleeping...')
    # asyncio が提供している処理も試してみる。
    # asyncio.sleep() は第1引数秒間 sleep し、第2引数の値をそのまま戻す。
    ret = yield from asyncio.sleep(1, 'end')
    assert ret == 'end'

    print('wake up')

fut = Coroutine2Future(my_coroutine())
# 全体が走り切ったらこの future の結果がセットされるので、
# その callback で event loop を停止しておく。
fut.add_done_callback(lambda f: loop.stop())

loop.run_forever()

このように、 coroutine と future を組み合わせることで、 非同期処理の結果をコールバック関数を明示的に書く必要なく受け取ることができるようになる。

task

この Coroutine2Future と同様のものが、 asyncio にも標準で用意されている。 それが asyncio.Task

また、 future or coroutine を受け取って coroutine であれば task (future) に変換する、 というヘルパー関数、 ensure_future も用意されている。

asyncio の API で future or coroutine を受け取るようなものは、 内部でこれらを使用して cotourine を future に変換した上で処理している。


TODO: async def で定義するアレについて調べる。