Project

Profile

Help

Issue #1891 ยป import_test.py

yuzheng, 05/04/2016 09:01 AM

 
import sys
import os
import functools
import json
from kobo.threads import run_in_threads
import rcm_pa_tool.pulpadmin
from rcm_pa_tool.pulpadmin import PulpAdmin

pulp_url = "localhost"
pulp_port = "443"
pulp_username = "admin"
pulp_password = "admin"

tasks = []

def import_unit(pa, task, upload, num):
print "Uploading [%4s/%4s]: %s" % (num, task.pool.queue_total,upload)
ret = pa.proxy.content._import(REPO_ID="all-rpm-content",
upload_id=upload,
unit_metadata={'checksumtype': 'sha256'},
unit_key=None,
unit_type_id="rpm").json()
tasks.extend(pa.get_async_task_id(ret))


def main():
pa = PulpAdmin("https", pulp_url, username=pulp_username, password=pulp_password, port=pulp_port)
uploads = os.listdir("/var/lib/pulp/uploads/")
threads_upload = 5
print "Uploading in %s threads" % threads_upload
run_in_threads(functools.partial(import_unit, pa), uploads, threads=threads_upload)

print "waiting for tasks %s" % tasks
pa.wait_for_tasks(tasks)
print "awaited tasks finished"


if __name__ == "__main__":
main()
    (1-1/1)