Files de tâches et tâches récurrentes avec Celery


Quand on a à traiter des choses bloquantes, avec des dépendances, des flux complexes ou des actions répétitives, créer des files d’attente peut se révéler très judicieux.

Par exemple lancer la génération d’un gros zip sur le clic d’un utilisateur, télécharger plein fichiers en parallèle pour son site de cul, lancer des calculs sur plusieurs machines et récupérer le résultat, encoder des videos en arrière plan, etc.

Le problème, c’est que fabriquer des files d’attente à la main, ça mène généralement à une grosse galère. La première boîte dans laquelle j’ai travaillé avait tout un système de queues à base de PHP + SQL fait à la main qui tapait dans du MySQL, c’était pas marrant du tout

Je fais une pause, et je note que le potentiel de jeux de mots sur cet article est fortement élevé. Mais je resterai fort.

Locking, priorité, dépendance, asynchronicité, concurrence, sérialisation, encoding, stockage, accessibilité, load balancing… Toutes ces problématiques sont bien vicieuses et chronophages. Il vaut mieux utiliser une lib solide et éprouvée.

Je resterai fort.

Kombu est une telle lib, mais elle est lourde et complexe à utiliser. J’avais fais le choix de la prendre pour un gros projet avec Max, je le regrette sur le long terme : c’est dur à maintenir et à faire évoluer. Le code est vraiment pas sympa.

Heureusement il existe une bibliothèque qui se met au dessus de Kombu pour et nous expose juste les fonctionnalités que l’on souhaite : celery.

Fort.

Celery est simple pour démarrer, mais très puissant si on rentre dans le détail, et croyez moi, le détail, on peut y rentrer très très profondément.

F…

Installation

Qui dit file d’attente, dit stockage. Il faut bien mettre les tâches quelque part et communiquer avec ce quelque part. En base de données ? Dans un gestionnaire de messages ? En mémoire ? Dans un cache ?

Celery résout le problème en proposant la même interface, quelque soit le support. Actuellement, on peut utiliser :

  • RabbitMQ
  • Redis
  • MongoDB
  • Beanstalk CouchDB
  • SQLAlchemy ou l’ORM Django (et donc toutes les bases de données supportées comme Sqlite, MySQL, PostGres…)
  • Amazon SQS

Dans notre exemple, nous allons le faire avec Redis car :

  • Redis est très simple à installer et à configurer.
  • Nous on utilise déjà du Redis partout.
  • Aucun risque de locking.

Pour ceux qui ont pas redis, c’est généralement dans les dépôts. Par exemple sur Ubuntu :

sudo apt-get install redis-server

Il n’y a rien à faire de plus, ça tourne, c’est configuré avec des valeurs par défaut qui sont saines. Je vous l’ai dis, redis, c’est fantastiquement bien foutu.

Ensuite on install celery et la lib d’accès à redis en Python qui porte un nom très original :

pip install celery redis

Ca devrait compiler un peu, et comme hab avec les extensions en C, assurez vous d’avoir un compilateur et les headers en place comme indiqué dans l’article sur pip.

Ensuite on peut créer ses tâches. Créez un module, par exemple tasks.py :

import urllib2
 
from collections import Counter
 
from celery import Celery
 
# Configuration de celery. Ceci peut aussi se faire dans un fichier de config.
# Ici on dit à celery que pour le module 'tasks', on va utiliser redis
# comme broker (passeur de massage) et comme result backend (stockage du
# resultat des tâches).
celery = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
 
 
# Et voici notre première tâche. C'est une fonction Python normale, décorée
# avec un decorateur de celery. Elle prend une URL, et calcule le nombre
# de lettre "e" qu'il y a dans la page.
@celery.task
def ecount(url):
    return Counter(urllib2.urlopen(url).read())['e']

On lance ensuite le processus celery dans un terminal (en production, mettez ça dans supervisord ou systemd pour que ça démarre automatiquement) :

[test] sam ~/Bureau/celery_test $ celery -A tasks worker -B --loglevel=info

 -------------- celery@sam v3.0.21 (Chiastic Slide)
