Project

Profile

Help

Downloading » History » Sprint/Milestone 28

jortel@redhat.com, 09/06/2017 06:16 PM

1 1 jortel@redhat.com
# Downloading
2
3 24 jortel@redhat.com
In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them **Jupiter** and **Saturn**. The *Jupiter* solution is based on *concurrent.futures* and the Saturn solution is based on *asyncio*. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The *Jupiter* solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The *Saturn* solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing.
4 3 jortel@redhat.com
5 5 jortel@redhat.com
The three actors for our use cases is the *Importer*, *Streamer* and Plugin Writer. The *ChangeSet* shares a subset of the Streamer requirements but not included in this discussion.
6 3 jortel@redhat.com
7 21 jortel@redhat.com
## Design Goals & Constraints
8
9 22 jortel@redhat.com
The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define <span class="underline">how</span> the requirements are met.
10
11 21 jortel@redhat.com
**juniper**:
12
13
  - constraints:
14
15
>   - object oriented
16
>   - support semantic versioning
17
18
  - goals
19
20
>   - encapsulate underlying technologies
21
>   - consistent interface across downloads. standard arguments, return values and raised exceptions.
22
>   - delegation pattern for common customization:
23
>
24
>>   - handling of downloaded bits to *Writers*
25
>>   - validation delegated to *Validations*
26
>>   - optional digest and size calculation delegated to *DownloadMonitor*
27
>>   - error handling delegated to *Event* handlers.
28
>
29
>   - external participation of download process through defined event registration and callback.
30
>   - delegate concurrency to standard lib (*concurrent.futures*).
31
>   - delegate protocol implementation to client libs.
32
33
**saturn**:
34
35
  - constraints:
36
37
>   - object oriented
38
>   - support semantic versioning
39
40
  - goals
41
42
>   - direct exposure of client libs.
43
>   - minimum encapsulation of underlying technologies.
44
>   - minimum \# of first class concepts (classes) and abstractions.
45
>   - minimum \# lines of code to maintain.
46
>   - delegate concurrency to standard lib (*asyncio*).
47
>   - delegate protocol implementation to client libs.
48
49 1 jortel@redhat.com
## Use Cases
50
51 2 jortel@redhat.com
### Importer
52 1 jortel@redhat.com
53 5 jortel@redhat.com
As an importer, I need to download single files.
54
55 9 jortel@redhat.com
**jupiter**:
56 5 jortel@redhat.com
57 15 jortel@redhat.com
~~~python
58 6 jortel@redhat.com
download = HttpDownload(
59
    url=url,
60
    writer=FileWriter(path),
61
    timeout=Timeout(connect=10, read=15),
62
    user=User(name='elmer', password='...'),
63
    ssl=SSL(ca_certificate='path-to-certificate',
64
            client_certificate='path-to-certificate',
65
            client_key='path-to-key',
66
            validation=True),
67
    proxy_url='http://user:password@gateway.org')
68 5 jortel@redhat.com
69
try:
70
    download()
71
except DownloadError:
72
    # An error occurred.
73
else:
74
   # Go read the downloaded file \o/
