Project

Profile

Help

Downloading » History » Sprint/Milestone 27

jortel@redhat.com, 09/06/2017 05:49 PM

1 1 jortel@redhat.com
# Downloading
2
3 24 jortel@redhat.com
In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them **Jupiter** and **Saturn**. The *Jupiter* solution is based on *concurrent.futures* and the Saturn solution is based on *asyncio*. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The *Jupiter* solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The *Saturn* solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing.
4 3 jortel@redhat.com
5 5 jortel@redhat.com
The three actors for our use cases is the *Importer*, *Streamer* and Plugin Writer. The *ChangeSet* shares a subset of the Streamer requirements but not included in this discussion.
6 3 jortel@redhat.com
7 21 jortel@redhat.com
## Design Goals & Constraints
8
9 22 jortel@redhat.com
The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define <span class="underline">how</span> the requirements are met.
10
11 21 jortel@redhat.com
**juniper**:
12
13
  - constraints:
14
15
>   - object oriented
16
>   - support semantic versioning
17
18
  - goals
19
20
>   - encapsulate underlying technologies
21
>   - consistent interface across downloads. standard arguments, return values and raised exceptions.
22
>   - delegation pattern for common customization:
23
>
24
>>   - handling of downloaded bits to *Writers*
25
>>   - validation delegated to *Validations*
26
>>   - optional digest and size calculation delegated to *DownloadMonitor*
27
>>   - error handling delegated to *Event* handlers.
28
>
29
>   - external participation of download process through defined event registration and callback.
30
>   - delegate concurrency to standard lib (*concurrent.futures*).
31
>   - delegate protocol implementation to client libs.
32
33
**saturn**:
34
35
  - constraints:
36
37
>   - object oriented
38
>   - support semantic versioning
39
40
  - goals
41
42
>   - direct exposure of client libs.
43
>   - minimum encapsulation of underlying technologies.
44
>   - minimum \# of first class concepts (classes) and abstractions.
45
>   - minimum \# lines of code to maintain.
46
>   - delegate concurrency to standard lib (*asyncio*).
47
>   - delegate protocol implementation to client libs.
48
49 1 jortel@redhat.com
## Use Cases
50
51 2 jortel@redhat.com
### Importer
52 1 jortel@redhat.com
53 5 jortel@redhat.com
As an importer, I need to download single files.
54
55 9 jortel@redhat.com
**jupiter**:
56 5 jortel@redhat.com
57 15 jortel@redhat.com
~~~python
58 6 jortel@redhat.com
download = HttpDownload(
59
    url=url,
60
    writer=FileWriter(path),
61
    timeout=Timeout(connect=10, read=15),
62
    user=User(name='elmer', password='...'),
63
    ssl=SSL(ca_certificate='path-to-certificate',
64
            client_certificate='path-to-certificate',
65
            client_key='path-to-key',
66
            validation=True),
67
    proxy_url='http://user:password@gateway.org')
68 5 jortel@redhat.com
69
try:
70
    download()
71
except DownloadError:
72
    # An error occurred.
73
else:
74
   # Go read the downloaded file \o/
