Project

Profile

Help

Downloading » History » Sprint/Milestone 25

jortel@redhat.com, 09/06/2017 04:34 PM

1 1 jortel@redhat.com
# Downloading
2
3 24 jortel@redhat.com
In pulp3, there are two competing technologies and designs being considered. For the purposes of the discussion we'll name them **Jupiter** and **Saturn**. The *Jupiter* solution is based on *concurrent.futures* and the Saturn solution is based on *asyncio*. In addition to the underlying technology difference, the solutions meet the requirements in different ways. The *Jupiter* solution includes more classes, provides more abstraction and supports customization through delegation and object composition. The *Saturn* solution meets the requirements with the fewest classes possible and minimum abstraction. Customization is supported though subclassing.
4 3 jortel@redhat.com
5 5 jortel@redhat.com
The three actors for our use cases is the *Importer*, *Streamer* and Plugin Writer. The *ChangeSet* shares a subset of the Streamer requirements but not included in this discussion.
6 3 jortel@redhat.com
7 21 jortel@redhat.com
## Design Goals & Constraints
8
9 22 jortel@redhat.com
The requirements define the minimum criteria to be satisfied by both solutions. The design constrains and goals define <span class="underline">how</span> the requirements are met.
10
11 21 jortel@redhat.com
**juniper**:
12
13
  - constraints:
14
15
>   - object oriented
16
>   - support semantic versioning
17
18
  - goals
19
20
>   - encapsulate underlying technologies
21
>   - consistent interface across downloads. standard arguments, return values and raised exceptions.
22
>   - delegation pattern for common customization:
23
>
24
>>   - handling of downloaded bits to *Writers*
25
>>   - validation delegated to *Validations*
26
>>   - optional digest and size calculation delegated to *DownloadMonitor*
27
>>   - error handling delegated to *Event* handlers.
28
>
29
>   - external participation of download process through defined event registration and callback.
30
>   - delegate concurrency to standard lib (*concurrent.futures*).
31
>   - delegate protocol implementation to client libs.
32
33
**saturn**:
34
35
  - constraints:
36
37
>   - object oriented
38
>   - support semantic versioning
39
40
  - goals
41
42
>   - direct exposure of client libs.
43
>   - minimum encapsulation of underlying technologies.
44
>   - minimum \# of first class concepts (classes) and abstractions.
45
>   - minimum \# lines of code to maintain.
46
>   - delegate concurrency to standard lib (*asyncio*).
47
>   - delegate protocol implementation to client libs.
48
49 1 jortel@redhat.com
## Use Cases
50
51 2 jortel@redhat.com
### Importer
52 1 jortel@redhat.com
53 5 jortel@redhat.com
As an importer, I need to download single files.
54
55 9 jortel@redhat.com
**jupiter**:
56 5 jortel@redhat.com
57 15 jortel@redhat.com
~~~python
58 6 jortel@redhat.com
download = HttpDownload(
59
    url=url,
60
    writer=FileWriter(path),
61
    timeout=Timeout(connect=10, read=15),
62
    user=User(name='elmer', password='...'),
63
    ssl=SSL(ca_certificate='path-to-certificate',
64
            client_certificate='path-to-certificate',
65
            client_key='path-to-key',
66
            validation=True),
67
    proxy_url='http://user:password@gateway.org')
68 5 jortel@redhat.com
69
try:
70
    download()
71
except DownloadError:
72
    # An error occurred.
73
else:
74
   # Go read the downloaded file \o/