75
~~~
76
77 9 jortel@redhat.com
**saturn**:
78 1 jortel@redhat.com
79 15 jortel@redhat.com
~~~python
80 6 jortel@redhat.com
ssl_context = aiohttpSSLContext()
81
ssl_context.load_cert_chain('path-to-CA_certificate')
82
ssl_context.load_cert_chain('path-to-CLIENT_certificate')
83
ssl_context.load_cert_chain('path-to-CLIENT_key')
84
85
connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context)
86
87
session = aiohttp.ClientSession(
88
    connector=connector,
89
    read_timeout=15,
90
    auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8'))
91
92
downloader_obj = HttpDownloader(
93
    session,
94
    url,
95
    proxy='http://gateway.org',
96
    proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')
97
98 5 jortel@redhat.com
downloader_coroutine = downloader_obj.run()
99
loop = asyncio._get_running_loop()
100
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
101
for task in done:
102
    try:
103 1 jortel@redhat.com
        result = task.result()  # This is a DownloadResult
104
    except aiohttp.ClientError:
105
        # An error occurred.
106 5 jortel@redhat.com
~~~
107
108 6 jortel@redhat.com
question: How can the connect timeout be set in aiohttp?
109
110 1 jortel@redhat.com
-----
111
112 9 jortel@redhat.com
As an importer, I can leverage all settings supported by underlying protocol specific client lib.
113
114
**jupiter**:
115
116 1 jortel@redhat.com
Commonly used settings supported by abstraction. Additional settings could be supported by subclassing.
117 9 jortel@redhat.com
118 15 jortel@redhat.com
~~~python
119
120 9 jortel@redhat.com
class SpecialDownload(HttpDownload):
121
122
    def _settings(self):
123
        settings = super()._settings()
124
        settings['special'] = <special value>
125
        return settings
126
~~~
127
128
**saturn**:
129
130 10 jortel@redhat.com
The underlying client lib arguments directly exposed.
131 9 jortel@redhat.com
132
-----
133 1 jortel@redhat.com
134 10 jortel@redhat.com
As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download.
135
136 1 jortel@redhat.com
**jupiter**:
137
138 10 jortel@redhat.com
Using the optional *DownloadMonitor* to collect statistics such as size and calculate digests.
139
140 15 jortel@redhat.com
~~~python
141
142 10 jortel@redhat.com
download = HttpDownload(..)
143 14 jortel@redhat.com
monitor = DownloadMonitor(download)
144 10 jortel@redhat.com
...  # perform download.
145 14 jortel@redhat.com
artifact = Artifact(**monitor.facts())
146 10 jortel@redhat.com
artifact.save()
147
~~~
148 1 jortel@redhat.com
149
**saturn**:
150 10 jortel@redhat.com
151
The *size* and all *digests* always calculated.
152
153 15 jortel@redhat.com
~~~python
154
155 10 jortel@redhat.com
downloader_obj = HttpDownloader(...)
156
...  # perform download.
157 28 jortel@redhat.com
result = task.result()
158
artifact = Artifact(**result.artifact_attributes)
159 10 jortel@redhat.com
artifact.save()
160
~~~
161
162 11 jortel@redhat.com
-----
163
164 1 jortel@redhat.com
As an importer, I need to download files concurrently.
165
166 11 jortel@redhat.com
**jupiter**:
167
168
Using the *Batch* to run the downloads concurrently. Only 3 downloads in memory at once.
169
170 15 jortel@redhat.com
~~~python
171
172 11 jortel@redhat.com
downloads = (HttpDownload(...) for _ in range(10))
173
174
with Batch(downloads, backlog=3) as batch:
175
    for plan in batch():
176
        try:
177
            plan.result()
178
        except DownloadError:
179
            # An error occurred.
180
        else:
181 1 jortel@redhat.com
            # Use the downloaded file \o/
182
~~~
183 11 jortel@redhat.com
184
**saturn**:
185
186
Using the asyncio run loop. This example does not restrict the number of downloads in memory at once.
187 12 jortel@redhat.com
188 15 jortel@redhat.com
~~~python
189
190 16 jortel@redhat.com
downloaders = (HttpDownloader...) for _ in range(10))
191 11 jortel@redhat.com
192
loop = asyncio._get_running_loop()
193 16 jortel@redhat.com
done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders]))
194 11 jortel@redhat.com
for task in done:
195
    try:
196
        result = task.result()  # This is a DownloadResult
197
    except aiohttp.ClientError:
198
        # An error occurred.
199
~~~
200
201 1 jortel@redhat.com
-----
202
203 16 jortel@redhat.com
As an importer, I want to validate downloaded files.
204
205 1 jortel@redhat.com
**jupiter**:
206
207 28 jortel@redhat.com
Supported by adding provided or custom validations to the download. A validation error raises *ValidationError* which *IsA* *DownloadError*.
208 17 jortel@redhat.com
209 16 jortel@redhat.com
~~~python
210
211
download = HttpDownload(...)
212
download.append(DigestValidation('sha256', '0x1234'))
213
214
try:
215
    download()
