La programmation basée sur les entrées/sorties asynchrones en Python : cas d'utilisations, fonctionnement interne, contraintes et styles de programmation.
Temps d'exécution limité par le processeur.
Calculs bruts, encodage de données, etc.
↪ l'asynchrone ne sert à rien
Pour aller plus vite :
Temps d'exécution limité par les entrées/sorties
↪ l'asynchrone peut être utile
WebSockets → l'asynchrone est vite indispensable
Nos processus passent leur temps à attendre ? Plus assez d'espace (mémoire) pour en ajouter d'autres ?
↪ Les passer en asynchrone peut aider.
Nos processus travaillent ?
↪ Les passer en asynchrone ne va pas aider.
On peut par contre envisager d'externaliser ce travail pour les passer en asynchrone.
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
10 000 requêtes par lots de 100 :
$ ab -n 10000 -c 100 http://127.0.0.1:8888/
Time taken for tests: 6.706 seconds
Complete requests: 10000
Requests per second: 1491.25 [#/sec] (mean)
Time per request: 67.058 [ms] (mean)
class MainHandler(tornado.web.RequestHandler):
def get(self):
cur = self.application.db.cursor()
cur.execute("SELECT 42, pg_sleep(0.300);")
result = cur.fetchone()
self.write("Result: %s" % result[0])
20 requêtes par lots de 10 :
$ ab -n 20 -c 10 http://127.0.0.1:8888/
Time taken for tests: 6.201 seconds
Complete requests: 20
Requests per second: 3.23 [#/sec] (mean)
Time per request: 3100.513 [ms] (mean)
from tornado import web, ioloop
import momoko
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
# Ancienne API de Momoko
self.application.db.execute(
'SELECT 42, pg_sleep(0.300)', callback=self._done)
def _done(self, cursor, error):
result = cursor.fetchone()
self.write("Result: %s" % result[0])
self.finish()
20 requêtes par lots de 10 :
$ ab -n 20 -c 10 http://127.0.0.1:8888/
Time taken for tests: 0.622 seconds
Complete requests: 20
Requests per second: 32.18 [#/sec] (mean)
Time per request: 310.756 [ms] (mean)
Version extrêmement simplifiée de l'IOLoop de Tornado :
def start(self):
while True:
# Appelle la fonction de polling de la plateforme
# (epoll sous Linux, kqueue sous BSD). Celle-ci
# renvoie les événements survenus sur les
# descripteurs de fichiers (sockets, etc.) surveillés
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
# Appelle les handler d'événements enregistrés
fd, events = self._events.popitem()
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
Enregistrement des handlers :
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
def handle_db(cursor, error):
db_value = cursor.fetchone()[0]
def handle_http(response):
json_data = json.loads(response.body.decode())
result = db_value - json_data['value']
self.write("Result: %s" % result)
self.finish()
http_client = tornado.httpclient.AsyncHTTPClient()
http_client.fetch('http://127.0.0.1:8000/', handle_http)
# Ancienne API de Momoko
self.application.db.execute(
'SELECT 42, pg_sleep(0.300)', callback=handle_db)
20 requêtes par lots de 10 :
$ ab -c 10 -n 20 http://127.0.0.1:8888/
Time taken for tests: 1.260 seconds
Requests per second: 15.88 [#/sec] (mean)
Time per request: 629.834 [ms] (mean)
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
cursor = yield self.application.db.execute(
'SELECT 42, pg_sleep(0.300)')
db_value = cursor.fetchone()[0]
http_client = tornado.httpclient.AsyncHTTPClient()
response = yield http_client.fetch('http://127.0.0.1:8000/')
json_data = json.loads(response.body.decode())
result = db_value - json_data['value']
self.write("Result: %s" % result)
self.finish()
20 requêtes par lots de 10 :
$ ab -c 10 -n 20 http://127.0.0.1:8888/
Time taken for tests: 1.281 seconds
Requests per second: 15.61 [#/sec] (mean)
Time per request: 640.527 [ms] (mean)
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
# Lancement des requêtes en parallèle
cursor, response = yield [
self.application.db.execute('SELECT 42, pg_sleep(0.300)'),
http_client.fetch('http://127.0.0.1:8000/'),
]
db_value = cursor.fetchone()[0]
json_data = json.loads(response.body.decode())
result = db_value - json_data['value']
self.write("Result: %s" % result)
self.finish()
20 requêtes par lots de 10 :
$ ab -c 10 -n 20 http://127.0.0.1:8888/
Time taken for tests: 0.663 seconds
Requests per second: 30.15 [#/sec] (mean)
Time per request: 331.638 [ms] (mean)
# Nécessite tornado==4.3.dev1
class MainHandler(tornado.web.RequestHandler):
async def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
cursor, response = await tornado.gen.multi([
self.application.db.execute('SELECT 42, pg_sleep(0.300)'),
http_client.fetch('http://127.0.0.1:8000/')
])
db_value = cursor.fetchone()[0]
json_data = json.loads(response.body.decode())
result = db_value - json_data['value']
self.write("Result: %s" % result)
self.finish()
20 requêtes par lots de 10 :
$ ab -c 10 -n 20 http://127.0.0.1:8888/
Time taken for tests: 0.706 seconds
Requests per second: 28.35 [#/sec] (mean)
Time per request: 352.756 [ms] (mean)
↪ THR utilise Tornado pour protéger et réguler une appli Django par des mécanismes asynchrones
50 à 100 personnes prennent leur service en même temps et bombardent le système de requêtes THR utilise Tornado pour protéger et réguler un cluster django synchrone (CPU bound) par des mécanismes asynchrones tout en continuant de répondre de façon synchrone aux requêtes HTTP des clients.
Exemple : limiter le nombre de requêtes simultanées comportant un entête HTTP donné
def limit_foo(request):
return request.headers.get('Foo') or 'none'
add_max_limit('limit_foo_header', limit_foo, "bar", 2)
Requêtes ne correspondant pas au critère de limitation :
$ ab -c10 -n10 -H "Foo: baz" http://127.0.0.1:8888/|grep 'Time taken'
Time taken for tests: 1.045 seconds
Requêtes correspondant au critère :
$ ab -c10 -n10 -H "Foo: bar" http://127.0.0.1:8888/|grep 'Time taken'
Time taken for tests: 5.055 seconds
Note : le service sous-jacent met une seconde à répondre dans les deux cas.
Des dizaines de processus assimilant des données d'origines diverses : satellites, radars, observations, super-calculateurs...
Arrêter et expirer les processus sans interrompre le travail en cours.
L'intérêt de Tornado pour circus est l'utilisation des coroutines qui permet de sortir du "callback hell" dans lequel j'étais tombé à ma première tentative. Le code est bien plus lisible et bien moins bugué !
Extrait du code :
@gen.coroutine
def _stop_watchers(self, close_output_streams=False,
for_shutdown=False):
watchers = watcher_iter_func(reverse=False)
yield [
watcher._stop(close_output_streams, for_shutdown)
for watcher in watchers
]
Note : ici l'asynchrone est utilisé comme une alternative aux threads.
Quand utiliser de l'asynchrone ?
Comment coder en asynchrone ?
Table of contents | t |
---|---|
Exposé | ESC |
Autoscale | e |
Full screen slides | f |
Presenter view | p |
Source files | s |
Slide numbers | n |
Blank screen | b |
Notes | 2 |
Help | h |