memo

2015-08-24

並列度の制限を python (asyncio) で

さっきのasyncio を使って書いてみる:

import asyncio
import functools


class ConcurrentManager(object):
    def __init__(self, concurrency):
        self._sem = asyncio.locks.Semaphore(concurrency)

    def wrap(self, func):
        assert asyncio.coroutines.iscoroutinefunction(func)

        @functools.wraps(func)
        @asyncio.coroutine
        def wrapper(*args, **kwargs):
            yield from self._sem.acquire()
            try:
                ret = yield from func(*args, **kwargs)
            finally:
                self._sem.release()

            return ret

        return wrapper

で同じく、ドメインごとに同時アクセス数を制限する例:

import asyncio
import aiohttp
import urllib.parse

@asyncio.coroutine
def _fetch_page(url):
    response = yield from aiohttp.request('GET', url)
    assert response.status == 200
    return (yield from response.read())

_cm_per_domain = {}

@asyncio.coroutine
def fetch_page(url):
    domain = urllib.parse.urlparse(url).netloc
    try:
        cm = _cm_per_domain[domain]
    except KeyError:
        cm = _cm_per_domain[domain] = ConcurrentManager(2)

    return (yield from cm.wrap(_fetch_page)(url))

fs = [
    fetch_page('http://localhost:8000/1'),
    fetch_page('http://localhost:8001/1'),
    fetch_page('http://localhost:8000/2'),
    fetch_page('http://localhost:8001/2'),
    fetch_page('http://localhost:8000/3'),
    fetch_page('http://localhost:8001/3'),
    fetch_page('http://localhost:8000/4'),
    fetch_page('http://localhost:8001/4'),
]

loop = asyncio.get_event_loop()
contents = loop.run_until_complete(asyncio.gather(*fs))
print(contents)

似たようなユーティリティクラス・関数が提供されていたので、だいたいそのまま。