並列度の制限を python (asyncio) で
さっきの を asyncio
を使って書いてみる:
import asyncio import functools class ConcurrentManager(object): def __init__(self, concurrency): self._sem = asyncio.locks.Semaphore(concurrency) def wrap(self, func): assert asyncio.coroutines.iscoroutinefunction(func) @functools.wraps(func) @asyncio.coroutine def wrapper(*args, **kwargs): yield from self._sem.acquire() try: ret = yield from func(*args, **kwargs) finally: self._sem.release() return ret return wrapper
で同じく、ドメインごとに同時アクセス数を制限する例:
import asyncio import aiohttp import urllib.parse @asyncio.coroutine def _fetch_page(url): response = yield from aiohttp.request('GET', url) assert response.status == 200 return (yield from response.read()) _cm_per_domain = {} @asyncio.coroutine def fetch_page(url): domain = urllib.parse.urlparse(url).netloc try: cm = _cm_per_domain[domain] except KeyError: cm = _cm_per_domain[domain] = ConcurrentManager(2) return (yield from cm.wrap(_fetch_page)(url)) fs = [ fetch_page('http://localhost:8000/1'), fetch_page('http://localhost:8001/1'), fetch_page('http://localhost:8000/2'), fetch_page('http://localhost:8001/2'), fetch_page('http://localhost:8000/3'), fetch_page('http://localhost:8001/3'), fetch_page('http://localhost:8000/4'), fetch_page('http://localhost:8001/4'), ] loop = asyncio.get_event_loop() contents = loop.run_until_complete(asyncio.gather(*fs)) print(contents)
似たようなユーティリティクラス・関数が提供されていたので、だいたいそのまま。