mirror of https://github.com/Nheko-Reborn/nheko
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1201 lines
38 KiB
1201 lines
38 KiB
#!/usr/bin/python3
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
|
|
import asyncio
|
|
import base64
|
|
import binascii
|
|
import errno
|
|
import fnmatch
|
|
import gzip
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from argparse import ArgumentParser
|
|
from functools import reduce
|
|
from urllib.parse import urljoin, urlparse, urlunparse
|
|
from tenacity import (
|
|
retry,
|
|
retry_if_exception_type,
|
|
stop_after_delay,
|
|
wait_random_exponential,
|
|
)
|
|
|
|
import aiohttp
|
|
import aiohttp.client_exceptions
|
|
import gi
|
|
|
|
gi.require_version("OSTree", "1.0")
|
|
from gi.repository import Gio, GLib, OSTree # noqa: E402
|
|
|
|
UPLOAD_CHUNK_LIMIT = 4 * 1024 * 1024
|
|
DEFAULT_LIMIT = 2**16
|
|
|
|
|
|
def eprint(*args, **kwargs):
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
|
|
class UsageException(Exception):
|
|
def __init__(self, msg):
|
|
self.msg = msg
|
|
|
|
def __str__(self):
|
|
return self.msg
|
|
|
|
|
|
class ApiError(Exception):
|
|
def __init__(self, response, body):
|
|
self.url = str(response.url)
|
|
self.status = response.status
|
|
|
|
try:
|
|
self.body = json.loads(body)
|
|
except (json.JSONDecodeError, TypeError):
|
|
self.body = {
|
|
"status": self.status,
|
|
"error-type": "no-error",
|
|
"message": f"Non-json error from server: {body}",
|
|
}
|
|
|
|
def repr(self):
|
|
return {
|
|
"type": "api",
|
|
"url": self.url,
|
|
"status_code": self.status,
|
|
"details": self.body,
|
|
}
|
|
|
|
def __str__(self):
|
|
return "Api call to %s failed with status %d, details: %s" % (
|
|
self.url,
|
|
self.status,
|
|
self.body,
|
|
)
|
|
|
|
|
|
class ServerApiError(ApiError):
|
|
def __init__(self, response, body):
|
|
super().__init__(response, body)
|
|
|
|
|
|
class FailedJobError(Exception):
|
|
def __init__(self, job):
|
|
self.job = job
|
|
|
|
def repr(self):
|
|
return {"type": "job", "job": self.job}
|
|
|
|
def __str__(self):
|
|
return "Job failed: %s" % (self.job)
|
|
|
|
|
|
TENACITY_RETRY_EXCEPTIONS = retry_if_exception_type(
|
|
(
|
|
aiohttp.client_exceptions.ServerDisconnectedError,
|
|
ServerApiError,
|
|
aiohttp.client_exceptions.ServerConnectionError,
|
|
aiohttp.client_exceptions.ClientOSError,
|
|
)
|
|
)
|
|
TENACITY_STOP_AFTER = stop_after_delay(300)
|
|
TENACITY_WAIT_BETWEEN = wait_random_exponential(multiplier=1, max=60)
|
|
|
|
|
|
# This is similar to the regular payload, but opens the file lazily
|
|
class AsyncNamedFilePart(aiohttp.payload.Payload):
|
|
def __init__(self, value, disposition="attachment", *args, **kwargs):
|
|
self._file = None
|
|
if "filename" not in kwargs:
|
|
kwargs["filename"] = os.path.basename(value)
|
|
|
|
super().__init__(value, *args, **kwargs)
|
|
|
|
if self._filename is not None and disposition is not None:
|
|
self.set_content_disposition(
|
|
disposition, filename=self._filename, quote_fields=False
|
|
)
|
|
|
|
self._size = os.stat(value).st_size
|
|
|
|
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
|
|
raise NotImplementedError
|
|
|
|
async def write(self, writer):
|
|
if self._file is None or self._file.closed:
|
|
self._file = open(self._value, "rb")
|
|
try:
|
|
chunk = self._file.read(DEFAULT_LIMIT)
|
|
while chunk:
|
|
await writer.write(chunk)
|
|
chunk = self._file.read(DEFAULT_LIMIT)
|
|
finally:
|
|
self._file.close()
|
|
|
|
@property
|
|
def size(self):
|
|
return self._size
|
|
|
|
|
|
def ostree_object_path(repo, obj):
|
|
repodir = repo.get_path().get_path()
|
|
return os.path.join(repodir, "objects", obj[0:2], obj[2:])
|
|
|
|
|
|
def ostree_get_dir_files(repo, objects, dirtree):
|
|
if dirtree.endswith(".dirtree"):
|
|
dirtree = dirtree[:-8]
|
|
dirtreev = repo.load_variant(OSTree.ObjectType.DIR_TREE, dirtree)[1]
|
|
iter = OSTree.RepoCommitTraverseIter()
|
|
iter.init_dirtree(repo, dirtreev, 0)
|
|
while True:
|
|
type = iter.next()
|
|
if type == OSTree.RepoCommitIterResult.END:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.ERROR:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.FILE:
|
|
d = iter.get_file()
|
|
objects.add(d.out_checksum + ".filez")
|
|
if type == OSTree.RepoCommitIterResult.DIR:
|
|
pass
|
|
|
|
|
|
def local_needed_files(repo, metadata_objects):
|
|
objects = set()
|
|
for c in metadata_objects:
|
|
if c.endswith(".dirtree"):
|
|
ostree_get_dir_files(repo, objects, c)
|
|
return objects
|
|
|
|
|
|
def local_needed_metadata_dirtree(repo, objects, dirtree_content, dirtree_meta):
|
|
objects.add(dirtree_meta + ".dirmeta")
|
|
dirtree_content_name = dirtree_content + ".dirtree"
|
|
if dirtree_content_name in objects:
|
|
return
|
|
objects.add(dirtree_content_name)
|
|
|
|
dirtreev = repo.load_variant(OSTree.ObjectType.DIR_TREE, dirtree_content)[1]
|
|
iter = OSTree.RepoCommitTraverseIter()
|
|
iter.init_dirtree(repo, dirtreev, 0)
|
|
while True:
|
|
type = iter.next()
|
|
if type == OSTree.RepoCommitIterResult.END:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.ERROR:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.FILE:
|
|
pass
|
|
if type == OSTree.RepoCommitIterResult.DIR:
|
|
d = iter.get_dir()
|
|
local_needed_metadata_dirtree(
|
|
repo, objects, d.out_content_checksum, d.out_meta_checksum
|
|
)
|
|
|
|
|
|
def local_needed_metadata(repo, commits):
|
|
objects = set()
|
|
for rev in commits:
|
|
objects.add(rev + ".commit")
|
|
commitv = repo.load_variant(OSTree.ObjectType.COMMIT, rev)[1]
|
|
iter = OSTree.RepoCommitTraverseIter()
|
|
iter.init_commit(repo, commitv, 0)
|
|
while True:
|
|
type = iter.next()
|
|
if type == OSTree.RepoCommitIterResult.END:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.ERROR:
|
|
break
|
|
if type == OSTree.RepoCommitIterResult.FILE:
|
|
pass
|
|
if type == OSTree.RepoCommitIterResult.DIR:
|
|
d = iter.get_dir()
|
|
local_needed_metadata_dirtree(
|
|
repo, objects, d.out_content_checksum, d.out_meta_checksum
|
|
)
|
|
return objects
|
|
|
|
|
|
def chunks(iterable, n):
|
|
"""Yield successive n-sized chunks from iterable."""
|
|
for i in range(0, len(iterable), n):
|
|
yield iterable[i : i + n]
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def _missing_chunk(session, build_url, chunk, headers):
|
|
wanted_json = json.dumps({"wanted": chunk}).encode("utf-8")
|
|
data = gzip.compress(wanted_json)
|
|
|
|
resp = await session.get(build_url + "/missing_objects", data=data, headers=headers)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
if resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
return await resp.json()
|
|
|
|
|
|
async def missing_objects(session, build_url, token, wanted):
|
|
missing = []
|
|
for chunk in chunks(wanted, 2000):
|
|
headers = {
|
|
"Authorization": "Bearer " + token,
|
|
"Content-Encoding": "gzip",
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = await _missing_chunk(session, build_url, chunk, headers)
|
|
missing.extend(data["missing"])
|
|
return missing
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def upload_files(session, build_url, token, files):
|
|
if len(files) == 0:
|
|
return
|
|
print(
|
|
"Uploading %d files (%d bytes)"
|
|
% (len(files), reduce(lambda x, y: x + y, map(lambda f: f.size, files)))
|
|
)
|
|
with aiohttp.MultipartWriter() as writer:
|
|
for f in files:
|
|
writer.append(f)
|
|
writer.headers["Authorization"] = "Bearer " + token
|
|
resp = await session.request(
|
|
"post", build_url + "/upload", data=writer, headers=writer.headers
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def upload_deltas(
|
|
session, repo_path, build_url, token, deltas, refs, ignore_delta
|
|
):
|
|
if not len(deltas):
|
|
return
|
|
|
|
req = []
|
|
for ref, commit in refs.items():
|
|
# Skip screenshots here
|
|
parts = ref.split("/")
|
|
if (
|
|
len(parts) == 4
|
|
and (parts[0] == "app" or parts[0] == "runtime")
|
|
and not should_skip_delta(parts[1], ignore_delta)
|
|
):
|
|
for delta in deltas:
|
|
# Only upload from-scratch deltas, as these are the only reused ones
|
|
if delta == commit:
|
|
print(" %s: %s" % (ref, delta))
|
|
delta_name = delta_name_encode(delta)
|
|
delta_dir = (
|
|
repo_path + "/deltas/" + delta_name[:2] + "/" + delta_name[2:]
|
|
)
|
|
parts = os.listdir(delta_dir)
|
|
for part in parts:
|
|
req.append(
|
|
AsyncNamedFilePart(
|
|
delta_dir + "/" + part,
|
|
filename=delta_name + "." + part + ".delta",
|
|
)
|
|
)
|
|
|
|
if len(req):
|
|
await upload_files(session, build_url, token, req)
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def upload_objects(session, repo_path, build_url, token, objects):
|
|
req = []
|
|
total_size = 0
|
|
for file_obj in objects:
|
|
named = get_object_multipart(repo_path, file_obj)
|
|
file_size = named.size
|
|
if (
|
|
total_size + file_size > UPLOAD_CHUNK_LIMIT
|
|
): # The new object would bring us over the chunk limit
|
|
if len(req) > 0: # We already have some objects, upload those first
|
|
next_req = [named]
|
|
total_size = file_size
|
|
else:
|
|
next_req = []
|
|
req.append(named)
|
|
total_size = 0
|
|
await upload_files(session, build_url, token, req)
|
|
req = next_req
|
|
else:
|
|
total_size = total_size + file_size
|
|
req.append(named)
|
|
|
|
# Upload any remainder
|
|
await upload_files(session, build_url, token, req)
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def create_ref(session, build_url, token, ref, commit, build_log_url=None):
|
|
print("Creating ref %s with commit %s" % (ref, commit))
|
|
resp = await session.post(
|
|
build_url + "/build_ref",
|
|
headers={"Authorization": "Bearer " + token},
|
|
json={"ref": ref, "commit": commit, "build-log-url": build_log_url},
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
data = await resp.json()
|
|
return data
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def add_extra_ids(session, build_url, token, extra_ids):
|
|
print("Adding extra ids %s" % (extra_ids))
|
|
resp = await session.post(
|
|
build_url + "/add_extra_ids",
|
|
headers={"Authorization": "Bearer " + token},
|
|
json={"ids": extra_ids},
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
data = await resp.json()
|
|
return data
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def get_build(session, build_url, token):
|
|
resp = await session.get(build_url, headers={"Authorization": "Bearer " + token})
|
|
if resp.status != 200:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
data = await resp.json()
|
|
return data
|
|
|
|
|
|
# For stupid reasons this is a string with json, lets expand it
|
|
def reparse_job_results(job):
|
|
job["results"] = json.loads(job.get("results", "{}"))
|
|
return job
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def get_job(session, job_url, token):
|
|
resp = await session.get(
|
|
job_url, headers={"Authorization": "Bearer " + token}, json={}
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
data = await resp.json()
|
|
return data
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def wait_for_job(session, job_url, token):
|
|
reported_delay = False
|
|
old_job_status = 0
|
|
printed_len = 0
|
|
iterations_since_change = 0
|
|
error_iterations = 0
|
|
while True:
|
|
try:
|
|
resp = await session.get(
|
|
job_url,
|
|
headers={"Authorization": "Bearer " + token},
|
|
json={"log-offset": printed_len},
|
|
)
|
|
async with resp:
|
|
if resp.status == 200:
|
|
error_iterations = 0
|
|
job = await resp.json()
|
|
job_status = job["status"]
|
|
if job_status == 0 and not reported_delay:
|
|
reported_delay = True
|
|
start_after_struct = job.get("start_after", None)
|
|
if start_after_struct:
|
|
start_after = start_after_struct.get(
|
|
"secs_since_epoch", None
|
|
)
|
|
now = time.time()
|
|
if start_after and start_after > now:
|
|
print(
|
|
"Waiting %d seconds before starting job"
|
|
% (int(start_after - now))
|
|
)
|
|
if job_status > 0 and old_job_status == 0:
|
|
print("/ Job was started")
|
|
old_job_status = job_status
|
|
log = job["log"]
|
|
if len(log) > 0:
|
|
iterations_since_change = 0
|
|
for line in log.splitlines(True):
|
|
print("| %s" % line, end="")
|
|
printed_len = printed_len + len(log)
|
|
else:
|
|
iterations_since_change = iterations_since_change + 1
|
|
if job_status > 1:
|
|
if job_status == 2:
|
|
print("\\ Job completed successfully")
|
|
else:
|
|
print("\\ Job failed")
|
|
raise FailedJobError(job)
|
|
return job
|
|
else:
|
|
iterations_since_change = (
|
|
4 # Start at 4 so we ramp up the delay faster
|
|
)
|
|
error_iterations = error_iterations + 1
|
|
if error_iterations <= 5:
|
|
print(
|
|
"Unexpected response %s getting job log, ignoring"
|
|
% resp.status
|
|
)
|
|
else:
|
|
raise ApiError(resp, await resp.text())
|
|
except OSError as e:
|
|
if e.args[0] == errno.ECONNRESET:
|
|
# Client disconnected, retry
|
|
# Not sure exactly why, but i got a lot of ConnectionResetErrors here
|
|
# in tests. I guess the server stops reusing a http2 session after a bit
|
|
# Should be fine to retry with the backof
|
|
pass
|
|
else:
|
|
raise
|
|
# Some polling backoff to avoid loading the server
|
|
if iterations_since_change <= 1:
|
|
sleep_time = 1
|
|
elif iterations_since_change < 5:
|
|
sleep_time = 3
|
|
elif iterations_since_change < 15:
|
|
sleep_time = 5
|
|
elif iterations_since_change < 30:
|
|
sleep_time = 10
|
|
else:
|
|
sleep_time = 60
|
|
time.sleep(sleep_time)
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def wait_for_checks(session, build_url, token):
|
|
print("Waiting for checks, if any...")
|
|
while True:
|
|
resp = await session.get(
|
|
build_url + "/extended", headers={"Authorization": "Bearer " + token}
|
|
)
|
|
async with resp:
|
|
if resp.status == 404:
|
|
return
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
build = await resp.json()
|
|
|
|
# wait for the repo to be validated
|
|
if build["build"]["repo_state"] == 1:
|
|
time.sleep(2)
|
|
else:
|
|
break
|
|
|
|
for check in build["checks"]:
|
|
print("Waiting for check: %s" % (check["check_name"]))
|
|
job_url = build_url + "/check/" + check["check_name"] + "/job"
|
|
await wait_for_job(session, job_url, token)
|
|
|
|
resp = await session.get(
|
|
build_url + "/extended", headers={"Authorization": "Bearer " + token}
|
|
)
|
|
async with resp:
|
|
if resp.status == 404:
|
|
return
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
build = await resp.json()
|
|
for check in build["checks"]:
|
|
if check["status"] == 3:
|
|
print("\\ Check {} has failed".format(check["check_name"]))
|
|
raise FailedJobError(check)
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, token):
|
|
print("Committing build %s" % (build_url))
|
|
json = {"endoflife": eol, "endoflife_rebase": eol_rebase}
|
|
if token_type is not None:
|
|
json["token_type"] = token_type
|
|
resp = await session.post(
|
|
build_url + "/commit", headers={"Authorization": "Bearer " + token}, json=json
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
job = await resp.json()
|
|
job_url = resp.headers["location"]
|
|
|
|
if wait:
|
|
pass
|
|
|
|
print("Waiting for commit job")
|
|
await wait_for_checks(session, build_url, token)
|
|
job = await wait_for_job(session, job_url, token)
|
|
|
|
reparse_job_results(job)
|
|
job["location"] = job_url
|
|
return job
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def publish_build(session, build_url, wait, token):
|
|
print("Publishing build %s" % (build_url))
|
|
resp = await session.post(
|
|
build_url + "/publish", headers={"Authorization": "Bearer " + token}, json={}
|
|
)
|
|
async with resp:
|
|
if resp.status == 400:
|
|
body = await resp.json()
|
|
try:
|
|
if isinstance(body, str):
|
|
body = json.loads(body)
|
|
|
|
current_state = body.get("current-state", None)
|
|
|
|
if current_state == "published":
|
|
print("the build has been already published")
|
|
return {}
|
|
if current_state == "publishing":
|
|
print("the build is currently being published")
|
|
return {}
|
|
elif current_state == "failed":
|
|
print("the build has failed")
|
|
raise ApiError(resp, await resp.text())
|
|
elif current_state == "validating":
|
|
print("the build is still being validated or held for review")
|
|
return {}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
if resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
|
|
job = await resp.json()
|
|
job_url = resp.headers["location"]
|
|
|
|
if wait:
|
|
print("Waiting for publish job")
|
|
job = await wait_for_job(session, job_url, token)
|
|
|
|
reparse_job_results(job)
|
|
job["location"] = job_url
|
|
return job
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def purge_build(session, build_url, token):
|
|
print("Purging build %s" % (build_url))
|
|
resp = await session.post(
|
|
build_url + "/purge", headers={"Authorization": "Bearer " + token}, json={}
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status == 400:
|
|
response = await resp.json()
|
|
if response.get("message") == "Can't prune build while in use":
|
|
return '{"OK"}'
|
|
else:
|
|
raise ApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
return await resp.json()
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def create_token(session, manager_url, token, name, subject, scope, duration):
|
|
token_url = urljoin(manager_url, "api/v1/token_subset")
|
|
resp = await session.post(
|
|
token_url,
|
|
headers={"Authorization": "Bearer " + token},
|
|
json={
|
|
"name": name,
|
|
"sub": subject,
|
|
"scope": scope,
|
|
"duration": duration,
|
|
},
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
return await resp.json()
|
|
|
|
|
|
def get_object_multipart(repo_path, object):
|
|
return AsyncNamedFilePart(
|
|
repo_path + "/objects/" + object[:2] + "/" + object[2:], filename=object
|
|
)
|
|
|
|
|
|
@retry(
|
|
stop=TENACITY_STOP_AFTER,
|
|
wait=TENACITY_WAIT_BETWEEN,
|
|
retry=TENACITY_RETRY_EXCEPTIONS,
|
|
reraise=True,
|
|
)
|
|
async def create_command(session, args):
|
|
build_url = urljoin(args.manager_url, "api/v1/build")
|
|
json = {"repo": args.repo}
|
|
if args.app_id is not None:
|
|
json["app-id"] = args.app_id
|
|
if args.public_download is not None:
|
|
json["public-download"] = args.public_download
|
|
if args.build_log_url is not None:
|
|
json["build-log-url"] = args.build_log_url
|
|
resp = await session.post(
|
|
build_url, headers={"Authorization": "Bearer " + args.token}, json=json
|
|
)
|
|
async with resp:
|
|
if resp.status >= 500:
|
|
raise ServerApiError(resp, await resp.text())
|
|
elif resp.status != 200:
|
|
raise ApiError(resp, await resp.text())
|
|
data = await resp.json()
|
|
data["location"] = resp.headers["location"]
|
|
if not args.print_output:
|
|
print(resp.headers["location"])
|
|
return data
|
|
|
|
|
|
def delta_name_part_encode(commit):
|
|
return base64.b64encode(binascii.unhexlify(commit), b"+_")[:-1].decode("utf-8")
|
|
|
|
|
|
def delta_name_encode(delta):
|
|
return "-".join(map(delta_name_part_encode, delta.split("-")))
|
|
|
|
|
|
def should_skip_delta(id, globs):
|
|
if globs:
|
|
for glob in globs:
|
|
if fnmatch.fnmatch(id, glob):
|
|
return True
|
|
return False
|
|
|
|
|
|
def build_url_to_api(build_url):
|
|
parts = urlparse(build_url)
|
|
path = os.path.dirname(os.path.dirname(parts.path))
|
|
return urlunparse((parts.scheme, parts.netloc, path, None, None, None))
|
|
|
|
|
|
async def push_command(session, args):
|
|
local_repo = OSTree.Repo.new(Gio.File.new_for_path(args.repo_path))
|
|
try:
|
|
local_repo.open(None)
|
|
except GLib.Error as err:
|
|
raise UsageException(
|
|
"Can't open repo %s: %s" % (args.repo_path, err.message)
|
|
) from err
|
|
|
|
refs = {}
|
|
if len(args.branches) == 0:
|
|
_, all_refs = local_repo.list_refs(None, None)
|
|
for ref in all_refs:
|
|
if (
|
|
ref.startswith("app/")
|
|
or ref.startswith("runtime/")
|
|
or ref.startswith("screenshots/")
|
|
):
|
|
refs[ref] = all_refs[ref]
|
|
else:
|
|
for branch in args.branches:
|
|
_, rev = local_repo.resolve_rev(branch, False)
|
|
refs[branch] = rev
|
|
|
|
if args.minimal_token:
|
|
id = os.path.basename(urlparse(args.build_url).path)
|
|
token = create_token(
|
|
args.build_url,
|
|
args.token,
|
|
"minimal-upload",
|
|
"build/%s" % (id),
|
|
["upload"],
|
|
60 * 60,
|
|
)["token"]
|
|
else:
|
|
token = args.token
|
|
|
|
print("Uploading refs to %s: %s" % (args.build_url, list(refs)))
|
|
|
|
metadata_objects = local_needed_metadata(local_repo, refs.values())
|
|
|
|
print("Refs contain %d metadata objects" % (len(metadata_objects)))
|
|
|
|
missing_metadata_objects = await missing_objects(
|
|
session, args.build_url, token, list(metadata_objects)
|
|
)
|
|
|
|
print("Remote missing %d of those" % (len(missing_metadata_objects)))
|
|
|
|
file_objects = local_needed_files(local_repo, metadata_objects)
|
|
print("Has %d file objects for those" % (len(file_objects)))
|
|
|
|
missing_file_objects = await missing_objects(
|
|
session, args.build_url, token, list(file_objects)
|
|
)
|
|
print("Remote missing %d of those" % (len(missing_file_objects)))
|
|
|
|
# First upload all missing file objects
|
|
print("Uploading file objects")
|
|
await upload_objects(
|
|
session, args.repo_path, args.build_url, token, missing_file_objects
|
|
)
|
|
|
|
# Then all the metadata
|
|
print("Uploading metadata objects")
|
|
await upload_objects(
|
|
session, args.repo_path, args.build_url, token, missing_metadata_objects
|
|
)
|
|
|
|
_, deltas = local_repo.list_static_delta_names()
|
|
print("Uploading deltas")
|
|
await upload_deltas(
|
|
session, args.repo_path, args.build_url, token, deltas, refs, args.ignore_delta
|
|
)
|
|
|
|
# Then the refs
|
|
for ref, commit in refs.items():
|
|
await create_ref(
|
|
session,
|
|
args.build_url,
|
|
token,
|
|
ref,
|
|
commit,
|
|
build_log_url=args.build_log_url,
|
|
)
|
|
|
|
# Then any extra ids
|
|
if args.extra_id:
|
|
await add_extra_ids(session, args.build_url, token, args.extra_id)
|
|
|
|
commit_job = None
|
|
publish_job = None
|
|
update_job = None
|
|
|
|
# Note, this always uses the full token, as the minimal one only has upload permissions
|
|
if args.commit or args.publish:
|
|
commit_job = await commit_build(
|
|
session,
|
|
args.build_url,
|
|
args.end_of_life,
|
|
args.end_of_life_rebase,
|
|
args.token_type,
|
|
args.publish or args.wait,
|
|
args.token,
|
|
)
|
|
|
|
if args.publish:
|
|
publish_job = await publish_build(
|
|
session, args.build_url, args.wait or args.wait_update, args.token
|
|
)
|
|
update_job_id = publish_job.get("results", {}).get("update-repo-job", None)
|
|
if update_job_id:
|
|
print("Queued repo update job %d" % (update_job_id))
|
|
update_job_url = (
|
|
build_url_to_api(args.build_url) + "/job/" + str(update_job_id)
|
|
)
|
|
if args.wait_update:
|
|
print("Waiting for repo update job")
|
|
update_job = await wait_for_job(session, update_job_url, token)
|
|
else:
|
|
update_job = await get_job(session, update_job_url, token)
|
|
reparse_job_results(update_job)
|
|
update_job["location"] = update_job_url
|
|
|
|
data = await get_build(session, args.build_url, args.token)
|
|
if commit_job:
|
|
data["commit_job"] = commit_job
|
|
if publish_job:
|
|
data["publish_job"] = publish_job
|
|
if update_job:
|
|
data["update_job"] = update_job
|
|
return data
|
|
|
|
|
|
async def commit_command(session, args):
|
|
job = await commit_build(
|
|
session,
|
|
args.build_url,
|
|
args.end_of_life,
|
|
args.end_of_life_rebase,
|
|
args.token_type,
|
|
args.wait,
|
|
args.token,
|
|
)
|
|
return job
|
|
|
|
|
|
async def publish_command(session, args):
|
|
job = await publish_build(
|
|
session, args.build_url, args.wait or args.wait_update, args.token
|
|
)
|
|
update_job_id = job.get("results", {}).get("update-repo-job", None)
|
|
if update_job_id:
|
|
print("Queued repo update job %d" % (update_job_id))
|
|
update_job_url = build_url_to_api(args.build_url) + "/job/" + str(update_job_id)
|
|
if args.wait_update:
|
|
print("Waiting for repo update job")
|
|
update_job = await wait_for_job(session, update_job_url, args.token)
|
|
else:
|
|
update_job = await get_job(session, update_job_url, args.token)
|
|
reparse_job_results(update_job)
|
|
update_job["location"] = update_job_url
|
|
return job
|
|
|
|
|
|
async def purge_command(session, args):
|
|
job = await purge_build(session, args.build_url, args.token)
|
|
return job
|
|
|
|
|
|
async def create_token_command(session, args):
|
|
data = await create_token(
|
|
session,
|
|
args.manager_url,
|
|
args.token,
|
|
args.name,
|
|
args.subject,
|
|
args.scope,
|
|
args.duration,
|
|
)
|
|
if not args.print_output:
|
|
print(data["token"])
|
|
return data
|
|
|
|
|
|
async def follow_job_command(session, args):
|
|
job = await wait_for_job(session, args.job_url, args.token)
|
|
return job
|
|
|
|
|
|
async def run_with_session(args):
|
|
timeout = aiohttp.ClientTimeout(total=90 * 60)
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
result = await args.func(session, args)
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
progname = os.path.basename(sys.argv[0])
|
|
|
|
parser = ArgumentParser(prog=progname)
|
|
parser.add_argument(
|
|
"-v", "--verbose", action="store_true", help="enable verbose output"
|
|
)
|
|
parser.add_argument("--debug", action="store_true", help="enable debugging output")
|
|
parser.add_argument("--output", help="Write output json to file")
|
|
parser.add_argument("--print-output", action="store_true", help="Print output json")
|
|
parser.add_argument("--token", help="use this token")
|
|
parser.add_argument("--token-file", help="use token from file")
|
|
subparsers = parser.add_subparsers(
|
|
title="subcommands",
|
|
dest="subparser_name",
|
|
description="valid subcommands",
|
|
help="additional help",
|
|
)
|
|
|
|
create_parser = subparsers.add_parser("create", help="Create new build")
|
|
create_parser.add_argument("manager_url", help="remote repo manager url")
|
|
create_parser.add_argument("repo", help="repo name")
|
|
create_parser.add_argument("app_id", nargs="?", help="app ID")
|
|
create_parser.add_argument(
|
|
"--public_download",
|
|
action="store_true",
|
|
default=None,
|
|
help="allow public read access to the build repo",
|
|
)
|
|
create_parser.add_argument(
|
|
"--no_public_download",
|
|
action="store_false",
|
|
dest="public_download",
|
|
default=None,
|
|
help="allow public read access to the build repo",
|
|
)
|
|
create_parser.add_argument(
|
|
"--build-log-url", help="Set URL of the build log for the whole build"
|
|
)
|
|
create_parser.set_defaults(func=create_command)
|
|
|
|
push_parser = subparsers.add_parser("push", help="Push to repo manager")
|
|
push_parser.add_argument("build_url", help="remote build url")
|
|
push_parser.add_argument("repo_path", help="local repository")
|
|
push_parser.add_argument("branches", nargs="*", help="branches to push")
|
|
push_parser.add_argument(
|
|
"--commit", action="store_true", help="commit build after pushing"
|
|
)
|
|
push_parser.add_argument(
|
|
"--publish", action="store_true", help="publish build after committing"
|
|
)
|
|
push_parser.add_argument(
|
|
"--extra-id", action="append", help="add extra collection-id"
|
|
)
|
|
push_parser.add_argument(
|
|
"--ignore-delta", action="append", help="don't upload deltas matching this glob"
|
|
)
|
|
push_parser.add_argument(
|
|
"--wait", action="store_true", help="wait for commit/publish to finish"
|
|
)
|
|
push_parser.add_argument(
|
|
"--wait-update", action="store_true", help="wait for update-repo to finish"
|
|
)
|
|
push_parser.add_argument(
|
|
"--minimal-token",
|
|
action="store_true",
|
|
help="Create minimal token for the upload",
|
|
)
|
|
push_parser.add_argument("--end-of-life", help="Set end of life")
|
|
push_parser.add_argument(
|
|
"--end-of-life-rebase", help="Set new ID which will supercede the current one"
|
|
)
|
|
push_parser.add_argument("--token-type", help="Set token type", type=int)
|
|
push_parser.add_argument(
|
|
"--build-log-url", help="Set URL of the build log for each uploaded ref"
|
|
)
|
|
push_parser.set_defaults(func=push_command)
|
|
|
|
commit_parser = subparsers.add_parser("commit", help="Commit build")
|
|
commit_parser.add_argument(
|
|
"--wait", action="store_true", help="wait for commit to finish"
|
|
)
|
|
commit_parser.add_argument("--end-of-life", help="Set end of life")
|
|
commit_parser.add_argument(
|
|
"--end-of-life-rebase", help="Set new ID which will supercede the current one"
|
|
)
|
|
commit_parser.add_argument("--token-type", help="Set token type", type=int)
|
|
commit_parser.add_argument("build_url", help="remote build url")
|
|
commit_parser.set_defaults(func=commit_command)
|
|
|
|
publish_parser = subparsers.add_parser("publish", help="Publish build")
|
|
publish_parser.add_argument(
|
|
"--wait", action="store_true", help="wait for publish to finish"
|
|
)
|
|
publish_parser.add_argument(
|
|
"--wait-update", action="store_true", help="wait for update-repo to finish"
|
|
)
|
|
publish_parser.add_argument("build_url", help="remote build url")
|
|
publish_parser.set_defaults(func=publish_command)
|
|
|
|
purge_parser = subparsers.add_parser("purge", help="Purge build")
|
|
purge_parser.add_argument("build_url", help="remote build url")
|
|
purge_parser.set_defaults(func=purge_command)
|
|
|
|
create_token_parser = subparsers.add_parser(
|
|
"create-token", help="Create subset token"
|
|
)
|
|
create_token_parser.add_argument("manager_url", help="remote repo manager url")
|
|
create_token_parser.add_argument("name", help="Name")
|
|
create_token_parser.add_argument("subject", help="Subject")
|
|
create_token_parser.add_argument("scope", nargs="*", help="Scope")
|
|
create_token_parser.add_argument(
|
|
"--duration",
|
|
help="Duration until expires, in seconds",
|
|
default=60 * 60 * 24, # Default duration is one day
|
|
type=int,
|
|
)
|
|
create_token_parser.set_defaults(func=create_token_command)
|
|
|
|
follow_job_parser = subparsers.add_parser(
|
|
"follow-job", help="Follow existing job log"
|
|
)
|
|
follow_job_parser.add_argument("job_url", help="url of job")
|
|
follow_job_parser.set_defaults(func=follow_job_command)
|
|
|
|
args = parser.parse_args()
|
|
|
|
loglevel = logging.WARNING
|
|
if args.verbose:
|
|
loglevel = logging.INFO
|
|
if args.debug:
|
|
loglevel = logging.DEBUG
|
|
|
|
logging.basicConfig(
|
|
format="%(module)s: %(levelname)s: %(message)s",
|
|
level=loglevel,
|
|
stream=sys.stderr,
|
|
)
|
|
|
|
if not args.subparser_name:
|
|
print("No subcommand specified, see --help for usage")
|
|
exit(1)
|
|
|
|
if not args.token:
|
|
if args.token_file:
|
|
file = open(args.token_file, "rb")
|
|
args.token = file.read().splitlines()[0].decode("utf-8").strip()
|
|
elif "REPO_TOKEN" in os.environ:
|
|
args.token = os.environ["REPO_TOKEN"]
|
|
else:
|
|
print("No token available, pass with --token, --token-file or $REPO_TOKEN")
|
|
exit(1)
|
|
|
|
res = 1
|
|
output = None
|
|
try:
|
|
result = asyncio.run(run_with_session(args))
|
|
|
|
output = {
|
|
"command": args.subparser_name,
|
|
"result": result,
|
|
}
|
|
res = 0
|
|
except (ApiError, FailedJobError) as e:
|
|
eprint(str(e))
|
|
output = {
|
|
"command": args.subparser_name,
|
|
"error": e.repr(),
|
|
}
|
|
except UsageException as e:
|
|
eprint(str(e))
|
|
output = {
|
|
"error": {
|
|
"type": "usage",
|
|
"details": {
|
|
"message": str(e),
|
|
},
|
|
}
|
|
}
|
|
except Exception:
|
|
ei = sys.exc_info()
|
|
eprint(
|
|
"Unexpected %s exception in %s: %s"
|
|
% (ei[0].__name__, args.subparser_name, ei[1])
|
|
)
|
|
eprint(traceback.format_exc())
|
|
output = {
|
|
"command": args.subparser_name,
|
|
"error": {
|
|
"type": "exception",
|
|
"details": {
|
|
"error-type": ei[0].__name__,
|
|
"message": str(ei[1]),
|
|
},
|
|
},
|
|
}
|
|
res = 1
|
|
|
|
if output:
|
|
if args.print_output:
|
|
print(json.dumps(output, indent=4))
|
|
if args.output:
|
|
f = open(args.output, "w+")
|
|
f.write(json.dumps(output, indent=4))
|
|
f.write("\n")
|
|
f.close()
|
|
exit(res)
|
|
|