Django 6.0 introduces a built-in background tasks framework in django.tasks. But don't expect to phase out Celery, Huey or other preferred solutions just yet.
The release notes are quite clear on this:
Django handles task creation and queuing, but does not provide a worker mechanism to run tasks. Execution must be managed by external infrastructure, such as a separate process or service.
The main purpose of the new django.tasks module is to provide a common API for task queues implementations. Jake Howard is the driving force behind this enhancement. Check out the introduction on the Django forum.
His reference implementation, and simultaneously a backport for earlier versions of Django, is available as django-tasks on GitHub.
But let's ignore that and play with the more minimal version included in Django 6.0 instead. By creating our very own backend and worker.
We're going to create an app to send notifications to phones and other devices using ntfy.sh. (I'm a fan!)
If you prefer to dive into the code yourself, check out the final version of the project on GitHub.
All that's required to send a notification to your phone using nfty is:
https://ntfy.sh/<yourtopic>The free version only provides public topics and messages. Meaning anyone can see the stuff you're sending if they subscribe to the topic. For our purpose we can simply create a topic with a randomized name, like a UUID.
The project's settings expect the URL from step 4 to be supplied as an environment variable. For example:
NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c
Here's our function that does the heavy lifting:
import httpx from django.conf import settings def send_notification(message: str, title: str | None): # Pass the title if specified. headers = {"title": title} if title else {} httpx.post( settings.NTFY_URL, content=message, headers=headers, )
Really. That's all there is to it to start sending and receiving notifications.

You really should have a look at the Django documentation on the Task framework for details, but we'll save you a bit of time and give a quick primer.
This is the main goal of the new framework: defining tasks using Django's standard API, rather than using task queue specific decorators, or other methods.
So here it goes:
# ... from django.tasks import task @task def send_notification(message: str, title: str | None): # ...as before
Our function is now a task. In fact it's a django.tasks.Task.
You cannot call send_notification directly anymore. Tasks can only be run by using the enqueue method. It might not be the behavior you'd expect or want, but this seems to be the best option. This design eliminates the possibility of accidentally invoking a task in-process, rather than in the background.
The task decorator allows you to specify the task's priority, queue name and backend name. You can override these settings with the using method, which returns a new django.tasks.Task instance.
If you need more control over task behavior, you can set takes_context to True in the decorator and add context as the first argument. This context currently provides you with access to the task result and thereby useful information like the number of attempts.
There's no way of defining retries and backoffs, or other fancy things you might expect from a full-blown task queue implementation. But that's not what this is. You can easily add your own retry logic by inspecting the task context if needed.
Adding a task to the queue is easy:
task_result = send_notification.enqueue( message="Season's greeting!", title="Santa has something to tell you" )
This is where things start to fall short. At least right now. Django 6.0 will ship with the ImmediateBackend and the DummyBackend. The first will execute the task immediately, while the latter will not execute the task at all.
Which is why our project includes a (demo) backend backed by the database and a worker process!
If you're not going to wait around for the result, you can get a hold of it later on using its id. Simply call get_result(result_id) on your task.
Our project includes a view that's polled periodically for outstanding results using htmx.