---- **** -----
--- * ***  * -- Linux-3.2.0-48-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> broker:      redis://localhost:6379//
- ** ---------- .> app:         tasks:0x2a2fa50
- ** ---------- .> concurrency: 4 (processes)
- *** --- * --- .> events:      OFF (enable -E to monitor this worker)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery:      exchange:celery(direct) binding:celery


[Tasks]
  . tasks.ecount

[2013-07-26 13:22:21,631: INFO/Beat] Celerybeat: Starting..
[2013-07-26 13:04:51,274: WARNING/MainProcess] celery@sam ready.
[2013-07-26 13:04:51,280: INFO/MainProcess] consumer: Connected to redis://localhost:6379//.

-A précise le module à importer, -B démarre le beat (on verra ça plus tard), worker dit à celery que démarrer des processus de consommation de files d’attente (par défaut 4 qui travaillent en parallèle), et --loglevel=info va nous permettre d’avoir un affichage verbeux pour comprendre ce qui se passe.

Votre file d’attente est prête, et frétille d’impatience.

Lancer une tâche

A partir de là, vous pouvez envoyer des tâches dans la file d’attente, depuis n’importe où :

  • Un script.
  • Un serveur Web (par exemple une vue Django).
  • Un programme sur un autre serveur (même si il faudrait alors configurer redis pour qu’il écoute sur les ports extérieurs, ce qui n’est pas le cas ici par simplicité).
  • etc

Plusieurs programmes peuvent envoyer plein de tâches, en même temps, et elles vont se loger dans la file d’attente, sans bloquer le programme qui les a envoyé.

Par exemple, depuis le shell :

>>> from tasks import ecount
>>> res = ecount.delay('http://danstonchat.com')

Ceci ne bloque pas mon shell, la ligne s’exécute immédiatement. La fonction ecount n’est pas appelée depuis le shell, elle est dans la file d’attente et sera appelée par un des processus (les fameux ‘worker’) qui consomment la queue. Du côté de la file, on peut voir dans le log :

[2013-07-26 14:18:08,609: INFO/MainProcess] Got task from broker: tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df]
[2013-07-26 14:18:09,070: INFO/MainProcess] Task tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] succeeded in 0.446974039078s: 1242

On a donc notre tâche qui a bien été traitée.

On peut récupérer le résultat dans le shell :

>>> res.state
'PENDING'

Ah… La tâche n’est pas encore terminée. Et un peu plus tard :

>>> res.state
'SUCCESS'
>>> res.result
1242

Lancer une tâche est bien entendu peu intéressant, les listes d’attente sont vraiment sympa quand on a plein de tâches à lancer, par plein de processus différents :

results = [ecount.delay(url) for url in ('http://google.com', 'http://sametmax.com', 'http://sebsauvage.com', 'http://multiboards.com', 'http://0bin.net', 'http://danstonchat.com')]
[2013-07-26 14:25:46,646: INFO/MainProcess] Got task from broker: tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724]
[2013-07-26 14:25:46,649: INFO/MainProcess] Got task from broker: tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2]
[2013-07-26 14:25:46,650: INFO/MainProcess] Got task from broker: tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e]
[2013-07-26 14:25:46,652: INFO/MainProcess] Got task from broker: tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d]
[2013-07-26 14:25:46,653: INFO/MainProcess] Got task from broker: tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246]
[2013-07-26 14:25:46,654: INFO/MainProcess] Got task from broker: tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7]
[2013-07-26 14:25:47,144: INFO/MainProcess] Task tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] succeeded in 0.479865789413s: 27
[2013-07-26 14:25:47,242: INFO/MainProcess] Task tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] succeeded in 0.578661203384s: 609
[2013-07-26 14:25:47,501: INFO/MainProcess] Task tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] succeeded in 0.35736989975s: 263
[2013-07-26 14:25:47,645: INFO/MainProcess] Task tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] succeeded in 0.403187036514s: 1270
[2013-07-26 14:25:47,815: INFO/MainProcess] Task tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] succeeded in 1.14100408554s: 23
[2013-07-26 14:25:49,010: INFO/MainProcess] Task tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] succeeded in 2.34633708s: 3158

