Story #8501


Task #8495: [EPIC] As a user, I have a new-improved tasking system with new-style workers

As a user, I have new pulp workers that do not require a resource manager

Added by bmbouter about 3 years ago. Updated almost 3 years ago.

Start date:
Due date:
% Done:


Estimated time:
Platform Release:
Sprint Candidate:



See the epic description for a high level. This is the primary implementation story of all of the sub-tasks.

Add args and kwargs to the Task object

Add Task.args and Task.kwargs and have both dispatch and enqueue_with_reservation create those fields.

These will be JSON serialized types for safety versus pickle for example. It's expected that the tasking system can only pass JSON serializable arguments.

These will not be included in the user facing TaskSerializer for data security reasons. Additionally it's not very helpful for users to be able to query on the args or kwargs to a task anyway.

These two fields will be nullable because not all tasks will have args or kwargs.

Enable users to switch to the new or old style

To do this, add the new USE_NEW_WORKER_TYPE setting which if True will cause Pulp to not dispatch to RQ. It will be False by default.

This setting will be used by the dispatch and enqueue_with_reservation interfaces. If USE_NEW_WORKER_TYPE=False (the default) it will write the task to the db as usual (only now with args and kwargs anyway) and dispatch to RQ as usual. If USE_NEW_WORKER_TYPE=True it will only write the task to the db (with args and kwargs now).

Create the new worker entry point

The new worker will be a click-based entry point runnable with the name pulpcore-worker.

Have the workers heartbeat and record themselves in the db

The worker should write its existence into the Worker table and should use the same naming convention as the old-style, i.e. pid@fqdn, e.g.

Additionally, the worker needs to heartbeat regularly. It should heartbeat 3 times within one worker timeout which is defined by the WORKER_TTL setting (which already exists). The "3" heartbeats is what has worked well for the RQ workers.

Distributed worker cleanup

Workers need to periodically check on each other to handle the case when OOM kills a worker and the task is still in the running state. When a worker has stopped heartbeating and shutdown non-gracefully any task that is in running that is assigned to it must be set to Failed.

Distributed algorithm for workers coordinating tasks to work on safely

This is a two part process:

  1. Find the next safe task
  2. Use postgresql advisory locks on the needed resources to ensure no other tasks are concurrently writing to them

This approach was put together my bmbouter and mdellweg through discussion on this hackmd with task run order examples

Finding the next safe task

Each worker will find the next task for itself by inspecting the Task table. Two all-SQL based approaches were considered. One inspired by stakehouse. The second is a SQL graph-based approach. However, for simplicity bmbouter and mdellweg want to use a simple application based approach to start with. This can be improved over time for additional speed gains.

  1. Query the Task table for tasks in the running state, add all of their resource string names to a set.
  2. Query the Task table for tasks in the waiting state, ordered with oldest first.
  3. Read the resources from the Tasks and add them to the set.
  4. Attempt to lock-and-run the first Task that has resources not currently in the set.

The Locking Algorithm

Postgresql advisory locks will be used with a single lock on each resource. Pulp supports multi-resource lock requests, so to avoid deadlock the all workers must acquire the locks in the same order. So if task requires ['salt', 'pepper', 'cumin'] all workers will try to lock 'salt' then 'pepper', then 'cumin' in that order. This "resources" required is in the Task data, and is ordered, so that order will be used.

Postgresql advisory locks are used for two reasons:

  1. They can be tied to the session level and not related to transactions. These locks need to exist for long periods of time, longer than a single transaction can reasonably hold. Additionally, tasking code needs to be free to start and commit transactions.

  2. They auto-cleanup once the session ends (worker disconnect) or they are explicitly released. This is a wonderful property that the RQ design has struggled with by attempting to have cleanup occur in application code.

Postgresql advisory locks are technically a single 64-bit key value, which can be thought of as a 8-byte integer. A hash will be used to produce those 8-byte integers as follow:

resource = 'somestring'
from hashlib import blake2s
the_digest = blake2s(resource.encode(), digest_size=8).digest()
the_int = int.from_bytes(the_digest, byteorder='big')

Worker forking

The worker must fork prior to running the task code. This post-forker model is beneficial to guarantee memory to be released with each Task.


  1. Add docs for the USE_NEW_WORKER_TYPE setting.
  2. Update the Architecture and Deploying docs to identify the new-style and old-style options and different requirements.
  3. Update the Installation Instruction docs to identify the new-style and old-style options and different requirements.


The same benchmark tests should be run against the new worker style. At its heart this uses this script. It should be demonstrated that tasking throughput scales linearly with worker count.

Related issues

Blocks Pulp - Story #8502: As an installer user, I can opt-in to the new-style workersCLOSED - NOTABUG

Blocks Pulp - Story #8504: As an operator user, I can opt-in to the new-style workersCLOSED - NOTABUG

Blocked by Pulp - Task #8496: Deprecate `enqueue_with_reservation` from plugin API and introduce new `dispatch` interface to replace itCLOSED - CURRENTRELEASEbmbouter

Blocked by Pulp - Task #8721: Provide a technology agnostic entrypoint to start workersCLOSED - CURRENTRELEASEmdellweg


Also available in: Atom PDF