Project

Profile

Help

Downloading

In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them Jupiter and Saturn. The Jupiter solution is based on concurrent.futures and the Saturn solution is based on asyncio. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The Jupiter solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The Saturn solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing.

The three actors for our use cases is the Importer, Streamer and Plugin Writer. The ChangeSet shares a subset of the Streamer requirements but not included in this discussion.

Design Goals & Constraints

The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define how the requirements are met.

juniper:

  • constraints:
  • object oriented
  • support semantic versioning
  • goals
  • encapsulate underlying technologies
  • consistent interface across downloads. standard arguments, return values and raised exceptions.
  • delegation pattern for common customization:
  • handling of downloaded bits to Writers
  • validation delegated to Validations
  • optional digest and size calculation delegated to DownloadMonitor
  • error handling delegated to Event handlers.
  • external participation of download process through defined event registration and callback.
  • delegate concurrency to standard lib (concurrent.futures).
  • delegate protocol implementation to client libs.

saturn:

  • constraints:
  • object oriented
  • support semantic versioning
  • goals
  • direct exposure of client libs.
  • minimum encapsulation of underlying technologies.
  • minimum # of first class concepts (classes) and abstractions.
  • minimum # lines of code to maintain.
  • delegate concurrency to standard lib (asyncio).
  • delegate protocol implementation to client libs.

Use Cases

Importer


As an importer, I need to download single files.

jupiter:

download = HttpDownload(
    url=url,
    writer=FileWriter(path),
    timeout=Timeout(connect=10, read=15),
    user=User(name='elmer', password='...'),
    ssl=SSL(ca_certificate='path-to-certificate',
            client_certificate='path-to-certificate',
            client_key='path-to-key',
            validation=True),
    proxy_url='http://user:password@gateway.org')

try:
    download()
except DownloadError:
    # An error occurred.
else:
   # Go read the downloaded file \o/

saturn:

ssl_context = aiohttpSSLContext()
ssl_context.load_cert_chain('path-to-CA_certificate')
ssl_context.load_cert_chain('path-to-CLIENT_certificate')
ssl_context.load_cert_chain('path-to-CLIENT_key')

connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context)

session = aiohttp.ClientSession(
    connector=connector,
    read_timeout=15,
    auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8'))

downloader_obj = HttpDownloader(
    session,
    url,
    proxy='http://gateway.org',
    proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')

downloader_coroutine = downloader_obj.run()
loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
for task in done:
    try:
        result = task.result()  # This is a DownloadResult
    except aiohttp.ClientError:
        # An error occurred.

question: How can the connect timeout be set in aiohttp?


As an importer, I can leverage all settings supported by underlying protocol specific client lib.

jupiter:

Commonly used settings supported by abstraction. Additional settings could be supported by subclassing.


class SpecialDownload(HttpDownload):

    def _settings(self):
        settings = super()._settings()
        settings['special'] = <special value>
        return settings

saturn:

The underlying client lib arguments directly exposed.


As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download.

jupiter:

Using the optional DownloadMonitor to collect statistics such as size and calculate digests.


download = HttpDownload(..)
monitor = DownloadMonitor(download)
...  # perform download.
artifact = Artifact(**monitor.facts())
artifact.save()

saturn:

The size and all digests always calculated.


downloader_obj = HttpDownloader(...)
...  # perform download.
result = task.result()
artifact = Artifact(**result.artifact_attributes)
artifact.save()

As an importer, I need to download files concurrently.

jupiter:

Using the Batch to run the downloads concurrently. Only 3 downloads in memory at once.


downloads = (HttpDownload(...) for _ in range(10))

with Batch(downloads, backlog=3) as batch:
    for plan in batch():
        try:
            plan.result()
        except DownloadError:
            # An error occurred.
        else:
            # Use the downloaded file \o/

saturn:

Using the asyncio run loop. This example does not restrict the number of downloads in memory at once.


downloaders = (HttpDownloader...) for _ in range(10))

loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders]))
for task in done:
    try:
        result = task.result()  # This is a DownloadResult
    except aiohttp.ClientError:
        # An error occurred.

As an importer, I want to validate downloaded files.

jupiter:

Supported by adding provided or custom validations to the download. A validation error raises ValidationError which IsA DownloadError.


download = HttpDownload(...)
download.validation.append(DigestValidation('sha256', '0x1234'))

try:
    download()
except DownloadError:
    # An error occurred.

saturn:

Supported by passing the expected_digests dictionary and catching DigestValidationError.


downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'})

downloader_coroutine = downloader_obj.run()
loop = asyncio._get_running_loop()
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
for task in done:
    try:
        result = task.result()  # This is a DownloadResult
    except (aiohttp.ClientError, DigestValidationError):
        # An error occurred.

As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading.

jupiter:

Using the Batch to run the downloads concurrently. The input to the batch can be a generator and the number of downloads in
memory is limited by the backlog argument.


downloads = (HttpDownload(...) for _ in range(10))

with Batch(downloads, backlog=3) as batch:
    for plan in batch():
        try:
            plan.result()
        except DownloadError:
            # An error occurred.
        else:
            # Use the downloaded file \o/

saturn:

@bmbouters: please describe and provide examples.


As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory.

jupiter:

Using the Batch to run the downloads concurrently and specifying the backlog to limit the number of downloads in memory. See other examples.

The Download.attachment provides linkage to objects like Artifacts that are related to the download.


download = HttpDownload(...)
download.attachment = Artifact(..)

saturn:

@bmbouters: please describe and provide examples.


