Celery is an asynchronous task queue/job queue based on distributed message passing. It is used to handle background tasks and manage long-running operations in a distributed system. Celery allows us to run tasks in the background, separate from your main application logic, which helps in maintaining responsiveness and improving performance.
Distributed Task Management: Celery can distribute tasks across multiple worker nodes, allowing you to scale your task processing horizontally. This makes it suitable for large-scale applications that require high availability and scalability.
Reliability and Fault Tolerance: Celery ensures that tasks are retried upon failure and provides mechanisms to handle task failures and retries. This enhances the reliability and fault tolerance of your task processing system.
pip install celery
pip install redis # Optional, if you choose Redis as the broker
Create a Celery Configuration File
Create a file named celery.py
in your Django project directory (usually alongside settings.py
).
# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Update __init__.py
Update the __init__.py
file in your Django project directory to ensure Celery is loaded when Django starts.
# myproject/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
Configure Celery Settings
Add Celery-related settings to your Django settings.py
file.
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # Redis as the broker
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Redis as the result backend
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
Setting Up Redis
Setting Up RabbitMQ
Choose the Broker: Update the CELERY_BROKER_URL
setting in your Django settings.py
to match your broker of choice. For Redis:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
For RabbitMQ:
CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
Brokers: A broker is a message queue that Celery uses to send and receive messages. It acts as an intermediary between the application and Celery workers. Brokers are essential for message passing in Celery. Common brokers include:
To configure the broker, set the CELERY_BROKER_URL
in your Django settings.py
. For Redis:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
For RabbitMQ:
CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
Backends: A backend is where Celery stores the results of tasks. It allows you to retrieve the results later and check the status of tasks. Common result backends include:
django-celery-results
.To configure the result backend, set CELERY_RESULT_BACKEND
in your Django settings.py
. For Redis:
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
Workers: Workers are the processes that execute the tasks. They listen to the message broker for tasks and perform the required operations. You can run multiple workers to handle more tasks concurrently. Celery allows you to configure worker settings such as concurrency and queue bindings.
Tasks: A task is a Python function that is executed asynchronously by Celery workers. Tasks are defined using the @app.task
decorator in Celery. They can be simple functions or more complex workflows.
Queues: Tasks are sent to queues, which are essentially channels or routing mechanisms for task messages. You can configure different queues for different types of tasks, allowing you to prioritize or separate tasks based on their nature or urgency.
To define a task, you use the @app.task
decorator provided by Celery:
# tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
Once you’ve defined a task, you can call it asynchronously using the delay
method. This method sends the task to the message broker, where it will be picked up by a worker:
# views.py
from .tasks import add
def some_view(request):
result = add.delay(4, 6)
# You can access the result object to check the status or get the result later
return HttpResponse(f"Task result: {result.get()}")
To start Celery workers, use the celery
command-line tool:
celery -A myproject worker --loglevel=info
You can start multiple workers or configure them to listen to specific queues using additional command-line options.
celery -A myproject worker -Q high_priority --loglevel=info
Celery provides various states for tasks to help you track their progress:
To monitor these states, you can use Celery’s result objects or monitoring tools like Celery Flower:
# views.py
from .tasks import add
from celery.result import AsyncResult
def some_view(request):
task = add.delay(4, 6)
result = AsyncResult(task.id)
if result.state == 'PENDING':
status = 'Task is pending'
elif result.state == 'STARTED':
status = 'Task has started'
elif result.state == 'SUCCESS':
status = 'Task succeeded'
elif result.state == 'FAILURE':
status = 'Task failed'
return HttpResponse(f"Task status: {status}")
To get the result of a task, use the result.get()
method. This method blocks until the task is finished and returns the result:
# views.py
def some_view(request):
result = add.delay(4, 6)
task_result = result.get(timeout=10) # Waits up to 10 seconds for the result
return HttpResponse(f"Task result: {task_result}")
If the task fails or takes too long, you may need to handle exceptions or timeouts.
Celery allows you to configure retries for tasks that fail due to exceptions. You can set the max_retries
parameter and use the retry
method within the task to handle retries:
# tasks.py
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
@shared_task(bind=True, max_retries=3)
def add(self, x, y):
try:
return x + y
except Exception as exc:
raise self.retry(exc=exc, countdown=60) # Retry after 60 seconds
Celery’s periodic tasks allow you to execute tasks at regular intervals, such as hourly or daily. This is useful for recurring tasks like daily reports or cleanup operations. You can define periodic tasks using Celery Beat, which is a scheduler that kicks off tasks at specified intervals:
# tasks.py
from celery import shared_task
@shared_task
def daily_cleanup():
# Your cleanup logic here
pass
Install Celery Beat
Make sure you have django-celery-beat
installed to use Celery Beat with Django:
pip install django-celery-beat
Add django_celery_beat
to Your Installed Apps
Update your settings.py
to include django_celery_beat
:
INSTALLED_APPS = [
# other apps
'django_celery_beat',
]
Run Migrations
Apply migrations for the django_celery_beat
app:
python manage.py migrate django_celery_beat
Create and Schedule Periodic Tasks
You can create and manage periodic tasks from the Django admin interface or using Django’s management commands. Here’s how to define a periodic task using Django admin:
tasks.daily_cleanup
).Alternatively, you can configure periodic tasks programmatically:
# setup_periodic_tasks.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from datetime import timedelta
def setup_periodic_tasks():
schedule, created = IntervalSchedule.objects.get_or_create(
every=24,
period=IntervalSchedule.HOURS,
)
PeriodicTask.objects.create(
interval=schedule,
name='Daily Cleanup',
task='tasks.daily_cleanup',
)
Start Celery Beat
Run Celery Beat alongside your Celery workers:
celery -A myproject beat --loglevel=info
You can configure Celery to use different queues for different tasks. This allows you to prioritize or separate tasks based on their nature or urgency. Define queues in your Celery configuration:
# settings.py
CELERY_TASK_QUEUES = {
'high_priority': {
'exchange': 'high_priority',
'exchange_type': 'direct',
'binding_key': 'high_priority',
},
'low_priority': {
'exchange': 'low_priority',
'exchange_type': 'direct',
'binding_key': 'low_priority',
},
}
When defining tasks, you can specify which queue they should be routed to. Use the queue
argument in the @app.task
decorator or when calling the task:
# tasks.py
from celery import shared_task
@shared_task(queue='high_priority')
def process_important_data(data):
# Process important data
pass
@shared_task(queue='low_priority')
def process_regular_data(data):
# Process regular data
pass
You can also route tasks dynamically by configuring routing options:
# settings.py
CELERY_ROUTES = {
'tasks.process_important_data': {'queue': 'high_priority'},
'tasks.process_regular_data': {'queue': 'low_priority'},
}
To integrate Celery with Django, you need to install Celery along with some additional packages for managing periodic tasks and storing task results.
pip install celery
pip install django-celery-beat
pip install django-celery-results
Create the Celery Configuration File
Create a file named celery.py
in your Django project directory (alongside settings.py
).
# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker doesn’t have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Update Django Settings
Add Celery-related settings to your settings.py
:
# myproject/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0' # or your chosen broker URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # or your chosen backend URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
Configure Django to use django-celery-beat
and django-celery-results
:
INSTALLED_APPS = [
# other apps
'django_celery_beat',
'django_celery_results',
]
Update __init__.py
Ensure Celery is loaded when Django starts:
# myproject/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
Choosing a Message Broker
Celery supports various brokers. The two most common are:
Configuring the Message Broker in Celery and Django
Depending on your choice of broker, configure it in your settings.py
. For Redis:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
For RabbitMQ:
CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'rpc://' # RabbitMQ can also be used as a result backend
Make sure the message broker server is running and accessible.
Tasks are defined using the @app.task
decorator provided by Celery. Create a tasks.py
file in your Django app directory:
# myapp/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
In this example, add
is a Celery task that takes two arguments and returns their sum. The @shared_task
decorator allows this function to be used as a Celery task.
You can import and call tasks from anywhere in your Django application. For example, to call the add
task from a view:
# myapp/views.py
from django.http import HttpResponse
from .tasks import add
def my_view(request):
result = add.delay(4, 6) # Call task asynchronously
return HttpResponse(f"Task result: {result.get(timeout=10)}")
In this example, add.delay(4, 6)
sends the add
task to the Celery worker, and result.get(timeout=10)
retrieves the result.
To process tasks, you need to start Celery worker processes. Run the following command from your project directory:
celery -A myproject worker --loglevel=info
To ensure Celery workers start automatically with Django, you can use process management tools like supervisord
, systemd
, or Docker
.
Example using systemd
on Linux:
Create a Service File
Create a file named /etc/systemd/system/celery.service
:
[Unit]
Description=Celery Service
After=network.target
[Service]
User=youruser
Group=yourgroup
WorkingDirectory=/path/to/your/project
ExecStart=/path/to/your/virtualenv/bin/celery -A myproject worker --loglevel=info
Restart=always
[Install]
WantedBy=multi-user.target
Enable and Start the Service
sudo systemctl enable celery
sudo systemctl start celery