## Background See the epic description for a high level. This is the primary implementation story of all of the sub-tasks. ## Add args and kwargs to the Task object Add Task.args and Task.kwargs and have both `dispatch` and `enqueue_with_reservation` create those fields. These will be JSON serialized types for safety versus pickle for example. It's expected that the tasking system can only pass JSON serializable arguments. These will not be included in the user facing TaskSerializer for data security reasons. Additionally it's not very helpful for users to be able to query on the args or kwargs to a task anyway. These two fields will be nullable because not all tasks will have args or kwargs. ## Enable users to switch to the new or old style To do this, add the new `USE_NEW_WORKER_TYPE` setting which if `True` will cause Pulp to not dispatch to RQ. It will be `False` by default. This setting will be used by the `dispatch` and `enqueue_with_reservation` interfaces. If `USE_NEW_WORKER_TYPE=False` (the default) it will write the task to the db as usual (only now with args and kwargs anyway) and dispatch to RQ as usual. If `USE_NEW_WORKER_TYPE=True` it will only write the task to the db (with args and kwargs now). ## Create the new worker entry point The new worker will be a [click-based entry point](https://click.palletsprojects.com/en/7.x/setuptools/#setuptools-integration) runnable with the name `pulpcore-worker`. ## Have the workers heartbeat and record themselves in the db The worker should write its existence into the `Worker` table and should use the same naming convention as the old-style, i.e. `pid@fqdn`, e.g. `email@example.com`. Additionally, the worker needs to heartbeat regularly. It should heartbeat 3 times within one worker timeout which is defined by the `WORKER_TTL` setting (which already exists). The "3" heartbeats is what has worked well for the RQ workers. ## Distributed worker cleanup Workers need to periodically check on each other to handle the case when OOM kills a worker and the task is still in the running state. When a worker has stopped heartbeating and shutdown non-gracefully any task that is in running that is assigned to it must be set to Failed. ## Distributed algorithm for workers coordinating tasks to work on safely This is a two part process: 1. Find the next safe task 2. Use postgresql advisory locks on the needed resources to ensure no other tasks are concurrently writing to them This approach was put together my @bmbouter and @mdellweg through discussion on [this hackmd with task run order examples](https://hackmd.io/Y0l9nBm9SFmPiIBXBz8lwQ) ## Finding the next safe task Each worker will find the next task for itself by inspecting the Task table. Two all-SQL based approaches were considered. One inspired by [stakehouse](https://github.com/mdellweg/steakhouse/blob/master/steakhouse/grill.py#L38). The second is a SQL graph-based approach. However, for simplicity @bmbouter and @mdellweg want to use a simple application based approach to start with. This can be improved over time for additional speed gains. 1. Query the Task table for tasks in the running state, add all of their resource string names to a set. 2. Query the Task table for tasks in the waiting state, ordered with oldest first. 3. Read the resources from the Tasks and add them to the set. 4. Attempt to lock-and-run the first Task that has resources not currently in the set. ## The Locking Algorithm [Postgresql advisory locks](https://www.postgresql.org/docs/9.4/explicit-locking.html) will be used with a single lock on each resource. Pulp supports multi-resource lock requests, so to avoid deadlock the all workers must acquire the locks in the same order. So if task requires ['salt', 'pepper', 'cumin'] all workers will try to lock 'salt' then 'pepper', then 'cumin' in that order. This "resources" required is in the Task data, and is ordered, so that order will be used. Postgresql advisory locks are used for two reasons: 1. They can be tied to the session level and not related to transactions. These locks need to exist for long periods of time, longer than a single transaction can reasonably hold. Additionally, tasking code needs to be free to start and commit transactions. 2. They auto-cleanup once the session ends (worker disconnect) or they are explicitly released. This is a wonderful property that the RQ design has struggled with by attempting to have cleanup occur in application code. Postgresql advisory locks are technically a single 64-bit key value, which can be thought of as a 8-byte integer. A hash will be used to produce those 8-byte integers as follow: ``` resource = 'somestring' from hashlib import blake2s the_digest = blake2s(resource.encode(), digest_size=8).digest() the_int = int.from_bytes(the_digest, byteorder='big') ``` ## Worker forking The worker must fork prior to running the task code. This post-forker model is beneficial to guarantee memory to be released with each Task. ## Docs 1. Add docs for the `USE_NEW_WORKER_TYPE` setting. 2. Update the [Architecture and Deploying docs](https://docs.pulpproject.org/pulpcore/components.html) to identify the new-style and old-style options and different requirements. 3. Update the [Installation Instruction docs](https://docs.pulpproject.org/pulpcore/installation/instructions.html) to identify the new-style and old-style options and different requirements. ## Benchmarking The same [benchmark tests](https://hackmd.io/DV633ocwTnShsI8LdajshA) should be run against the new worker style. At its heart this uses [this script](https://gist.github.com/bmbouter/3e6ca6723a54978c340afbf4aecb63e3). It should be demonstrated that tasking throughput scales linearly with worker count.