Project

Profile

Help

Test #5181 » add_remove_utils.py

amacdona@redhat.com, 07/25/2019 02:48 PM

 
import sys
import json
from requests.auth import HTTPBasicAuth
import requests
from subprocess import check_output

DESTINATION_REPO = 'testrepo'
SOURCE_REPO = 'busybox'

TAGS = [
u'54ba1853-5e31-4b9d-b162-71ce4830d60d', u'bb84916b-b152-4ecb-950d-3974ef88154a',
u'2e83a76e-7511-4a28-8237-d6ac342ff5ba', u'eba2af8f-7ca3-4147-b07f-72b0d3b4c113',
u'958be6c8-cc96-4c1e-9e58-5e912044e68e', u'b9024e42-035d-4454-b418-954e94dda345',
u'ef82acad-afa8-4424-ada9-3e0e05d863e1', u'ec6e6830-1201-451a-a1e8-52e7dadbaf9b',
u'48b53d66-2446-4f9f-8cb4-40f6ea54b16d', u'c4048925-b632-4edb-9bf2-c62d1d8d096f'
]

# TEN_MANIFEST_LIST_IDS = [
# u'7a163c20-400c-49a2-bd10-53fd87005920', u'376ec577-96fc-4ae2-9977-3469fc6145f3',
# u'da70ae27-bca4-4602-ae21-eb5dbcdcaa84', u'd284f52a-16d5-47e9-bfb2-7f62c1cb45a1',
# u'581ecfeb-f5c5-43b7-8086-20c6251baa1a', u'5ee10453-92f1-4367-ba82-9e4c2b5b1136',
# u'c4a2c81e-53e1-4195-9c9d-19f7109e241c', u'8393b0dc-de8b-45bb-879a-8b8bb689d569',
# u'bcfd8d6d-09cc-4744-923c-385da00896e8', u'6ef97386-c7c3-49eb-8604-67c73b31ac01'
# ]

MANIFEST_LISTS_NO_SHARED_MANIFESTS = [
u'7a163c20-400c-49a2-bd10-53fd87005920', u'376ec577-96fc-4ae2-9977-3469fc6145f3',
u'da70ae27-bca4-4602-ae21-eb5dbcdcaa84', u'581ecfeb-f5c5-43b7-8086-20c6251baa1a',
u'c4a2c81e-53e1-4195-9c9d-19f7109e241c', u'8393b0dc-de8b-45bb-879a-8b8bb689d569',
u'bcfd8d6d-09cc-4744-923c-385da00896e8', u'6ef97386-c7c3-49eb-8604-67c73b31ac01'
]

# 2 manifests are shared between them
MANIFEST_LISTS_SHARING_MANIFESTS = [
u'd284f52a-16d5-47e9-bfb2-7f62c1cb45a1',
u'5ee10453-92f1-4367-ba82-9e4c2b5b1136',
]

MANIFESTS_NO_SHARED_BLOBS = [
u'0023cd1e-b3f1-4044-9421-16b06f0189fc',
u'03ccd921-ae5c-48a3-a7ca-867896a8c00a',
u'05976e8f-fd28-450c-860c-d356409b8171',
u'063fa040-251f-42a0-a1d4-6df3081aa6b3',
u'0702f826-8cbb-4481-969f-58456d28fc04',
u'08648b19-9fa8-4793-a613-97d3aa2138ac',
u'087e9d85-56ef-45f0-991a-3e33b319e27c',
]

MANIFESTS_WITH_SHARED_BLOBS = [
u'03769303-0fec-479b-9b98-69d13da6ebb4',
u'076f45c1-a066-41e5-a45a-45a65cf379fc',
u'0b11afd6-6d6a-4ac7-801d-cd87c62c7729'
]


AUTH = HTTPBasicAuth('admin', 'admin')
BASE_URL = 'https://pulp2.dev/pulp/api/v2'


def git_info():
out = check_output(["git", "branch"]).decode("utf8")
current = next(line for line in out.split("\n") if line.startswith("*"))
print("\n=======================================================")
print(" BRANCH " + current.strip("*").strip())
print("\n=======================================================")
cleanup()


def ensure_empty():
count = get_repo_content_count(DESTINATION_REPO)
if count:
print("NON EMPTY REPO")
print(count)
sys.exit(1)


def cleanup():
tags = set(get_units_by_type(DESTINATION_REPO, 'docker_tag'))
manifest_lists = set(get_units_by_type(DESTINATION_REPO, 'docker_manifest_list'))
manifests = set(get_units_by_type(DESTINATION_REPO, 'docker_manifest'))
blobs = set(get_units_by_type(DESTINATION_REPO, 'docker_blob'))
rm_by_content_pk(list(tags) + list(manifest_lists) + list(manifests) + list(blobs))
if get_repo_content_count(DESTINATION_REPO):
raise Exception("cleanup")


def wait_until_complete(task):
state = ''
while state != "finished":
req = requests.get(
url='https://pulp2.dev{task}'.format(task=task),
auth=AUTH,
)
state = json.loads(req.content)['state']
return json.loads(req.content)


def get_units_by_type(repo_id, content_type):
criteria = {
"type_ids": [content_type],
"filters": {
"unit": {}
}
}
req = requests.post(
url='{base}/repositories/{repo}/search/units/'.format(base=BASE_URL, repo=repo_id),
data=json.dumps({"criteria": criteria}),
auth=AUTH,
)
id_list = [content['unit_id'] for content in json.loads(req.content)]
return id_list


def get_repo_content_count(repo_id):
req = requests.get(
url='{base}/repositories/{repo}/'.format(base=BASE_URL, repo=repo_id),
auth=AUTH
)
counts_dict = json.loads(req.content)['content_unit_counts']
return counts_dict


def get_repo_tag_names(repo_id):
criteria = {
"type_ids": ['docker_tag'],
"filters": {
"unit": {}
}
}
req = requests.post(
url='{base}/repositories/{repo}/search/units/'.format(base=BASE_URL, repo=repo_id),
data=json.dumps({"criteria": criteria}),
auth=AUTH,
)
tag_names = [content['metadata']['name'] for content in json.loads(req.content)]
return tag_names


def copy_by_content_pk(content_type, content_pks):
criteria = {
"type_ids": [content_type],
"filters": {"unit": {"_id": {"$in": content_pks}}},
}
req = requests.post(
url='{base}/repositories/{repo}/actions/associate/'.format(base=BASE_URL,
repo=DESTINATION_REPO),
data=json.dumps({
"criteria": criteria,
"source_repo_id": SOURCE_REPO,
}),
auth=AUTH,
)
task = json.loads(req.content)['spawned_tasks'][0]['_href']
content = wait_until_complete(task)


def rm_by_content_pk(all_units):
criteria = {
"type_ids": ['docker_tag', 'docker_manifest_list', 'docker_manifest', 'docker_blob'],
"filters": {"unit": {"_id": {"$in": all_units}}},
}
req = requests.post(
url='{base}/repositories/{repo}/actions/unassociate/'.format(base=BASE_URL,
repo=DESTINATION_REPO),
data=json.dumps({"criteria": criteria}),
auth=AUTH,
)
task = json.loads(req.content)['spawned_tasks'][0]['_href']
content = wait_until_complete(task)
(2-2/3)