216
except DownloadError:
217
    # An error occurred.
218
~~~
219
220
**saturn**:
221
222 17 jortel@redhat.com
Supported by passing the *expected_digests* dictionary and catching *DigestValidationError*.
223 16 jortel@redhat.com
224
~~~python
225
226
downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'})
227
228
downloader_coroutine = downloader_obj.run()
229
loop = asyncio._get_running_loop()
230
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
231
for task in done:
232
    try:
233
        result = task.result()  # This is a DownloadResult
234
    except (aiohttp.ClientError, DigestValidationError):
235
        # An error occurred.
236
~~~
237
238
-----
239
240 18 jortel@redhat.com
As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading.
241
242
**jupiter**:
243
244 27 jortel@redhat.com
Using the *Batch* to run the downloads concurrently. The input to the batch can be a *generator* and the number of downloads in  
245
memory is limited by the *backlog* argument.
246
247 18 jortel@redhat.com
~~~python
248 27 jortel@redhat.com
249
downloads = (HttpDownload(...) for _ in range(10))
250
251
with Batch(downloads, backlog=3) as batch:
252
    for plan in batch():
253
        try:
254
            plan.result()
255
        except DownloadError:
256
            # An error occurred.
257
        else:
258
            # Use the downloaded file \o/
259 18 jortel@redhat.com
~~~
260
261 19 jortel@redhat.com
**saturn**:
262
263 27 jortel@redhat.com
@bmbouters: please describe and provide examples.
264
265 19 jortel@redhat.com
~~~python
266
~~~
267
268
-----
269 1 jortel@redhat.com
270
As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory.
271 19 jortel@redhat.com
272
**jupiter**:
273 1 jortel@redhat.com
274 27 jortel@redhat.com
Using the *Batch* to run the downloads concurrently and specifying the *backlog* to limit the number of downloads in memory. See other examples.
275 1 jortel@redhat.com
276 27 jortel@redhat.com
The Download.attachment provides linkage to objects like Artifacts that are related to the download.
277
278 19 jortel@redhat.com
~~~python
279
280 27 jortel@redhat.com
download = HttpDownload(...)
281
download.attachment = Artifact(..)
282 18 jortel@redhat.com
~~~
283
284
**saturn**:
285
286 27 jortel@redhat.com
@bmbouters: please describe and provide examples.
287 19 jortel@redhat.com
288 18 jortel@redhat.com
~~~python
289
~~~
290
291
-----
292
293
As an importer, I can perform concurrent downloading using a synchronous pattern.
294
295 1 jortel@redhat.com
**jupiter**:
296 18 jortel@redhat.com
297 19 jortel@redhat.com
Using the *Batch*. See other examples.
298 18 jortel@redhat.com
299
**saturn**:
300
301 19 jortel@redhat.com
Using either the *GroupDownloader* or asyncio loop directly. See other examples.
302 18 jortel@redhat.com
303 1 jortel@redhat.com
-----
304
305 18 jortel@redhat.com
As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads.
306 1 jortel@redhat.com
307 18 jortel@redhat.com
**jupiter**:
308
309 20 jortel@redhat.com
The Download.context is designed to support this. The *shared* context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download  
310 19 jortel@redhat.com
objects have the same context.
311 18 jortel@redhat.com
312
**saturn**:
313
314 19 jortel@redhat.com
Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional.
315 1 jortel@redhat.com
316 20 jortel@redhat.com
Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders.
317
318 18 jortel@redhat.com
-----
319
320
As an importer I can customize how downloading is performed. For example, to support mirror lists
321
322
**jupiter**:
323 1 jortel@redhat.com
324 23 jortel@redhat.com
All download objects can be customized in one of two ways. First, by delegation using *events*. And, second by subclassing.
325 1 jortel@redhat.com
326 23 jortel@redhat.com
Delegation example.
327
328 1 jortel@redhat.com
~~~python
329 23 jortel@redhat.com
330
class MirrorDelegate:
331
    # Any download can delegate mirror list resolution