The list underneath the form shows the results for each execution of our task. When the form's submitted, a new result is added to the top of the list. Htmx is instructed to keep polling for changes as long as the result's status isn't FAILED or SUCCESSFUL.
def task_result(request, result_id, status): result = send_notification.get_result(result_id) if result.status == status: # No need to swap the result. return HttpResponse(status=204) return TemplateResponse(request, "index.html#result", {"result": result})
Wondering what index.html#results is doing? Django 6.0 also introduces template partials. In this case our view effectively sends a response containing only the template partial named result.
When you decorate a callable with task, the configured backend's task_class is used to wrap the callable. The default's django.task.Task.
That class's enqueue method will in turn invoke the configured backend's enqueue method.
Calling its get_result method is similar: call the configured backend's get_result method and pass on the result.
Since there's no workers, that's basically all a task backend needs to provide. Cool. Let's add one, shall we?
Our goals:
Our enqueue and get_result methods will return an instance of the default django.tasks.TaskResult. This determines the minimum amount of data we need to store, and we're going to do so in a model called Task.
Let's create a first draft of our Task model, based on the properties of TaskResult and Task in django.tasks (the "dataclasses"):
class Task(models.Model): priority = models.IntegerField(default=0) callable_path = models.CharField(max_length=255) backend = models.CharField(max_length=200) queue_name = models.CharField(max_length=100) run_after = models.DateTimeField(null=True, blank=True) takes_context = models.BooleanField(default=False) # Stores args and kwargs arguments = models.JSONField(null=True, blank=True) status = models.CharField( choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY ) enqueued_at = models.DateTimeField() started_at = models.DateTimeField(blank=True, null=True) finished_at = models.DateTimeField(blank=True, null=True) last_attempted_at = models.DateTimeField(blank=True, null=True) return_value = models.JSONField(null=True, blank=True)
What's missing? For one, the TaskResult also includes a list of encountered errors, and ids of the workers that processed the task. Something that we could perhaps ignore.
Except the TaskResult.attempts property is based on the number of worker ids. And if you're using the task context within a task, you're bound to be relying on that type of information.
We could add these details to the Task model by adding a JSONField for each. This is the current approach in the reference implementation.
But let's be more explicit in our approach and define models for these as well. We'll record each attempt to execute a task and its potential error, linking them to the task with a foreign key:
class Error(models.Model): exception_class_path = models.TextField() traceback = models.TextField() class AttemptResultStatus(TextChoices): # Subset of TaskResultStatus. FAILED = TaskResultStatus.FAILED SUCCESSFUL = TaskResultStatus.SUCCESSFUL class Attempt(models.Model): task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE) error = models.OneToOneField( Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True ) worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID) started_at = models.DateTimeField() stopped_at = models.DateTimeField(blank=True, null=True) status = models.CharField( choices=AttemptResultStatus.choices, max_length=10, blank=True )
This setup ensures we have all necessary information to execute a task, plus we can provide every single bit of detail when a TaskResult is requested.
All fine and dandy, but we need to think about the worker's requirements as well. It needs to be able to:
We could do all of that with how it's set up right now, but I'd like to refine things a bit.
class Task(models.Model): # ... # This field is used to keep track of when to run a task (again). # run_after remains unchanged after enqueueing. available_after = models.DateTimeField() # Denormalized count of attempts. attempt_count = models.IntegerField(default=0) # Set when a worker starts processing this task. worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True) # ...
The available_after field will contain the earliest time at which the task can be executed. If the task's run_after is specified (which can be done by using a task's... using() method), available_after is set to that value. Otherwise we're using the current datetime; all in UTC.
Once a task needs to be retried, available_after will be set to the next possible point in time the task can be executed. In other words: we can back off.
The attempt_count field makes querying for available tasks a bit easier. Any tasks with an attempt_count greater than the maximum allowed value can be ignored. Yes, their status should have been set to FAILED which means they should be excluded by default, but we could change the configuration and tweak the maximum number of attempts.
The worker_id field is filled when a worker claims a task. This, among other things, prevents any other workers from picking up the task. Assuming the worker id is unique.
Enqueueing a task could not be easier: create a Task model instance from the Task dataclass instance, save it, done! Well, at least after turning the end result into a TaskResult.
We use the string version of the model's database id as the id of the result.
Retrieving a result is likewise only a matter of loading the task and its attempts, and turning that into a TaskResult.
Here's a simplified version of our task backend as it stands:
class DatabaseBackend(BaseTaskBackend): supports_defer = True supports_async_task = False supports_get_result = True supports_priority = True def enqueue(self, task: Task, args, kwargs): self.validate_task(task) model = self.queue_store.enqueue(task, args, kwargs) task_result = TaskResult( task=task, id=str(model.pk), # ... # More properties being set # ... ) return task_result def get_result(self, result_id): return self.model_to_result( self.queue_store.get(result_id) ) def model_to_result(self, model: models.Task) -> TaskResult: ...
At lot of functionality is deferred to this queue_store property. Before we dive into that, we'll explain the configuration options for this backend.
We want to be able to specify defaults for:
math.pow(factor, attempts)These can be customized for each individual queue. So we end up with something like this in our OPTIONS:
TASKS = { "default": { "BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend", "OPTIONS": { "queues": { "low_priority": { "max_attempts": 5, } }, "max_attempts": 10, "backoff_factor": 3, "purge": {"finished": "10 days", "unfinished": "20 days"}, }, } }
A task added to the low_priority queue will be attempted up to five times, with a backoff factor of 3. Other tasks will be attempted up to ten times with the same backoff factor.
The QueueStore class is a companion of our backend. It's focus is on retrieving and enqueueing tasks, checking for tasks to execute and claiming tasks.
However the main reason it's included is to simplify the worker. As we'll see, the worker gets it's own copy of the queue store, limited to the queues it needs to process.
The worker's job, at least in this project, is to provide information on outstanding tasks to the runner and to drive the processing of those tasks by the backend. Which means it looks like this:
class Worker: def __init__( self, id_: str | None, backend_name: str, only: set[str] | None, excluding: set[str] | None, ): # Grab the backend and its queue_store. self.backend = task_backends[backend_name] queue_store: QueueStore = self.backend.queue_store # Limit the queue_store to the select queues. if only or excluding: queue_store = queue_store.subset(only=only, excluding=excluding) self.queue_store = queue_store # Use or create and id. "Must" be unique. self.id = ( id_ if id_ else create_id(backend_name, queues=queue_store.queue_names) ) def has_more(self) -> bool: return self.queue_store.has_more() def process(self): with transaction.atomic(): tm = self.queue_store.claim_first_available(worker_id=self.id) if tm is not None: self.backend.process_task(tm)
All we need to do to have a functioning worker runner:
has_more.process the first available task. If not: go to 4.That's what our dbtasks_worker command does.
Our queue store provides a peek method which returns the id of the task in our queues with the most urgency; a combination of available_after, priority and attempt_count.
This lets the runner know whether there's more tasks to process. The next step is to claim one of those tasks. So we call peek again and if it returns a task id, we'll try to claim that particular task.
Here's a more basic, clearer version than the one included in our project's QueueStore:
def claim_first_available( self, worker_id: str, attempts: int = 3 ) -> models.Task | None: qs = models.Task.filter( worker_id="", status=TaskResultStatus.READY, ) for _ in range(attempts): task_id = self.peek() if not task_id: return None count = qs.filter(pk=task_id).update( worker_id=self.id_, status=TaskResultStatus.RUNNING, ) if count: return models.Task.objects.get(pk=task_id) return None
If the count is zero, we failed to claim the task. Otherwise we retrieve it from the database and can start processing.
The loop is included because we ended up here after trying to claim the task identified by peek. Which apparently has already been picked up by another worker. We might as well make the most of it and try to grab another task from the queue.
And finally the thing that actually does something!
The process_task method of our backend:
Attempt and constructs the current TaskResult.BaseException, or returning the return_value of the task when everything went according to plan.Task model, the Attempt and the TaskResult with the final details of the successful execution, or with details about the failure to do so.Again: if you want to dive into the details, have a look at the repository.
Of course this demo project leaves out all the things you really need to think hard about. Like signals for the worker. Or database transaction logic. That's not to say it's impossible. Far from it. It just wasn't the goal of this article.
The inclusion of this functionality in Django will certainly allow new libraries or adapters for existing task queues to pop up. And we'll probably soon see some complaints that django.tasks isn't extensive enough.
Because, if you're currently using the more advanced functionality of your task queue, there's probably a few things you're missing in django.tasks.
Some task queue libraries, like Celery, provide ways of combining tasks. You can feed the result of one task into another, enqueue tasks for each item in a list, and so on.
It should be clear by now that supporting this kind of orchestration isn't the goal of django.tasks. And I don't mind at all. There's no feasible way of creating a unified API to support this. I've had my share of problems with libraries that do claim to support it.
As mentioned before, there's currently no way to automatically retry a failed task, unless your backend does the heavy lifting. Like ours does.
Depending on your backend this might be easy enough to handle yourself. For example using a decorator:
def retry(func): @functools.wraps(func) def wrapper(context: TaskContext, *args, **kwargs): try: return func(context, *args, **kwargs) except BaseException as e: result = context.task_result backoff = math.pow(2, result.attempts) run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff) result.task.using(run_after=run_after).enqueue(*args, **kwargs) raise e return wrapper @task(takes_context=True) @retry def send_email(context: TaskContext, to: str, subject: str, body: str): # Do your thing ...
True. But the reference implementation does provide actual workers. Be patient, or even better: start helping out!
I reckon django.tasks will soon result in covering at least the most common 80% of use cases. Yes, its API is simple and limited, but to me that's more a benefit rather than a fault. I think this is as close as you can get to a standardized approach.