⚲
Project
Profile
Help
Tour this page
Frequent Planio Questions
Learning Resources
Sign in
Register
Switch Planio account
Open in Planio App
Share current page
Search
:
Projects
All Projects
Help
Tour this page
Frequent Planio Questions
Learning Resources
Sign in
Register
Switch Planio account
Open in Planio App
Share current page
Pulp
Overview
Roadmap
Issues
Gantt
Agile board
Calendar
Wiki
Repository
Download (1.19 KB)
Issue #1891
ยป import_test.py
yuzheng
, 05/04/2016 09:01 AM
import
sys
import
os
import
functools
import
json
from
kobo.threads
import
run_in_threads
import
rcm_pa_tool.pulpadmin
from
rcm_pa_tool.pulpadmin
import
PulpAdmin
pulp_url
=
"localhost"
pulp_port
=
"443"
pulp_username
=
"admin"
pulp_password
=
"admin"
tasks
=
[]
def
import_unit
(
pa
,
task
,
upload
,
num
):
print
"Uploading [%4s/%4s]: %s"
%
(
num
,
task
.
pool
.
queue_total
,
upload
)
ret
=
pa
.
proxy
.
content
.
_import
(
REPO_ID
=
"all-rpm-content"
,
upload_id
=
upload
,
unit_metadata
=
{
'checksumtype'
:
'sha256'
},
unit_key
=
None
,
unit_type_id
=
"rpm"
).
json
()
tasks
.
extend
(
pa
.
get_async_task_id
(
ret
))
def
main
():
pa
=
PulpAdmin
(
"https"
,
pulp_url
,
username
=
pulp_username
,
password
=
pulp_password
,
port
=
pulp_port
)
uploads
=
os
.
listdir
(
"/var/lib/pulp/uploads/"
)
threads_upload
=
5
print
"Uploading in %s threads"
%
threads_upload
run_in_threads
(
functools
.
partial
(
import_unit
,
pa
),
uploads
,
threads
=
threads_upload
)
print
"waiting for tasks %s"
%
tasks
pa
.
wait_for_tasks
(
tasks
)
print
"awaited tasks finished"
if
__name__
==
"__main__"
:
main
()
(1-1/1)
Loading...