Django celery + rabbitmq + redis: Use rabbitmq as broker and redis as results backend

Celery is a task queue with focus on real-time processing, while also supporting task scheduling.

RabbitMQ is a message broker. This means it handles the queue of “messages” between Django and Celery.

Redis is a key-value based storage (REmote DIstributed Storage). Very fast. Used for results backend.

Code for this tutorial is here: https://github.com/tek-shinobi/celeryDj

Install RabbitMQ

Installation: sudo apt-get install rabbitmq-server

Restarting rabbitmq-server: sudo service rabbitmq-server restart

check status of server: sudo rabbitmqctl status
running the above in concole will output a bunch of text. Look for this line to ensure all is good:
Status of node rabbit@my_computer_name

for example, if my computer is named tiger, I should look for this line:
Status of node rabbit@tiger

So, at this point, we have a rabbitMQ service running

Install Redis and redis-py

Installation: sudo apt install redis-server

Verify installation: To verify our Redis installation, type the redis-cli command, then type ping on the prompt that comes up:

$ redis-cli -v
redis-cli 5.0.6
$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379>

We can see that our Redis server is ready with the reply – PONG.

NOTE: Restarting server: Should you ever need to re-tart the server, do this in terminal:

sudo /etc/init.d/redis-server restart

AFTER installing and starting redis server, lets install redis-py. This is the python binding for Redis
pipenv install redis

Install celery

install celery: pipenv install celery flower "celery[redis]"

flower is the web-based tool for monitoring and administrating Celery clusters. Flower provides detailed statistics of task progress and history. It also shows other task details such as the arguments passed, start time, runtime, and others.

“celery[redis]”: Additional celery dependencies for Redis support

Before we start configuring celery for Django project, lets launch celery worker process and flower in background. Since our Django project is named mysite, the command looks like so (need to be launched from console on the project path):

celery -A mysite worker -l info

Here -l means loglevel of info. Setting this loglevel will give us a lot of helpful info in the console.

Now in yet another console, launch Flower at the project path:

celery -A mysite flower

Once you have launched flower, you can open its dashboard in a browser to monitor tasks:

http://localhost:5555/dashboard

Celery Basic Setup

Basic project structure at startup:

mysite/
 |-- mysite/
 |    |
 |    |-- __init__.py
 |    |-- settings.py
 |    |-- urls.py
 |    +-- wsgi.py
 |-- manage.py
 |-- Pipfile
 |-- Pipfile.lock

We will be using RabbitMQ as the broker. Other options are Redis. I prefer to use RabbitMQ as celery natively supports it and it just works.
We will use Redis as results backend. Also, we will be serializing everything to json when saving results to Redis.

In settings.py add this celery configuration at the bottom:

# celery setup
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'

CELERY_BROKER_URL = ‘amqp://localhost’ : CELERY_BROKER_URL specifies the connection string to the Broker. Since I am using localhost, its localhost there. But if your server is somewhere remote, you will have the IP instead. Here amqp indicates RabbitMQ is being used as broker.

CELERY_RESULT_BACKEND = ‘redis://localhost:6379’ : sets redis as the result backend. 6379 is the default port.

Adding Celery to Django project

Create a file named celery.py next to settings.py. This file will contain celery configuration for our project.

from __future__ import absolute_import  
import os  
from django.conf import settings  
from celery import Celery

# set default Django settings module for celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite_tasks', broker=settings.CELERY_BROKER_URL)
app.config_from_object('django.conf:settings', namespace='CELERY')  
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

The code above creates an instance of Celery in our project. When instantiating, we pass in a name for the list of tasks, in this case mysite_tasks and the broker. The last line instructs celery to auto-discover all asynchronous tasks for all the applications listed under INSTALLED_APPS.

Celery will look for definitions of asynchronous tasks within a file named tasks.py file in each of the application directories.

To be sure that Celery app is loaded every time Django starts, following code should be added to mysite/__init.py__.

Create celery tasks

There are two ways. One is like so, dump all the tasks in the celery.py file itself with @app.task decorator.

from __future__ import absolute_import  
import os  
from django.conf import settings  
from celery import Celery

# set default Django settings module for celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

app = Celery('mysite_tasks', broker=settings.CELERY_BROKER_URL)
app.config_from_object('django.conf:settings', namespace='CELERY')  
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task
def do_my_task():
    return 'celery task'

What I prefer though is the @shared_task decorator and using it to create tasks in each app, in the corresponding tasks.py file.
The @shared_task decorator lets you create tasks without having any concrete app instance, which is a simpler API to use than task.

Lets create an app inside django project:
python manage.py startapp my_app

Firstly, create a services.py to create services that we will later want to be done via celery. This is good because we can unit test these services. And later just wrap them inside celery tasks which don’t need any testing (I ignore celery’s tasks.py in my pytest).

def add(x, y):
    return x + y

Now, inside your my_app’s tasks.py file:

from celery import shared_task
from .services import add

@shared_task(name='sum two numbers')
def delayed_add(x, y):
    result = add(x, y)
    return result

Note: Always name the task. This way, when you look at the dashboard in Flower, you can identify the tasks by name. Here we named the task ‘sum two numbers’

In my_app/views.py

from django.shortcuts import HttpResponse

from .tasks import delayed_add

def celery_view(request):
    for counter in range(2):
        delayed_add.delay(3, counter)
    return HttpResponse("FINISH PAGE LOAD")

in mysite/urls.py:

from django.contrib import admin
from django.urls import path

from taskmanager.views import celery_view
urlpatterns = [
    path('admin/', admin.site.urls),
    path('celerytask/', celery_view),

Now that we have everything wired up, start the django server by running ‘python manage.py runserver’

Now goto this url:

http://127.0.0.1:8000/celerytask/

This will load the view we created with the celery async task. You can then check the flower dashboard to see that two tasks were run.

Also, since we added redis as results backend, the result is also stored in the redis. The Key is the task UUID. You can see the task UUID from Flower dashboard.

To see the results stored in redis, in terminal do redis-cli and then use the MGET _key_ to see the stored result. You can use KEYS '*' to see all keys.:

$ redis-cli
127.0.0.1:6379> KEYS '*'
1) "celery-task-meta-03ef35c8-e8c3-412a-a13d-6886e89415f1"
2) "celery-task-meta-57469f01-870b-41d5-a340-8794b5e807ba"
127.0.0.1:6379> MGET "celery-task-meta-03ef35c8-e8c3-412a-a13d-6886e89415f1"
1) "{\"status\": \"SUCCESS\", \"result\": 4, \"traceback\": null, \"children\": [], \"date_done\": \"2020-03-12T18:57:56.203213\", \"task_id\": \"03ef35c8-e8c3-412a-a13d-6886e89415f1\"}"
127.0.0.1:6379> 



No Comments


You can leave the first : )



Leave a Reply

Your email address will not be published. Required fields are marked *