memo

2014-02-10

Twisted の Producer/Consumer

Twisted には Producer/Consumer という仕組みがあって、 でかいデータのやり取りをうまく出来るようになっている。

どうやらデータを書き込む先である Consumer がデータを生成する元である Producer に対して、 「バッファいっぱいだからちょっと待って」「バッファ空いたからデータ送ってきていいよ」 というようなやり取りをするためのインターフェースっぽい。

けど ドキュメント 読んでもよく分からんので、ひたすら null を返すだけのウェブサービスを書いてみる。

# vim: fineencoding=utf-8
"""指定されたバイト数だけ null を返すウェブサービス
"""

from twisted.internet import interfaces
from twisted.web import resource, server
from zope.interface import implementer


class ZeroResource(resource.Resource):
    defaultSize = 1024

    def getChild(self, path, request):
        if path == '':
            return self

        return resource.NoResource()

    def render_GET(self, request):
        try:
            size = int(request.args.get('size')[0])

        except (TypeError, IndexError):
            size = self.defaultSize

        producer = ZeroProducer(request, size)

        request.setHeader('Content-Type', 'application/octet-stream')
        request.setHeader('Content-Length', str(size))

        if request.method == 'HEAD':
            return ''

        producer.start()

        return server.NOT_DONE_YET

    render_HEAD = render_GET


@implementer(interfaces.IPullProducer)
class ZeroProducer(object):
    bufSize = 4096

    def __init__(self, request, size):
        self.request = request
        self.size = size

        self._sentSize = 0

    def start(self):
        # Consumer である request に自身を登録
        self.request.registerProducer(self, False)

    # 以下2つが IPullProducer に必要なメソッド
    def stopProducing(self):
        self.request = None

    def resumeProducing(self):
        # Consumer のバッファに空きがあると呼ばれるメソッド
        if not self.request:
            return

        remaining = self.size - self._sentSize

        if remaining > self.bufSize:
            # Consumer にデータ書き込み
            self.request.write(b'\0' * self.bufSize)
            self._sentSize += self.bufSize

        else:
            self.request.write(b'\0' * remaining)
            self._sentSize += remaining
            # Consumer への登録取り消し
            self.request.unregisterProducer()
            # HTTP response 終了
            self.request.finish()
            self.stopProducing()


if __name__ == '__main__':
    import sys
    from twisted.python import log
    log.startLogging(sys.stderr)

    from twisted.internet import reactor

    site = server.Site(ZeroResource())
    reactor.listenTCP(8080, site)

    reactor.run()

twisted.web の Request class は IConsumer を実装しているので、 null を生成する IPullProducer を実装してつなげる。

これで、

  • curl --limit-rate 1 -N http://localhost:8080/?size=1073741824 のように、1Gのデータを1バイトずつ読み出されたとしてもメモリ使用量は一定

  • 複数同時に接続されたとしても、どれかが待たされたりすることはない