memo

2016-06-05

python の asyncio について調べる その3

前回 で、 transport/protocol の挙動を見た。

今回は実際に意味のある protocol を実装して見つつ、 イベント駆動とコールバック関数、そして Future の辺りまで説明したい。

(ものすごい雑に) HTTP を実装してみる

とりあえず例として、 (ものすごい雑に) HTTP を実装してみる。

import asyncio
import io


# 雑 HTTP client 実装
class SimpleHTTPClientProtocol(asyncio.Protocol):
    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.buffer = io.BytesIO()

    def connection_made(self, transport):
        # サーバーに接続されたら、まず HTTP リクエストを transport に書き込む。
        data = (
            'GET {path} HTTP/1.1\r\n'
            'Host: {host}\r\n'
            'Connection: close\r\n'
            '\r\n'
        ).format(path=self.path, host=self.host).encode('utf-8')

        transport.write(data)

    def data_received(self, data):
        # transport から受け取ったデータは、一旦バッファに蓄える。
        self.buffer.write(data)

    def eof_received(self):
        # 全てのデータを受信し終わったら、ヘッダとボディを分ける。
        # XXX: もちろん、正しくはちゃんとヘッダをパースしたりとか
        #      そもそもボディは必要分だけ読みだすとか色々あるのだけど、
        #      ここでは雑に処理する。
        data = self.buffer.getvalue()
        header, body = data.split(b'\r\n\r\n', 1)

        # 受信したレスポンスボディを画面に出力して、終了する。
        print(body.decode('utf-8'))
        loop.stop()


loop = asyncio.get_event_loop()

host = 'localhost'
path = '/index.html'

# XXX: ensure_future はまだおまじないのまま。
asyncio.ensure_future(
    loop.create_connection(
        lambda: SimpleHTTPClientProtocol(host, path), host, 80))

loop.run_forever()

これを実行すると、 http://localhost/index.html を画面に表示し、終了する。

再利用性を考える

さて、 HTTP を利用するために毎回このような protocol クラスを定義するのは面倒なので、 この protocol をライブラリとして再利用できるようにすることを考える。

上記 protocol ではレスポンス受信後の処理が画面表示に固定されてしまっているが、 ここはライブラリ利用者が自由に決めたい部分である。

ライブラリ利用者がクラスの挙動を自由にカスタマイズする方法としては、継承とメソッド上書きという手がある。

import asyncio
import io


class SimpleHTTPClientProtocol(asyncio.Protocol):
    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.buffer = io.BytesIO()

    def connection_made(self, transport):
        self.transport = transport

        data = (
            'GET {path} HTTP/1.1\r\n'
            'Host: {host}\r\n'
            'Connection: close\r\n'
            '\r\n'
        ).format(path=self.path, host=self.host).encode('utf-8')

        self.transport.write(data)

    def data_received(self, data):
        self.buffer.write(data)

    def eof_received(self):
        data = self.buffer.getvalue()
        header, body = data.split(b'\r\n\r\n', 1)

        # レスポンスボディの処理を別メソッド呼び出しに分離
        self.body_received(body)

    def body_received(self, body):
        # ライブラリユーザーは、このメソッドを上書きすることで
        # 好きな処理を行うことができる。
        raise NotImplementedError


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # ライブラリユーザーは SimpleHTTPClientProtocol を継承して使う
    class MyHTTPClientProtocol(SimpleHTTPClientProtocol):
        def body_received(self, body):
            print(body.decode('utf-8'))
            loop.stop()

    host = 'localhost'
    path = '/index.html'

    asyncio.ensure_future(
        loop.create_connection(
            # SimpleHTTPClientProtocol の代わりに、自分で定義した子クラスを使って接続する
            lambda: MyHTTPClientProtocol(host, path), host, 80))

    loop.run_forever()

コールバック関数

さて、これで HTTP protocol クラスが再利用可能になったのだが、 HTTP 接続毎に毎回子クラスを定義するというのは面倒だ。