Car du coup on sait que ces multiples tâches ne vont pas bloquer le processus en cour, mais qu’en plus la charge sera répartie sur le nombre de workers qu’on a décidé au départ, ni plus (surcharge du serveur), ni moins (traitement trop lent).

Comment je sais quand une tâche est terminée ?

On peut attendre que la tâche soit terminée :

>>> print res.wait()
9999

Mais ce n’est pas vraiment le but. On cherche avant tout à ce que les tâches soient non bloquantes, et exécutées dans un processus à part voir potentiellement distribuées sur plusieurs serveurs.

Par ailleurs, Celery n’est pas un remplacement d’un système de traitement asynchrone comme Tornado ou NodeJS, il n’est pas fait pour envoyer des réponses asynchrones à l’utilisateur. Il est fait pour faire des tâches en background, répartir la charge et ordonner le traitement. Bien entendu, on peut faire communiquer un système asynchrone avec celery comme ici ou ici, mais c’est une autre histoire.

Concentrons nous sur les tâches.

La question de “Comment je sais quand une tâche est terminée ?” est souvent traduisible par “comment je réagis à une tâche pour lancer du code quand elle s’est terminée sans erreur ?”.

Et là, il y une solution toute simple :

res = tache1.s(arg1, arg2) | tache2.s() | tache3.s(arg1)

Ceci va créer une chaîne de tâches. Quand la première se termine, la deuxième se lance en recevant le résultat de la première en argument.

s() fabrique une sous-tâche, c’est à dire une tâche à envoyer dans la file plus tard avec des arguments pré-enregistrés. Dans notre exemple, celery va lancer tache1 avec deux arguments, puis si ça marche, va appeler tache2 en lui passant le résultat de tache1 comme argument, puis si ça marche, va appeler tache3 avec le résultat de tache2 en premier argument et arg1 en second argument.

En fait, celery vient avec tout un tas d’outils pour exécuter des tâches dépendantes les unes des autres : par groupes, par chaînes, par morceaux, etc. Mais de toute façon, vous pouvez appeler une tâche… à l’intérieur d’une autre tâche. Donc parti de là vous pouvez faire pas mal de choses.

Comment je fais pour faire une tâche récurrente ?

C’est là qu’intervient le “beat” dont j’ai parlé tout à l’heure. Avec cette option, celery va vérifier toutes les secondes si il n’y a pas une tâche répétitive à lancer, et la mettre dans une file d’attente, à la manière d’un cron.

Il suffit de définir une tâche comme periodic_task pour qu’elle soit lancée régulièrement.

import smtplib
 
from celery.schedules import crontab
from celery.decorators import periodic_task
 
# va executer la tâche à 5h30, 13h30 et 23h30 tous les lundi
# run_every accepte aussi un timedelta, pour par exemple dire "toutes les 10m"
@periodic_task(run_every=crontab(hour='5,13,23', minute=30, day_of_week='monday'))
def is_alive():
    """
        Vérifie que le blog est toujours en ligne, et si ce n'est pas le cas,
        envoie un mail en panique.
    """
    if urllib2.urlopen('http://sametmax.com').code != 200:
        mail = 'lesametlemax__AT__gmail.com'.replace('__AT__', '@')
        server = smtplib.SMTP('smtp.gmail.com:587')
        server.starttls()
        server.login('root', 'admin123')
        server.sendmail(mail, mail, msg)
        server.quit()

Il y a de bons exemples sur la syntaxe sur crontab() dans la doc.

D’une manière générale, la doc de Celery est très très riche, donc plongez vous dedans si cet article ne répond pas à vos besoins, car si ça peut être mis dans une file, ça peut être fait par Celery.

Note de fin

Celery n’autoreload pas le code, donc redémarrez les workers à chaque fois que vous modifiez vos tasks.

Attention aussi aux tâches récurrentes, la suivante peut se lancer avant que la précédente soit terminée. C’est à vous de faire des tâches idempotentes, ou alors de mettre en place un système de locking.

