Story #4294

Stages API allows content "discovered" with content downloaded using ArtifactDownloader stage cannot be artibraritly deep

Added by bmbouter over 2 years ago. Updated over 1 year ago.

Start date:
Due date:
% Done:


Estimated time:
Platform Release:
Sprint Candidate:
Sprint 48


The Problem

Consider Docker which uses the ArtifactDownloader + ArtifactSaver to download a metadata file and save it. That metadata file identifies N more downloads, and those downloads identify more things, and so one forming an arbitrarily large tree data structure. For terminology purposes, call each level of these a "layer" of discovery.

Currently the only way to have the ArtifactDownloader + ArtifactSaver stages download objects in subsequent "layers" is to make the pipeline longer and have more instances of the ArtifactDownloader + ArtifactSaver. This solution won't work in cases where the number of layers is not known and can't be known before runtime. This is the case with Maven for example.

The Solution

1. Extend DeclarativeContent to allow for a asyncio.Future to be attached. This future can be known about in FirstStage when it creates the the DeclarativeContent. Having the reference to the Future from FirstStage allows the plugin writer to await on it for a result.

2. Create a new stage in the pipeline called ResolveContentFutures that will set the result of the DeclarativeContent.future to the DeclarativeContent.content

3. When FirstStage awaits on DeclarativeContent.get_future() it will receive the fully downloaded Content Unit which it can then use to generate more DeclarativeContent objects to send down the pipeline.

This solution was originally inspired by an idea from mdellweg and suggested by @gmbnomis.


#1 Updated by over 2 years ago

I played around with this concept, but I found that solving the "stopping problem" was pretty tricky. Do you have in mind how the stages will know when to stop?

I found at least 1 way to do it, but as you'll see, its not ideal.

Stopping problem: Currently, the ArtifactDownloader does not shutdown ( out_q.put(None) ) until it is finished. So the problem with a loop is that it would need to not shutdown until both the in_q and the loop_in_q are shutdown. However this is circular logic, because loop_in_q won't shut down until the Custom Plugin stage shuts down, which won't happen until the ArtifactDownloader shuts down.

In my experiments, I worked around this problem by the creation of yet another q, I called it `wait_on_me_q`, which really just keeps count of all units that have left the ArtifactDownloader and have not yet left the Custom Plugin stage. This allows the ArtifactDownloader to know when to stop with something like this:

in_q_shutdown = False
loop_in_q_shutdown = False
    if in_q.get() is None:
        in_q_shutdown = True
if wait_on_me_q.empty() and loop_in_q.empty():
    loop_in_q_shutdown = True

The reason the wait_on_me_queue is necessary is that the loop_in_q may be empty, and in_q is shutdown, but some items are still inbetween the stages, and any one of them could trigger more items to be put onto the loop_in_q.

However, this solution isn't great for a variety of reasons-- mainly that the Custom Plugin stage and the ArtifactDownloader stage need to work together in a way that seems like it requires too much knowledge. Additionally, the pipeline creation gets awkward, ArtifactDownloader has 2 inputs and Custom Plugin stage has 2 outputs.

#2 Updated by bmbouter over 2 years ago

Here's an idea I had to solve the stopping problem. The gist of the idea is that when looping is in-use, shutdown the pipeline in two phases.

The first phase shutdown begins when the None arrives on in_q to the ArtifactDownloader indicating that no additional DeclarativeContent objects will be arriving on in_q. Instead of passing down the None as it does today, pass along a new object called StoppingSignal or some similar name. The Stages will need to be updated to pass this object along from in_q to out_q through the existing stages which just propogates the signal of the first phase through the pipeline. This first phase does not actually shut down ArtifactDownloader and it's subsequent stages.

Eventually this StoppingSignal will make its way to a Plugin Custom Stage which is the start of the second shutdown phase. The CustomStage will put a None into loop_out_q which is the same Queue as the loop_in_q to the ArtifactDownloader stage. When a None is encountered on the loop_in_q the ArtifactDownloader can finally shut down as it normally would, and pass None down the pipeline as it normally would, which shuts down the pipeline normally all the way down.

Note that the ArtifactDownloader needs to know if it needs to perform the None-to-StoppingSignal replacement or if it should just propogate the None down like it does already. I think a flag to the ArtifactDownloader stage would allow it to know this, which would be configured by plugin writers when configuring a custom pipeline. It would default to False allowing the normal shutdown with None only to be the default.

#3 Updated by over 2 years ago

The problem with this design is that it only allows for single depth nesting of artifacts.

Let a2 be a 2-deep nested artifact that references a1, which references a0.

a2 is the only item that comes from the First Stage.

a2 goes through the ArtifactDownloader stage. Afterwards, there are no more from the First Stage, so in_q is finished. The ArtifactDownloader stage passes a2, then first stopping signal. The stage is still polling loop_in_q.

a2 makes it to the Custom Plugin stage, which processes it, and notices that it references a1. This stage pass a2 to the out_q, and a1 to the loop_in_q. Then, reacting to the first stopping signal, the CustomPlugin stage sends another stopping signal to the loop_in_q.