332
    # and hunting to this object.
333
334
    def __init__(self):
335
        self.mirrors = iter([])
336
337
    def attach(self, download):
338
        download.register(Event.PREPARED, self.on_prepare)
339
        download.register(Event.ERROR, self.on_error)
340
341
    def on_prepare(self, event):
342
        # Resolve the mirror list URL
343
        # May already be stored in the context or need to be downloaded and parsed.
344
        with event.download.context as context:
345
            try:
346
                mirrors = context.mirrors
347
            except AttributeError:
348
                download = event.download.clone()
349
                download.writer = BufferWriter()
350
                download()
351 25 jortel@redhat.com
                _list = download.writer.content()
352 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
353
                context.mirrors = mirrors
354
        # Align retries with # of mirrors.
355
        event.download.retries = len(mirrors)
356
        self.mirrors = iter(mirrors)
357
        # Start
358
        event.download.url = next(self.mirrors)
359
360
    def on_error(self, event):
361
        try:
362
            event.download.url = next(self.mirrors)
363
        except StopIteration:
364
            # no more mirrors
365
            pass
366
        else:
367
            event.repaired = True
368
369
# importer
370
def get_download(...):
371
    download = Factory.build(...)
372
    delegate = MirrorDelegate()
373
    delegate.attach(download)
374
~~~
375
376
Subclass example.
377
378
~~~python
379
380
class MirrorDownload(HttpDownload):
381
    # Support HTTP/HTTPS mirror list downloading.
382
383
    def _prepare(self):
384
        super()._prepare()
385
        # Resolve the mirror list URL
386
        # May already be stored in the context or need to be downloaded and parsed.
387
        with self.context as context:
388
            try:
389
                mirrors = context.mirrors
390
            except AttributeError:
391
                download = self.clone()
392
                download.writer = BufferWriter()
393
                download()
394 25 jortel@redhat.com
                _list = download.writer.content()
395 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
396
                context.mirrors = mirrors
397
        # Align retries with # of mirrors.
398
        self.retries = len(mirrors)
399
        self.mirrors = iter(mirrors)
400
        # Start
401
        self.url = next(self.mirrors)
402
403
    def _on_error(self, event):
404
        super()._on_error(event)
405
        try:
406
            self.url = next(self.mirrors)
407
        except StopIteration:
408
            # no more mirrors
409
            return False
410
        else:
411
            return True
412
413
# importer
414
def get_download(...):
415
    # Factory needs to support custom class.
416 18 jortel@redhat.com
~~~
417
418
**saturn**:
419
420
~~~python
421
~~~
422
423
-----
424
425
As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections.
426 1 jortel@redhat.com
427 18 jortel@redhat.com
**jupiter**:
428 1 jortel@redhat.com
429 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
430 18 jortel@redhat.com
431
**saturn**:
432
433 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
434 18 jortel@redhat.com
435
-----
436
437
As an importer, I can terminate concurrent downlading at any point and not leak resources.
438
439
**jupiter**:
440
441 26 jortel@redhat.com
The loop using the iterator returned by *Batch* can be safely exited at any point and all resources are then free to be garbage collected.
442 18 jortel@redhat.com
443 1 jortel@redhat.com
**saturn**:
444 18 jortel@redhat.com
445 26 jortel@redhat.com
The loop using the asyncio loop can be safely exited at any point and all resources are then free to be garbage collected.
446 18 jortel@redhat.com
447
-----
448 1 jortel@redhat.com
449 26 jortel@redhat.com
As an importer, I can download using any protocol. Starting with HTTP/HTTPS and eventually FTP.
450 1 jortel@redhat.com
451 18 jortel@redhat.com
**jupiter**:
452
453 26 jortel@redhat.com
Classes extending *Download* may implement any protocol. HTTP/HTTPS is supported by *HttpDownload*. See other use case examples.
454 18 jortel@redhat.com
455
**saturn**:
456
457 26 jortel@redhat.com
HTTP/HTTPS is supported by *HttpDownloader*. See other use case examples.
458 18 jortel@redhat.com
459
-----
460
461 1 jortel@redhat.com
### Streamer
462
463 18 jortel@redhat.com
As the streamer, I need to download files related to published artifacts and metadata but delegate *the implementation* (protocol, settings, credentials) to the importer. The implementation must be a black-box.
464
465 1 jortel@redhat.com
**jupiter**:
466 18 jortel@redhat.com
467 28 jortel@redhat.com
The *Download* is a callable.
468 18 jortel@redhat.com
469
~~~python
470 1 jortel@redhat.com
471 28 jortel@redhat.com
download = importer.get_downloader(...)
472
download()
473 1 jortel@redhat.com
~~~
474 18 jortel@redhat.com
475
**saturn**:
476 1 jortel@redhat.com
477 28 jortel@redhat.com
@bmbouters: please describe and provide examples.
478
479 1 jortel@redhat.com
~~~python
480 28 jortel@redhat.com
481
downloader = importer.get_downloader(...)
482
self.not_done.append(downloader.run())
483 18 jortel@redhat.com
~~~
484 16 jortel@redhat.com
485 28 jortel@redhat.com
  
