Project

Profile

Help

Downloading » History » Revision 30

Revision 29 (jortel@redhat.com, 09/06/2017 06:20 PM) → Revision 30/32 (jortel@redhat.com, 09/06/2017 06:44 PM)

# Downloading 

 In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them **Jupiter** and **Saturn**. The *Jupiter* solution is based on *concurrent.futures* and the Saturn solution is based on *asyncio*. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The *Jupiter* solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The *Saturn* solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing. 

 The three actors for our use cases is the *Importer*, *Streamer* and Plugin Writer. The *ChangeSet* shares a subset of the Streamer requirements but not included in this discussion. 

 ## Design Goals & Constraints 

 The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define <span class="underline">how</span> the requirements are met. 

 **juniper**: 

   - constraints: 

 >     - object oriented 
 >     - support semantic versioning 

   - goals 

 >     - encapsulate underlying technologies 
 >     - consistent interface across downloads. standard arguments, return values and raised exceptions. 
 >     - delegation pattern for common customization: 
 > 
 >>     - handling of downloaded bits to *Writers* 
 >>     - validation delegated to *Validations* 
 >>     - optional digest and size calculation delegated to *DownloadMonitor* 
 >>     - error handling delegated to *Event* handlers. 
 > 
 >     - external participation of download process through defined event registration and callback. 
 >     - delegate concurrency to standard lib (*concurrent.futures*). 
 >     - delegate protocol implementation to client libs. 

 **saturn**: 

   - constraints: 

 >     - object oriented 
 >     - support semantic versioning 

   - goals 

 >     - direct exposure of client libs. 
 >     - minimum encapsulation of underlying technologies. 
 >     - minimum \# of first class concepts (classes) and abstractions. 
 >     - minimum \# lines of code to maintain. 
 >     - delegate concurrency to standard lib (*asyncio*). 
 >     - delegate protocol implementation to client libs. 

 ## Use Cases 

 ### Importer 

 ----- 

 #### As an importer, I need to download single files. 

 **jupiter**: 

 ~~~python 
 download = HttpDownload( 
     url=url, 
     writer=FileWriter(path), 
     timeout=Timeout(connect=10, read=15), 
     user=User(name='elmer', password='...'), 
     ssl=SSL(ca_certificate='path-to-certificate', 
             client_certificate='path-to-certificate', 
             client_key='path-to-key', 
             validation=True), 
     proxy_url='http://user:password@gateway.org') 

 try: 
     download() 
 except DownloadError: 
     # An error occurred. 
 else: 
    # Go read the downloaded file \o/ 
 ~~~ 

 **saturn**: 

 ~~~python 
 ssl_context = aiohttpSSLContext() 
 ssl_context.load_cert_chain('path-to-CA_certificate') 
 ssl_context.load_cert_chain('path-to-CLIENT_certificate') 
 ssl_context.load_cert_chain('path-to-CLIENT_key') 

 connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context) 

 session = aiohttp.ClientSession( 
     connector=connector, 
     read_timeout=15, 
     auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')) 

 downloader_obj = HttpDownloader( 
     session, 
     url, 
     proxy='http://gateway.org', 
     proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8') 

 downloader_coroutine = downloader_obj.run() 
 loop = asyncio._get_running_loop() 
 done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine])) 
 for task in done: 
     try: 
         result = task.result()    # This is a DownloadResult 
     except aiohttp.ClientError: 
         # An error occurred. 
 ~~~ 

 question: How can the connect timeout be set in aiohttp? 

 ----- 

 #### As an importer, I can leverage all settings supported by underlying protocol specific client lib. 

 **jupiter**: 

 Commonly used settings supported by abstraction. Additional settings could be supported by subclassing. 

 ~~~python 

 class SpecialDownload(HttpDownload): 

     def _settings(self): 
         settings = super()._settings() 
         settings['special'] = <special value> 
         return settings 
 ~~~ 

 **saturn**: 

 The underlying client lib arguments directly exposed. 

 ----- 

 #### As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download. 

 **jupiter**: 

 Using the optional *DownloadMonitor* to collect statistics such as size and calculate digests. 

 ~~~python 

 download = HttpDownload(..) 
 monitor = DownloadMonitor(download) 
 ...    # perform download. 
 artifact = Artifact(**monitor.facts()) 
 artifact.save() 
 ~~~ 

 **saturn**: 

 The *size* and all *digests* always calculated. 

 ~~~python 

 downloader_obj = HttpDownloader(...) 
 ...    # perform download. 
 result = task.result() 
 artifact = Artifact(**result.artifact_attributes) 
 artifact.save() 
 ~~~ 

 ----- 

 #### As an importer, I need to download files concurrently. 

 **jupiter**: 

 Using the *Batch* to run the downloads concurrently. Only 3 downloads in memory at once. 

 ~~~python 

 downloads = (HttpDownload(...) for _ in range(10)) 

 with Batch(downloads, backlog=3) as batch: 
     for plan in batch(): 
         try: 
             plan.result() 
         except DownloadError: 
             # An error occurred. 
         else: 
             # Use the downloaded file \o/ 
 ~~~ 

 **saturn**: 

 Using the asyncio run loop. This example does not restrict the number of downloads in memory at once. 

 ~~~python 

 downloaders = (HttpDownloader...) for _ in range(10)) 

 loop = asyncio._get_running_loop() 
 done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders])) 
 for task in done: 
     try: 
         result = task.result()    # This is a DownloadResult 
     except aiohttp.ClientError: 
         # An error occurred. 
 ~~~ 

 ----- 

 #### As an importer, I want to validate downloaded files. 

 **jupiter**: 

 Supported by adding provided or custom validations to the download. A validation error raises *ValidationError* which *IsA* *DownloadError*. 

 ~~~python 

 download = HttpDownload(...) 
 download.append(DigestValidation('sha256', '0x1234')) 

 try: 
     download() 
 except DownloadError: 
     # An error occurred. 
 ~~~ 

 **saturn**: 

 Supported by passing the *expected_digests* dictionary and catching *DigestValidationError*. 

 ~~~python 

 downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'}) 

 downloader_coroutine = downloader_obj.run() 
 loop = asyncio._get_running_loop() 
 done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine])) 
 for task in done: 
     try: 
         result = task.result()    # This is a DownloadResult 
     except (aiohttp.ClientError, DigestValidationError): 
         # An error occurred. 
 ~~~ 

 ----- 

 #### As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading. 

 **jupiter**: 

 Using the *Batch* to run the downloads concurrently. The input to the batch can be a *generator* and the number of downloads in   
 memory is limited by the *backlog* argument. 

 ~~~python 

 downloads = (HttpDownload(...) for _ in range(10)) 

 with Batch(downloads, backlog=3) as batch: 
     for plan in batch(): 
         try: 
             plan.result() 
         except DownloadError: 
             # An error occurred. 
         else: 
             # Use the downloaded file \o/ 
 ~~~ 

 **saturn**: 

 @bmbouters: please describe and provide examples. 

 ~~~python 
 ~~~ 

 ----- 

 #### As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory. 

 **jupiter**: 

 Using the *Batch* to run the downloads concurrently and specifying the *backlog* to limit the number of downloads in memory. See other examples. 

 The Download.attachment provides linkage to objects like Artifacts that are related to the download. 

 ~~~python 

 download = HttpDownload(...) 
 download.attachment = Artifact(..) 
 ~~~ 

 **saturn**: 

 @bmbouters: please describe and provide examples. 

 ~~~python 
 ~~~ 

 ----- 

 #### As an importer, I can perform concurrent downloading using a synchronous pattern. 

 **jupiter**: 

 Using the *Batch*. See other examples. 

 **saturn**: 

 Using either the *GroupDownloader* or asyncio loop directly. See other examples. 

 ----- 

 #### As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads. 

 **jupiter**: 

 The Download.context is designed to support this. The *shared* context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download   
 objects have the same context. 

 **saturn**: 

 Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional. 

 Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders. 

 ----- 

 #### As an importer I can customize how downloading is performed. For example, to support mirror lists 

 **jupiter**: 

 All download objects can be customized in one of two ways. First, by delegation using *events*. And, second by subclassing. 

 Delegation example. 

 ~~~python 

 class MirrorDelegate: 
     # Any download can delegate mirror list resolution 
     # and hunting to this object. 

     def __init__(self): 
         self.mirrors = iter([]) 

     def attach(self, download): 
         download.register(Event.PREPARED, self.on_prepare) 
         download.register(Event.ERROR, self.on_error) 

     def on_prepare(self, event): 
         # Resolve the mirror list URL 
         # May already be stored in the context or need to be downloaded and parsed. 
         with event.download.context as context: 
             try: 
                 mirrors = context.mirrors 
             except AttributeError: 
                 download = event.download.clone() 
                 download.writer = BufferWriter() 
                 download() 
                 _list = download.writer.content() 
                 mirrors = [u.strip() for u in _list.split('\n') if u.strip()] 
                 context.mirrors = mirrors 
         # Align retries with # of mirrors. 
         event.download.retries = len(mirrors) 
         self.mirrors = iter(mirrors) 
         # Start 
         event.download.url = next(self.mirrors) 

     def on_error(self, event): 
         try: 
             event.download.url = next(self.mirrors) 
         except StopIteration: 
             # no more mirrors 
             pass 
         else: 
             event.repaired = True 

 # importer 
 def get_download(...): 
     download = Factory.build(...) 
     delegate = MirrorDelegate() 
     delegate.attach(download) 
 ~~~ 

 Subclass example. 

 ~~~python 

 class MirrorDownload(HttpDownload): 
     # Support HTTP/HTTPS mirror list downloading. 

     def _prepare(self): 
         super()._prepare() 
         # Resolve the mirror list URL 
         # May already be stored in the context or need to be downloaded and parsed. 
         with self.context as context: 
             try: 
                 mirrors = context.mirrors 
             except AttributeError: 
                 download = self.clone() 
                 download.writer = BufferWriter() 
                 download() 
                 _list = download.writer.content() 
                 mirrors = [u.strip() for u in _list.split('\n') if u.strip()] 
                 context.mirrors = mirrors 
         # Align retries with # of mirrors. 
         self.retries = len(mirrors) 
         self.mirrors = iter(mirrors) 
         # Start 
         self.url = next(self.mirrors) 

     def _on_error(self, event): 
         super()._on_error(event) 
         try: 
             self.url = next(self.mirrors) 
         except StopIteration: 
             # no more mirrors 
             return False 
         else: 
             return True 

 # importer 
 def get_download(...): 
     # Factory needs to support custom class. 
 ~~~ 

 **saturn**: 

 ~~~python 
 ~~~ 

 ----- 

 #### As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections. 

 **jupiter**: 

 This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases. 

 **saturn**: 

 This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases. 

 ----- 

 #### As an importer, I can terminate concurrent downlading at any point and not leak resources. 

 **jupiter**: 

 The loop using the iterator returned by *Batch* can be safely exited at any point and all resources are then free to be garbage collected. 

 **saturn**: 

 The loop using the asyncio loop can be safely exited at any point and all resources are then free to be garbage collected. 

 ----- 

 #### As an importer, I can download using any protocol. Starting with HTTP/HTTPS and eventually FTP. 

 **jupiter**: 

 Classes extending *Download* may implement any protocol. HTTP/HTTPS is supported by *HttpDownload*. See other use case examples. 

 **saturn**: 

 HTTP/HTTPS is supported by *HttpDownloader*. See other use case examples. 

 ----- 

 ### Streamer 

 ----- 

 #### As the streamer, I need to download files related to published artifacts and metadata but delegate *the implementation* (protocol, settings, credentials) to the importer. The implementation must be a black-box. 

 **jupiter**: 

 The *Download* is a callable. 

 ~~~python 

 download = importer.get_downloader(...) 
 download() 
 ~~~ 

 **saturn**: 

 @bmbouters: please describe and provide examples. 

 ~~~python 

 downloader = importer.get_downloader(...) 
 self.not_done.append(downloader.run()) 
 ~~~ 

  
 \--- 

 ----- 

 #### As the streamer, I want to validate downloaded files. 

 **jupiter**: 

 The *Download* may be configured by the importer with a list of *Validation* objects. Validation is performed on the downloaded bit stream. 

 ~~~python 

 download = importer.get_downloader(...) 
 download() 
 ~~~ 

 **saturn**: 

 The *HttpDownloader* may be configured by the importer with expected size and expected digests. Validation is performed on the downloaded bit stream. 

 ~~~python 

 downloader = importer.get_downloader(...) 
 self.not_done.append(downloader.run()) 
 ~~~ 

 ----- 

 #### As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things. 

 **jupiter**: 

 Each download may be configured with a shared context. The download objects collaborate to share resources using the context. The streamer updates each Download provided by the importer to use the same (shared) context. 

 ~~~python 

 download = importer.get_downloader(...) 
 download.context = self.context    # This is a Context. 
 download() 
 ~~~ 

 **saturn**: 

 Each downloader has a class attribute used to globally share resources. 

 ~~~python 

 downloader = importer.get_downloader(...) 
 self.not_done.append(downloader.run()) 
 ~~~ 

 ----- 

 #### As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer. 

 **jupiter**: 

 The downloader object provided by the importer will handle the mirror list. 

 **saturn**: 

 ~~~python 

 downloader = importer.get_downloader(...) 
 self.not_done.append(downloader.run()) 
 ~~~ 

 ----- 

 #### As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk. 

 **jupiter**: 

 The Download.writer can be reassigned to the base *Writer*. 

 ~~~python 

 class TwistedWriter(Writer): 

     def __init__(self, request): 
         self.request = request 

     def append(self, buffer): 
         reactor.callFromThread(self.request.write, buffer) 

     def close(self): 
         reactor.callFromThread(self.request.finish) 


 download = importer.get_downloader(...) 
 download.writer = TwistedWriter(request) 
 download() 
 ~~~ 

 **saturn**: 

 @bmbouters: please describe and add example. 

 ~~~python 
 ~~~ 

 ----- 

 #### As the streamer, I need to forward HTTP headers from the download response to the twisted response. 

 **jupiter**: 

 The headers can be forwarded using a delegate. ~~~python 
 ~~~ 

 **saturn**: 

 ~~~python 
 ~~~ 

 class HeaderDelegate: 

     def __init__(self, request): 
         self.request = request 

     def attach(download): 
         download.register(Event.REPLIED, self.on_reply) 

     def on_reply(event): 
         for header, value in event.download.reply.headers: 
             # do filtering here 
             self.request.setHeader(header, value) ----- 

 #### As the streamer, I can download = importer.get_downloader(...) using (the same) custom logic as the importer such as supporting mirror lists 

 **jupiter**: 

 ~~~python 
 delegate = HeaderDelegate(request) 
 delegate.attach(download) 
 download() 
 ~~~ 

 **saturn**: 

 @bmbouters: please describe and add example. 

 ~~~python 
 ~~~ 

 -----