As an importer, I can perform concurrent downloading using a synchronous pattern.

jupiter:

Using the Batch. See other examples.

saturn:

Using the asyncio loop directly. See other examples.


As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads.

jupiter:

The Download.context is designed to support this. The shared context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download
objects have the same context.

saturn:

Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional.

Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders.


As an importer I can customize how downloading is performed. For example, to support mirror lists

jupiter:

All download objects can be customized in one of two ways. First, by delegation using events. And, second by subclassing.

Delegation example.


class MirrorDelegate:
    # Any download can delegate mirror list resolution
    # and hunting to this object.

    def __init__(self):
        self.mirrors = iter([])

    def attach(self, download):
        download.register(Event.PREPARED, self.on_prepare)
        download.register(Event.ERROR, self.on_error)

    def on_prepare(self, event):
        # Resolve the mirror list URL
        # May already be stored in the context or need to be downloaded and parsed.
        with event.download.context as context:
            try:
                mirrors = context.mirrors
            except AttributeError:
                download = event.download.clone()
                download.writer = BufferWriter()
                download()
                _list = download.writer.content()
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
                context.mirrors = mirrors
        # Align retries with # of mirrors.
        event.download.retries = len(mirrors)
        self.mirrors = iter(mirrors)
        # Start
        event.download.url = next(self.mirrors)

    def on_error(self, event):
        try:
            event.download.url = next(self.mirrors)
        except StopIteration:
            # no more mirrors
            pass
        else:
            event.repaired = True

# importer
def get_download(...):
    download = Factory.build(...)
    delegate = MirrorDelegate()
    delegate.attach(download)

Subclass example.


class MirrorDownload(HttpDownload):
    # Support HTTP/HTTPS mirror list downloading.

    def _prepare(self):
        super()._prepare()
        # Resolve the mirror list URL
        # May already be stored in the context or need to be downloaded and parsed.
        with self.context as context:
            try:
                mirrors = context.mirrors
            except AttributeError:
                download = self.clone()
                download.writer = BufferWriter()
                download()
                _list = download.writer.content()
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
                context.mirrors = mirrors
        # Align retries with # of mirrors.
        self.retries = len(mirrors)
        self.mirrors = iter(mirrors)
        # Start
        self.url = next(self.mirrors)

    def _on_error(self, event):
        super()._on_error(event)
        try:
            self.url = next(self.mirrors)
        except StopIteration:
            # no more mirrors
            return False
        else:
            return True

# importer
def get_download(...):
    # Factory needs to support custom class.

saturn:


As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections.

jupiter:

This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.

saturn:

This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.


As an importer, I can terminate concurrent downlading at any point and not leak resources.

jupiter:

The loop using the iterator returned by Batch can be safely exited at any point and all resources are then free to be garbage collected.

saturn:

The loop using the asyncio loop can be safely exited at any point and all resources are then free to be garbage collected.


As an importer, I can download using any protocol. Starting with HTTP/HTTPS and eventually FTP.

jupiter:

Classes extending Download may implement any protocol. HTTP/HTTPS is supported by HttpDownload. See other use case examples.

saturn:

HTTP/HTTPS is supported by HttpDownloader. See other use case examples.


Streamer


As the streamer, I need to download files related to published artifacts and metadata but delegate the implementation (protocol, settings, credentials) to the importer. The implementation must be a black-box.

jupiter:

The Download is a callable.


download = importer.get_downloader(...)
download()

saturn:

@bmbouters: please describe and provide examples.


downloader = importer.get_downloader(...)
self.not_done.append(downloader.run())

As the streamer, I want to validate downloaded files.

jupiter:

The Download may be configured by the importer with a list of Validation objects. Validation is performed on the downloaded bit stream.


download = importer.get_downloader(...)
download()

saturn:

The HttpDownloader may be configured by the importer with expected size and expected digests. Validation is performed on the downloaded bit stream.


downloader = importer.get_downloader(...)
self.not_done.append(downloader.run())

As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things.

jupiter:

Each download may be configured with a shared context. The download objects collaborate to share resources using the context. The streamer updates each Download provided by the importer to use the same (shared) context.


download = importer.get_downloader(...)
download.context = self.context  # This is a Context.
download()

saturn:

Each downloader has a class attribute used to globally share resources.


downloader = importer.get_downloader(...)
self.not_done.append(downloader.run())

As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer.

jupiter:

The downloader object provided by the importer will handle the mirror list.

saturn:


downloader = importer.get_downloader(...)
self.not_done.append(downloader.run())

As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk.

jupiter:

The Download.writer can be reassigned to the base Writer.


class TwistedWriter(Writer):

    def __init__(self, request):
        self.request = request

    def append(self, buffer):
        reactor.callFromThread(self.request.write, buffer)

    def close(self):
        reactor.callFromThread(self.request.finish)


download = importer.get_downloader(...)
download.writer = TwistedWriter(request)
download()

saturn:

@bmbouters: please describe and add example.


As the streamer, I need to forward HTTP headers from the download response to the twisted response.

jupiter:

The headers can be forwarded using a delegate.


class HeaderDelegate:

    def __init__(self, request):
        self.request = request

    def attach(download):
        download.register(Event.REPLIED, self.on_reply)

    def on_reply(event):
        for header, value in event.download.reply.headers:
            # do filtering here
            self.request.setHeader(header, value)

download = importer.get_downloader(...)
delegate = HeaderDelegate(request)
delegate.attach(download)
download()

saturn:

@bmbouters: please describe and add example.