並列度の制限を python (twisted) で
元ネタ: http://qiita.com/shyouhei/items/f944b345a32ed149dbc3
依存関係うんぬんのところは、単に deferred の callback chain でよさそう。
ということで、並列度の制限のところだけ考えてみる。 twisted.internet.defer
には DeferredSemaphore
とかいういかにもなモノがあるので、 これを利用して以下のようなクラスを作ってみる。
import functools from twisted.internet import defer class ConcurrentManager(object): def __init__(self, concurrency): self._sem = defer.DeferredSemaphore(concurrency) def wrap(self, func): @functools.wraps(func) @defer.inlineCallbacks def wrapper(*args, **kwargs): yield self._sem.acquire() try: ret = yield defer.maybeDeferred(func, *args, **kwargs) finally: self._sem.release() defer.returnValue(ret) return wrapper
で、これを使って例として、
HTTP リクエストを投げる処理
f1
とf2
を順番に呼ぶf1
とf2
は合わせて同時に3つまでしか走らない
というようなのを実装してみる。
from __future__ import print_function from datetime import datetime from twisted.internet import defer, reactor from twisted.web.client import getPage cm = ConcurrentManager(3) @cm.wrap @defer.inlineCallbacks def f1(n): print(datetime.now(), 'start f1({})'.format(n)) page = yield getPage('http://localhost:8000/page1') print(datetime.now(), 'end f1({})'.format(n)) defer.returnValue(n) @cm.wrap @defer.inlineCallbacks def f2(n): print(datetime.now(), 'start f2({})'.format(n)) page = yield getPage('http://localhost:8000/page2') print(datetime.now(), 'end f2({})'.format(n)) defer.returnValue(n) ds = [ f1(n).addCallback(f2) for n in range(4) ] defer.gatherResults(ds).addBoth(lambda _: reactor.stop()) reactor.run()
ランダムな秒数スリープする HTTP サーバーに対して実行してみた結果:
2015-08-24 15:38:27.602508 start f1(0)
2015-08-24 15:38:27.603131 start f1(1)
2015-08-24 15:38:27.603428 start f1(2)
2015-08-24 15:38:29.613932 end f1(1)
2015-08-24 15:38:29.614219 start f1(3)
2015-08-24 15:38:30.613483 end f1(0)
2015-08-24 15:38:30.613697 start f2(1)
2015-08-24 15:38:30.621357 end f1(3)
2015-08-24 15:38:30.621607 start f2(0)
2015-08-24 15:38:31.613802 end f1(2)
2015-08-24 15:38:31.614067 start f2(3)
2015-08-24 15:38:31.619616 end f2(1)
2015-08-24 15:38:31.619837 start f2(2)
2015-08-24 15:38:32.621757 end f2(3)
2015-08-24 15:38:32.627226 end f2(0)
2015-08-24 15:38:35.626644 end f2(2)
f1(2)
まで実行したところで一旦止まって、その後は1つ終了するごとに次が開始してる。 何となく動いていそう。
さて、上記の例ではデコレータとして使ったけど、関数実行時に wrap
してもよいのだし、 ConcurrentManager
自体も動的に生成できる。
ということで、ドメインごとに同時アクセス数を制限する例:
from __future__ import print_function from twisted.internet import defer, reactor from twisted.web.client import getPage as _getPage from twisted.python.compat import urllib_parse _cm_per_domain = {} def getPage(url): """ドメインごとの同時アクセス数制限付き getPage """ domain = urllib_parse.urlparse(url).netloc try: cm = _cm_per_domain[domain] except KeyError: cm = _cm_per_domain[domain] = ConcurrentManager(2) return cm.wrap(_getPage)(url) ds = [ getPage(b'http://localhost:8000/1'), getPage(b'http://localhost:8001/1'), getPage(b'http://localhost:8000/2'), getPage(b'http://localhost:8001/2'), getPage(b'http://localhost:8000/3'), getPage(b'http://localhost:8001/3'), getPage(b'http://localhost:8000/4'), getPage(b'http://localhost:8001/4'), ] defer.gatherResults(ds).addBoth(lambda r: (print(r), reactor.stop())) reactor.run()
思いつきで実装したわりには、案外使いでがあるのかもしれない。