From fe984c696c69e41c37ac1806f60e4a8c6b248296 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Mon, 3 Nov 2025 21:23:17 +0200 Subject: [PATCH] runtime: Add Pull-Only labs runtime implementation New PULL_LABS runtime for KernelCI that implements the Job Definition creation according to the PULL_LABS protocol specification. New files: kernelci/runtime/pull_labs.py - Main runtime implementation config/runtime/base/pull_labs.jinja2 - JSON job definition template kernelci/config/runtime.py - Added RuntimePullLabs configuration class Properties: poll_interval, timeout, storage_name Registered in RuntimeFactory._lab_types as 'pull_labs' Signed-off-by: Denys Fedoryshchenko --- config/runtime/base/pull_labs.jinja2 | 90 ++++++ kernelci/config/runtime.py | 134 +++++--- kernelci/runtime/pull_labs.py | 440 +++++++++++++++++++++++++++ 3 files changed, 619 insertions(+), 45 deletions(-) create mode 100644 config/runtime/base/pull_labs.jinja2 create mode 100644 kernelci/runtime/pull_labs.py diff --git a/config/runtime/base/pull_labs.jinja2 b/config/runtime/base/pull_labs.jinja2 new file mode 100644 index 0000000000..b33b24f7fd --- /dev/null +++ b/config/runtime/base/pull_labs.jinja2 @@ -0,0 +1,90 @@ +{# SPDX-License-Identifier: LGPL-2.1-or-later -#} +{# PULL_LABS job definition template - generates JSON format #} +{# This template creates a job definition according to PULL_LABS protocol Section 3 #} +{ + "artifacts": { +{%- if node.artifacts.kernel %} + "kernel": "{{ node.artifacts.kernel }}" +{%- endif %} +{%- if node.artifacts.modules %}, + "modules": "{{ node.artifacts.modules }}" +{%- endif %} +{%- if node.artifacts.kselftest %}, + "kselftest": "{{ node.artifacts.kselftest }}" +{%- endif %} +{%- if node.artifacts.rootfs %}, + "rootfs_type": "{{ node.artifacts.rootfs }}" +{%- endif %} +{%- if node.artifacts.initrd %}, + "initrd": "{{ node.artifacts.initrd }}" +{%- endif %} +{%- if node.artifacts.dtb %}, + "dtb": "{{ node.artifacts.dtb }}" +{%- endif %} + }, + "tests": [ +{%- if test_definitions %} +{%- for test in test_definitions %} + { + "id": "{{ test.id }}", + "type": "{{ test.type }}", + "depends": {{ test.depends | tojson }}, + "timeout_s": {{ test.timeout_s | default(600) }} +{%- if test.subset %}, + "subset": {{ test.subset | tojson }} +{%- endif %} +{%- if test.parameters %}, + "parameters": "{{ test.parameters }}" +{%- endif %} +{%- if test.pre_commands %}, + "pre-commands": {{ test.pre_commands | tojson }} +{%- else %}, + "pre-commands": [] +{%- endif %} +{%- if test.post_commands %}, + "post-commands": {{ test.post_commands | tojson }} +{%- else %}, + "post-commands": [] +{%- endif %} + }{{ "," if not loop.last else "" }} +{%- endfor %} +{%- else %} + { + "id": "boot-test-{{ node.id[:8] }}", + "type": "boot", + "depends": [], + "timeout_s": 600, + "pre-commands": [], + "post-commands": [] + } +{%- endif %} + ], + "environment": { + "platform": "{{ platform_config.name }}", + "arch": "{{ platform_config.arch }}", + "console": { + "method": "{{ platform_config.console_method | default('serial') }}", + "baud": {{ platform_config.console_baud | default(115200) }} + } +{%- if platform_config.params and platform_config.params.requirements %} + ,"requirements": {{ platform_config.params.requirements | tojson }} +{%- endif %} + }, + "callback": { + "url": "{{ instance_callback }}/node/{{ node.id }}", + "token_name": "{{ callback_token_name | default('kernelci-pipeline-callback') }}" + } +{%- if node.artifacts.metadata %} + ,"integrity": { + "sha256": { +{%- if checksums %} +{%- for artifact_name, checksum in checksums.items() %} + "{{ artifact_name }}": "{{ checksum }}"{{ "," if not loop.last else "" }} +{%- endfor %} +{%- else %} + "_comment": "Checksums would be populated from metadata.json if available" +{%- endif %} + } + } +{%- endif %} +} diff --git a/kernelci/config/runtime.py b/kernelci/config/runtime.py index d59dd3115b..0c2650cf0e 100644 --- a/kernelci/config/runtime.py +++ b/kernelci/config/runtime.py @@ -2,6 +2,9 @@ # # Copyright (C) 2019, 2021-2023 Collabora Limited # Author: Guillaume Tucker +# +# Copyright (C) 2025 Collabora Limited +# Author: Denys Fedoryshchenko """KernelCI Runtime environment configuration""" @@ -11,7 +14,7 @@ class Runtime(YAMLConfigObject): """Runtime environment configuration""" - yaml_tag = '!Runtime' + yaml_tag = "!Runtime" def __init__(self, name, lab_type, filters=None, rules=None): """A runtime environment configuration object @@ -44,25 +47,24 @@ def rules(self): @classmethod def _get_yaml_attributes(cls): attrs = super()._get_yaml_attributes() - attrs.update({'lab_type', 'rules'}) + attrs.update({"lab_type", "rules"}) return attrs @classmethod def _to_yaml_dict(cls, data): - yaml_dict = { - key: getattr(data, key) - for key in cls._get_yaml_attributes() - } - yaml_dict.update({ - # pylint: disable=protected-access - 'filters': [{fil.name: fil} for fil in data._filters], - }) + yaml_dict = {key: getattr(data, key) for key in cls._get_yaml_attributes()} + yaml_dict.update( + { + # pylint: disable=protected-access + "filters": [{fil.name: fil} for fil in data._filters], + } + ) return yaml_dict @classmethod def to_yaml(cls, dumper, data): return dumper.represent_mapping( - 'tag:yaml.org,2002:map', cls._to_yaml_dict(data) + "tag:yaml.org,2002:map", cls._to_yaml_dict(data) ) def match(self, data): @@ -73,19 +75,26 @@ def match(self, data): class RuntimeLAVA(Runtime): """Configuration for LAVA runtime environments""" - yaml_tag = '!RuntimeLAVA' + yaml_tag = "!RuntimeLAVA" PRIORITIES = { - 'low': 0, - 'medium': 50, - 'high': 100, + "low": 0, + "medium": 50, + "high": 100, } # This should be solved by dropping the "priority" attribute # pylint: disable=too-many-arguments - def __init__(self, url=None, priority=None, priority_min=None, - priority_max=None, queue_timeout=None, notify=None, - **kwargs): + def __init__( + self, + url=None, + priority=None, + priority_min=None, + priority_max=None, + queue_timeout=None, + notify=None, + **kwargs, + ): super().__init__(**kwargs) def _set_priority_value(value, default): @@ -135,24 +144,25 @@ def notify(self): @classmethod def _get_yaml_attributes(cls): attrs = super()._get_yaml_attributes() - attrs.update({ - 'priority', - 'priority_min', - 'priority_max', - 'queue_timeout', - 'url', - 'notify', - }) + attrs.update( + { + "priority", + "priority_min", + "priority_max", + "queue_timeout", + "url", + "notify", + } + ) return attrs class RuntimeDocker(Runtime): """Configuration for Docker runtime environments""" - yaml_tag = '!RuntimeDocker' + yaml_tag = "!RuntimeDocker" - def __init__(self, env_file=None, volumes=None, user=None, timeout=None, - **kwargs): + def __init__(self, env_file=None, volumes=None, user=None, timeout=None, **kwargs): super().__init__(**kwargs) self._env_file = env_file self._volumes = volumes or [] @@ -182,14 +192,14 @@ def timeout(self): @classmethod def _get_yaml_attributes(cls): attrs = super()._get_yaml_attributes() - attrs.update({'env_file', 'volumes', 'user', 'timeout'}) + attrs.update({"env_file", "volumes", "user", "timeout"}) return attrs class RuntimeKubernetes(Runtime): """Configuration for Kubernetes runtime environments""" - yaml_tag = '!RuntimeKubernetes' + yaml_tag = "!RuntimeKubernetes" def __init__(self, context=None, **kwargs): super().__init__(**kwargs) @@ -203,7 +213,40 @@ def context(self): @classmethod def _get_yaml_attributes(cls): attrs = super()._get_yaml_attributes() - attrs.update({'context'}) + attrs.update({"context"}) + return attrs + + +class RuntimePullLabs(Runtime): + """Configuration for PULL_LABS runtime environments""" + + yaml_tag = "!RuntimePullLabs" + + def __init__(self, poll_interval=None, timeout=None, storage_name=None, **kwargs): + super().__init__(**kwargs) + self._poll_interval = poll_interval or 30 + self._timeout = timeout or 3600 + self._storage_name = storage_name + + @property + def poll_interval(self): + """Polling interval for events in seconds""" + return self._poll_interval + + @property + def timeout(self): + """Job timeout in seconds""" + return self._timeout + + @property + def storage_name(self): + """Storage configuration name for job definitions""" + return self._storage_name + + @classmethod + def _get_yaml_attributes(cls): + attrs = super()._get_yaml_attributes() + attrs.update({"poll_interval", "timeout", "storage_name"}) return attrs @@ -211,22 +254,23 @@ class RuntimeFactory: # pylint: disable=too-few-public-methods """Factory to create lab objects from YAML data.""" _lab_types = { - 'docker': RuntimeDocker, - 'kubernetes': RuntimeKubernetes, - 'lava': RuntimeLAVA, - 'legacy.lava_xmlrpc': RuntimeLAVA, - 'legacy.lava_rest': RuntimeLAVA, - 'shell': Runtime, + "docker": RuntimeDocker, + "kubernetes": RuntimeKubernetes, + "lava": RuntimeLAVA, + "legacy.lava_xmlrpc": RuntimeLAVA, + "legacy.lava_rest": RuntimeLAVA, + "pull_labs": RuntimePullLabs, + "shell": Runtime, } @classmethod def from_yaml(cls, name, config, default_filters): """Load the configuration from YAML data""" - lab_type = config.get('lab_type') + lab_type = config.get("lab_type") kwargs = { - 'name': name, - 'lab_type': lab_type, - 'filters': FilterFactory.from_data(config, default_filters), + "name": name, + "lab_type": lab_type, + "filters": FilterFactory.from_data(config, default_filters), } lab_cls = cls._lab_types[lab_type] if lab_type else Runtime return lab_cls.load_from_yaml(config, **kwargs) @@ -234,12 +278,12 @@ def from_yaml(cls, name, config, default_filters): def from_yaml(data, filters): """Load the runtime environment from YAML based on its type""" - runtimes_filters = filters.get('runtimes') + runtimes_filters = filters.get("runtimes") runtimes = { name: RuntimeFactory.from_yaml(name, runtime, runtimes_filters) - for name, runtime in data.get('runtimes', {}).items() + for name, runtime in data.get("runtimes", {}).items() } return { - 'runtimes': runtimes, + "runtimes": runtimes, } diff --git a/kernelci/runtime/pull_labs.py b/kernelci/runtime/pull_labs.py new file mode 100644 index 0000000000..e1b1148fb6 --- /dev/null +++ b/kernelci/runtime/pull_labs.py @@ -0,0 +1,440 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (C) 2025 Collabora Limited +# Author: Denys Fedoryshchenko + +"""PULL_LABS runtime implementation""" + +import base64 +import gzip +import json +import tempfile +import time +import uuid + +from kernelci.runtime import Runtime, evaluate_test_suite_result + + +class LogParser: + """PULL_LABS log parser + + This class can be used to parse PULL_LABS logs as received in a callback. + It can handle both base64-encoded (optionally gzipped) logs and plain text. + """ + + def __init__(self, log_data): + """Initialize log parser with log data + + *log_data* can be: + - base64-encoded string (with optional "base64:" prefix) + - plain text string + - gzipped and base64-encoded (prefix "base64:" expected) + """ + self._raw_log = self._decode_log(log_data) + + @classmethod + def _decode_log(cls, log_data): + """Decode log data from various formats""" + if not log_data: + return "" + + # Handle base64 encoding + if isinstance(log_data, str) and log_data.startswith("base64:"): + try: + encoded_data = log_data[7:] # Remove 'base64:' prefix + decoded_bytes = base64.b64decode(encoded_data) + # Try to decompress if it's gzipped + try: + decompressed = gzip.decompress(decoded_bytes) + return decompressed.decode("utf-8", errors="replace") + except gzip.BadGzipFile: + # Not gzipped, just decode as UTF-8 + return decoded_bytes.decode("utf-8", errors="replace") + except Exception as exc: # pylint: disable=broad-except + print(f"Warning: Failed to decode base64 log: {exc}") + return log_data + + # Plain text + return log_data + + def get_text(self): + """Get the plain text log as a string""" + return self._raw_log + + def get_text_log(self, output): + """Write the plain text log to output""" + output.write(self._raw_log) + + +class Callback: + """PULL_LABS callback handler + + Parses callback data according to PULL_LABS protocol Section 6 & 7. + """ + + # Test result status mapping from PULL_LABS to KernelCI + STATUS_MAP = { + "ok": "pass", + "pass": "pass", + "fail": "fail", + "skip": "skip", + "error": "incomplete", + } + + def __init__(self, data): + """Initialize callback handler with callback data + + *data* is the parsed JSON callback data from the lab + """ + self._data = data + + def get_data(self): + """Get the raw callback data""" + return self._data + + def get_device_id(self): + """Get the ID of the tested device from metadata""" + metadata = self._data.get("metadata", {}) + return metadata.get("system") + + def get_job_status(self): + """Get overall job status from summary + + Returns 'pass', 'fail', or 'incomplete' based on test results + """ + summary = self._data.get("summary", {}) + total = summary.get("total", 0) + failed = summary.get("failed", 0) + + # If no tests ran, it's incomplete + if total == 0: + return "incomplete" + + # If any test failed, overall status is fail + if failed > 0: + return "fail" + + return "pass" + + def is_infra_error(self): + """Determine whether the job has hit an infrastructure error + + Infrastructure errors are indicated by error_code in metadata + or certain test statuses + """ + # Check for error_code in top-level data or metadata + if "error_code" in self._data: + return True + + metadata = self._data.get("metadata", {}) + if "error_code" in metadata: + return True + + # Check if all tests have 'error' status + tests = self._data.get("tests", {}) + if tests: + all_errors = all( + self._get_test_status(test) == "error" + for test in tests.values() + if isinstance(test, dict) + ) + return all_errors + + return False + + @classmethod + def _get_test_status(cls, test_data): + """Extract status from test data""" + if isinstance(test_data, dict): + return test_data.get("status", "error") + return "error" + + @classmethod + def _convert_status(cls, status): + """Convert PULL_LABS status to KernelCI status""" + return cls.STATUS_MAP.get(status, "incomplete") + + def _parse_test_node(self, name, test_data): + """Parse a single test node into KernelCI format + + Returns a dict with 'node' and 'child_nodes' keys + """ + node = { + "name": name, + "state": "done", + } + + # Extract status + if isinstance(test_data, dict): + status = test_data.get("status", "error") + node["result"] = self._convert_status(status) + + # Extract duration if present + duration_ms = test_data.get("duration_ms") + if duration_ms is not None: + node["data"] = node.get("data", {}) + node["data"]["duration_ms"] = duration_ms + + # Extract metrics if present + metrics = test_data.get("metrics") + if metrics: + node["data"] = node.get("data", {}) + node["data"]["metrics"] = metrics + + # Check for subtests + subtests = test_data.get("subtests", {}) + if subtests: + node["kind"] = "job" + child_nodes = self._parse_tests_hierarchy(subtests) + + # Re-evaluate result based on children if needed + if node["result"] == "pass": + child_result = evaluate_test_suite_result(child_nodes) + if child_result != node["result"]: + node["result"] = child_result + + return {"node": node, "child_nodes": child_nodes} + + # Leaf test node + node["kind"] = "test" + return {"node": node, "child_nodes": []} + + # Simple status string + node["result"] = "incomplete" + node["kind"] = "test" + return {"node": node, "child_nodes": []} + + def _parse_tests_hierarchy(self, tests): + """Parse tests dictionary into hierarchy format""" + hierarchy = [] + for name, test_data in tests.items(): + item = self._parse_test_node(name, test_data) + hierarchy.append(item) + return hierarchy + + def get_results(self): + """Parse the results and return them as a plain dictionary""" + return self._data.get("tests", {}) + + def get_hierarchy(self, results, job_node): # pylint: disable=unused-argument + """Convert the plain results dictionary to a hierarchy for the API + + *results* is the tests dictionary from the callback + *job_node* is the job node data with initial status + """ + job_result = job_node.get("result", self.get_job_status()) + + # Handle infrastructure errors + if self.is_infra_error(): + job_result = "incomplete" + if "data" not in job_node: + job_node["data"] = {} + job_node["data"]["error_code"] = self._data.get( + "error_code", "Infrastructure" + ) + job_node["data"]["error_msg"] = self._data.get( + "error_msg", "Infrastructure error" + ) + + # Parse test hierarchy + tests = self._data.get("tests", {}) + child_nodes = self._parse_tests_hierarchy(tests) + + # Re-evaluate job result based on children + if child_nodes and job_result == "pass": + child_result = evaluate_test_suite_result(child_nodes) + if child_result != job_result: + print( + f"DEBUG: {job_node.get('id')} Transitting job node " + f"result: {job_result} -> {child_result}" + ) + job_result = child_result + + return { + "node": { + "name": job_node["name"], + "result": job_result, + "artifacts": {}, + "data": job_node.get("data", {}), + }, + "child_nodes": child_nodes, + } + + def get_log_parser(self): + """Get a LogParser object from the callback data""" + artifacts = self._data.get("artifacts", {}) + log = artifacts.get("log") + if not log: + return None + return LogParser(log) + + +class PullLabs(Runtime): + """Runtime implementation for PULL_LABS protocol + + PULL_LABS is a pull-based protocol where external hardware labs poll for + events, download job definitions, run tests, and post results via callback. + + This runtime generates JSON job definitions according to the PULL_LABS + protocol specification (Section 3) and handles result callbacks. + """ + + def __init__(self, config, kcictx=None, **kwargs): + super().__init__(config, **kwargs) + self._context = kcictx + self._stored_url = None + + def get_params(self, job, api_config=None): + """Get job template parameters with PULL_LABS-specific additions""" + params = super().get_params(job, api_config) + if params: + params["timeout"] = self.config.timeout + params["poll_interval"] = self.config.poll_interval + return params + + def get_job_definition_url(self): + """Get the URL where the job definition was stored if any""" + return self._stored_url + + def generate(self, job, params): + """Generate PULL_LABS job definition in JSON format + + *job* is a Job object + *params* is the template parameters dictionary + + Returns JSON string of the job definition + """ + template = self._get_template(job.config) + try: + rendered = template.render(params) + # Validate JSON + json.loads(rendered) + return rendered + except json.JSONDecodeError as exc: + print(f"Error: Generated template is not valid JSON: {exc}") + return None + except Exception as exc: # pylint: disable=broad-except + platform_params = ( + params["platform_config"].params + if params.get("platform_config") + else {} + ) + print( + f"Error rendering job template: {exc}, params: {params}, " + f"platform_params: {platform_params}" + ) + return None + + def submit(self, job_path): + """Store job definition in external storage for pull-based labs + + For PULL_LABS, "submit" means: + 1. Store job definition JSON in external storage + 2. Let the scheduler update the job node to make it available for labs + + *job_path* is the path to the generated job definition file + + Returns ``None`` as pull-based labs pick up jobs asynchronously. + """ + with open(job_path, "r", encoding="utf-8") as job_file: + job_definition = job_file.read() + self._store_job_definition(job_definition) + return None + + def get_job_id(self, job_object): + """Extract job ID from the job object returned by submit()""" + # For PULL_LABS, job_object is already the job ID string + return str(job_object) + + def wait(self, job_object): + """Wait for job completion via callback + + For PULL_LABS protocol, jobs complete when the lab posts results + to the callback endpoint. This method would typically poll the API + for job state changes. + + *job_object* is the job ID + + Returns 0 for success, 1 for failure + + Note: In practice, this is handled by the pipeline/callback system, + so this method may not be actively used. + """ + # In PULL_LABS, waiting is typically handled by the callback system + # This is a placeholder for compatibility with the Runtime interface + print( + f"PULL_LABS: Job {job_object} submitted. " + "Waiting for lab to accept and complete via callback." + ) + + # For now, return success as the submission itself succeeded + # Actual job tracking happens via the callback system + return 0 + + def _store_job_definition(self, job_definition): + """Store job definition in external storage + + *job_definition* is the JSON string of the job definition + + Returns a unique job ID + """ + if not self._context: + raise ValueError( + "Context is required for external storage but was not provided" + ) + + # Get storage configuration + storage_name = self.config.storage_name + if not storage_name: + # Get default storage configuration name + storage_name = self._context.get_default_storage_config() + + if not storage_name: + # Fallback to first available storage + storage_names = self._context.get_storage_names() + if not storage_names: + raise ValueError("No storage configurations found in context") + storage_name = storage_names[0] + + storage = self._context.init_storage(storage_name) + if not storage: + raise ValueError(f"Failed to initialize storage '{storage_name}'") + + # Generate unique ID and path + date_str = time.strftime("%Y%m%d") + unique_id = uuid.uuid4().hex + upload_dir = f"pull_labs_jobs/{date_str}" + + print( + f"Storing PULL_LABS job definition to '{storage_name}' " + f"path: {upload_dir} name: {unique_id}.json" + ) + + try: + job_bytes = job_definition.encode("utf-8") + with tempfile.NamedTemporaryFile(delete=True) as tmp_file: + tmp_file.write(job_bytes) + tmp_file.flush() + local_file = tmp_file.name + artifact_name = f"{unique_id}.json" + stored_url = storage.upload_single( + (local_file, artifact_name), + upload_dir, + ) + self._stored_url = stored_url + + if not self._stored_url: + raise ValueError("Upload returned no URL") + + print(f"Job definition stored at URL: {stored_url}") + return unique_id + + except Exception as exc: + raise ValueError( + f"Failed to store job definition in external storage: {exc}" + ) from exc + + +def get_runtime(runtime_config, **kwargs): + """Get a PULL_LABS runtime object""" + return PullLabs(runtime_config, **kwargs)