Issue #4018
closedArtifactDownloader stage does not accept further work if blocked in download
Description
Problem Statement¶
The ArtifactDownloader has two locations where it can block: dequeue from
the upstream stage queue and waiting for downloads to finish.
When downloads are ongoing, no new work can be accepted (even when below
the limit of concurrent downloads).
For example, this test:
async def test_sparse_batches_dont_block_stage(self):
def queue_content_with_a_single_download(in_q, batchsize=100, delay=100):
"""
Queue a batch of `batchsize` declarative_content instances. Only the
first one triggers a download of duration `delay`.
"""
self.queue_dc(in_q, delays=[delay])
for i in range(batchsize - 1):
self.queue_dc(in_q, download=False)
in_q = asyncio.Queue()
out_q = asyncio.Queue()
download_task = self.loop.create_task(self.download_task(in_q, out_q))
# First 100 declarative content instances
queue_content_with_a_single_download(in_q)
# After 0.5 seconds, the first content unit is downloading
await self.advance(0.5)
self.assertEqual(DownloaderMock.running, 1)
self.assertEqual(out_q.qsize(), 99)
# at 0.5 seconds next batch arrives (last batch)
queue_content_with_a_single_download(in_q)
in_q.put_nowait(None)
# at 1.0 seconds, two downloads are running
await self.advance(0.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(out_q.qsize(), 99)
# at 101 seconds, stage should have completed
await self.advance(100)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.downloads, 2)
self.assertEqual(download_task.result(), DownloaderMock.downloads)
self.assertEqual(in_q.qsize(), 0)
self.assertEqual(out_q.qsize(), 201)
In the current implementation, this fails with:
# at 1.0 seconds, two downloads are running
await self.advance(0.5)
> self.assertEqual(DownloaderMock.running, 2)
E AssertionError: 1 != 2
Proposal¶
A fix is to wait on both upstream content units and downloads at the same
location. I have a running version of this ArtifactDownloader. It needs some cleanup before posting, though.
Updated by bmbouter about 6 years ago
We probably should make sure there is a maximum amount of items held at the downloader stage though. This is for items that have been taken out of the queue but aren't yet able to be downloaded. Does that make sense? Is that related to what you're writing about?
Ping me for review or help whenever you have something to share!
Updated by gmbnomis about 6 years ago
bmbouter wrote:
We probably should make sure there is a maximum amount of items held at the downloader stage though. This is for items that have been taken out of the queue but aren't yet able to be downloaded. Does that make sense? Is that related to what you're writing about?
Yes, that makes perfect sense and I want to retain that. In my opinion, the ArtifactDownloader should:
- Limit the number of content unit being worked on
- Limit the number of concurrent downloads
- Whenever the limit of concurrent downloads is not reached, pick up some work from one of the content units being worked on (if there is one)
- Whenever the maximum number of content unit being worked on is not reached, get a new content unit. If it needs no download, forward it directly to the out_q. Otherwise, add it to the pool of content unit being worked on (and start downloading if possible).
The current implementation limits the number of content units being worked on by stopping to get content units when saturated. This is an implicit limit, but is not the main problem (making it more explicit would be nice though).
The main problem I see is: If ArtifactDownloader works on downloads (even if it is a only a single one), no new content is being looked at. In the worst case, if every content batch being processed contains only one content unit with one artifact to download, it will download one artifact at a time.
Ping me for review or help whenever you have something to share!
Sure, will do.
Added by gmbnomis about 6 years ago
Added by gmbnomis about 6 years ago
Revision d413ef60 | View on GitHub
Implement single blocking asyncio.wait for ArtifactDownloader
The ArtifactDownloader had two locations where it could block: dequeue from the upstream stage queue and waiting for downloads to finish.
When downloads were ongoing, no new work could be accepted (even when below the limit of concurrent downloads).
Fix this by waiting on both upstream content units and downloads at the same time. Additionally, refactor the huge method into a "run" class with state. This allows to split up the functionality into smaller methods without passing a large set of parameters around.
Restructure the tasks: Tasks are handling one content instance in the form of a single coroutine now. The coroutine computes the artifacts to download, downloads them and updates the content instance. The overall limit on the number of concurrent downloads is restricted by a common semaphore.
Add unit tests using simulated downloads and clocks to verify the dynamic behavior of ArtifactDownloader.
Updated by gmbnomis about 6 years ago
PR is at: https://github.com/pulp/pulp/pull/3661
Updated by gmbnomis about 6 years ago
- Status changed from ASSIGNED to MODIFIED
Applied in changeset pulp|d413ef60ff511bcb9bdfd3fdb2d65953be2b1862.
Updated by bmbouter almost 5 years ago
- Status changed from MODIFIED to CLOSED - CURRENTRELEASE
Implement single blocking asyncio.wait for ArtifactDownloader
The ArtifactDownloader had two locations where it could block: dequeue from the upstream stage queue and waiting for downloads to finish.
When downloads were ongoing, no new work could be accepted (even when below the limit of concurrent downloads).
Fix this by waiting on both upstream content units and downloads at the same time. Additionally, refactor the huge method into a "run" class with state. This allows to split up the functionality into smaller methods without passing a large set of parameters around.
Restructure the tasks: Tasks are handling one content instance in the form of a single coroutine now. The coroutine computes the artifacts to download, downloads them and updates the content instance. The overall limit on the number of concurrent downloads is restricted by a common semaphore.
Add unit tests using simulated downloads and clocks to verify the dynamic behavior of ArtifactDownloader.
closes #4018 https://pulp.plan.io/issues/4018