memo

2016-01-26

python で同期的なコードの中で、一部非同期な処理をしたい

同期的なフレームワークを用いて書かれた処理の中で、一部だけ非同期に処理したいと思うことがある。

例えば、

  • リクエストの処理途中でメール送信を行う必要があるのだが、 メール送信処理は裏で行わせておいて、 その結果は待たずにレスポンスをさっさと返したい。

  • 複数の HTTP サーバーにリクエストを送り、その結果をまとめて処理するのだが、 リクエストは全部一度に送ってしまって、全ての結果が返ってきたところで処理を進めたい。

など。

で、 python で非同期処理といえば twisted なのだけど、 その twisted を同期的なコードから呼びやすくするためのライブラリとして crochet というものがある。

ということでこれ使えば大体解決すると思うけど、 既存の同期的な処理をそのまま非同期に実行したいのだ (twisted に合わせて書き換えたくない)、 というケースもあるかもしれないので、 twisted の thread pool を利用して同期的なコードを非同期に実行する例を書いてみる:

# vim: fileencoding=utf-8
from __future__ import print_function

from crochet import setup, run_in_reactor

setup()

import functools


def run_in_thread_pool(func):
    '''処理を thread pool 内で実行するデコレータ
    '''
    @functools.wraps(func)
    @run_in_reactor
    def wrapped(*args, **kwargs):
        from twisted.internet import threads

        return threads.deferToThread(func, *args, **kwargs)

    return wrapped


def test():
    @run_in_thread_pool
    def heavy_process():
        import threading
        import time

        # 重い処理の例として3秒スリープする
        time.sleep(3)
        print(time.ctime(), threading.current_thread().name)

        return 2

    # 重い処理を21個同時に開始する。
    # (処理自体は twisted デフォルトの thread pool size の10個ずつ走る。)
    hs = [heavy_process() for _ in range(21)]

    print('waiting...')
    # .wait() を呼び出し、処理の終了待ちと結果の受け取りをする。
    results = [h.wait() for h in hs]

    print(sum(results))


if __name__ == '__main__':
    test()

実行結果:

waiting...
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-0
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-1
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-2
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-3
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-4
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-5
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-6
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-8
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-7
Tue Jan 26 23:08:04 2016 PoolThread-twisted.internet.reactor-9
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-0
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-1
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-2
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-4
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-5
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-3
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-6
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-8
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-7
Tue Jan 26 23:08:07 2016 PoolThread-twisted.internet.reactor-9
Tue Jan 26 23:08:10 2016 PoolThread-twisted.internet.reactor-0
42

この例では非同期処理の結果を最後に集めて使っているけど、結果に興味が無いのであれば単に .wait() を呼ばなければ良い。 (ただしその場合でも、プログラム自体の終了は全ての処理が終了してからになっていた。)