python の asyncio について調べる その5
今回は Stream について調べる。
Stream は coroutine ベースの API でコネクションへの読み書きが行える機能で、 transport/protocol の上に実装されている。
とりあえず使ってみる
またいつものように、サーバーから返ってきたデータを画面に出すクライアントを書いてみる。
import asyncio import time def hello_client(): # open_connection() 関数によって、 # 読み込み用・書き込み用のオブジェクトが得られる。 reader, writer = yield from asyncio.open_connection('127.0.0.1', 9999) print('connected', reader, writer) while True: # coroutine ベースの API ということで、 yield することでデータを読み出せる data = yield from reader.read(1024) if not data and reader.at_eof(): print('eof') break print(int(time.time())) print(data) print('end') loop = asyncio.get_event_loop() # そういえば future/coroutine についての知識を得たので、 # 「 future/coroutine が最後まで走り切るまで event loop を動かす」という機能、 # run_until_complete() を使ってみる。 # わざわざ自分で loop.stop() を呼ぶ手間が省ける。 loop.run_until_complete(hello_client())
実行してみる:
$ python3.5 hello-client4.py
connected <StreamReader t=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>>> <StreamWriter transport=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>> reader=<StreamReader t=<_SelectorSocketTransport fd=7 read=polling write=<idle, bufsize=0>>>>
1465311510
b'hello world\n'
end
1465311511
b'hello world\n'
end
1465311512
b'hello world\n'
end
eof
内部実装について調べる
さて、では実際にどのような実装になっているのか少し見てみる。
open_connection()
は StreamReader
と StreamWriter
の2つのインスタンスを返してくる。
この内、 StreamWriter
の方はどうも transport の書き込み用 API を公開するだけの wrapper のようだった。
StreamReader
について見る。
StreamReader.read()
は、まず内部バッファを確認し、あればそのデータをそのまま返す。
バッファが空の場合、内部で待ち受け用の future を作成し、それを yield することでバッファが埋まるのを待つ。 StreamReader
は、表には出てこないが内部で使われている protocol, StreamReaderProtocol
からデータを受け取る。 (StreamReaderProtocol
は通常の protocol 同様 transport から .data_received()
でデータを受け取っている。) データ受け取り時に待ち受け用の future に結果がセットされるので、
.read()
の処理が再開され、データが呼び出し元に返ることになる。