memo

2015-08-24

並列度の制限を python (twisted) で

元ネタ: http://qiita.com/shyouhei/items/f944b345a32ed149dbc3

依存関係うんぬんのところは、単に deferred の callback chain でよさそう。

ということで、並列度の制限のところだけ考えてみる。 twisted.internet.defer には DeferredSemaphore とかいういかにもなモノがあるので、 これを利用して以下のようなクラスを作ってみる。

import functools

from twisted.internet import defer


class ConcurrentManager(object):
    def __init__(self, concurrency):
        self._sem = defer.DeferredSemaphore(concurrency)

    def wrap(self, func):
        @functools.wraps(func)
        @defer.inlineCallbacks
        def wrapper(*args, **kwargs):
            yield self._sem.acquire()

            try:
                ret = yield defer.maybeDeferred(func, *args, **kwargs)
            finally:
                self._sem.release()

            defer.returnValue(ret)

        return wrapper

で、これを使って例として、

  • HTTP リクエストを投げる処理 f1f2 を順番に呼ぶ

  • f1f2 は合わせて同時に3つまでしか走らない

というようなのを実装してみる。

from __future__ import print_function

from datetime import datetime
from twisted.internet import defer, reactor
from twisted.web.client import getPage

cm = ConcurrentManager(3)

@cm.wrap
@defer.inlineCallbacks
def f1(n):
    print(datetime.now(), 'start f1({})'.format(n))
    page = yield getPage('http://localhost:8000/page1')
    print(datetime.now(), 'end f1({})'.format(n))
    defer.returnValue(n)

@cm.wrap
@defer.inlineCallbacks
def f2(n):
    print(datetime.now(), 'start f2({})'.format(n))
    page = yield getPage('http://localhost:8000/page2')
    print(datetime.now(), 'end f2({})'.format(n))
    defer.returnValue(n)

ds = [
    f1(n).addCallback(f2)
    for n in range(4)
]

defer.gatherResults(ds).addBoth(lambda _: reactor.stop())
reactor.run()

ランダムな秒数スリープする HTTP サーバーに対して実行してみた結果:

2015-08-24 15:38:27.602508 start f1(0)
2015-08-24 15:38:27.603131 start f1(1)
2015-08-24 15:38:27.603428 start f1(2)
2015-08-24 15:38:29.613932 end f1(1)
2015-08-24 15:38:29.614219 start f1(3)
2015-08-24 15:38:30.613483 end f1(0)
2015-08-24 15:38:30.613697 start f2(1)
2015-08-24 15:38:30.621357 end f1(3)
2015-08-24 15:38:30.621607 start f2(0)
2015-08-24 15:38:31.613802 end f1(2)
2015-08-24 15:38:31.614067 start f2(3)
2015-08-24 15:38:31.619616 end f2(1)
2015-08-24 15:38:31.619837 start f2(2)
2015-08-24 15:38:32.621757 end f2(3)
2015-08-24 15:38:32.627226 end f2(0)
2015-08-24 15:38:35.626644 end f2(2)

f1(2) まで実行したところで一旦止まって、その後は1つ終了するごとに次が開始してる。 何となく動いていそう。

さて、上記の例ではデコレータとして使ったけど、関数実行時に wrap してもよいのだし、 ConcurrentManager 自体も動的に生成できる。

ということで、ドメインごとに同時アクセス数を制限する例:

from __future__ import print_function

from twisted.internet import defer, reactor
from twisted.web.client import getPage as _getPage
from twisted.python.compat import urllib_parse

_cm_per_domain = {}

def getPage(url):
    """ドメインごとの同時アクセス数制限付き getPage
    """
    domain = urllib_parse.urlparse(url).netloc
    try:
        cm = _cm_per_domain[domain]
    except KeyError:
        cm = _cm_per_domain[domain] = ConcurrentManager(2)

    return cm.wrap(_getPage)(url)

ds = [
    getPage(b'http://localhost:8000/1'),
    getPage(b'http://localhost:8001/1'),
    getPage(b'http://localhost:8000/2'),
    getPage(b'http://localhost:8001/2'),
    getPage(b'http://localhost:8000/3'),
    getPage(b'http://localhost:8001/3'),
    getPage(b'http://localhost:8000/4'),
    getPage(b'http://localhost:8001/4'),
]

defer.gatherResults(ds).addBoth(lambda r: (print(r), reactor.stop()))
reactor.run()

思いつきで実装したわりには、案外使いでがあるのかもしれない。