Actions
Issue #4018
closedArtifactDownloader stage does not accept further work if blocked in download
Start date:
Due date:
Estimated time:
Severity:
2. Medium
Version:
Platform Release:
OS:
Triaged:
Yes
Groomed:
No
Sprint Candidate:
No
Tags:
Sprint:
Quarter:
Description
Problem Statement¶
The ArtifactDownloader has two locations where it can block: dequeue from
the upstream stage queue and waiting for downloads to finish.
When downloads are ongoing, no new work can be accepted (even when below
the limit of concurrent downloads).
For example, this test:
async def test_sparse_batches_dont_block_stage(self):
def queue_content_with_a_single_download(in_q, batchsize=100, delay=100):
"""
Queue a batch of `batchsize` declarative_content instances. Only the
first one triggers a download of duration `delay`.
"""
self.queue_dc(in_q, delays=[delay])
for i in range(batchsize - 1):
self.queue_dc(in_q, download=False)
in_q = asyncio.Queue()
out_q = asyncio.Queue()
download_task = self.loop.create_task(self.download_task(in_q, out_q))
# First 100 declarative content instances
queue_content_with_a_single_download(in_q)
# After 0.5 seconds, the first content unit is downloading
await self.advance(0.5)
self.assertEqual(DownloaderMock.running, 1)
self.assertEqual(out_q.qsize(), 99)
# at 0.5 seconds next batch arrives (last batch)
queue_content_with_a_single_download(in_q)
in_q.put_nowait(None)
# at 1.0 seconds, two downloads are running
await self.advance(0.5)
self.assertEqual(DownloaderMock.running, 2)
self.assertEqual(out_q.qsize(), 99)
# at 101 seconds, stage should have completed
await self.advance(100)
self.assertEqual(DownloaderMock.running, 0)
self.assertEqual(DownloaderMock.downloads, 2)
self.assertEqual(download_task.result(), DownloaderMock.downloads)
self.assertEqual(in_q.qsize(), 0)
self.assertEqual(out_q.qsize(), 201)
In the current implementation, this fails with:
# at 1.0 seconds, two downloads are running
await self.advance(0.5)
> self.assertEqual(DownloaderMock.running, 2)
E AssertionError: 1 != 2
Proposal¶
A fix is to wait on both upstream content units and downloads at the same
location. I have a running version of this ArtifactDownloader. It needs some cleanup before posting, though.
Actions
Implement single blocking asyncio.wait for ArtifactDownloader
The ArtifactDownloader had two locations where it could block: dequeue from the upstream stage queue and waiting for downloads to finish.
When downloads were ongoing, no new work could be accepted (even when below the limit of concurrent downloads).
Fix this by waiting on both upstream content units and downloads at the same time. Additionally, refactor the huge method into a "run" class with state. This allows to split up the functionality into smaller methods without passing a large set of parameters around.
Restructure the tasks: Tasks are handling one content instance in the form of a single coroutine now. The coroutine computes the artifacts to download, downloads them and updates the content instance. The overall limit on the number of concurrent downloads is restricted by a common semaphore.
Add unit tests using simulated downloads and clocks to verify the dynamic behavior of ArtifactDownloader.
closes #4018 https://pulp.plan.io/issues/4018