The ArtifactDownloader recieves a1 from loop_in_q, downloads it, and passes it. Then it receives the second stopping signal and shuts down completely. a1 passes to the Custom Plugin stage, which notices that it references a0. It passes a1 to the out_q and a0 to loop_in_q, but the ArtifactDownloader has already shut down and a0 is never downloaded.

#4 Updated by bmbouter over 2 years ago

Thank you for pointing this out. This design is not correctly handling the case where content units received from loop_in_q and processed by ArtifactDownloader generate additional DeclarativeContent objects to be handled yet the StoppingSignal has already been sent to loop_out_q.

The adjustment is for PluginCustomStage to wait until all items that have been sent to loop_out_q have been received back through in_q and then send None (or StoppingSignal) to loop_out_q. When ArtifactDownloader receives this None (or StoppingSignal) from loop_in_q it will shutdown and send None to out_q.

The solution does require a bit more responsibility from PluginCustomStage, but I think it's a pretty straightforward pattern. What do you think about this approach? What issues come with it? What else could we do?

#5 Updated by over 2 years ago

That adjustment could work, but it could be a memory hog if it needs to keep track of which units it expects to get back. This is fundamentally the same idea as the wait_on_me counter, except that instead of keeping track of which units it expects, it just keeps track of how many units it expects to get back. When I was playing with it, the wait_on_me_q just stored a bunch of Nones. There is probably a better way to do it, than a q if all we need is a counter.

#6 Updated by over 2 years ago

One additional implementation idea I played with was instead of adding this complexity to the ArtifactDownloader, we would make a new stage based on my confluence stage. The confluence stage is able to have multiple in_qs and a single out_q. A modified confluence stage "LoopConfluenceStage" could handle any looping and be placed in front of the ArtifactDownloader. This would separate the complexity, and also allow for loops between other sets of stages.

The part that I don't like about a LoopConfluenceStage (or the design we have talked about) is that the construction of the pipeline becomes more complex. When creating the pipeline, we won't be able to simply iterate through a list like we currently do, we will need to construct additional queues and connect them to the appropriate stages.

#7 Updated by over 2 years ago

Heres the confluence code, its nice because it takes an arbitrarily long list of in_qs, which would be helpful if more than one stage needed to loop back.

import asyncio

from stages import Stage