75
~~~
76
77 9 jortel@redhat.com
**saturn**:
78 1 jortel@redhat.com
79 15 jortel@redhat.com
~~~python
80 6 jortel@redhat.com
ssl_context = aiohttpSSLContext()
81
ssl_context.load_cert_chain('path-to-CA_certificate')
82
ssl_context.load_cert_chain('path-to-CLIENT_certificate')
83
ssl_context.load_cert_chain('path-to-CLIENT_key')
84
85
connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context)
86
87
session = aiohttp.ClientSession(
88
    connector=connector,
89
    read_timeout=15,
90
    auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8'))
91
92
downloader_obj = HttpDownloader(
93
    session,
94
    url,
95
    proxy='http://gateway.org',
96
    proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')
97
98 5 jortel@redhat.com
downloader_coroutine = downloader_obj.run()
99
loop = asyncio._get_running_loop()
100
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
101
for task in done:
102
    try:
103 1 jortel@redhat.com
        result = task.result()  # This is a DownloadResult
104
    except aiohttp.ClientError:
105
        # An error occurred.
106 5 jortel@redhat.com
~~~
107
108 6 jortel@redhat.com
question: How can the connect timeout be set in aiohttp?
109
110 1 jortel@redhat.com
-----
111
112 9 jortel@redhat.com
As an importer, I can leverage all settings supported by underlying protocol specific client lib.
113
114
**jupiter**:
115
116 1 jortel@redhat.com
Commonly used settings supported by abstraction. Additional settings could be supported by subclassing.
117 9 jortel@redhat.com
118 15 jortel@redhat.com
~~~python
119
120 9 jortel@redhat.com
class SpecialDownload(HttpDownload):
121
122
    def _settings(self):
123
        settings = super()._settings()
124
        settings['special'] = <special value>
125
        return settings
126
~~~
127
128
**saturn**:
129
130 10 jortel@redhat.com
The underlying client lib arguments directly exposed.
131 9 jortel@redhat.com
132
-----
133 1 jortel@redhat.com
134 10 jortel@redhat.com
As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download.
135
136 1 jortel@redhat.com
**jupiter**:
137
138 10 jortel@redhat.com
Using the optional *DownloadMonitor* to collect statistics such as size and calculate digests.
139
140 15 jortel@redhat.com
~~~python
141
142 10 jortel@redhat.com
download = HttpDownload(..)
143 14 jortel@redhat.com
monitor = DownloadMonitor(download)
144 10 jortel@redhat.com
...  # perform download.
145 14 jortel@redhat.com
artifact = Artifact(**monitor.facts())
146 10 jortel@redhat.com
artifact.save()
147
~~~
148 1 jortel@redhat.com
149
**saturn**:
150 10 jortel@redhat.com
151
The *size* and all *digests* always calculated.
152
153 15 jortel@redhat.com
~~~python
154
155 10 jortel@redhat.com
downloader_obj = HttpDownloader(...)
156
...  # perform download.
157
result = task.result(**result.artifact_attributes)
158
artifact = Artifact()
159
artifact.save()
160
~~~
161
162 11 jortel@redhat.com
-----
163
164 1 jortel@redhat.com
As an importer, I need to download files concurrently.
165
166 11 jortel@redhat.com
**jupiter**:
167
168
Using the *Batch* to run the downloads concurrently. Only 3 downloads in memory at once.
169
170 15 jortel@redhat.com
~~~python
171
172 11 jortel@redhat.com
downloads = (HttpDownload(...) for _ in range(10))
173
174
with Batch(downloads, backlog=3) as batch:
175
    for plan in batch():
176
        try:
177
            plan.result()
178
        except DownloadError:
179
            # An error occurred.
180
        else:
181 1 jortel@redhat.com
            # Use the downloaded file \o/
182
~~~
183 11 jortel@redhat.com
184
**saturn**:
185
186
Using the asyncio run loop. This example does not restrict the number of downloads in memory at once.
187 12 jortel@redhat.com
188 15 jortel@redhat.com
~~~python
189
190 16 jortel@redhat.com
downloaders = (HttpDownloader...) for _ in range(10))
191 11 jortel@redhat.com
192
loop = asyncio._get_running_loop()
193 16 jortel@redhat.com
done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders]))
194 11 jortel@redhat.com
for task in done:
195
    try:
196
        result = task.result()  # This is a DownloadResult
197
    except aiohttp.ClientError:
198
        # An error occurred.
199
~~~
200
201 1 jortel@redhat.com
-----
202
203 16 jortel@redhat.com
As an importer, I want to validate downloaded files.
204
205 1 jortel@redhat.com
**jupiter**:
206
207 17 jortel@redhat.com
Supported by adding provided or custom validations to the download. A validation error raises *ValidationError* which IsA *DownloadError*.
208
209 16 jortel@redhat.com
~~~python
210
211
download = HttpDownload(...)
212
download.append(DigestValidation('sha256', '0x1234'))
213
214
try:
215
    download()
216
except DownloadError:
217
    # An error occurred.
218
~~~
219
220
**saturn**:
221
222 17 jortel@redhat.com
Supported by passing the *expected_digests* dictionary and catching *DigestValidationError*.
223 16 jortel@redhat.com
224
~~~python
225
226
downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'})
227
228
downloader_coroutine = downloader_obj.run()
229
loop = asyncio._get_running_loop()
230
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
231
for task in done:
232
    try:
233
        result = task.result()  # This is a DownloadResult
234
    except (aiohttp.ClientError, DigestValidationError):
235
        # An error occurred.
236
~~~
237
238
-----
239
240 18 jortel@redhat.com
As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading.
241
242
**jupiter**:
243
244 27 jortel@redhat.com
Using the *Batch* to run the downloads concurrently. The input to the batch can be a *generator* and the number of downloads in  
245
memory is limited by the *backlog* argument.
246
247 18 jortel@redhat.com
~~~python
248 27 jortel@redhat.com
249
downloads = (HttpDownload(...) for _ in range(10))
250
251
with Batch(downloads, backlog=3) as batch:
252
    for plan in batch():
