Project

Profile

Help

Issue #4018

closed

ArtifactDownloader stage does not accept further work if blocked in download

Added by gmbnomis over 5 years ago. Updated over 4 years ago.

Status:
CLOSED - CURRENTRELEASE
Priority:
Normal
Assignee:
Category:
-
Sprint/Milestone:
Start date:
Due date:
Estimated time:
Severity:
2. Medium
Version:
Platform Release:
OS:
Triaged:
Yes
Groomed:
No
Sprint Candidate:
No
Tags:
Sprint:
Quarter:

Description

Problem Statement

The ArtifactDownloader has two locations where it can block: dequeue from
the upstream stage queue and waiting for downloads to finish.

When downloads are ongoing, no new work can be accepted (even when below
the limit of concurrent downloads).

For example, this test:

    async def test_sparse_batches_dont_block_stage(self):

        def queue_content_with_a_single_download(in_q, batchsize=100, delay=100):
            """
            Queue a batch of `batchsize` declarative_content instances. Only the
            first one triggers a download of duration `delay`.
            """
            self.queue_dc(in_q, delays=[delay])
            for i in range(batchsize - 1):
                self.queue_dc(in_q, download=False)

        in_q = asyncio.Queue()
        out_q = asyncio.Queue()
        download_task = self.loop.create_task(self.download_task(in_q, out_q))

        # First 100 declarative content instances
        queue_content_with_a_single_download(in_q)

        # After 0.5 seconds, the first content unit is downloading
        await self.advance(0.5)
        self.assertEqual(DownloaderMock.running, 1)
        self.assertEqual(out_q.qsize(), 99)

        # at 0.5 seconds next batch arrives (last batch)
        queue_content_with_a_single_download(in_q)
        in_q.put_nowait(None)

        # at 1.0 seconds, two downloads are running
        await self.advance(0.5)
        self.assertEqual(DownloaderMock.running, 2)
        self.assertEqual(out_q.qsize(), 99)

        # at 101 seconds, stage should have completed
        await self.advance(100)

        self.assertEqual(DownloaderMock.running, 0)
        self.assertEqual(DownloaderMock.downloads, 2)
        self.assertEqual(download_task.result(), DownloaderMock.downloads)
        self.assertEqual(in_q.qsize(), 0)
        self.assertEqual(out_q.qsize(), 201)

In the current implementation, this fails with:

        # at 1.0 seconds, two downloads are running
        await self.advance(0.5)
>       self.assertEqual(DownloaderMock.running, 2)
E       AssertionError: 1 != 2

Proposal

A fix is to wait on both upstream content units and downloads at the same
location. I have a running version of this ArtifactDownloader. It needs some cleanup before posting, though.

Actions #1

Updated by CodeHeeler over 5 years ago

  • Triaged changed from No to Yes
Actions #2

Updated by gmbnomis over 5 years ago

  • Assignee set to gmbnomis
Actions #3

Updated by gmbnomis over 5 years ago

  • Status changed from NEW to ASSIGNED
Actions #4

Updated by bmbouter over 5 years ago

We probably should make sure there is a maximum amount of items held at the downloader stage though. This is for items that have been taken out of the queue but aren't yet able to be downloaded. Does that make sense? Is that related to what you're writing about?

Ping me for review or help whenever you have something to share!

Actions #5

Updated by gmbnomis over 5 years ago

bmbouter wrote:

We probably should make sure there is a maximum amount of items held at the downloader stage though. This is for items that have been taken out of the queue but aren't yet able to be downloaded. Does that make sense? Is that related to what you're writing about?

Yes, that makes perfect sense and I want to retain that. In my opinion, the ArtifactDownloader should:

- Limit the number of content unit being worked on
- Limit the number of concurrent downloads
- Whenever the limit of concurrent downloads is not reached, pick up some work from one of the content units being worked on (if there is one)
- Whenever the maximum number of content unit being worked on is not reached, get a new content unit. If it needs no download, forward it directly to the out_q. Otherwise, add it to the pool of content unit being worked on (and start downloading if possible).

The current implementation limits the number of content units being worked on by stopping to get content units when saturated. This is an implicit limit, but is not the main problem (making it more explicit would be nice though).

The main problem I see is: If ArtifactDownloader works on downloads (even if it is a only a single one), no new content is being looked at. In the worst case, if every content batch being processed contains only one content unit with one artifact to download, it will download one artifact at a time.

Ping me for review or help whenever you have something to share!

Sure, will do.

Added by gmbnomis over 5 years ago

Revision d413ef60 | View on GitHub

Implement single blocking asyncio.wait for ArtifactDownloader

The ArtifactDownloader had two locations where it could block: dequeue from the upstream stage queue and waiting for downloads to finish.

When downloads were ongoing, no new work could be accepted (even when below the limit of concurrent downloads).

Fix this by waiting on both upstream content units and downloads at the same time. Additionally, refactor the huge method into a "run" class with state. This allows to split up the functionality into smaller methods without passing a large set of parameters around.

Restructure the tasks: Tasks are handling one content instance in the form of a single coroutine now. The coroutine computes the artifacts to download, downloads them and updates the content instance. The overall limit on the number of concurrent downloads is restricted by a common semaphore.

Add unit tests using simulated downloads and clocks to verify the dynamic behavior of ArtifactDownloader.

closes #4018 https://pulp.plan.io/issues/4018

Added by gmbnomis over 5 years ago

Revision d413ef60 | View on GitHub

Implement single blocking asyncio.wait for ArtifactDownloader

The ArtifactDownloader had two locations where it could block: dequeue from the upstream stage queue and waiting for downloads to finish.

When downloads were ongoing, no new work could be accepted (even when below the limit of concurrent downloads).

Fix this by waiting on both upstream content units and downloads at the same time. Additionally, refactor the huge method into a "run" class with state. This allows to split up the functionality into smaller methods without passing a large set of parameters around.

Restructure the tasks: Tasks are handling one content instance in the form of a single coroutine now. The coroutine computes the artifacts to download, downloads them and updates the content instance. The overall limit on the number of concurrent downloads is restricted by a common semaphore.

Add unit tests using simulated downloads and clocks to verify the dynamic behavior of ArtifactDownloader.

closes #4018 https://pulp.plan.io/issues/4018

Actions #7

Updated by gmbnomis over 5 years ago

  • Status changed from ASSIGNED to MODIFIED
Actions #8

Updated by daviddavis almost 5 years ago

  • Sprint/Milestone set to 3.0.0
Actions #9

Updated by bmbouter almost 5 years ago

  • Tags deleted (Pulp 3)
Actions #10

Updated by bmbouter over 4 years ago

  • Status changed from MODIFIED to CLOSED - CURRENTRELEASE

Also available in: Atom PDF