#!/usr/bin/env python3 # coding: utf-8 import click import re import os import hashlib from tempfile import TemporaryDirectory from os.path import isdir, join, isfile, abspath, dirname, \ realpath, splitext, relpath from subprocess import check_call, check_output, CalledProcessError, DEVNULL from shutil import which, copytree, copyfile from shlex import quote as shell_quote from collections import OrderedDict from string import Template from datetime import datetime __version__ = "0.2.1" NONE, LOW = 0, 1 VERBOSITY = NONE # USEFUL ROUTINES def fail(msg, *params): raise click.ClickException(msg.format(*params)) def cached_constant(f): cache = [] def _f(): if len(cache) == 0: cache.append(f()) return cache[0] return _f def cached_property(f): key = '_cached_' + f.__name__ def _f(self): if not hasattr(self, key): setattr(self, key, f(self)) return getattr(self, key) return property(_f) def Counter(v = 0): v = [ v ] def _f(): v[0] += 1 return v[0] return _f tmpdir = cached_constant(lambda: TemporaryDirectory()) # pylint: disable=unnecessary-lambda tmpunique = Counter() CURRENT_TIME = datetime.utcnow().isoformat() def tmppath(name = None, directory = False): '''get a temp. path''' count = tmpunique() tmp_directory = tmpdir().name if name is None: name = 'tmp' tmp_path = join(tmp_directory, '{:03d}-{}'.format(count, name)) if directory: os.mkdir(tmp_path) return tmp_path def log(msg, v = 0): if VERBOSITY == NONE: if v == 0: click.secho('LOG {}'.format(msg), fg = 'green') else: if v <= VERBOSITY: click.secho('LOG[{}] {}'.format(v, msg), fg = 'green') def log_check_call(cmd, **kwds): log('Run (check): {}'.format(cmd), LOW) return check_call(cmd, **kwds) def log_check_output(cmd, **kwds): log('Run (output): {}'.format(cmd), LOW) return check_output(cmd, **kwds) @cached_constant def find_bundle_files(): '''finds the location of bundle files''' debocker_dir = dirname(abspath(realpath(__file__))) locations = [ debocker_dir, '/usr/share/debocker' ] for loc in locations: bundle_files = join(loc, 'bundle-files') if isdir(bundle_files): log("Bundle files found in '{}'.".format(bundle_files), LOW) return bundle_files fail('could not find bundle files') def assert_command(cmd): if which(cmd) is None: fail("command '{}' is not available", cmd) @cached_constant def assert_docker(): if which('docker') is None: fail('docker is not available') def extract_pristine_tar(path, candidates): if which('pristine-tar') is None: log('No pristine-tar available.', LOW) return None try: tars = log_check_output([ 'pristine-tar', 'list' ], cwd = path) except CalledProcessError: log('Error in pristine-tar - giving up.', LOW) return None # let's hope that no non-utf8 files are stored thelist = tars.decode('utf-8').split() log('Pristine tarballs: {}'.format(thelist), LOW) matches = list(set(thelist) & set(candidates)) if len(matches) > 0: m = matches[0] log("Found a match: '{}'.".format(m), LOW) log_check_call([ 'pristine-tar', 'checkout', m ], cwd = path, stderr = DEVNULL) try: src = join(path, m) dst = tmppath(m) copyfile(src, dst) log("Tarball extracted to '{}'.".format(dst), LOW) finally: os.unlink(src) return dst else: log('No matching pristine tar found.', LOW) return None class Package: def __init__(self, path): self.path = path self.debian = join(self.path, 'debian') self.control = join(self.debian, 'control') self.changelog = join(self.debian, 'changelog') self.source_format = join(self.debian, 'source', 'format') def is_valid(self): '''verifies that the current directory is a debian package''' return isdir(self.debian) and isfile(self.control) and \ isfile(self.source_format) and self.format in ['native', 'quilt'] def assert_is_valid(self): if not self.is_valid(): fail('not in debian package directory') @cached_property def format(self): with open(self.source_format) as f: line = f.readline().strip() m = re.match(r'^3\.0 \((native|quilt)\)', line) if not m: fail('unsupported format ({})', line) fmt = m.group(1) log("Detected format '{}'.".format(fmt), LOW) return fmt @cached_property def native(self): return self.format == 'native' @cached_property def name(self): with open(self.control) as f: for line in f: m = re.match(r'^Source: (\S+)$', line) if m: return m.group(1) fail('could not find the name of the package') @cached_property def long_version(self): '''long version''' with open(self.changelog) as f: line = f.readline() m = re.match(r'^(\S+) \((\S+)\)', line) if not m: fail("could not parse package version (from '{}')", line.strip()) return m.group(2) @cached_property def version(self): '''upstream version''' if self.native: return self.long_version else: m = re.match(r'^(.+)-(\d+)$', self.long_version) if not m: fail('could not parse version ({})', self.long_version) return m.group(1) @cached_property def orig_tarball_candidates(self): '''possible original upstream tarballs''' formats = [ 'xz', 'gz', 'bz2' ] names = [ '{}_{}.orig.tar.{}'.format(self.name, self.version, fmt) for fmt in formats ] return names @cached_property def orig_tarball(self): '''finds the original tarball''' for name in self.orig_tarball_candidates: tarball = join(self.path, '..', name) log("Trying tarball candidate '{}'.".format(tarball), LOW) if isfile(tarball): log("Original tarball found at '{}'.".format(tarball), LOW) return tarball result = extract_pristine_tar(self.path, self.orig_tarball_candidates) if result is not None: return result fail('could not find original tarball') def assert_orig_tarball(self): if self.native: # for now, we just tar the current directory path = tmppath('{}_{}.tar.xz'.format( self.name, self.version)) with open(path, 'wb') as output: tar = [ 'tar', 'c', '--xz', '--exclude=.git', '-C', self.path, '.' ] log_check_call(tar, stdout = output) return path else: return self.orig_tarball # simple alias def tar_package_debian(self, output, comp = None): # TODO: make it reproducible # pylint: disable=fixme compressions = { 'xz': '--xz' } tar = [ 'tar', 'c' ] if comp: tar += [ compressions[comp] ] debian = join(self.path, 'debian') filelist = get_reproducible_filelist(debian, self.path) tar += [ '--no-recursion', '-C', self.path ] tar += filelist log_check_call(tar, stdout = output) def build_docker_tarball_to_fd(self, output, buildinfo): '''builds the docker tarball that builds the package''' controlfile = join(self.path, 'debian', 'control') controlfile = abspath(controlfile) debianfile = tmppath('debian.tar.xz') with open(debianfile, 'wb') as debian: self.tar_package_debian(debian, comp = 'xz') originalfile = self.assert_orig_tarball() originalfile = abspath(originalfile) if self.native: make_native_bundle(self.name, self.version, controlfile, originalfile, buildinfo, output) else: make_quilt_bundle(self.name, self.long_version, controlfile, originalfile, debianfile, buildinfo, output) def build_docker_tarball(self, filename, buildinfo): with open(filename, 'wb') as output: self.build_docker_tarball_to_fd(output, buildinfo) def calculate_md5_and_size(path): md5 = hashlib.md5() count = 0 with open(path, 'rb') as f: while True: buff = f.read(8192) if len(buff) == 0: break count += len(buff) md5.update(buff) return md5.hexdigest(), count def make_native_bundle(name, version, control, source, buildinfo, output): dsc_name = '{}_{}.dsc'.format(name, version) bundler = Bundler(name, version, control, dsc_name, buildinfo, native = True) _, ext = splitext(source) source_name = '{}_{}.tar{}'.format(name, version, ext) bundler.add_source_file(source_name, source, 'source_tarball') bundler.write_bundle(output = output) def make_quilt_bundle(name, version, control, original, debian, buildinfo, output): dsc_name = '{}_{}.dsc'.format(name, version) bundler = Bundler(name, version, control, dsc_name, buildinfo, native = False) _, oext = splitext(original) _, dext = splitext(debian) # TODO: improve # pylint: disable=fixme uversion = version.split('-')[0] original_name = '{}_{}.orig.tar{}'.format(name, uversion, oext) debian_name = '{}_{}.debian.tar{}'.format(name, version, dext) bundler.add_source_file(original_name, original, 'original_tarball') bundler.add_source_file(debian_name, debian, 'debian_tarball') bundler.write_bundle(output = output) def get_reproducible_filelist(path, base = None): if base is None: base = path elements = [] for p, ds, fs in os.walk(path): p = relpath(p, base) if p != '.': p = join('.', p) elements += [ join(p, x) for x in ds ] elements += [ join(p, x) for x in fs ] return sorted(elements) # STEPS STEPS = OrderedDict([ ('end', None), ('build', 'args_05'), ('extract-source', 'args_04'), ('install-deps', 'args_03'), ('install-utils', 'args_02'), ('upgrade', 'args_01') ]) class Bundler: def __init__(self, name, version, control, dsc_name, buildinfo, native): self.name = name self.version = version self.native = native self.control = control self.sources = [] self.dsc_name = dsc_name self.step_name = buildinfo['step'] self.step = STEPS[self.step_name] self.image = buildinfo['image'] self.buildinfo = buildinfo self.wdir = tmppath('bundle', directory = True) @property def format_string(self): return ('3.0 (native)' if self.native else '3.0 (quilt)') def add_source_file(self, name, path, tag): md5, size = calculate_md5_and_size(path) self.sources.append({ 'name': name, 'path': path, 'md5': md5, 'size': size, 'tag': tag }) def write_dsc_file(self): '''makes minimal, yet functional .dsc file''' path = join(self.wdir, 'source', self.dsc_name) with open(path, 'w') as f: f.write('Format: {}\n'.format(self.format_string)) f.write('Source: {}\n'.format(self.name)) f.write('Version: {}\n'.format(self.version)) f.write('Files:\n') for s in self.sources: f.write(' {} {} {}\n'.format(s['md5'], s['size'], s['name'])) def write_info_file(self, info): path = join(self.wdir, 'info') with open(path, 'w') as f: for k, v in info.items(): f.write("{}={}\n".format(k, v)) def write_buildinfo_file(self): path = join(self.wdir, 'buildinfo') with open(path, 'w') as f: f.write("flags='{}'\n".format(self.buildinfo['flags'])) def write_bundle(self, output): '''writes bundle to a given file''' os.makedirs(join(self.wdir, 'source')) info = OrderedDict() info['bundle_version'] = __version__ info['name'] = self.name info['version'] = self.version info['format'] = ('native' if self.native else 'quilt') def make_link(target, parts): return os.symlink(target, join(self.wdir, *parts)) # control file make_link(self.control, [ 'control' ]) # dsc file self.write_dsc_file() info['dsc_name'] = self.dsc_name # sources for s in self.sources: name = s['name'] tag = s['tag'] make_link(s['path'], [ 'source', name ]) info[tag] = name # info & buildinfo self.write_info_file(info) self.write_buildinfo_file() # bundle files bundle_files = find_bundle_files() copytree(join(bundle_files, 'steps'), join(self.wdir, 'steps')) dockertmpl = join(bundle_files, 'Dockerfile') with open(dockertmpl, 'r') as df: t = Template(df.read()) ctx = { 'image': self.image, 'args_01': 'none', 'args_02': 'none', 'args_03': 'none', 'args_04': 'none', 'args_05': 'none' } if self.step: log("Replacing '{}' with '{}'.".format( self.step, CURRENT_TIME)) if self.step not in ctx: fail('internal error in dockerfile template') ctx[self.step] = CURRENT_TIME rendered = t.substitute(ctx) dockerfile = join(self.wdir, 'Dockerfile') with open(dockerfile, 'w') as f: f.write(rendered) file_list = get_reproducible_filelist(self.wdir) # tar everything tar = [ 'tar', 'c', '-h', '--numeric-owner' ] tar += [ '--no-recursion' ] tar += [ '--owner=0', '--group=0' ] tar += [ '--mtime=1970-01-01' ] tar += [ '-C', self.wdir ] tar += file_list log_check_call(tar, stdout = output) def docker_build_bundle(bundle_name, no_cache, pull): '''builds the given image and returns the final image''' # TODO: quite ugly, cannot be done cleaner? # pylint: disable=fixme build_log = tmppath() bundle_esc = shell_quote(bundle_name) build_log_esc = shell_quote(build_log) docker_opts = [] if no_cache: docker_opts.append('--no-cache') if pull: docker_opts.append('--pull') docker_opts = ' '.join(docker_opts) log_check_call('docker build {} - < {} | tee {}'.format( docker_opts, bundle_esc, build_log_esc), shell = True) with open(build_log) as f: s = f.read().strip() ms = re.findall(r'Successfully built (\S+)', s) if len(ms) != 1: fail('cannot parse logs (build failed?)') image = ms[0] return image # CLI INTERFACE @click.group() @click.option('-v', '--verbose', count=True, help = 'be verbose, repeat for more effect') def cli(verbose): global VERBOSITY # pylint: disable=global-statement VERBOSITY = verbose @cli.command(help = 'Write tar bundle') @click.argument('path', default = '.') @click.option('-o', '--output', default = None, metavar = 'FILE', help = 'output file') @click.option('-f', '--flags', default = '', metavar = 'FLAGS', help = 'build flags') @click.option('step', '--from', default = 'end', help = 'start from the given step', type = click.Choice(STEPS.keys())) @click.option('--image', default = 'debian:unstable', metavar = 'IMAGE', help = 'base docker image') def bundle(path, output, flags, step, image): pkg = Package(path) pkg.assert_is_valid() if output is None: name = '{}_{}_bundle.tar'.format(pkg.name, pkg.long_version) output = join('..', name) log('Preparing bundle for {} ({})...'.format(pkg.name, pkg.version)) if not pkg.native: pkg.assert_orig_tarball() pkg.build_docker_tarball(output, { 'flags': flags, 'step': step, 'image': image }) log("Bundle created in '{}'.".format(output)) if VERBOSITY > NONE: md5, size = calculate_md5_and_size(output) log('Bundle hash and size: {}, {}.'.format(md5, size), LOW) @cli.command('build-bundle', help = 'Build bundle') @click.argument('bundle') @click.option('-o', '--output', default = '.', metavar = 'DIRECTORY', help = 'output directory') @click.option('--sign', '-s', default = False, is_flag = True, help = 'sign built package with debsign') @click.option('no_cache', '--no-cache', default = False, is_flag = True, help = 'do not use docker image cache') @click.option('--pull', default = False, is_flag = True, help = 'pull the newest base image') def build_bundle(bundle_name, output, sign, no_cache, pull): assert_docker() if sign: assert_command('debsign') image = docker_build_bundle(bundle_name, no_cache = no_cache, pull = pull) log('Build successful (in {})'.format(image)) # extract the build build_tar = tmppath('build.tar') with open(build_tar, 'wb') as f: log_check_call([ 'docker', 'run', '--rm=true', image, '/root/steps/build-tar' ], stdout = f) log("Build tar stored in '{}'".format(build_tar)) tar_list = log_check_output([ 'tar', 'tf', build_tar ]) tar_files = tar_list.decode('utf-8').split() log("Build files: {}".format(' '.join(tar_files)), LOW) log_check_call([ 'tar', 'xf', build_tar, '-C', output ]) log("Build files stored in '{}'.".format(output)) if sign: # TODO: needs devscripts, and hence will not work outside Debian # pylint: disable=fixme # we probably have to copy/fork debsign, cause signing within # the container is not a good idea security-wise changes = [ fn for fn in tar_files if fn.endswith('.changes') ] if len(changes) != 1: fail('could not find the changes files') changes_path = join(output, changes[0]) log("Trying to sign '{}'.".format(changes_path), LOW) log_check_call([ 'debsign', changes_path ]) @cli.command(help = 'Build package') @click.argument('path', default = '.') @click.option('-o', '--output', default = '..', metavar = 'DIRECTORY', help = 'output directory') @click.option('--sign', '-s', default = False, is_flag = True, help = 'sign built package with debsign') @click.option('-f', '--flags', default = '', metavar = 'FLAGS', help = 'build flags') @click.option('no_cache', '--no-cache', default = False, is_flag = True, help = 'do not use docker image cache') @click.option('step', '--from', default = 'end', help = 'start from the given step', type = click.Choice(STEPS.keys())) @click.option('--image', default = 'debian:unstable', metavar = 'IMAGE', help = 'base docker image') @click.option('--pull', default = False, is_flag = True, help = 'pull the newest base image') @click.pass_context def build(ctx, path, output, sign, flags, no_cache, step, image, pull): assert_docker() if sign: assert_command('debsign') tarball_path = tmppath('bundle.tar') ctx.invoke(bundle, path = path, output = tarball_path, flags = flags, step = step, image = image) ctx.invoke(build_bundle, bundle = tarball_path, sign = sign, output = output, no_cache = no_cache, pull = pull) if __name__ == '__main__': cli.main()