253
        try:
254
            plan.result()
255
        except DownloadError:
256
            # An error occurred.
257
        else:
258
            # Use the downloaded file \o/
259 18 jortel@redhat.com
~~~
260
261 19 jortel@redhat.com
**saturn**:
262
263 27 jortel@redhat.com
@bmbouters: please describe and provide examples.
264
265 19 jortel@redhat.com
~~~python
266
~~~
267
268
-----
269 1 jortel@redhat.com
270
As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory.
271 19 jortel@redhat.com
272
**jupiter**:
273 1 jortel@redhat.com
274 27 jortel@redhat.com
Using the *Batch* to run the downloads concurrently and specifying the *backlog* to limit the number of downloads in memory. See other examples.
275 1 jortel@redhat.com
276 27 jortel@redhat.com
The Download.attachment provides linkage to objects like Artifacts that are related to the download.
277
278 19 jortel@redhat.com
~~~python
279
280 27 jortel@redhat.com
download = HttpDownload(...)
281
download.attachment = Artifact(..)
282 18 jortel@redhat.com
~~~
283
284
**saturn**:
285
286 27 jortel@redhat.com
@bmbouters: please describe and provide examples.
287 19 jortel@redhat.com
288 18 jortel@redhat.com
~~~python
289
~~~
290
291
-----
292
293
As an importer, I can perform concurrent downloading using a synchronous pattern.
294
295 1 jortel@redhat.com
**jupiter**:
296 18 jortel@redhat.com
297 19 jortel@redhat.com
Using the *Batch*. See other examples.
298 18 jortel@redhat.com
299
**saturn**:
300
301 19 jortel@redhat.com
Using either the *GroupDownloader* or asyncio loop directly. See other examples.
302 18 jortel@redhat.com
303 1 jortel@redhat.com
-----
304
305 18 jortel@redhat.com
As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads.
306 1 jortel@redhat.com
307 18 jortel@redhat.com
**jupiter**:
308
309 20 jortel@redhat.com
The Download.context is designed to support this. The *shared* context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download  
310 19 jortel@redhat.com
objects have the same context.
311 18 jortel@redhat.com
312
**saturn**:
313
314 19 jortel@redhat.com
Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional.
315 1 jortel@redhat.com
316 20 jortel@redhat.com
Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders.
317
318 18 jortel@redhat.com
-----
319
320
As an importer I can customize how downloading is performed. For example, to support mirror lists
321
322
**jupiter**:
323 1 jortel@redhat.com
324 23 jortel@redhat.com
All download objects can be customized in one of two ways. First, by delegation using *events*. And, second by subclassing.
325 1 jortel@redhat.com
326 23 jortel@redhat.com
Delegation example.
327
328 1 jortel@redhat.com
~~~python
329 23 jortel@redhat.com
330
class MirrorDelegate:
331
    # Any download can delegate mirror list resolution
332
    # and hunting to this object.
333
334
    def __init__(self):
335
        self.mirrors = iter([])
336
337
    def attach(self, download):
338
        download.register(Event.PREPARED, self.on_prepare)
339
        download.register(Event.ERROR, self.on_error)
340
341
    def on_prepare(self, event):
342
        # Resolve the mirror list URL
343
        # May already be stored in the context or need to be downloaded and parsed.
344
        with event.download.context as context:
345
            try:
346
                mirrors = context.mirrors
347
            except AttributeError:
348
                download = event.download.clone()
349
                download.writer = BufferWriter()
350
                download()
351 25 jortel@redhat.com
                _list = download.writer.content()
352 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
353
                context.mirrors = mirrors
354
        # Align retries with # of mirrors.
355
        event.download.retries = len(mirrors)
356
        self.mirrors = iter(mirrors)
357
        # Start
358
        event.download.url = next(self.mirrors)
359
360
    def on_error(self, event):
361
        try:
362
            event.download.url = next(self.mirrors)
363
        except StopIteration:
364
            # no more mirrors
365
            pass
366
        else:
367
            event.repaired = True
368
369
# importer
370
def get_download(...):
371
    download = Factory.build(...)
372
    delegate = MirrorDelegate()
373
    delegate.attach(download)
374
~~~
375
376
Subclass example.
377
378
~~~python
379
380
class MirrorDownload(HttpDownload):
381
    # Support HTTP/HTTPS mirror list downloading.
382
383
    def _prepare(self):
384
        super()._prepare()
385
        # Resolve the mirror list URL
386
        # May already be stored in the context or need to be downloaded and parsed.
