|
from pulpcore.client.pulpcore import (
|
|
ApiClient as CoreApiClient,
|
|
Configuration,
|
|
TasksApi
|
|
)
|
|
from pulpcore.client.pulp_rpm import (
|
|
ApiClient as RpmApiClient,
|
|
ContentPackagesApi,
|
|
RepositoriesRpmApi,
|
|
RemotesRpmApi,
|
|
RepositoriesRpmVersionsApi,
|
|
RpmRepositorySyncURL,
|
|
PublicationsRpmApi,
|
|
DistributionsRpmApi,
|
|
)
|
|
from pulp_smash.pulp3.bindings import monitor_task
|
|
|
|
|
|
import socket
|
|
|
|
configuration = Configuration()
|
|
configuration.username = 'admin'
|
|
configuration.password = 'password'
|
|
configuration.host = 'http://{}:24817'.format(socket.gethostname())
|
|
configuration.safe_chars_for_path_param = '/'
|
|
|
|
core_client = CoreApiClient(configuration)
|
|
rpm_client = RpmApiClient(configuration)
|
|
|
|
# Create api clients for all resource types
|
|
rpm_repo_api = RepositoriesRpmApi(rpm_client)
|
|
rpm_repo_versions_api = RepositoriesRpmVersionsApi(rpm_client)
|
|
rpm_content_api = ContentPackagesApi(rpm_client)
|
|
rpm_remote_api = RemotesRpmApi(rpm_client)
|
|
rpm_publication_api = PublicationsRpmApi(rpm_client)
|
|
rpm_distributions_api = DistributionsRpmApi(rpm_client)
|
|
|
|
|
|
tasks_api = TasksApi(core_client)
|
|
|
|
# Sync test
|
|
# =========
|
|
|
|
FIXTURE = "https://fixtures.pulpproject.org/rpm-unsigned/"
|
|
|
|
# ==================================================
|
|
|
|
RPM_REPO_URL = FIXTURE
|
|
RPM_REPO_NAME = "test"
|
|
|
|
|
|
remote_details = {
|
|
'name': RPM_REPO_NAME,
|
|
'url': RPM_REPO_URL,
|
|
'policy': 'on_demand',
|
|
'tls_validation': False,
|
|
}
|
|
|
|
rpm_remote = rpm_remote_api.create(remote_details)
|
|
|
|
rpm_repo = rpm_repo_api.create({'name': RPM_REPO_NAME, 'remote': rpm_remote.pulp_href, 'autopublish': False})
|
|
rpm_repo = rpm_repo_api.list(name=RPM_REPO_NAME).results[0]
|
|
rpm_remote = rpm_remote_api.list(name=RPM_REPO_NAME).results[0]
|
|
|
|
|
|
# ======================
|
|
# Sync
|
|
# ======================
|
|
|
|
repository_sync_data = RpmRepositorySyncURL(mirror=True)
|
|
|
|
sync_response = rpm_repo_api.sync(rpm_repo.pulp_href, repository_sync_data)
|
|
task = monitor_task(sync_response.task)
|
|
time_diff = task.finished_at - task.started_at
|
|
print("Sync time: {} seconds".format(time_diff.seconds))
|
|
|
|
remote_details.update({"url": "https://fixtures.pulpproject.org/rpm-alt-layout/"})
|
|
update_response = rpm_remote_api.update(rpm_remote.pulp_href, remote_details)
|
|
monitor_task(update_response.task)
|
|
|
|
resync_response = rpm_repo_api.sync(rpm_repo.pulp_href, repository_sync_data)
|
|
task = monitor_task(resync_response.task)
|
|
time_diff = task.finished_at - task.started_at
|
|
print("Re-sync time: {} seconds".format(time_diff.seconds))
|