Celery is a task queue with focus on real-time processing, while also supporting task scheduling.
RabbitMQ is a message broker. This means it handles the queue of “messages” between Django and Celery.
Redis is a key-value based storage (REmote DIstributed Storage). Very fast. Used for results backend.
Code for this tutorial is here: https://github.com/tek-shinobi/celeryDj
Install RabbitMQ
Installation: sudo apt-get install rabbitmq-server
Restarting rabbitmq-server: sudo service rabbitmq-server restart
check status of server: sudo rabbitmqctl status
running the above in concole will output a bunch of text. Look for this line to ensure all is good:
Status of node rabbit@my_computer_name
for example, if my computer is named tiger, I should look for this line:
Status of node rabbit@tiger
So, at this point, we have a rabbitMQ service running
Install Redis and redis-py
Installation: sudo apt install redis-server
Verify installation: To verify our Redis installation, type the redis-cli
command, then type ping
on the prompt that comes up:
$ redis-cli -v redis-cli 5.0.6 $ redis-cli 127.0.0.1:6379> ping PONG 127.0.0.1:6379>
We can see that our Redis server is ready with the reply – PONG
.
NOTE: Restarting server: Should you ever need to re-tart the server, do this in terminal:
sudo /etc/init.d/redis-server restart
AFTER installing and starting redis server, lets install redis-py. This is the python binding for Redis
pipenv install redis
Install celery
install celery: pipenv install celery flower "celery[redis]"
flower is the web-based tool for monitoring and administrating Celery clusters. Flower provides detailed statistics of task progress and history. It also shows other task details such as the arguments passed, start time, runtime, and others.
“celery[redis]”: Additional celery dependencies for Redis support
Before we start configuring celery for Django project, lets launch celery worker process and flower in background. Since our Django project is named mysite, the command looks like so (need to be launched from console on the project path):
celery -A mysite worker -l info
Here -l
means loglevel of info. Setting this loglevel will give us a lot of helpful info in the console.
Now in yet another console, launch Flower at the project path:
celery -A mysite flower
Once you have launched flower, you can open its dashboard in a browser to monitor tasks:
http://localhost:5555/dashboard
Celery Basic Setup
Basic project structure at startup:
mysite/ |-- mysite/ | | | |-- __init__.py | |-- settings.py | |-- urls.py | +-- wsgi.py |-- manage.py |-- Pipfile |-- Pipfile.lock
We will be using RabbitMQ as the broker. Other options are Redis. I prefer to use RabbitMQ as celery natively supports it and it just works.
We will use Redis as results backend. Also, we will be serializing everything to json when saving results to Redis.
In settings.py add this celery configuration at the bottom:
# celery setup CELERY_BROKER_URL = 'amqp://localhost' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_BROKER_URL = ‘amqp://localhost’
: CELERY_BROKER_URL specifies the connection string to the Broker. Since I am using localhost, its localhost there. But if your server is somewhere remote, you will have the IP instead. Here amqp
indicates RabbitMQ is being used as broker.
CELERY_RESULT_BACKEND = ‘redis://localhost:6379’
: sets redis as the result backend. 6379 is the default port.
Adding Celery to Django project
Create a file named celery.py next to settings.py. This file will contain celery configuration for our project.
from __future__ import absolute_import import os from django.conf import settings from celery import Celery # set default Django settings module for celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite_tasks', broker=settings.CELERY_BROKER_URL) app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
The code above creates an instance of Celery in our project. When instantiating, we pass in a name for the list of tasks, in this case mysite_tasks and the broker. The last line instructs celery to auto-discover all asynchronous tasks for all the applications listed under INSTALLED_APPS.
Celery will look for definitions of asynchronous tasks within a file named tasks.py file in each of the application directories.
To be sure that Celery app is loaded every time Django starts, following code should be added to mysite/__init.py__.
Create celery tasks
There are two ways. One is like so, dump all the tasks in the celery.py file itself with @app.task
decorator.
from __future__ import absolute_import import os from django.conf import settings from celery import Celery # set default Django settings module for celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite_tasks', broker=settings.CELERY_BROKER_URL) app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task def do_my_task(): return 'celery task'
What I prefer though is the @shared_task
decorator and using it to create tasks in each app, in the corresponding tasks.py file.
The @shared_task
decorator lets you create tasks without having any concrete app instance, which is a simpler API to use than task.
Lets create an app inside django project:
python manage.py startapp my_app
Firstly, create a services.py to create services that we will later want to be done via celery. This is good because we can unit test these services. And later just wrap them inside celery tasks which don’t need any testing (I ignore celery’s tasks.py in my pytest).
def add(x, y): return x + y
Now, inside your my_app’s tasks.py
file:
from celery import shared_task from .services import add @shared_task(name='sum two numbers') def delayed_add(x, y): result = add(x, y) return result
Note: Always name the task. This way, when you look at the dashboard in Flower, you can identify the tasks by name. Here we named the task ‘sum two numbers’
In my_app/views.py
from django.shortcuts import HttpResponse from .tasks import delayed_add def celery_view(request): for counter in range(2): delayed_add.delay(3, counter) return HttpResponse("FINISH PAGE LOAD")
in mysite/urls.py:
from django.contrib import admin from django.urls import path from taskmanager.views import celery_view urlpatterns = [ path('admin/', admin.site.urls), path('celerytask/', celery_view),
Now that we have everything wired up, start the django server by running ‘python manage.py runserver’
Now goto this url:
http://127.0.0.1:8000/celerytask/
This will load the view we created with the celery async task. You can then check the flower dashboard to see that two tasks were run.
Also, since we added redis as results backend, the result is also stored in the redis. The Key is the task UUID. You can see the task UUID from Flower dashboard.
To see the results stored in redis, in terminal do redis-cli
and then use the MGET _key_
to see the stored result. You can use KEYS '*'
to see all keys.:
$ redis-cli 127.0.0.1:6379> KEYS '*' 1) "celery-task-meta-03ef35c8-e8c3-412a-a13d-6886e89415f1" 2) "celery-task-meta-57469f01-870b-41d5-a340-8794b5e807ba" 127.0.0.1:6379> MGET "celery-task-meta-03ef35c8-e8c3-412a-a13d-6886e89415f1" 1) "{\"status\": \"SUCCESS\", \"result\": 4, \"traceback\": null, \"children\": [], \"date_done\": \"2020-03-12T18:57:56.203213\", \"task_id\": \"03ef35c8-e8c3-412a-a13d-6886e89415f1\"}" 127.0.0.1:6379>
No Comments
You can leave the first : )