Task #8495: [EPIC] As a user, I have a new-improved tasking system with new-style workers
As a user, I have new pulp workers that do not require a resource manager
See the epic description for a high level. This is the primary implementation story of all of the sub-tasks.
Add args and kwargs to the Task object¶
Add Task.args and Task.kwargs and have both
enqueue_with_reservation create those fields.
These will be JSON serialized types for safety versus pickle for example. It's expected that the tasking system can only pass JSON serializable arguments.
These will not be included in the user facing TaskSerializer for data security reasons. Additionally it's not very helpful for users to be able to query on the args or kwargs to a task anyway.
These two fields will be nullable because not all tasks will have args or kwargs.
Enable users to switch to the new or old style¶
To do this, add the new
USE_NEW_WORKER_TYPE setting which if
True will cause Pulp to not dispatch to RQ. It will be
False by default.
This setting will be used by the
enqueue_with_reservation interfaces. If
USE_NEW_WORKER_TYPE=False (the default) it will write the task to the db as usual (only now with args and kwargs anyway) and dispatch to RQ as usual. If
USE_NEW_WORKER_TYPE=True it will only write the task to the db (with args and kwargs now).
Create the new worker entry point¶
The new worker will be a click-based entry point runnable with the name
Have the workers heartbeat and record themselves in the db¶
The worker should write its existence into the
Worker table and should use the same naming convention as the old-style, i.e.
Additionally, the worker needs to heartbeat regularly. It should heartbeat 3 times within one worker timeout which is defined by the
WORKER_TTL setting (which already exists). The "3" heartbeats is what has worked well for the RQ workers.
Distributed worker cleanup¶
Workers need to periodically check on each other to handle the case when OOM kills a worker and the task is still in the running state. When a worker has stopped heartbeating and shutdown non-gracefully any task that is in running that is assigned to it must be set to Failed.
Distributed algorithm for workers coordinating tasks to work on safely¶
This is a two part process:
- Find the next safe task
- Use postgresql advisory locks on the needed resources to ensure no other tasks are concurrently writing to them
Finding the next safe task¶
Each worker will find the next task for itself by inspecting the Task table. Two all-SQL based approaches were considered. One inspired by stakehouse. The second is a SQL graph-based approach. However, for simplicity bmbouter and mdellweg want to use a simple application based approach to start with. This can be improved over time for additional speed gains.
- Query the Task table for tasks in the running state, add all of their resource string names to a set.
- Query the Task table for tasks in the waiting state, ordered with oldest first.
- Read the resources from the Tasks and add them to the set.
- Attempt to lock-and-run the first Task that has resources not currently in the set.
The Locking Algorithm¶
Postgresql advisory locks will be used with a single lock on each resource. Pulp supports multi-resource lock requests, so to avoid deadlock the all workers must acquire the locks in the same order. So if task requires ['salt', 'pepper', 'cumin'] all workers will try to lock 'salt' then 'pepper', then 'cumin' in that order. This "resources" required is in the Task data, and is ordered, so that order will be used.
Postgresql advisory locks are used for two reasons:
They can be tied to the session level and not related to transactions. These locks need to exist for long periods of time, longer than a single transaction can reasonably hold. Additionally, tasking code needs to be free to start and commit transactions.
They auto-cleanup once the session ends (worker disconnect) or they are explicitly released. This is a wonderful property that the RQ design has struggled with by attempting to have cleanup occur in application code.
Postgresql advisory locks are technically a single 64-bit key value, which can be thought of as a 8-byte integer. A hash will be used to produce those 8-byte integers as follow:
resource = 'somestring' from hashlib import blake2s the_digest = blake2s(resource.encode(), digest_size=8).digest() the_int = int.from_bytes(the_digest, byteorder='big')
The worker must fork prior to running the task code. This post-forker model is beneficial to guarantee memory to be released with each Task.
- Add docs for the
- Update the Architecture and Deploying docs to identify the new-style and old-style options and different requirements.
- Update the Installation Instruction docs to identify the new-style and old-style options and different requirements.