定義するべき処理 (メソッド) が一つしかないというのであれば、コールバック関数を渡すという API も考えられる。

import asyncio
import io


class SimpleHTTPClientProtocol(asyncio.Protocol):
    def __init__(self, host, path, callback):
        self.host = host
        self.path = path

        # レスポンスボディの処理をコールバック関数として受け取っておく
        self.callback = callback

        self.buffer = io.BytesIO()

    def connection_made(self, transport):
        self.transport = transport

        data = (
            'GET {path} HTTP/1.1\r\n'
            'Host: {host}\r\n'
            'Connection: close\r\n'
            '\r\n'
        ).format(path=self.path, host=self.host).encode('utf-8')

        self.transport.write(data)

    def data_received(self, data):
        self.buffer.write(data)

    def eof_received(self):
        data = self.buffer.getvalue()
        header, body = data.split(b'\r\n\r\n', 1)

        self.body_received(body)

    def body_received(self, body):
        # 保持しておいたコールバック関数を呼び出す。
        self.callback(body)


# create_connection の呼び出しも含めてヘルパー関数を用意しておく
def simple_http(host, path, callback):
    loop = asyncio.get_event_loop()

    asyncio.ensure_future(
        loop.create_connection(
            # SimpleHTTPClientProtocol に、
            # ライブラリユーザーから受け取ったコールバックを渡す
            lambda: SimpleHTTPClientProtocol(host, path, callback),
            host, 80))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # ライブラリユーザーはコールバック関数を作成して使う
    def my_callback(body):
        print(body.decode('utf-8'))
        loop.stop()

    host = 'localhost'
    path = '/index.html'

    simple_http(host, path, my_callback)

    loop.run_forever()

Future

さて、この場面のような、一度きりしか呼ばれない (呼ばれるべきではない) コールバックを管理する方法として、 Future というものが利用できる。

使ってみよう。

import asyncio
import io


class SimpleHTTPClientProtocol(asyncio.Protocol):
    def __init__(self, host, path, future):
        self.host = host
        self.path = path

        # 受け取った future を保持しておく
        self.future = future

        self.buffer = io.BytesIO()

    def connection_made(self, transport):
        self.transport = transport

        data = (
            'GET {path} HTTP/1.1\r\n'
            'Host: {host}\r\n'
            'Connection: close\r\n'
            '\r\n'
        ).format(path=self.path, host=self.host).encode('utf-8')

        self.transport.write(data)

    def data_received(self, data):
        self.buffer.write(data)

    def eof_received(self):
        data = self.buffer.getvalue()
        header, body = data.split(b'\r\n\r\n', 1)

        self.body_received(body)

    def body_received(self, body):
        # ボディを受け取ったら、 future の結果としてセットする。
        self.future.set_result(body)


# 今度はコールバックを受け取らず、 future を返す関数として定義する。
def simple_http(host, path):
    # 結果の管理に使う future を作成する
    future = asyncio.Future()

    loop = asyncio.get_event_loop()

    asyncio.ensure_future(
        loop.create_connection(
            # SimpleHTTPClientProtocol に future を渡す。
            lambda: SimpleHTTPClientProtocol(host, path, future),
            host, 80))

    # Protocol に渡したものと同じ future を結果として返す。
    return future


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    def my_callback(future):
        # future から結果を取り出し、処理する
        body = future.result()

        print(body.decode('utf-8'))
        loop.stop()

    host = 'localhost'
    path = '/index.html'

    # ここではコールバック関数を渡さず、
    future = simple_http(host, path)
    # 受け取った future にコールバックをセットする。
    future.add_done_callback(my_callback)

    loop.run_forever()

... まあ、正直普通にコールバックを渡す場合と比べてあまり変わっていない気がする。

Future は coroutine と組み合わせることで面白い使い方ができる。次はその辺説明したい。