Twisted の Producer/Consumer
Twisted には Producer/Consumer という仕組みがあって、 でかいデータのやり取りをうまく出来るようになっている。
どうやらデータを書き込む先である Consumer がデータを生成する元である Producer に対して、 「バッファいっぱいだからちょっと待って」「バッファ空いたからデータ送ってきていいよ」 というようなやり取りをするためのインターフェースっぽい。
けど ドキュメント 読んでもよく分からんので、ひたすら null を返すだけのウェブサービスを書いてみる。
# vim: fineencoding=utf-8 """指定されたバイト数だけ null を返すウェブサービス """ from twisted.internet import interfaces from twisted.web import resource, server from zope.interface import implementer class ZeroResource(resource.Resource): defaultSize = 1024 def getChild(self, path, request): if path == '': return self return resource.NoResource() def render_GET(self, request): try: size = int(request.args.get('size')[0]) except (TypeError, IndexError): size = self.defaultSize producer = ZeroProducer(request, size) request.setHeader('Content-Type', 'application/octet-stream') request.setHeader('Content-Length', str(size)) if request.method == 'HEAD': return '' producer.start() return server.NOT_DONE_YET render_HEAD = render_GET @implementer(interfaces.IPullProducer) class ZeroProducer(object): bufSize = 4096 def __init__(self, request, size): self.request = request self.size = size self._sentSize = 0 def start(self): # Consumer である request に自身を登録 self.request.registerProducer(self, False) # 以下2つが IPullProducer に必要なメソッド def stopProducing(self): self.request = None def resumeProducing(self): # Consumer のバッファに空きがあると呼ばれるメソッド if not self.request: return remaining = self.size - self._sentSize if remaining > self.bufSize: # Consumer にデータ書き込み self.request.write(b'\0' * self.bufSize) self._sentSize += self.bufSize else: self.request.write(b'\0' * remaining) self._sentSize += remaining # Consumer への登録取り消し self.request.unregisterProducer() # HTTP response 終了 self.request.finish() self.stopProducing() if __name__ == '__main__': import sys from twisted.python import log log.startLogging(sys.stderr) from twisted.internet import reactor site = server.Site(ZeroResource()) reactor.listenTCP(8080, site) reactor.run()
twisted.web の Request class は IConsumer を実装しているので、 null を生成する IPullProducer を実装してつなげる。
これで、
curl --limit-rate 1 -N http://localhost:8080/?size=1073741824
のように、1Gのデータを1バイトずつ読み出されたとしてもメモリ使用量は一定複数同時に接続されたとしても、どれかが待たされたりすることはない