387
        with self.context as context:
388
            try:
389
                mirrors = context.mirrors
390
            except AttributeError:
391
                download = self.clone()
392
                download.writer = BufferWriter()
393
                download()
394 25 jortel@redhat.com
                _list = download.writer.content()
395 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
396
                context.mirrors = mirrors
397
        # Align retries with # of mirrors.
398
        self.retries = len(mirrors)
399
        self.mirrors = iter(mirrors)
400
        # Start
401
        self.url = next(self.mirrors)
402
403
    def _on_error(self, event):
404
        super()._on_error(event)
405
        try:
406
            self.url = next(self.mirrors)
407
        except StopIteration:
408
            # no more mirrors
409
            return False
410
        else:
411
            return True
412
413
# importer
414
def get_download(...):
415
    # Factory needs to support custom class.
416 18 jortel@redhat.com
~~~
417
418
**saturn**:
419
420
~~~python
421
~~~
422
423
-----
424
425
As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections.
426 1 jortel@redhat.com
427 18 jortel@redhat.com
**jupiter**:
428 1 jortel@redhat.com
429 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
430 18 jortel@redhat.com
431
**saturn**:
432
433 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
434 18 jortel@redhat.com
435
-----
436
437
As an importer, I can terminate concurrent downlading at any point and not leak resources.
438
439
**jupiter**:
440
441 26 jortel@redhat.com
The loop using the iterator returned by *Batch* can be safely exited at any point and all resources are then free to be garbage collected.
442 18 jortel@redhat.com
443 1 jortel@redhat.com
**saturn**:
444 18 jortel@redhat.com
445 26 jortel@redhat.com
The loop using the asyncio loop can be safely exited at any point and all resources are then free to be garbage collected.
446 18 jortel@redhat.com
447
-----
448 1 jortel@redhat.com
449 26 jortel@redhat.com
As an importer, I can download using any protocol. Starting with HTTP/HTTPS and eventually FTP.
450 1 jortel@redhat.com
451 18 jortel@redhat.com
**jupiter**:
452
453 26 jortel@redhat.com
Classes extending *Download* may implement any protocol. HTTP/HTTPS is supported by *HttpDownload*. See other use case examples.
454 18 jortel@redhat.com
455
**saturn**:
456
457 26 jortel@redhat.com
HTTP/HTTPS is supported by *HttpDownloader*. See other use case examples.
458 18 jortel@redhat.com
459
-----
460
461 1 jortel@redhat.com
### Streamer
462
463 18 jortel@redhat.com
As the streamer, I need to download files related to published artifacts and metadata but delegate *the implementation* (protocol, settings, credentials) to the importer. The implementation must be a black-box.
464 1 jortel@redhat.com
465 18 jortel@redhat.com
**jupiter**:
466
467
~~~python
468
~~~
469
470
**saturn**:
471
472
~~~python
473
~~~
474
475 1 jortel@redhat.com
-----
476
477 18 jortel@redhat.com
As the streamer, I can download using any protocol supported by the importer.
478
479
**jupiter**:
480
481
~~~python
482 16 jortel@redhat.com
~~~
483 18 jortel@redhat.com
484
**saturn**:
485
486
~~~python
487 1 jortel@redhat.com
~~~
488
489 18 jortel@redhat.com
-----
490
491
As the streamer, I want to validate downloaded files.
492
493 1 jortel@redhat.com
**jupiter**:
494
495
~~~python
496
~~~
497
498
**saturn**:
499
500
~~~python
501
~~~
502 18 jortel@redhat.com
503
-----
504
505
As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things.
506
507
**jupiter**:
508
509
~~~python
510
~~~
511
512
**saturn**:
513
514
~~~python
515
~~~
516
517
-----
518
519
As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer.
520
521
**jupiter**:
522
523
~~~python
524
~~~
525
526
**saturn**:
527
528
~~~python
529
~~~
530
531
-----
532
533
As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk.
534
535
**jupiter**:
536
537
~~~python
538
~~~
539
540
**saturn**:
541
542
~~~python
543
~~~
544
545
-----
546
547
As the streamer, I need to forward HTTP headers from the download response to the twisted response.
548
549
**jupiter**:
550
551
~~~python
552
~~~
553
554
**saturn**:
555
556
~~~python
557
~~~
558
559
-----
560
561
As the streamer, I can download using (the same) custom logic as the importer such as supporting mirror lists
562
563
**jupiter**:
564
565
~~~python
566
~~~
567
568
**saturn**:
569
570
~~~python
571
~~~
572
573
-----