lwzm
11/7/2017 - 4:55 AM

tornado gen and async

#!/usr/bin/env python3

import asyncio
import functools
import threading
import time

from concurrent.futures import ThreadPoolExecutor
from tornado.web import RequestHandler, Application

pool = ThreadPoolExecutor(10)
threads_running = {}


def thread_spy(fn):
    @functools.wraps(fn)
    def wrapper(*args, **kwds):
        thr = threading.current_thread()
        threads_running[thr] = args, kwds
        try:
            return fn(*args, **kwds)
        finally:
            threads_running.pop(thr)
    return wrapper


@thread_spy
def sleep(n):
    print(threading.current_thread())
    time.sleep(n)
    1 / n
    print(threading.current_thread(), n)
    return n


class Delay(RequestHandler):
    async def get(self, delay=1.0):
        delay = float(delay)
        n = await asyncio.wait_for(
            asyncio.wrap_future(pool.submit(sleep, delay)),
            10.0,
        )
        self.write(str(n))


class Info(RequestHandler):
    def get(self):
        self.set_header("Content-Type", "text/plain; charset=UTF-8")
        print(len(threads_running), "/", threading.active_count(), file=self)
        for k, v in threads_running.items():
            print(k, v, file=self)


app = Application([
    (r"/(.+)", Delay),
    (r"/", Info),
])


if __name__ == "__main__":
    from tornado.ioloop import IOLoop
    app.listen(1111, xheaders=True)
    IOLoop.current().start()
#!/usr/bin/env python3

import asyncio

import tornado.web


class Delay(tornado.web.RequestHandler):
    async def get(self, delay=1.0):
        await asyncio.sleep(float(delay))
        self.write(str(delay))


app = tornado.web.Application([
    (r"/(.+)", Delay),
    (r"/", Delay),
])


if __name__ == "__main__":
    from tornado.ioloop import IOLoop
    app.listen(1111, xheaders=True)
    IOLoop.current().start()
upstream gen_sock {
    server unix:/your/unix-socket-path;
    keepalive 512;
}

server {

    server_name  gen.*;
    listen       443 ssl http2;

    proxy_read_timeout 3600;

    location / {
        proxy_pass http://gen_sock;
    }
}
#!/usr/bin/env python3

import asyncio
import collections

import tornado.web


class Base(tornado.web.RequestHandler):
    _futures = collections.defaultdict(list)

    def set_default_headers(self):
        self.set_header("Content-Type", "text/plain; charset=UTF-8")

    @classmethod
    def complete(cls, token, msg):
        lst = cls._futures[token]
        for i in lst:
            i.set_result(msg)
        lst.clear()

    @classmethod
    def promise(cls, token):
        future = asyncio.Future()
        cls._futures[token].append(future)
        return future


class Handler(Base):
    async def get(self, token):
        result = await self.promise(token)
        await asyncio.sleep(1)
        self.write(result)

    def post(self, token):
        self.complete(token, self.request.body)


app = tornado.web.Application([
    (r"/(.+)", Handler),
])


if __name__ == "__main__":
    from tornado.options import define, parse_command_line, options
    from tornado.httpserver import HTTPServer
    from tornado.netutil import bind_unix_socket
    from tornado.ioloop import IOLoop
    define("sock", default=".sock")
    parse_command_line()
    server = HTTPServer(app, xheaders=True)
    server.add_socket(bind_unix_socket(options.sock, 0o666))
    IOLoop.current().start()
class RemoteDebugHandler(BaseHandler):
    _futures = []

    @tornado.gen.coroutine
    def get(self):
        future = tornado.concurrent.Future()
        self._futures.append(future)
        msg = yield future
        self.set_header("Content-Type", "text/plain; charset=UTF-8")
        self.write(msg)

    def post(self):
        msg = self.request.body.decode()
        lst = self._futures
        while lst:
            lst.pop().set_result(msg)