Downloading » History » Revision 24
« Previous |
Revision 24/32
(diff)
| Next »
jortel@redhat.com, 09/05/2017 11:24 PM
Downloading¶
In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them Jupiter and Saturn. The Jupiter solution is based on concurrent.futures and the Saturn solution is based on asyncio. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The Jupiter solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The Saturn solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing.
The three actors for our use cases is the Importer, Streamer and Plugin Writer. The ChangeSet shares a subset of the Streamer requirements but not included in this discussion.
Design Goals & Constraints¶
The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define how the requirements are met.
juniper:
- constraints:
- object oriented
- support semantic versioning
- goals
- encapsulate underlying technologies
- consistent interface across downloads. standard arguments, return values and raised exceptions.
- delegation pattern for common customization:
- handling of downloaded bits to Writers
- validation delegated to Validations
- optional digest and size calculation delegated to DownloadMonitor
- error handling delegated to Event handlers.
- external participation of download process through defined event registration and callback.
- delegate concurrency to standard lib (concurrent.futures).
- delegate protocol implementation to client libs.
saturn:
- constraints:
- object oriented
- support semantic versioning
- goals
- direct exposure of client libs.
- minimum encapsulation of underlying technologies.
- minimum # of first class concepts (classes) and abstractions.
- minimum # lines of code to maintain.
- delegate concurrency to standard lib (asyncio).
- delegate protocol implementation to client libs.
Use Cases¶
Importer¶
As an importer, I need to download single files.
jupiter:
download = HttpDownload(
url=url,
writer=FileWriter(path),
timeout=Timeout(connect=10, read=15),
user=User(name='elmer', password='...'),
ssl=SSL(ca_certificate='path-to-certificate',
client_certificate='path-to-certificate',
client_key='path-to-key',
validation=True),
proxy_url='http://user:password@gateway.org')
try:
download()
except DownloadError:
# An error occurred.
else:
# Go read the downloaded file \o/
saturn:
ssl_context = aiohttpSSLContext()
ssl_context.load_cert_chain('path-to-CA_certificate')
ssl_context.load_cert_chain('path-to-CLIENT_certificate')
ssl_context.load_cert_chain('path-to-CLIENT_key')
connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context)
session = aiohttp.ClientSession(
connector=connector,
read_timeout=15,
auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8'))
downloader_obj = HttpDownloader(
session,
url,
proxy='http://gateway.org',
proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')
downloader_coroutine = downloader_obj.run()
loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
for task in done:
try:
result = task.result() # This is a DownloadResult
except aiohttp.ClientError:
# An error occurred.
question: How can the connect timeout be set in aiohttp?
As an importer, I can leverage all settings supported by underlying protocol specific client lib.
jupiter:
Commonly used settings supported by abstraction. Additional settings could be supported by subclassing.
class SpecialDownload(HttpDownload):
def _settings(self):
settings = super()._settings()
settings['special'] = <special value>
return settings
saturn:
The underlying client lib arguments directly exposed.
As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download.
jupiter:
Using the optional DownloadMonitor to collect statistics such as size and calculate digests.
download = HttpDownload(..)
monitor = DownloadMonitor(download)
... # perform download.
artifact = Artifact(**monitor.facts())
artifact.save()
saturn:
The size and all digests always calculated.
downloader_obj = HttpDownloader(...)
... # perform download.
result = task.result(**result.artifact_attributes)
artifact = Artifact()
artifact.save()
As an importer, I need to download files concurrently.
jupiter:
Using the Batch to run the downloads concurrently. Only 3 downloads in memory at once.
downloads = (HttpDownload(...) for _ in range(10))
with Batch(downloads, backlog=3) as batch:
for plan in batch():
try:
plan.result()
except DownloadError:
# An error occurred.
else:
# Use the downloaded file \o/
saturn:
Using the asyncio run loop. This example does not restrict the number of downloads in memory at once.
downloaders = (HttpDownloader...) for _ in range(10))
loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders]))
for task in done:
try:
result = task.result() # This is a DownloadResult
except aiohttp.ClientError:
# An error occurred.
As an importer, I want to validate downloaded files.
jupiter:
Supported by adding provided or custom validations to the download. A validation error raises ValidationError which IsA DownloadError.
download = HttpDownload(...)
download.append(DigestValidation('sha256', '0x1234'))
try:
download()
except DownloadError:
# An error occurred.
saturn:
Supported by passing the expected_digests dictionary and catching DigestValidationError.
downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'})
downloader_coroutine = downloader_obj.run()
loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
for task in done:
try:
result = task.result() # This is a DownloadResult
except (aiohttp.ClientError, DigestValidationError):
# An error occurred.
As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading.
jupiter:
saturn:
As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory.
jupiter:
Using the Batch to run the downloads concurrently. Only 3 downloads in memory at once.
downloads = (HttpDownload(...) for _ in range(10))
with Batch(downloads, backlog=3) as batch:
for plan in batch():
try:
plan.result()
except DownloadError:
# An error occurred.
else:
# Use the downloaded file \o/
saturn:
Using the GroupDownloader?
As an importer, I can perform concurrent downloading using a synchronous pattern.
jupiter:
Using the Batch. See other examples.
saturn:
Using either the GroupDownloader or asyncio loop directly. See other examples.
As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads.
jupiter:
The Download.context is designed to support this. The shared context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download
objects have the same context.
saturn:
Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional.
Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders.
As an importer I can customize how downloading is performed. For example, to support mirror lists
jupiter:
All download objects can be customized in one of two ways. First, by delegation using events. And, second by subclassing.
Delegation example.
class MirrorDelegate:
# Any download can delegate mirror list resolution
# and hunting to this object.
def __init__(self):
self.mirrors = iter([])
def attach(self, download):
download.register(Event.PREPARED, self.on_prepare)
download.register(Event.ERROR, self.on_error)
def on_prepare(self, event):
# Resolve the mirror list URL
# May already be stored in the context or need to be downloaded and parsed.
with event.download.context as context:
try:
mirrors = context.mirrors
except AttributeError:
download = event.download.clone()
download.writer = BufferWriter()
download()
_list = download.writer.read()
mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
context.mirrors = mirrors
# Align retries with # of mirrors.
event.download.retries = len(mirrors)
self.mirrors = iter(mirrors)
# Start
event.download.url = next(self.mirrors)
def on_error(self, event):
try:
event.download.url = next(self.mirrors)
except StopIteration:
# no more mirrors
pass
else:
event.repaired = True
# importer
def get_download(...):
download = Factory.build(...)
delegate = MirrorDelegate()
delegate.attach(download)
Subclass example.
class MirrorDownload(HttpDownload):
# Support HTTP/HTTPS mirror list downloading.
def _prepare(self):
super()._prepare()
# Resolve the mirror list URL
# May already be stored in the context or need to be downloaded and parsed.
with self.context as context:
try:
mirrors = context.mirrors
except AttributeError:
download = self.clone()
download.writer = BufferWriter()
download()
_list = download.writer.read()
mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
context.mirrors = mirrors
# Align retries with # of mirrors.
self.retries = len(mirrors)
self.mirrors = iter(mirrors)
# Start
self.url = next(self.mirrors)
def _on_error(self, event):
super()._on_error(event)
try:
self.url = next(self.mirrors)
except StopIteration:
# no more mirrors
return False
else:
return True
# importer
def get_download(...):
# Factory needs to support custom class.
saturn:
As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections.
jupiter:
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
saturn:
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
As an importer, I can terminate concurrent downlading at any point and not leak resources.
jupiter:
saturn:
As an importer, I can download using any protocol. Starting with HTTP/HTTPS and FTP.
jupiter:
saturn:
Streamer¶
As the streamer, I need to download files related to published artifacts and metadata but delegate the implementation (protocol, settings, credentials) to the importer. The implementation must be a black-box.
jupiter:
saturn:
As the streamer, I can download using any protocol supported by the importer.
jupiter:
saturn:
As the streamer, I want to validate downloaded files.
jupiter:
saturn:
As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things.
jupiter:
saturn:
As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer.
jupiter:
saturn:
As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk.
jupiter:
saturn:
As the streamer, I need to forward HTTP headers from the download response to the twisted response.
jupiter:
saturn:
As the streamer, I can download using (the same) custom logic as the importer such as supporting mirror lists
jupiter:
saturn:
Updated by jortel@redhat.com about 7 years ago · 24 revisions