class ConfluenceStage(Stage):
    Waits on an arbitrary number of input queues and outputs as single queue.

        in_q_list(list) List of incoming asyncio.Queues
        joied_out(asyncio.Queue): A single outgoing stream.
    async def __call__(self, in_q_list, joined_out):
        self._pending = set()
        self._finished = set()
        open_queues = [None for q in in_q_list]
        current_tasks = {}
        for queue in in_q_list:
            task = self.add_to_pending(queue.get())
            current_tasks[task] = queue

        while open_queues:
            done, self._pending = await asyncio.wait(
            self._finished = self._finished.union(done)

            while self._finished:
                out_task = self._finished.pop()
                if out_task.result() is None:
                    used_queue = current_tasks.pop(out_task)
                    next_task = self.add_to_pending(used_queue.get())
                    current_tasks[next_task] = used_queue
                    await joined_out.put(out_task.result())
        # After both inputs are finished (2 Nones) we close this stage
        await joined_out.put(None)

    def add_to_pending(self, coro):
        task = asyncio.ensure_future(coro)
        return task

#8 Updated by bmbouter over 2 years ago


That adjustment could work, but it could be a memory hog if it needs to keep track of which units it expects to get back. This is fundamentally the same idea as the wait_on_me counter, except that instead of keeping track of which units it expects, it just keeps track of how many units it expects to get back. When I was playing with it, the wait_on_me_q just stored a bunch of Nones. There is probably a better way to do it, than a q if all we need is a counter.

The issue with keeping track of the counts is that it wont' disambiguate between DeclarativeContent that is originating from FirstStage versus that content originating from PluginCustomStage and looping back around.

Also in terms of memory, I don't think this will incur much memory at all since DeclarativeContent is an object it won't store a copy, only a reference. Also it will be as fast as it could be because using a set (versus list, dict, tuples) should produce the most efficient add and remove operations in Python.

#9 Updated by over 2 years ago

Ok, I think that will be fine.

The only remaining concern I have is how to construct the pipeline, but I'm willing to leave that to the implementer. I'm marking as groomed and adding to the sprint.

#10 Updated by gmbnomis over 2 years ago

To add a completely different approach:

mdellweg had a very nice and elegant idea to solve this ( Judging from the comment, there should already be an implementation in pulp_deb (but I couldn't find it).

But even without having looked at an implementation, I think it will be elegant for the following reasons:
- The first stage can just decide whether a content unit is a terminal content unit or a whether it may generate other content units. In the latter case, it just attaches a Future to the declarative_content and awaits it (together with all other content unit futures and the put() into the next stage)
- When a declarative_content leaves the pipeline (at the latest), its attached Future (if present) is set to done and the first stage will wake up to inspect the content unit & artifact(s).
- If there is nothing to await in the first stage anymore, it sends the terminating None. Most stages do not have to know that they are working in a complicated feedback pipeline setup. Issue #4296 needs to be addressed though (but that is the case for all solutions)
- The pipeline stays linear. Feedback is not done via queues (which may be the source for additional deadlock situations)
- All the meta-data handling and creation of declarative_content stays in the first stage and is not distributed across several stages.

#11 Updated by mdellweg over 2 years ago

Thank you gmbnomis for bringing this up. I had to move away from this implementation, because i did not see a method to work around #4296 on the plugin side.

The last version employing this method is:

The only benefit you did not mention, that comes to my mind is that because every DeclarativeContent is produced in the first stage, there is no need for multiple instaces of the Artifact stages in the pipeline as in .

#12 Updated by over 2 years ago

bmbouter and I chatted about this one, and we agreed that the uses of Futures is a better design pattern. What we'd like to do is incorporate this into pulpcore tooling to make this pattern simpler to do for the plugin writers. I'm hopeful that we can fix the deadlocking issue for batches and all plugins will be able to benefit from this.

@mdellweg++ for the slick and idiomatic design, @gmbnomis++ for bringing it up!

#13 Updated by mdellweg over 2 years ago

Can you specify, how the tooling in plugin-api should look like, then i could start to resurrect code from my old pulp_deb commit.

#14 Updated by over 2 years ago

There are 3 situations that we need to keep in mind

  1. Must work for "flat" content that is not nested
  2. Must work for nested content where the depth is known (pulp_docker, pulp_deb)
  3. must work for nested content where the depth is not known (maven)

Given that we we just looking at the code rather than running it, the implementation I am imagining is just speculation. bmbouter may have a different approach, but this is what I'm thinking:

First, we could augment the DeclarativeContent model to have a future attribute:

Additionally, we could include the stage at the end of the pipe that sets the result of those futures.

As it is in the deb commit, I suspect that we might need some adjustments to be in place so this will work for plugins that do not have discoverable/nested content. Another potential issue is that each content type "layer" keeps a separate list of futures, which I think will be troublesome for cases where we do not know how deeply nested things are. One idea for addressing this is keeping a single list of futures in the first stage, and the type-specific logic will only act on the appropriate type, but that would be in the plugin implementation, not pulpcore-plugin.

With those components in place, the plugin-writers will need (hopefully) a less sophisticated understanding of asyncio, and they would only need to keep a list (or lists) of pending content and use the loop syntax on it:

We did have some concerns about changing the length of python lists as we iterate through them, but if that is a problem we can simply use the old syntax.

#15 Updated by mdellweg over 2 years ago

I do not see a Problem with flat content, because they would not even create the Future (and never need to know about it). So the ResolveFuture stage is a noop for them.

For the unknown discoverage depth, you can easily attach a callback to the Future [0] and forget about it.


#16 Updated by bmbouter over 2 years ago

mdellweg wrote:

For the unknown discoverage depth, you can easily attach a callback to the Future [0] and forget about it.


This works if you want to write that callback handler code and in some cases that is what the plugin writer will do.

For the unknown discoverage depth though, you want the "handler" to generate new content from FirstStage because the handler needs to make more DeclarativeContent objects that also need to be handled by the Stages pipeline. Bringing those objects back to the pipeline allow for reuse of the pipeline.

#17 Updated by mdellweg over 2 years ago

Oh, right. You need to at least keep track of those futures in the first stage to know when to send the final None.

So i guess, you would loop infinitely with a growing and shrinking list of unresolved futures and break when it vanishes.

#18 Updated by bmbouter over 2 years ago

  • Status changed from NEW to ASSIGNED
  • Assignee set to bmbouter

The code itself is already merged w/ the first associated commit from @mdellweg. Thanks mdellweg!

I'm taking as ASSIGNED so I can add plugin writer documentation about the pattern and what situations it's useful for.

#19 Updated by bmbouter over 2 years ago

  • Description updated (diff)

#20 Updated by bmbouter over 2 years ago

  • File deleted (loop_diagram.jpg)

#21 Updated by daviddavis over 2 years ago

  • Sprint set to Sprint 48

#22 Updated by bmbouter over 2 years ago

  • Status changed from ASSIGNED to POST

#23 Updated by bmbouter over 2 years ago

  • Status changed from POST to MODIFIED
  • % Done changed from 0 to 100

Applied in changeset commit:pulpcore-plugin|26549044c72e8205da8585e0fa51a36c274e52ff.

#24 Updated by daviddavis about 2 years ago

  • Sprint/Milestone set to 3.0.0

#25 Updated by bmbouter about 2 years ago

  • Tags deleted (Pulp 3, Pulp 3 RC Blocker)

#26 Updated by bmbouter over 1 year ago

  • Status changed from MODIFIED to CLOSED - CURRENTRELEASE

Please register to edit this issue

Also available in: Atom PDF