75
~~~
76
77 9 jortel@redhat.com
**saturn**:
78 1 jortel@redhat.com
79 15 jortel@redhat.com
~~~python
80 6 jortel@redhat.com
ssl_context = aiohttpSSLContext()
81
ssl_context.load_cert_chain('path-to-CA_certificate')
82
ssl_context.load_cert_chain('path-to-CLIENT_certificate')
83
ssl_context.load_cert_chain('path-to-CLIENT_key')
84
85
connector=aiohttp.TCPConnector(verify_ssl=True, ssl_context=ssl_context)
86
87
session = aiohttp.ClientSession(
88
    connector=connector,
89
    read_timeout=15,
90
    auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8'))
91
92
downloader_obj = HttpDownloader(
93
    session,
94
    url,
95
    proxy='http://gateway.org',
96
    proxy_auth=aiohttp.BasicAuth('elmer', password='...', encoding='utf-8')
97
98 5 jortel@redhat.com
downloader_coroutine = downloader_obj.run()
99
loop = asyncio._get_running_loop()
100
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
101
for task in done:
102
    try:
103 1 jortel@redhat.com
        result = task.result()  # This is a DownloadResult
104
    except aiohttp.ClientError:
105
        # An error occurred.
106 5 jortel@redhat.com
~~~
107
108 6 jortel@redhat.com
question: How can the connect timeout be set in aiohttp?
109
110 1 jortel@redhat.com
-----
111
112 9 jortel@redhat.com
As an importer, I can leverage all settings supported by underlying protocol specific client lib.
113
114
**jupiter**:
115
116 1 jortel@redhat.com
Commonly used settings supported by abstraction. Additional settings could be supported by subclassing.
117 9 jortel@redhat.com
118 15 jortel@redhat.com
~~~python
119
120 9 jortel@redhat.com
class SpecialDownload(HttpDownload):
121
122
    def _settings(self):
123
        settings = super()._settings()
124
        settings['special'] = <special value>
125
        return settings
126
~~~
127
128
**saturn**:
129
130 10 jortel@redhat.com
The underlying client lib arguments directly exposed.
131 9 jortel@redhat.com
132
-----
133 1 jortel@redhat.com
134 10 jortel@redhat.com
As an importer, I can create an Artifact with a downloaded file using the size and digests calculated during the download.
135
136 1 jortel@redhat.com
**jupiter**:
137
138 10 jortel@redhat.com
Using the optional *DownloadMonitor* to collect statistics such as size and calculate digests.
139
140 15 jortel@redhat.com
~~~python
141
142 10 jortel@redhat.com
download = HttpDownload(..)
143 14 jortel@redhat.com
monitor = DownloadMonitor(download)
144 10 jortel@redhat.com
...  # perform download.
145 14 jortel@redhat.com
artifact = Artifact(**monitor.facts())
146 10 jortel@redhat.com
artifact.save()
147
~~~
148 1 jortel@redhat.com
149
**saturn**:
150 10 jortel@redhat.com
151
The *size* and all *digests* always calculated.
152
153 15 jortel@redhat.com
~~~python
154
155 10 jortel@redhat.com
downloader_obj = HttpDownloader(...)
156
...  # perform download.
157
result = task.result(**result.artifact_attributes)
158
artifact = Artifact()
159
artifact.save()
160
~~~
161
162 11 jortel@redhat.com
-----
163
164 1 jortel@redhat.com
As an importer, I need to download files concurrently.
165
166 11 jortel@redhat.com
**jupiter**:
167
168
Using the *Batch* to run the downloads concurrently. Only 3 downloads in memory at once.
169
170 15 jortel@redhat.com
~~~python
171
172 11 jortel@redhat.com
downloads = (HttpDownload(...) for _ in range(10))
173
174
with Batch(downloads, backlog=3) as batch:
175
    for plan in batch():
176
        try:
177
            plan.result()
178
        except DownloadError:
179
            # An error occurred.
180
        else:
181 1 jortel@redhat.com
            # Use the downloaded file \o/
182
~~~
183 11 jortel@redhat.com
184
**saturn**:
185
186
Using the asyncio run loop. This example does not restrict the number of downloads in memory at once.
187 12 jortel@redhat.com
188 15 jortel@redhat.com
~~~python
189
190 16 jortel@redhat.com
downloaders = (HttpDownloader...) for _ in range(10))
191 11 jortel@redhat.com
192
loop = asyncio._get_running_loop()
193 16 jortel@redhat.com
done, not_done = loop.run_until_complete(asyncio.wait([d.run() for d in downloaders]))
194 11 jortel@redhat.com
for task in done:
195
    try:
196
        result = task.result()  # This is a DownloadResult
197
    except aiohttp.ClientError:
198
        # An error occurred.
199
~~~
200
201 1 jortel@redhat.com
-----
202
203 16 jortel@redhat.com
As an importer, I want to validate downloaded files.
204
205 1 jortel@redhat.com
**jupiter**:
206
207 17 jortel@redhat.com
Supported by adding provided or custom validations to the download. A validation error raises *ValidationError* which IsA *DownloadError*.
208
209 16 jortel@redhat.com
~~~python
210
211
download = HttpDownload(...)
212
download.append(DigestValidation('sha256', '0x1234'))
213
214
try:
215
    download()
216
except DownloadError:
217
    # An error occurred.
218
~~~
219
220
**saturn**:
221
222 17 jortel@redhat.com
Supported by passing the *expected_digests* dictionary and catching *DigestValidationError*.
223 16 jortel@redhat.com
224
~~~python
225
226
downloader_obj = HttpDownloader(..., expected_digests={'sha256': '0x1234'})
227
228
downloader_coroutine = downloader_obj.run()
229
loop = asyncio._get_running_loop()
230
done, not_done = loop.run_until_complete(asyncio.wait([downloader_coroutine]))
231
for task in done:
232
    try:
233
        result = task.result()  # This is a DownloadResult
234
    except (aiohttp.ClientError, DigestValidationError):
235
        # An error occurred.
236
~~~
237
238
-----
239
240 18 jortel@redhat.com
As an importer, I am not required to keep all content (units) and artifacts in memory to support concurrent downloading.
241
242
**jupiter**:
243
244
~~~python
245
~~~
246
247
**saturn**:
248
249
~~~python
250
~~~
251
252
-----
253
254
As an importer, I need a way to link a downloaded file to an artifact without keeping all content units and artifacts in memory.
255
256
**jupiter**:
257
258 19 jortel@redhat.com
Using the *Batch* to run the downloads concurrently. Only 3 downloads in memory at once.
259
260 18 jortel@redhat.com
~~~python
261 19 jortel@redhat.com
262
downloads = (HttpDownload(...) for _ in range(10))
263
264
with Batch(downloads, backlog=3) as batch:
265
    for plan in batch():
266
        try:
267
            plan.result()
268
        except DownloadError:
269
            # An error occurred.
270
        else:
271
            # Use the downloaded file \o/
272 18 jortel@redhat.com
~~~
273
274
**saturn**:
275
276 19 jortel@redhat.com
Using the GroupDownloader?
277
278 18 jortel@redhat.com
~~~python
279
~~~
280
281
-----
282
283
As an importer, I can perform concurrent downloading using a synchronous pattern.
284
285 1 jortel@redhat.com
**jupiter**:
286 18 jortel@redhat.com
287 19 jortel@redhat.com
Using the *Batch*. See other examples.
288 18 jortel@redhat.com
289
**saturn**:
290
291 19 jortel@redhat.com
Using either the *GroupDownloader* or asyncio loop directly. See other examples.
292 18 jortel@redhat.com
293 1 jortel@redhat.com
-----
294
295 18 jortel@redhat.com
As an importer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads.
296 1 jortel@redhat.com
297 18 jortel@redhat.com
**jupiter**:
298
299 20 jortel@redhat.com
The Download.context is designed to support this. The *shared* context can be used to safely share anything This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. When it's appropriate for individual downloads to share things, an external actor like the Batch or the Streamer will ensure that all of the download  
300 19 jortel@redhat.com
objects have the same context.
301 18 jortel@redhat.com
302
**saturn**:
303
304 19 jortel@redhat.com
Each downloader could define a class attribute. This global can be used share anything. This includes python-requests sessions (using a Cache), auth tokens and resolved mirror lists. The sharing is done through collaboration. Sharing is global and unconditional.
305 1 jortel@redhat.com
306 20 jortel@redhat.com
Question: how will thread safety be provided? The streamer will have multiple twisted threads using these downloaders.
307
308 18 jortel@redhat.com
-----
309
310
As an importer I can customize how downloading is performed. For example, to support mirror lists
311
312
**jupiter**:
313 1 jortel@redhat.com
314 23 jortel@redhat.com
All download objects can be customized in one of two ways. First, by delegation using *events*. And, second by subclassing.
315 1 jortel@redhat.com
316 23 jortel@redhat.com
Delegation example.
317
318 1 jortel@redhat.com
~~~python
319 23 jortel@redhat.com
320
class MirrorDelegate:
321
    # Any download can delegate mirror list resolution
322
    # and hunting to this object.
323
324
    def __init__(self):
325
        self.mirrors = iter([])
326
327
    def attach(self, download):
328
        download.register(Event.PREPARED, self.on_prepare)
329
        download.register(Event.ERROR, self.on_error)
330
331
    def on_prepare(self, event):
332
        # Resolve the mirror list URL
333
        # May already be stored in the context or need to be downloaded and parsed.
334
        with event.download.context as context:
335
            try:
336
                mirrors = context.mirrors
337
            except AttributeError:
338
                download = event.download.clone()
339
                download.writer = BufferWriter()
340
                download()
341 25 jortel@redhat.com
                _list = download.writer.content()
342 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
343
                context.mirrors = mirrors
344
        # Align retries with # of mirrors.
345
        event.download.retries = len(mirrors)
346
        self.mirrors = iter(mirrors)
347
        # Start
348
        event.download.url = next(self.mirrors)
349
350
    def on_error(self, event):
351
        try:
352
            event.download.url = next(self.mirrors)
353
        except StopIteration:
354
            # no more mirrors
355
            pass
356
        else:
357
            event.repaired = True
358
359
# importer
360
def get_download(...):
361
    download = Factory.build(...)
362
    delegate = MirrorDelegate()
363
    delegate.attach(download)
364
~~~
365
366
Subclass example.
367
368
~~~python
369
370
class MirrorDownload(HttpDownload):
371
    # Support HTTP/HTTPS mirror list downloading.
372
373
    def _prepare(self):
374
        super()._prepare()
375
        # Resolve the mirror list URL
376
        # May already be stored in the context or need to be downloaded and parsed.
377
        with self.context as context:
378
            try:
379
                mirrors = context.mirrors
380
            except AttributeError:
381
                download = self.clone()
382
                download.writer = BufferWriter()
383
                download()
384 25 jortel@redhat.com
                _list = download.writer.content()
385 23 jortel@redhat.com
                mirrors = [u.strip() for u in _list.split('\n') if u.strip()]
386
                context.mirrors = mirrors
387
        # Align retries with # of mirrors.
388
        self.retries = len(mirrors)
389
        self.mirrors = iter(mirrors)
390
        # Start
391
        self.url = next(self.mirrors)
392
393
    def _on_error(self, event):
394
        super()._on_error(event)
395
        try:
396
            self.url = next(self.mirrors)
397
        except StopIteration:
398
            # no more mirrors
399
            return False
400
        else:
401
            return True
402
403
# importer
404
def get_download(...):
405
    # Factory needs to support custom class.
406 18 jortel@redhat.com
~~~
407
408
**saturn**:
409
410
~~~python
411
~~~
412
413
-----
414
415
As an importer, concurrent downloading must limit the number of simultaneous connections. Downloading 5k artifacts cannot open 5k connections.
416 1 jortel@redhat.com
417 18 jortel@redhat.com
**jupiter**:
418 1 jortel@redhat.com
419 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
420 18 jortel@redhat.com
421
**saturn**:
422
423 20 jortel@redhat.com
This is supported by sharing connection pools and limiting the total number of downloads in progress concurrently. See resource sharing and concurrency limiting use cases.
424 18 jortel@redhat.com
425
-----
426
427
As an importer, I can terminate concurrent downlading at any point and not leak resources.
428
429
**jupiter**:
430
431
~~~python
432
~~~
433
434
**saturn**:
435
436
~~~python
437
~~~
438
439
-----
440
441 1 jortel@redhat.com
As an importer, I can download using any protocol. Starting with HTTP/HTTPS and FTP.
442
443 18 jortel@redhat.com
**jupiter**:
444
445
~~~python
446
~~~
447
448
**saturn**:
449
450
~~~python
451
~~~
452
453
-----
454
455 1 jortel@redhat.com
### Streamer
456
457 18 jortel@redhat.com
As the streamer, I need to download files related to published artifacts and metadata but delegate *the implementation* (protocol, settings, credentials) to the importer. The implementation must be a black-box.
458 1 jortel@redhat.com
459 18 jortel@redhat.com
**jupiter**:
460
461
~~~python
462
~~~
463
464
**saturn**:
465
466
~~~python
467
~~~
468
469 1 jortel@redhat.com
-----
470
471 18 jortel@redhat.com
As the streamer, I can download using any protocol supported by the importer.
472
473
**jupiter**:
474
475
~~~python
476 16 jortel@redhat.com
~~~
477 18 jortel@redhat.com
478
**saturn**:
479
480
~~~python
481 1 jortel@redhat.com
~~~
482
483 18 jortel@redhat.com
-----
484
485
As the streamer, I want to validate downloaded files.
486
487 1 jortel@redhat.com
**jupiter**:
488
489
~~~python
490
~~~
491
492
**saturn**:
493
494
~~~python
495
~~~
496 18 jortel@redhat.com
497
-----
498
499
As the streamer, concurrent downloads must share resources such as sessions,connection pools and auth tokens across individual downloads without having knowledge of such things.
500
501
**jupiter**:
502
503
~~~python
504
~~~
505
506
**saturn**:
507
508
~~~python
509
~~~
510
511
-----
512
513
As the streamer, I need to support complex downloading such as mirror lists. This complexity must be delegated to the importer.
514
515
**jupiter**:
516
517
~~~python
518
~~~
519
520
**saturn**:
521
522
~~~python
523
~~~
524
525
-----
526
527
As the streamer, I need to bridge the downloaded bit stream to the Twisted response. The file is not written to disk.
528
529
**jupiter**:
530
531
~~~python
532
~~~
533
534
**saturn**:
535
536
~~~python
537
~~~
538
539
-----
540
541
As the streamer, I need to forward HTTP headers from the download response to the twisted response.
542
543
**jupiter**:
544
545
~~~python
546
~~~
547
548
**saturn**:
549
550
~~~python
551
~~~
552
553
-----
554
555
As the streamer, I can download using (the same) custom logic as the importer such as supporting mirror lists
556
557
**jupiter**:
558
559
~~~python
560
~~~
561
562
**saturn**:
563
564
~~~python
565
~~~
566
567
-----