#!/usr/bin/env python3 ######################################################################## # This file is part of devuan-releasebot. # # Copyright: Franco (nextime) Lanza (c) 2015 # Mark (LeePen) Hindley (c) 2020 # # devuan-releasebot is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # devuan-releasebot is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Foobar. If not, see . ########################################################################## import abc import configparser import functools import typing import attr # We use type annotations, and attrs implements the helper type keyword # for compatibility with python 3.5 in version 17.3.0. # We need to monkey patch things. if tuple(int(i) for i in attr.__version__.split('.')) < (17, 3, 0): if getattr(attr, 'original_ib', None) is None: attr.original_ib = attr.ib def our_attrib(type=None, *args, **kwargs): # factory is also new in 18.1.0 factory = kwargs.pop('factory', None) if factory is not None: kwargs['default']=attr.Factory(factory) return attr.original_ib(*args, **kwargs) attr.ib = our_attrib def _list_helper( config, section: str, item: str, separator: str = "," ) -> typing.Iterable[str]: """ Helper for comma-separated lists in .ini configurations """ return [s.strip() for s in config.get(section, item).split(separator)] @attr.s class ReleaseBotCommand(object): """ Data container for a ReleaseBot command. Which commands are supported is determined by the cmd_COMMAND methods in the ReleaseBot class. """ name = attr.ib(type=str) package = attr.ib(type=str) labels = attr.ib(type=typing.Iterable[str]) @attr.s class ReleaseBotRequester(object): """ Data container for the requester and their permissions. is_admin provides access to building anything. permissions can be one of: 'member' or 'collaborator/PACKAGE' 'member' provides access to building anything but restricted suites and prefixes 'collaborator/PACKAGE' provides access to building PACKAGE This list of permissions is not guaranteed to be exhaustive, but should be enough to make decisions about the build. """ name = attr.ib(type=str) is_admin = attr.ib(type=bool, default=False) permissions = attr.ib(factory=list, type=typing.Iterable[str]) @attr.s class ReleaseBotTask(object): """ Data container for all necessary data to carry out a ReleaseBot task. """ id = attr.ib(type=str) repo_store = attr.ib(repr=False, type="GenericRepoStore") command = attr.ib(type=ReleaseBotCommand) requester = attr.ib(type=ReleaseBotRequester) def notify(self, message: str, resolve: bool = False) -> type(None): """ Provide some progress notification regarding this task. Tasks will only be resolved if this is not a dryrun. """ self.repo_store.notify_task(self, message, resolve) def resolve_notify(self, message: str) -> type(None): """ Resolve a task with a notification. Tasks will only be resolved if this is not a dryrun. """ self.repo_store.notify_task(self, message, resolve=True) class GenericRepoStore(abc.ABC): """ Basic interface to abstract away the repository store (Gitea, GitLab, ...). If we ever need a different subsystem, this would hopefully stay the same. """ def __init__(self, config, args): self.config = config self.args = args self._host = self.config.get("git", "host") self._token = self.config.get("git", "token") self.namespace = self.config.get("git", "namespace", fallback="devuan").lower() self.username = self.config.get( "git", "username", fallback="releasebot" ).lower() @abc.abstractmethod def get_pending_tasks(self) -> typing.Iterable[ReleaseBotTask]: """ Get assigned issues/jobs/builds to the configured user in the namespace. Underlying implementations should provide all information needed to make a decision about whether or not and how such a request is to be processed. """ pass def notify_task(self, task: ReleaseBotTask, message: str, resolve: bool): """ Provide some progress notification regarding a task and optionally resolve it. Tasks will only be resolved if this is not a dryrun. """ if self.args.debug: print("[DEBUG] ", message, task) if not self.args.dryrun: self.do_notify_task(task, message, resolve) @abc.abstractmethod def do_notify_task(self, task: ReleaseBotTask, message: str, resolve: bool): """ Actually perform the notification. To be overridden in each implementation. This is never called if this is a dryrun. """ pass class GiteaRepoStore(GenericRepoStore): """ Gitea specific bits. """ def __init__(self, config, args): super().__init__(config, args) # These could be overridden in the config, but we have sane defaults self.gitea_org = self.config.get("git", "gitea_org", fallback=self.namespace) self.gitea_team = self.config.get("git", "gitea_team", fallback="packages") import requests try: # python-requests 3 self._http = requests.HTTPSession() except AttributeError: # python-requests 2 self._http = requests.Session() # This takes care of authentication against gitea self._http.headers.update( { "Authorization": "token {}".format(self._token), "Accept": "application/json", "Content-Type": "application/json", } ) def get_pending_tasks(self) -> typing.Iterable[ReleaseBotTask]: issues = self._get_build_issues() for issue in issues: # Basic data command = issue["title"].strip().lower() package = issue["repository"]["name"] labels = set([l["name"] for l in issue["labels"]]) repo_owner = issue["repository"]["owner"].lower() issue_assignee = issue["assignee"]["username"].lower() task_id = "{}/{}/issues/{}".format(self.namespace, package, issue["number"]) if self.args.debug: print("[DEBUG] Considering {}: {}.".format(task_id, command)) if repo_owner != self.namespace or issue_assignee != self.username: # Silently skip if outside of the given namespace / user if self.args.debug: print( "[DEBUG] Skipping {}/{}/{}".format( repo_owner, package, issue["number"] ) ) continue # TODO: refuse reopened issues? # does gitea provide this information in the API? # Setup the ReleaseBotCommand releasebot_command = ReleaseBotCommand( name=command, package=package, labels=labels ) # Setup the ReleaseBotRequester with their permissions requester = ReleaseBotRequester( name=issue["user"]["username"], is_admin=issue["user"].get("is_admin", False), ) if self.is_packages_member(requester.name): requester.permissions.append("member") if self.is_package_collaborator(requester.name, package): requester.permissions.append("collaborator/{}".format(package)) task = ReleaseBotTask( id=task_id, repo_store=self, command=releasebot_command, requester=requester, ) yield task def do_notify_task(self, task: ReleaseBotTask, message: str, resolve: bool): url = "/repos/{}/comments".format(task.id) self._request("post", url, json=dict(body=message)) if resolve: url = "/repos/{}".format(task.id) self._request("patch", url, json=dict(state="closed")) def _get_build_issues(self): """ Overridable to facilitate testing. """ return self._request( "get", "/repos/issues/search", json={"state": "open", "q": "build"} ).json() @property @functools.lru_cache() def packages_team_id(self) -> int: """ We need this id internally for Gitea's API, it's cached per run. """ teams = self._request( "get", "/orgs/{}/teams/search".format(self.gitea_org), params={"q": self.gitea_team, "include_desc": False}, ).json()["data"] if len(teams) != 1: raise RuntimeError("Failed to identify packages team id") return teams[0]["id"] @functools.lru_cache() def is_packages_member(self, username: str) -> bool: """ This is critical for permission management. It is cached for each user for this run for consistency. """ response = self._request( "get", "/teams/{id}/members/{username}".format( id=self.packages_team_id, username=username ), ) return response.ok @functools.lru_cache() def is_package_collaborator(self, username, package) -> bool: """ This is critical for permission management. It is cached for each user/package combination for this run. """ response = self._request( "get", "/repos/{owner}/{repo}/collaborators/{username}".format( owner=self.gitea_org, repo=package, username=username ), ) return response.ok def _request( self, method: str, url: str, params: dict = dict(), json: dict = dict() ): response = self._http.request( method, "{}/api/v1{}".format(self._host, url), params=params, json=json ) if response.status_code >= 400 and response.status_code != 404: raise RuntimeError( "Gitea API request {} failed with {}".format(url, response.status_code) ) return response class GenericJobStore(abc.ABC): """ Basic interface to abstract away the job store (Jenkins, buildbot, ...). If we ever need a different subsystem, this would hopefully stay the same. """ def __init__(self, config, args): self.config = config self.args = args def queue_build(self, suite: str, package: str) -> str: """ Queue a build for 'package' for 'suite' if not in dryrun mode. Return the URL for the build or, if in dryrun mode, an empty string. """ url = "" if self.args.debug: print('[DEBUG] Queueing "{}" build for "{}"'.format(package, suite)) if not self.args.dryrun: url = self.do_queue_build(suite, package) return url @abc.abstractmethod def do_queue_build(self, suite: str, package: str) -> str: """ Actually queue the build. To be overridden in each implementation. This is never called if this is a dryrun. """ pass class JenkinsJobStore(GenericJobStore): """ Jenkins specific bits. """ def __init__(self, config, args): super().__init__(config, args) self.jkjob_name = self.config.get("jenkins", "build_job") self._connect_jenkins() def _connect_jenkins(self): """ Connect to jenkins and perform some sanity checks. This is a separate method to facilitate testing of the rest. """ import jenkins self.jk = jenkins.Jenkins( self.config.get("jenkins", "host"), self.config.get("jenkins", "username"), self.config.get("jenkins", "password"), ) # Check job (sanity) if not self.jk.job_exists(self.jkjob_name): raise RuntimeError("Build job not found on jenkins") if self.args.debug: print( "[DEBUG] Connected to Jenkins and checked build_job ", self.jkjob_name ) def do_queue_build(self, suite: str, package: str) -> str: """ Queue build if not in dryrun mode. Return the URL for the build or, if in dryrun mode, an empty string. """ job_info = self.jk.get_job_info(self.jkjob_name) self.jk.build_job(self.jkjob_name, dict(codename=suite, srcpackage=package)) return "{}{}".format(job_info["url"], job_info["nextBuildNumber"]) class ReleaseBot(object): def __init__( self, config, args, job_store: GenericJobStore, repo_store: GenericRepoStore ): self.config = config self.args = args self.job_store = job_store self.repo_store = repo_store self.suites = _list_helper(config, "filters", "suites") def process(self): for task in self.repo_store.get_pending_tasks(): yield self._process_task(task) def _process_task(self, task: ReleaseBotTask) -> ReleaseBotTask: # 0. Check command is valid cmd = getattr(self, "cmd_{}".format(task.command.name), None) if cmd is not None: # 1. check permissions if self.may_run_task(task): # 2. Process command if cmd(task): # 3. notify as necessary task.resolve_notify("Finished queueing") else: task.resolve_notify("You don't have permissions to run this task") else: task.resolve_notify( 'Command not supported: "{}". ' "Available commands are: {}.".format( task.command.name, ", ".join(self.available_commands) ) ) return task @property @functools.lru_cache() def available_commands(self) -> typing.Iterable[str]: """ Return an exhaustive list of all supported commands in this class. A command X is supported if a method is defined as follows: cmd_X(self, task: ReleaseBotTask): """ return [x[4:] for x in dir(self) if x.startswith("cmd_")] @property @functools.lru_cache() def restricted_suites(self) -> typing.Iterable[str]: """ Generate an exhaustive list of restricted suites. Use the cartesian product of restricted_suites and restricted_suffixes. """ import itertools filters = _list_helper(self.config, "filters", "restricted_suites") suffixes = _list_helper(self.config, "filters", "restricted_suffixes") return ["-".join(i) for i in itertools.product(filters, suffixes)] def cmd_build(self, task: ReleaseBotTask): if not task.command.labels: task.resolve_notify("You must assign some labels") return False for suite in sorted(task.command.labels): if suite not in self.suites + self.restricted_suites: task.notify( 'Ignoring unrecognised tag or disallowed suite "{}"'.format(suite) ) continue url = self.job_store.queue_build(suite, task.command.package) task.notify( 'Triggered "{}" build for "{}": {}'.format( task.command.package, suite, url ) ) return True def may_run_task(self, task: ReleaseBotTask) -> bool: """ Decide whether or not a task may be run. This is returns false if any suite does not match the requester's permissions. See ReleaseBotRequester for the verbose description of the permissions. """ # Admins may run tasks if task.requester.is_admin: return True # If there is at least one restricted suite, we refuse the whole task restricted_build = any( suite in self.restricted_suites for suite in task.command.labels ) # Restricted tasks are only for admins if restricted_build: return False # Package members may run unrestricted tasks if "member" in task.requester.permissions: return True # Collaborators may run unrestricted tasks on this package if "collaborator/{}".format(task.command.package) in task.requester.permissions: return True # Default to deny return False def get_argparser(): """ Get ReleaseBot's argument parser. Any new parameters that are implemented should be added here. """ import argparse parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", type=argparse.FileType("r"), required=True) parser.add_argument("--debug", action="store_true") parser.add_argument("--dryrun", action="store_true") return parser def main(config, args): """ Perform a ReleaseBot run """ repo_store = GiteaRepoStore(config, args) job_store = JenkinsJobStore(config, args) releasebot = ReleaseBot( config=config, args=args, repo_store=repo_store, job_store=job_store ) print(list(releasebot.process())) if __name__ == "__main__": args = get_argparser().parse_args() if args.config.name != "" and not isinstance(args.config.name, int): import os # Ensure the config file with secrets is mode 0600 or 0400 # This check is skipped if config is passed via stdin or file descriptor if os.stat(args.config.name).st_mode & 0o177 != 0: import sys sys.stderr.write( "Permissions are too broad ({})\n".format(args.config.name) ) sys.exit(1) config = configparser.ConfigParser() config.read_file(args.config) main(config, args)