486
\---
487 1 jortel@redhat.com
488 18 jortel@redhat.com
As the streamer, I want to validate downloaded files.
489 1 jortel@redhat.com
490
**jupiter**:
491
492 28 jortel@redhat.com
The *Download* may be configured by the importer with a list of *Validation* objects. Validation is performed on the downloaded bit stream.
493
494 1 jortel@redhat.com
~~~python
495 28 jortel@redhat.com
496
download = importer.get_downloader(...)
497
download()
498 18 jortel@redhat.com
~~~
499
500 1 jortel@redhat.com
**saturn**:
501
502 28 jortel@redhat.com
The *HttpDownloader* may be configured by the importer with expected size and expected digests. Validation is performed on the downloaded bit stream.
503
504 1 jortel@redhat.com
~~~python
505 28 jortel@redhat.com
506
downloader = importer.get_downloader(...)
507
self.not_done.append(downloader.run())
508 1 jortel@redhat.com
~~~
509
510
-----
511
512
As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things.
513
514
**jupiter**:
515
516 28 jortel@redhat.com
Each download may be configured with a shared context. The download objects collaborate to share resources using the context. The streamer updates each Download provided by the importer to use the same (shared) context.
517
518 18 jortel@redhat.com
~~~python
519 28 jortel@redhat.com
520
download = importer.get_downloader(...)
521
download.context = self.context  # This is a Context.
522
download()
523 1 jortel@redhat.com
~~~
524 18 jortel@redhat.com
525
**saturn**:
526
527 28 jortel@redhat.com
Each downloader has a class attribute used to globally share resources.
528
529 1 jortel@redhat.com
~~~python
530 28 jortel@redhat.com
531
downloader = importer.get_downloader(...)
532
self.not_done.append(downloader.run())
533 18 jortel@redhat.com
~~~
534
535
-----
536
537
As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer.
538
539
**jupiter**:
540
541 28 jortel@redhat.com
The downloader object provided by the importer will handle the mirror list.
542 18 jortel@redhat.com
543
**saturn**:
544
545
~~~python
546 28 jortel@redhat.com
547
downloader = importer.get_downloader(...)
548
self.not_done.append(downloader.run())
549 18 jortel@redhat.com
~~~
550
551
-----
552
553
As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk.
554
555
**jupiter**:
556
557
~~~python
558
~~~
559
560
**saturn**:
561
562
~~~python
563
~~~
564
565
-----
566
567
As the streamer, I need to forward HTTP headers from the download response to the twisted response.
568
569
**jupiter**:
570
571
~~~python
572
~~~
573
574
**saturn**:
575
576
~~~python
577
~~~
578
579
-----
580
581
As the streamer, I can download using (the same) custom logic as the importer such as supporting mirror lists
582
583
**jupiter**:
584
585
~~~python
586
~~~
587
588
**saturn**:
589
590
~~~python
591
~~~
592
593
-----