memo

2016-06-07

python の asyncio について調べる その5

今回は Stream について調べる。

Stream は coroutine ベースの API でコネクションへの読み書きが行える機能で、 transport/protocol の上に実装されている。

とりあえず使ってみる

またいつものように、サーバーから返ってきたデータを画面に出すクライアントを書いてみる。

import asyncio
import time


def hello_client():
    # open_connection() 関数によって、
    # 読み込み用・書き込み用のオブジェクトが得られる。
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 9999)
    print('connected', reader, writer)

    while True:
        # coroutine ベースの API ということで、 yield することでデータを読み出せる
        data = yield from reader.read(1024)

        if not data and reader.at_eof():
            print('eof')
            break

        print(int(time.time()))
        print(data)
        print('end')


loop = asyncio.get_event_loop()
# そういえば future/coroutine についての知識を得たので、
# 「 future/coroutine が最後まで走り切るまで event loop を動かす」という機能、
# run_until_complete() を使ってみる。
# わざわざ自分で loop.stop() を呼ぶ手間が省ける。
loop.run_until_complete(hello_client())

実行してみる:

$ python3.5 hello-client4.py
connected <StreamReader t=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>>> <StreamWriter transport=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>> reader=<StreamReader t=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>>>>
1465311510
b'hello world\n'
end
1465311511
b'hello world\n'
end
1465311512
b'hello world\n'
end
eof

内部実装について調べる

さて、では実際にどのような実装になっているのか少し見てみる。

open_connection()StreamReaderStreamWriter の2つのインスタンスを返してくる。

この内、 StreamWriter の方はどうも transport の書き込み用 API を公開するだけの wrapper のようだった。

StreamReader について見る。

StreamReader.read() は、まず内部バッファを確認し、あればそのデータをそのまま返す。

バッファが空の場合、内部で待ち受け用の future を作成し、それを yield することでバッファが埋まるのを待つ。 StreamReader は、表には出てこないが内部で使われている protocol, StreamReaderProtocol からデータを受け取る。 (StreamReaderProtocol は通常の protocol 同様 transport から .data_received() でデータを受け取っている。) データ受け取り時に待ち受け用の future に結果がセットされるので、 .read() の処理が再開され、データが呼び出し元に返ることになる。