Skip to content
Snippets Groups Projects
Select Git revision
  • 4fbc4112a9670b1eb365bc73882a272d3ec6e9f8
  • master default
  • 20220525.0
  • 20170913.1
  • 20170913.0
5 results

main.py

Blame
  • main.py 13.45 KiB
    # main.py
    #
    # Copyright 2017 Collabora Limited
    # Copyright 2017 Andrej Shadura <andrew.shadura@collabora.co.uk>
    #
    # This program is free software; you can redistribute it and/or modify it
    # under the terms of the GNU General Public License as published by the
    # Free Software Foundation; either version 2 of the License, or (at your
    # option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
    # Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with this program; if not, write to the Free Software Foundation, Inc.,
    # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
    #
    # SPDX-License-Identifier: GPL-2.0+
    
    from __future__ import print_function
    
    import os
    import re
    
    from future.builtins import zip
    
    try:
        from urllib.error import HTTPError
        from urllib.parse import quote_plus
    except ImportError:
        from urllib import quote_plus
    
        from urllib2 import HTTPError
    
    try:
        from M2Crypto.SSL import SSLError as SSLError
    except:
        SSLError = None
    
    try:
        from xml.etree import cElementTree as ET
    except ImportError:
        import cElementTree as ET
    
    from .progress import print_progress, print_status, progress, wrap_output
    
    try:
        # python 2.6 and python 2.7
        unicode
        ET_ENCODING = "utf-8"
        # python 2.6 does not have bytes and python 2.7 reimplements it as alias to
        # str, but in incompatible way as it does not accept the same arguments
        bytes = lambda x, *args: x
    except:
        # python3 does not have unicode, so lets reimplement it
        # as void function as it already gets unicode strings
        unicode = lambda x, *args: x
        ET_ENCODING = "unicode"
    
    from osc import cmdln, conf, core, oscerr
    from osc.util.helper import decode_list
    
    
    def create_project(apiurl, project, repos=[]):
        """
        Create a new project with sane minimal defaults.
        """
    
        tree = ET.Element('project', name=project)
        ET.SubElement(tree, 'title')
        ET.SubElement(tree, 'description')
        ET.SubElement(tree, 'person', role='maintainer', userid=conf.get_apiurl_usr(apiurl))
        ET.SubElement(tree, 'person', role='bugowner', userid=conf.get_apiurl_usr(apiurl))
        ET.SubElement(ET.SubElement(tree, 'publish'), 'disable')
        ET.SubElement(ET.SubElement(tree, 'build'), 'disable')
    
        repo_tags = {}
        for r in repos:
            if r.name not in repo_tags:
                repo_tags[r.name] = ET.SubElement(tree, 'repository', name=r.name)
            ET.SubElement(repo_tags[r.name], 'arch').text = r.arch
    
        url = core.make_meta_url('prj',
                                 path_args = quote_plus(project),
                                 apiurl = apiurl,
                                 msg = "Create %s" % project)
        mf = core.metafile(url, ET.tostring(tree, encoding=ET_ENCODING))
        with wrap_output():
            mf.sync()
    
    def mangle_metadata(metadata, src_project, dst_project, keep_metadata=False):
        if not hasattr(metadata, "splitlines"):
            metadata = b"".join(metadata)
    
        project = ET.fromstring(metadata)
        project.set('name', dst_project)
    
        if not keep_metadata:
            src_path = src_project.split(':')
            src_release, src_component = src_path[-2:]
            dst_path = dst_project.split(':')
            dst_release, dst_component = dst_path[-2:]
    
            for repo in project.findall('repository'):
                if repo.get('name') == src_release:
                    repo.set('name', dst_release)
                for path in repo.findall('path'):
                    prj_path = path.get('project').split(':')
                    # mangle only distro:release:* repositories
                    # FIXME: rewrite to not require confusing slices (rsplit?)
                    if src_path[:-1] == prj_path[:-1]:
                        # replace distro:release: with path:to:the:new_distro:new_release:
                        path.set('project', ':'.join(dst_path[:-1] + prj_path[-1:]))
                        if path.get('repository') == src_release:
                            path.set('repository', dst_release)
    
            for e in project.findall('build/*') + project.findall('publish/*'):
                if e.get('repository') == src_release:
                    e.set('repository', dst_release)
    
        return ET.tostring(project)
    
    def increment_build_suffix(build_suffix):
        """
        Find a suitable build suffix for the next release.
    
        If there's a build counter, increment the letter right before it.
        If the character before it is not a letter, or if it is a 'z' already,
        insert one more letter before the build counter.
    
        >>> increment_build_suffix('bu')
        'bu'
        >>> increment_build_suffix('bu<B_CNT>')
        'bv<B_CNT>'
        >>> increment_build_suffix('by<B_CNT>')
        'bz<B_CNT>'
        >>> increment_build_suffix('bz<B_CNT>')
        'bza<B_CNT>'
        >>> increment_build_suffix('%%{?release_prefix}.<CI_CNT>.<B_CNT>')
        '%%{?release_prefix}.<CI_CNT>.a<B_CNT>'
        >>> increment_build_suffix('<CI_CNT>.<B_CNT>')
        '<CI_CNT>.a<B_CNT>'
        """
    
        m = re.match(r"(?P<prefix>[a-zA-Z0-9~%{_}<>?.+-]+?)(?P<var>[a-y]?)(?P<b_cnt><B_CNT>)(?P<rest>.*)", build_suffix)
        if not m:
            return build_suffix
        kw = m.groupdict()
        if kw['var'] == '':
            kw['var'] = 'a'
        else:
            kw['var'] = chr(ord(kw['var']) + 1)
        return "{prefix}{var}{b_cnt}{rest}".format(**kw)
    
    def mangle_prjconf(prjconf, new_build_suffix=None):
        r"""
        Update the build suffix (Release: line) in the project configuration.
    
        >>> mangle_prjconf([b'Repotype: debian\n', b'type: dsc\n', b'release: bv<B_CNT>\n', b'\n'])
        ['Repotype: debian\n', 'type: dsc\n', 'release: bw<B_CNT>\n', '\n']
        >>> mangle_prjconf(['Repotype: debian\n', 'type: dsc\n', 'release: bv<B_CNT>\n', '\n'])
        ['Repotype: debian\n', 'type: dsc\n', 'release: bw<B_CNT>\n', '\n']
        >>> mangle_prjconf('Repotype: debian\ntype: dsc\nrelease: bv<B_CNT>\n\n')
        ['Repotype: debian\n', 'type: dsc\n', 'release: bw<B_CNT>\n', '\n']
        """
        def process_line(line):
            m = re.match(r"(?P<key>release):\s*(?P<value>\S+)", line.strip(), re.IGNORECASE)
            if m:
                kw = m.groupdict()
                kw['value'] = new_build_suffix or increment_build_suffix(kw['value'])
                line = "{key}: {value}\n".format(**kw)
            return line
    
        if hasattr(prjconf, "splitlines"):
            prjconf = prjconf.splitlines(True)
        return [process_line(line) for line in decode_list(prjconf)]
    
    def copy_project_meta(apiurl, src_project, dst_project, keep_metadata=False, build_suffix=None):
        """
        Copy project configuration and metadata
    
        First update the project configuration, as it is less likely to fail.
        Then proceed with the metadata; that can fail if referenced projects
        don't exist yet.
        """
        print_status("Updating the project configuration...")
        prjconf = core.show_project_conf(apiurl, src_project)
        prjconf = mangle_prjconf(prjconf, new_build_suffix=build_suffix)
        url = core.make_meta_url('prjconf',
                                 path_args = quote_plus(dst_project),
                                 apiurl = apiurl,
                                 msg = "Update prjconf for %s" % dst_project)
        mf = core.metafile(url, ''.join(prjconf))
        with wrap_output():
            mf.sync()
    
        print_status("Updating the project metadata...")
        metadata = core.show_project_meta(apiurl, src_project)
        metadata = mangle_metadata(metadata, src_project, dst_project, keep_metadata=keep_metadata)
        url = core.make_meta_url('prj',
                                 path_args = quote_plus(dst_project),
                                 apiurl = apiurl,
                                 msg = "Update metadata for %s" % dst_project)
        mf = core.metafile(url, metadata)
        try:
            with wrap_output():
                mf.sync()
        except HTTPError as e:
            if e.hdrs.get('X-Opensuse-Errorcode') == 'unknown_project':
                e.osc_msg = "Most probably, this means some of the projects this project refers to, do not yet exist."
            raise
    
    def do_clone(self, subcmd, opts, src_project, dst_project):
        apiurl = conf.config['apiurl']
        src_release, src_component = src_project.split(':')[-2:]
        dst_release, dst_component = dst_project.split(':')[-2:]
        try:
            src_package_list = core.meta_get_packagelist(apiurl, src_project)
        except HTTPError as e:
            if e.code == 404:
                raise oscerr.WrongOptions("Source project %s not found" % src_project)
            raise
        try:
            dst_package_list = core.meta_get_packagelist(apiurl, dst_project)
        except HTTPError as e:
            if e.code != 404:
                raise
            else:
                print_status("Destination project does not yet exist, creating...")
                create_project(apiurl, dst_project)
                dst_package_list = []
    
        failed_packages = set()
        succeeded_packages = set()
        todo = set(src_package_list) - set(dst_package_list)
        while set(src_package_list) - succeeded_packages:
            for package in progress(todo, desc="Copying packages..."):
                print_progress("Copying %s", "%s/%s" % (src_component, package))
                try:
                    with wrap_output():
                        core.copy_pac(apiurl, src_project, package,
                                      apiurl, dst_project, package,
                                      keep_maintainers = True,
                                      expand = True,
                                      comment = "Branch from %s to %s" % (src_release, dst_release))
                except (oscerr.APIError, HTTPError, SSLError) as e:
                    failed_packages.add(package)
                    print_status("Failed to copy %s, will retry later" % package)
                    pass
    
            if opts.no_verify:
                verification_list = []
            else:
                verification_list = sorted(set(dst_package_list) - succeeded_packages - failed_packages)
            for package in progress(verification_list, desc="Checking already copied packages..."):
                print_progress("Verifying %s", package)
                src_package_files = [f for f in
                                        sorted(core.meta_get_filelist(apiurl,
                                                                      src_project,
                                                                      package,
                                                                      expand=True,
                                                                      verbose=True),
                                               key=lambda f: f.name)
                                     if f.name != '_link']
                dst_package_files = [f for f in
                                        sorted(core.meta_get_filelist(apiurl,
                                                                      dst_project,
                                                                      package,
                                                                      expand=True,
                                                                      verbose=True),
                                               key=lambda f: f.name)
                                     if f.name != '_link']
    
                if len(src_package_files) != len(dst_package_files):
                    print_status("Package %s not copied fully, will retry later" % package)
                    failed_packages.add(package)
                    continue
    
                for src_file, dst_file in zip(src_package_files, dst_package_files):
                     if src_file.name != dst_file.name or src_file.md5 != dst_file.md5:
                        print_status("Package %s not copied fully, will retry later" % package)
                        failed_packages.add(package)
                        break
                else:
                    succeeded_packages.add(package)
    
            todo = failed_packages
            failed_packages = set()
            if todo:
                print_status("Retrying previously failed packages...")
            elif opts.no_verify:
                break
    
        if not opts.copy_only:
            copy_project_meta(apiurl, src_project, dst_project, keep_metadata=opts.keep_metadata)
    
    def do_fork(self, subcmd, opts, src_distro, dst_distro):
        apiurl = conf.config['apiurl']
        projects = core.meta_get_project_list(apiurl)
        src_distro_projects = [p for p in projects if p.startswith(src_distro + ':')]
        dst_distro_projects = [dst_distro + ':' + p.split(':')[-1] for p in src_distro_projects]
        if src_distro in projects:
            msg = "%s is a project" % src_distro
            if not src_distro_projects:
                msg += ", but no other related projects found"
    
            msg += "\n"
            msg += "Abort."
    
            raise oscerr.WrongArgs(msg)
    
        src_release = src_distro.split(':')[-1]
        dst_release = dst_distro.split(':')[-1]
    
        print_status("Pre-creating projects and repositories")
        for src_prj, dst_prj in zip(progress(src_distro_projects, desc="Creating projects...", unit="project"), dst_distro_projects):
            print_progress("Creating %s", dst_prj)
            repos = list(core.get_repos_of_project(apiurl, src_prj))
            if not opts.keep_metadata:
                for r in repos:
                    if r.name == src_release:
                        r.name = dst_release
    
            create_project(apiurl, dst_prj, repos)
    
        opts.copy_only = True
    
        print_status("Copying projects")
        for src_prj, dst_prj in zip(src_distro_projects, dst_distro_projects):
            do_clone(self, subcmd, opts, src_prj, dst_prj)
    
        print_status("Copying metadata")
        for src_prj, dst_prj in zip(progress(src_distro_projects, desc="Copying metadata...", unit="project"), dst_distro_projects):
            copy_project_meta(apiurl, src_prj, dst_prj, keep_metadata=opts.keep_metadata)
    
        pass