Story #8501

Updated by bmbouter about 3 years ago

## Background 

 See the epic description for a high level. This is the primary implementation story of all of the sub-tasks. 

 ## Add args and kwargs to the Task object 

 Add Task.args and Task.kwargs and have both `dispatch` and `enqueue_with_reservation` create those fields. 

 These will be JSON serialized types for safety versus pickle for example. It's expected that the tasking system can only pass JSON serializable arguments. 

 These will not be included in the user facing TaskSerializer for data security reasons. Additionally it's not very helpful for users to be able to query on the args or kwargs to a task anyway. 

 These two fields will be nullable because not all tasks will have args or kwargs. 

 ## Enable users to switch to the new or old style 

 To do this, add the new    `USE_NEW_WORKER_TYPE` setting which if `True` will cause Pulp to not dispatch to RQ. It will be `False` by default. 

 This setting will be used by the `dispatch` and `enqueue_with_reservation` interfaces. If `USE_NEW_WORKER_TYPE=False` (the default) it will write the task to the db as usual (only now with args and kwargs anyway) and dispatch to RQ as usual. If `USE_NEW_WORKER_TYPE=True` it will only write the task to the db (with args and kwargs now). 

 ## Create the new worker entry point 

 The new worker will be a [click-based entry point]( runnable with the name `pulpcore-worker`. 

 ## Have the workers heartbeat and record themselves in the db 

 The worker should write its existence into the `Worker` table and should use the same naming convention as the old-style, i.e. `pid@fqdn`, e.g. ``. 

 Additionally, the worker needs to heartbeat regularly. It should heartbeat 3 times within one worker timeout which is defined by the `WORKER_TTL` setting (which already exists). The "3" heartbeats is what has worked well for the RQ workers. 

 ## Distributed worker cleanup 

 Workers need to periodically check on each other to handle the case when OOM kills a worker and the task is still in the running state. When a worker has stopped heartbeating and shutdown non-gracefully any task that is in running that is assigned to it must be set to Failed. 

 ## Distributed algorithm for workers coordinating tasks to work on safely 

 This is a two part process: 

 1. Find the next safe task 
 2. Use postgresql advisory locks on the needed resources to ensure no other tasks are concurrently writing to them 

 This approach was put together my @bmbouter and @mdellweg through discussion on [this hackmd with task run order examples]( 

 ## Finding the next safe task 

 Each worker will find the next task for itself by inspecting the Task table. 

 ## The Locking Algorithm 

 [Postgresql advisory locks]( will be used with a single lock on each resource. Pulp supports multi-resource lock requests, so to avoid deadlock the all workers must acquire the locks in the same order. So if task requires ['salt', 'pepper', 'cumin'] all workers will try to lock 'salt' then 'pepper', then 'cumin' in that order. This "resources" required is in the Task data, and is ordered, so that order will be used. 

 Postgresql advisory locks are used for two reasons: 

 1. They can be tied to the session level and not related to transactions. These locks need to exist for long periods of time, longer than a single transaction can reasonably hold. Additionally, tasking code needs to be free to start and commit transactions. 

 2. They auto-cleanup once the session ends (worker disconnect) or they are explicitly released. This is a wonderful property that the RQ design has struggled with by attempting to have cleanup occur in application code. 

 Postgresql advisory locks are technically a single 64-bit key value, which can be thought of as a 8-byte integer. A hash will be used to produce those 8-byte integers as follow: 

 resource = 'somestring' 
 from hashlib import blake2s 
 the_digest = blake2s(resource.encode(), digest_size=8).digest() 
 the_int = int.from_bytes(the_digest, byteorder='big') 

 ## Docs 

 1. Add docs for the `USE_NEW_WORKER_TYPE` setting. 
 2. Update the [Architecture and Deploying docs]( to identify the new-style and old-style options and different requirements. 
 3. Update the [Installation Instruction docs]( to identify the new-style and old-style options and different requirements. 

 ## Benchmarking 

 The same [benchmark tests]( should be run against the new worker style. At its heart this uses [this script]( It should be demonstrated that tasking throughput scales linearly with worker count.