From 6529240be8d241d680ebaa1ee849ecb15b441a4e Mon Sep 17 00:00:00 2001 From: Nicolas Werner Date: Tue, 27 Dec 2022 04:30:32 +0100 Subject: [PATCH] Update flat-manager-client --- scripts/flat-manager-client | 123 ++++++++++++++++++++++++++++++++---- 1 file changed, 111 insertions(+), 12 deletions(-) diff --git a/scripts/flat-manager-client b/scripts/flat-manager-client index 99f23b70..d50b5ec1 100755 --- a/scripts/flat-manager-client +++ b/scripts/flat-manager-client @@ -31,7 +31,8 @@ from functools import reduce from urllib.parse import urljoin, urlparse, urlsplit, urlunparse, urlunsplit import aiohttp -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed +import aiohttp.client_exceptions +from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_random_exponential import gi gi.require_version('OSTree', '1.0') @@ -55,7 +56,7 @@ class ApiError(Exception): self.status = response.status try: - self.body = json.loads(response); + self.body = json.loads(body) except: self.body = {"status": self.status, "error-type": "no-error", "message": "No json error details from server"} @@ -71,6 +72,10 @@ class ApiError(Exception): return "Api call to %s failed with status %d, details: %s" % (self.url, self.status, self.body) +TENACITY_RETRY_EXCEPTIONS = (retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError) | retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerConnectionError)) +TENACITY_STOP_AFTER = stop_after_delay(300) +TENACITY_WAIT_BETWEEN = wait_random_exponential(multiplier=1, max=60) + # This is similar to the regular payload, but opens the file lazily class AsyncNamedFilePart(aiohttp.payload.Payload): def __init__(self, @@ -181,6 +186,13 @@ def chunks(l, n): for i in range(0, len(l), n): yield l[i:i + n] + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def missing_objects(session, build_url, token, wanted): missing=[] for chunk in chunks(wanted, 2000): @@ -199,6 +211,12 @@ async def missing_objects(session, build_url, token, wanted): missing.extend(data["missing"]) return missing +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=(retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError)), + reraise=True, +) async def upload_files(session, build_url, token, files): if len(files) == 0: return @@ -212,6 +230,7 @@ async def upload_files(session, build_url, token, files): if resp.status != 200: raise ApiError(resp, await resp.text()) + async def upload_deltas(session, repo_path, build_url, token, deltas, refs, ignore_delta): if not len(deltas): return @@ -258,6 +277,13 @@ async def upload_objects(session, repo_path, build_url, token, objects): # Upload any remainder await upload_files(session, build_url, token, req) + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def create_ref(session, build_url, token, ref, commit): print("Creating ref %s with commit %s" % (ref, commit)) resp = await session.post(build_url + "/build_ref", headers={'Authorization': 'Bearer ' + token}, json= { "ref": ref, "commit": commit} ) @@ -268,6 +294,13 @@ async def create_ref(session, build_url, token, ref, commit): data = await resp.json() return data + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def add_extra_ids(session, build_url, token, extra_ids): print("Adding extra ids %s" % (extra_ids)) resp = await session.post(build_url + "/add_extra_ids", headers={'Authorization': 'Bearer ' + token}, json= { "ids": extra_ids} ) @@ -278,6 +311,13 @@ async def add_extra_ids(session, build_url, token, extra_ids): data = await resp.json() return data + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def get_build(session, build_url, token): resp = await session.get(build_url, headers={'Authorization': 'Bearer ' + token}) if resp.status != 200: @@ -290,6 +330,13 @@ def reparse_job_results(job): job["results"] = json.loads(job.get("results", "{}")) return job + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def get_job(session, job_url, token): resp = await session.get(job_url, headers={'Authorization': 'Bearer ' + token}, json={}) async with resp: @@ -298,6 +345,13 @@ async def get_job(session, job_url, token): data = await resp.json() return data + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def wait_for_job(session, job_url, token): reported_delay = False old_job_status = 0 @@ -366,6 +420,13 @@ async def wait_for_job(session, job_url, token): sleep_time=60 time.sleep(sleep_time) + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, token): print("Committing build %s" % (build_url)) json = { @@ -390,15 +451,24 @@ async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, to job["location"] = job_url return job + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def publish_build(session, build_url, wait, token): print("Publishing build %s" % (build_url)) resp = await session.post(build_url + "/publish", headers={'Authorization': 'Bearer ' + token}, json= { } ) async with resp: if resp.status == 400: - body = await resp.text() + body = await resp.json() try: - msg = json.loads(body) - if msg.get("current-state", "") == "published": + if isinstance(body, str): + body = json.loads(body) + + if body.get("current-state") == "published": print("the build has been already published") return {} except: @@ -418,6 +488,13 @@ async def publish_build(session, build_url, wait, token): job["location"] = job_url return job + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def purge_build(session, build_url, token): print("Purging build %s" % (build_url)) resp = await session.post(build_url + "/purge", headers={'Authorization': 'Bearer ' + token}, json= {} ) @@ -426,6 +503,13 @@ async def purge_build(session, build_url, token): raise ApiError(resp, await resp.text()) return await resp.json() + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def create_token(session, manager_url, token, name, subject, scope, duration): token_url = urljoin(manager_url, "api/v1/token_subset") resp = await session.post(token_url, headers={'Authorization': 'Bearer ' + token}, json = { @@ -442,11 +526,23 @@ async def create_token(session, manager_url, token, name, subject, scope, durati def get_object_multipart(repo_path, object): return AsyncNamedFilePart(repo_path + "/objects/" + object[:2] + "/" + object[2:], filename=object) + +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) async def create_command(session, args): build_url = urljoin(args.manager_url, "api/v1/build") - resp = await session.post(build_url, headers={'Authorization': 'Bearer ' + args.token}, json={ + json = { "repo": args.repo - }) + } + if args.app_id is not None: + json["app-id"] = args.app_id + if args.public_download is not None: + json["public-download"] = args.public_download + resp = await session.post(build_url, headers={'Authorization': 'Bearer ' + args.token}, json=json) async with resp: if resp.status != 200: raise ApiError(resp, await resp.text()) @@ -475,9 +571,9 @@ def build_url_to_api(build_url): return urlunparse((parts.scheme, parts.netloc, path, None, None, None)) @retry( - stop=stop_after_attempt(6), - wait=wait_fixed(10), - retry=retry_if_exception_type(ApiError), + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, reraise=True, ) async def push_command(session, args): @@ -630,6 +726,9 @@ if __name__ == '__main__': create_parser = subparsers.add_parser('create', help='Create new build') create_parser.add_argument('manager_url', help='remote repo manager url') create_parser.add_argument('repo', help='repo name') + create_parser.add_argument('app_id', nargs='?', help='app ID') + create_parser.add_argument('--public_download', action='store_true', default=None, help='allow public read access to the build repo') + create_parser.add_argument('--no_public_download', action='store_false', dest='public_download', default=None, help='allow public read access to the build repo') create_parser.set_defaults(func=create_command) push_parser = subparsers.add_parser('push', help='Push to repo manager') @@ -717,8 +816,7 @@ if __name__ == '__main__': res = 1 output = None try: - loop = asyncio.get_event_loop() - result = loop.run_until_complete(run_with_session(args)) + result = asyncio.run(run_with_session(args)) output = { "command": args.subparser_name, @@ -770,3 +868,4 @@ if __name__ == '__main__': f.write("\n") f.close() exit(res) +