29 thoughts on “Files de tâches et tâches récurrentes avec Celery

  • roro

    Il n’y a plus qu’a ajouter que le céleri est une plante hermaphrodite…cqfd

  • Sam Post author

    Le broken est le système qui :

    – stocke les files d’attentes
    – s’assure que la bonne tache est dans la bonne file d’attente
    – ordonne les taches dans les files d’attentes
    – permet de récupérer la prochaine tâche à effectuer dans la file d’attente donnée

    Celery ne fait qu’appeler des fonctions du broker pour fonctionner.

    Il faut bien comprendre que le broker doit pouvoir representer une file ordonnée, y accéder de manière concurrente depuis n’importe quel système connecté en local ou à distance, mettre des labels sur les files, etc.

    Certains système, comme AMPQ, sont spécialisés là dedans, d’autre, comme les bases de données, sont détournées pour cela.

    Par exemple, dans le cadre de MongoDB, il y a une collection messages qui va contenir toutes les tâches (à la base, un broker est un livreur de message, et par dessus on créer des files de tâches, mais on peut s’en servir pour plein d’autre choses).
    Ensuite il y a une collection routing qui contient les informations pour savoir dans quelle liste mettre quelle tâche. En effet, il peut y avoir des centaines de files, avec des labels, et une tâches peut être “routée” vers une file ou une autre selon le besoin (je ne l’ai pas montré dans l’article, car c’est un usage avancé). Enfin broadcast contient les informations qui spécifient à qui propager l’information une fois qu’un message, ici notre tâche, a été traité.

    Par contre pour supprimer une tâche, t’as vraiment fais ta feignasse, ça m’a pris 3 secondes pour trouver ça sur un moteur de recherche :

    celery.control.revoke(uuid, terminate=True)

    Merci pour le signalement du lien mort.

  • roro

    @Sam:”…et par dessus on créer” –> on crée.
    Cela donne l’impression que ta pensée a été modifiée sur le parcours cerveau/main.
    Tu nous aurais pas choppé un trojan ?

  • Sam Post author

    Arf, je vais pas aller jusqu’à corriger les comments, sinon je suis parti pour l’asile.

  • Kad

    Il manque plus qu’un article sur supervisord et on pourra gérer un système rien qu’avec du python!!

  • Fred

    Merci :)
    Et +1 pour supervisord voire circus ! Perso j’utilise upstart pour lancer et manipuler mes démons (gunicorn notamment) mais je suis curieux de voir ce qui se fait à côté.

  • VonTenia

    Marrant comme timing, je viens de passer le weekend a mettre en place un systeme anti-fraude sous celery et puis je decouvre votre article maintenant…
    Comme vous j’ai choisis d’utiliser un broker redis par simplicité. Quelqu’un avec de l’experience pourrait argumenter sur la superiorité de rabitMQ ? Parce que ca a l’air quand meme beaucoup plus complexe.
    Et je plussois ce qui est dit dans ce post a propos de la superiorite de celery comparé à vouloir faire son propre systeme de traitement asynchrone : on commence par se dire que celery, un broker et tout ca c’est un peu trop compliqué pour ce qu’on veut faire et au final on se retrouve a reinventer la roue en pensant faire plus simple… Bref meme pour un petit projet n’hesitez pas, utilisez celery. Perso je fais juste des verifications de coherences et un appel à une API sur les donnees envoyées et je ne voulais pas penaliser l’experience utilisateur avec ces verifications.

  • Sam Post author

    RabbitMQ est avant tout un système de messagerie, et il est donc intéressant pour ce genre de chose. Par ailleurs, AMPQ est un protocole de PUSH, alors que sur des DB, on utilise du polling pour simuler le PUSH, du coup RabbitMQ est plus performant.

    Pour la plupart des cas, on s’en branle.

  • Lhassa

    m***** alors, je voulais vous demander comment vous gériez les files de tâches (pas les commentaires, hein…), et vous répondez avant que je questionne, trop fort.
    J’ai commencé à écrire mon propre truc, mais non seulement c’est évidement réinventer la roue (comme dit plus haut) parce que si j’y pense, d’autre aussi… mais en plus c’est nul.

    MERCI à vous, ma prochaine bouteille de rouge vous sera dédiée, en attendant de perdre du temps avec plaisir au boulot, à enfin apprendre des trucs utile…

  • Sam Post author

    C’est le même principe, mais celery support le routing, l’exécution sur plusieurs serveurs, plus de brokers, plus de support de stockages, le groupes de tâches, les tâches avec dépendances, et vient avec une grosse communauté (incluant les projets qui vont avec).

    APScHeduler peut être intéressant pour un projet simple, et être vu comme comme bottle peut être comparé à Django.

  • groug

    J’adore Sam & Max. J’ai un problème de tâche bloquante, je cherche un peu sur le net, je galère, je tombe finalement sur Celery sur SO, je fais ma désormais classique recherche “site:sametmax.com celery”, et là, bah oui, ils en avaient déjà parlé.
    Plus que le tag cloud, faudrait peut-être une page avec la liste exhaustive des tags utilisés dans les articles, ça permettrait peut-être de plus facilement chercher ce qu’on ne sait pas chercher !

    En voyant les tags, complètement hors-sujet : c’est déjà arrivé que quelqu’un tombe sur le site en cherchant poo en anglais ? Genre “poo sex”. Ce serait géant.

  • Sam Post author

    Si il y a un plugin wordpress qui fait ça automatiquement, file moi le lien.

    Pour la recherche, tu n’imagines pas les termes qui arrivent sur notre site. C’est ubuesque.

  • groug

    Genre ça, oui !
    Faudrait peut-être aussi me filer une petite dizaine de bitcoins…

  • Sam Post author

    Pas problème, donne moi les codes d’accès de ton compte de banque en ligne, et je t’envoies autant de bitcoins que tu veux.

  • mdupuy

    On apprend que le broker est un “passeur de massage”. On sent que la Thaïlande a eu une certaine influence sur le contenu de l’article.

  • Sam Post author

    ^^

    Tu ne crois pas si bien dire, on s’est tous les deux inscrit à l’école de massage thai pour une 30aine d’heures de cours. On va devenir experts en passage de massage \o/

  • francoisgfx

    Hello,

    J’aimerai créer une tache qui soit exécuté par plusieurs workers ayant différente app. Est ce que c’est possible ?

    j’ai une tache périodique qui poll des events d’un service. Je voudrais stocker chaque event dans une queue par type d’event.

    Ensuite que chaque worker se bind a une queue d’un type d’event qui l’intéresse et execute son code en fonction.

    Vue que la fonction/app a exécuter est dans le message, je ne vois pas comment je peux faire ça.

    Si vous avez une idée, je suis preneur.

    Merci

    F.

  • Sam Post author

    Pose la question sur indexerror.net, ce sera mieux adapté que les comments du blog.

  • Siltaar

    « Heureusement il existe une bibliothèque qui se met au dessus de Kombu pour et nous expose juste les fonctionnalités que l’on souhaite »

    –> pour

    Après, la vie privée des libs, qui se mets au dessus, qui expose quoi… cela ne nous, regarde pas ;-)

  • mzk

    J’arrive un peu tard, pour ramener ma fraise… J’ai mis du temps à comprendre tout ce jargon sur le multiprocessing, threading, parallélisme vs. concurrence, etc. Mais le tutoriel ici est assez fascinant et finalement très accessible ! Merci.

    Juste pour les noobs comme moi, et qui utilisent abondement Python 3.6 – comme fourni out-of-the-box par Arch Linux, voici une légère ré-écriture du script tasks.py :

    
    import urllib.request
    
    from collections import Counter
    
    from celery import Celery
    
    celery = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
    
    @celery.task
    def ecount(url):
        with urllib.request.urlopen(url) as res:
            html = res.read()
        html = html.decode('utf-8')
        return Counter(html)['e']
    
    

    Voilà. Je sais pas si c’est très utile, mais si ça peut aider les bleus comme moi…

  • AnOnyme77

    Super article. J’apprends toujours des trucs super intéressants avec vous !

    Une petite erreur dans un code par contre je pense :

    @periodic_task(run_every=crontab(hour='5,13,23', minute=30, day_of_week='monday')

    Je vois ici deux parenthèses ouvrantes et une fermante :'(.

Comments are closed.

Des questions Python sans rapport avec l'article ? Posez-les sur IndexError.