#!/usr/bin/env python3 from collections import namedtuple from dataclasses import dataclass from functools import cached_property from pathlib import Path from typing import Self from valve_gfx_ci.gfxinfo import find_gpus, cache_db import multiprocessing import subprocess import traceback import argparse import platform import requests import serial import socket import struct import fcntl import glob import math import time import sys import re import os NetworkConf = namedtuple("NetworkConf", ['mac', 'ipv4']) def next_power_of_2(x): return 1 if x == 0 else 2**math.ceil(math.log2(x)) def readfile(*args): with open(os.path.join(*args)) as f: return f.read().rstrip("\0\r\n") @dataclass class Compatible: vendor: str model: str def __str__(self) -> str: return f"{self.vendor}:{self.model}" @classmethod def from_path(cls, path: Path | str) -> list[Self]: compatibles = [] path = Path(path) if path.exists(): try: compatibles_str = readfile(path) except Exception as e: print(f"ERROR: Failed to read the path {path}: {e}") for compatible in compatibles_str.split("\0"): fields = compatible.split(',') if len(fields) == 2: vendor, model = fields compatibles.append(cls(vendor=vendor, model=model)) else: print(f"WARNING: Ignoring {path}'s invalid value '{compatible}'") return compatibles @classmethod def from_device_glob(cls, device_glob: str) -> list[Self]: compatibles = [] for path in glob.glob(f"/proc/device-tree/**/{device_glob}/compatible"): compatibles.extend(cls.from_path(path)) return compatibles class MachineInfo: def __init__(self, executor_base_url: str): self.executor_base_url = executor_base_url self.gpus = find_gpus(allow_db_updates=False) if self.gpus: print(f"Found {len(self.gpus)} GPU(s):") for gpu in self.gpus: print(f" * {gpu}") print() else: print("No GPU found") print(f"Machine tags: {", ".join(sorted(self.machine_tags))}\n") @property def gpu(self): if len(self.gpus) > 0: return self.gpus[0] @cached_property def root_compatibles(self) -> list[tuple[str, str]]: return Compatible.from_path("/proc/device-tree/compatible") @property def machine_base_name(self) -> str: if self.gpus: return "+".join(sorted([g.base_name for g in self.gpus])).lower() elif compatibles := self.root_compatibles: # Use the most precise board name if available through the device tree compatible = compatibles[0] return f"{compatible.vendor}-{compatible.model}".lower() else: # Default to the cpu architecture return f"unk-{platform.machine()}" @property def cpu_tags(self) -> set[str]: def get_cpu_count(): cpus = set() for cpu_topology_path in glob.iglob("/sys/devices/system/cpu/cpu*/topology/"): package_id = int(readfile(cpu_topology_path, 'physical_package_id')) core_id = int(readfile(cpu_topology_path, 'core_id')) cpus.add((package_id, core_id)) return max(1, len(cpus)) tags = set() cpu_count = get_cpu_count() tags.add(f"cpu:arch:{platform.machine()}") # This value may change depending on the kernel (Linux vs Windows) tags.add(f"cpu:cores:{cpu_count}") if cpu_count >= 4: tags.add(f"cpu:cores:4+") if cpu_count >= 16: tags.add(f"cpu:cores:16+") # Add the compatibles for the cpus for compatible in Compatible.from_device_glob("cpu@*"): tags.add(f"dt:cpu:compatible:{str(compatible)}".lower()) return tags @property def ram_tags(self) -> set[str]: def ram_size(): with open("/proc/meminfo", "rt") as f: for line in f: if m := re.match(r'MemTotal:[ \t]+(\d+) kB', line):\ return int(m.groups()[0]) tags = set() mem_gib = next_power_of_2(ram_size() / 1024 / 1024) tags.add(f"mem:size:{mem_gib}GiB") if mem_gib >= 4: tags.add(f"mem:size:4+GiB") if mem_gib >= 16: tags.add(f"mem:size:16+GiB") if mem_gib >= 64: tags.add(f"mem:size:64+GiB") return tags @property def firmware_tags(self) -> set[str]: tags = set() # Check if this is an EFI firmware if os.path.exists("/sys/firmware/efi"): tags.add("firmware:efi") else: tags.add("firmware:non-efi") # Check if the machine has resizeable bar enabled for gpu in self.gpus: try: gpu_path = gpu.pci_device.sysfs_path() except Exception: continue # Expose if realizable bar is available for the GPU if os.path.exists(gpu_path): if os.path.exists(f"{gpu_path}/resource0_resize"): tags.add("firmware:gpu:bar0:resizeable") else: tags.add("firmware:gpu:bar0:fixedsized") try: bar0_mib = int(os.path.getsize(f"{gpu_path}/resource0") / 1024 / 1024) tags.add(f"firmware:gpu:bar0:{bar0_mib}MiB") except Exception as e: print(f"Can't check the size of BAR0: {e}") # TODO: Add DMI decoding to get the BIOS vendor, version, and release date # See https://wiki.osdev.org/System_Management_BIOS for more details # List the compatibles of blocks/devices that can be shared between # multiple SoCs or boards, ignoring the ones that are only found on # one board or SoC for which people can use the root compatible to # select. exposed_compatibles = { "top-level": self.root_compatibles, # "gpu": Compatible.from_device_glob("gpu@*"), # Already provided by gpu_tags "display-subsystem": Compatible.from_device_glob("display-subsystem@*"), "video-codec": Compatible.from_device_glob("video-codec@*"), } for name, compatibles in exposed_compatibles.items(): for compatible in compatibles: tags.add(f"dt:{name}:compatible:{str(compatible)}".lower()) # Add extra attributes that could be beneficial as tags for attr in ["chassis-type"]: try: if value := readfile("/proc/device-tree/", attr): # Replace white spaces by an underscore value = value.replace(" ", "_") tags.add(f"dt:{attr}:{value}") except OSError: pass except Exception: traceback.print_exc() return tags @property def gpu_tags(self) -> set[str]: tags = set() for gpu in self.gpus: tags = tags | gpu.tags return tags @cached_property def machine_tags(self) -> set[str]: return set().union(self.gpu_tags, self.cpu_tags, self.ram_tags, self.firmware_tags) @property def default_network_interface(self) -> str: with open("/proc/net/route", "rt") as f: for line in f: if m := re.match(r'^(?P\w+)[ \t]+(?P[A-F0-9]+)', line): fields = m.groupdict() if fields['destination'] == '00000000': return fields['nif'] @classmethod def __iface_query_param(cls, iface, param) -> bytes: # Implementation from: # https://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: try: return fcntl.ioctl(s.fileno(), param, struct.pack('256s', iface.encode('utf8'))) except OSError: # Iface doesn't exist, or no IP assigned raise ValueError(f"The interface {iface} has no IP assigned") from None @property def default_gateway_nif_addrs(self) -> str: def get_addr_ipv4(nif): return socket.inet_ntop(socket.AF_INET, self.__iface_query_param(nif, 0x8915)[20:24]) # SIOCGIFADDR def get_mac_addr(nif): mac_bytes = self.__iface_query_param(nif, 0x8927)[18:24] # SIOCGIFHWADDR return ":".join([f'{b:02x}' for b in mac_bytes]) if nif := self.default_network_interface: ipv4 = get_addr_ipv4(nif) mac = get_mac_addr(nif) # NOTE: If IPv6 were to be needed in the future, it could be read from procfs: # $ cat /proc/net/if_inet6 # 00000000000000000000000000000001 01 80 10 80 lo # fe80000000000000fec90893172808ea 03 40 20 80 enp4s0 return NetworkConf(mac, ipv4) raise ValueError("Your computer does not seem connected to a network") @cached_property def kernel_cmdline(self) -> str: with open("/proc/cmdline") as f: return f.read() @classmethod def scsi_id(cls, device: str) -> dict[str, str]: ret = {} try: output = subprocess.check_output(["/usr/lib/udev/scsi_id", "-g", "-x", device], text=True) for line in output.splitlines(): key, value = line.split("=", maxsplit=1) ret[key] = value except subprocess.CalledProcessError: pass return ret @property def machine_serial_number(self) -> str | None: # Use the android bootloaders' serial number as a machine ID if present if m := re.search(r'\bandroidboot.serialno=(\S+)\b', self.kernel_cmdline): return m.group(1) # Read the serial number from the device tree try: return readfile("/proc/device-tree/serial-number") except Exception: pass # Read the serial from /dev/sda if serial := self.scsi_id("/dev/sda").get('ID_SCSI_SERIAL'): return serial # Read the serial from MMC0 try: return readfile("/sys/class/mmc_host/mmc0/mmc0:0001/serial") except Exception: pass return None @cached_property def existing_machine_ids(self) -> list[str]: try: r = requests.get(f"{self.executor_base_url}/api/v1/duts") return list(r.json().get("duts", {}).keys()) except Exception: print(f"WARNING: Failed to list of existing machines in the executor:\n{traceback.format_exc()}") return [] @cached_property def machine_id(self) -> str: # Create the list of candidate machine IDs, in priority order candidates = [self.default_gateway_nif_addrs.mac] if serial_number := self.machine_serial_number: candidates.append(serial_number) # Check with the executor if any of the candidate is available print(f"Candidate machine ids: {", ".join(candidates)}") print(f"Known machine ids by MaRS DB: {", ".join(self.existing_machine_ids)}") for candidate in candidates: if candidate in self.existing_machine_ids: return candidate # Default to the highest-priority candidate return candidates[0] def send_through_local_tty_device(self, msg, tty_device=None): if tty_device is None: tty_device = self.local_tty_device if tty_device is not None: with serial.Serial(tty_device, baudrate=115200, timeout=1) as ser: ser.write(msg.encode()) @classmethod def list_all_possible_serial_ports(cls) -> set[str]: # Until https://github.com/pyserial/pyserial/pull/709 lands, open code the method devices = set() with open('/proc/tty/drivers') as drivers_file: drivers = drivers_file.readlines() for driver in drivers: items = driver.strip().split() if items[4] == 'serial': devices.update(glob.glob(items[1]+'*')) return devices @classmethod def score_tty_device(cls, port_path: str) -> int: port_name = os.path.basename(port_path) # Prioritize consoles that are more likely to have early boot messages, and reduce the priority of gagdget # consoles because of their lower reliability if port_name.startswith("ttyS") or port_name.startswith("ttyMSM") or port_name.startswith("ttyAMA"): return 1 elif port_name.startswith("ttyGS"): return -1 return 0 @cached_property def local_tty_device(self) -> str: def ping_serial_port(port): try: ser = serial.Serial(port, baudrate=115200, timeout=1) except serial.serialutil.SerialException as e: print(f"Failed to open the tty {port}: {e}") sys.exit(42) # Make sure we start from a clean slate ser.reset_input_buffer() # Try pinging SALAD up to 3 times to work around early bytes being lost on some serial adapters for i in range(3): # Send a ping, and wait for the pong ser.write(b"\nSALAD.ping\n") line = ser.readline() print(f"{port}: Received {line}") if line == b"SALAD.pong\n": sys.exit(0) sys.exit(42) # Get all available ports ports = self.list_all_possible_serial_ports() if len(ports) == 0: print("WARNING: No serial ports found!") return None # Find all the available ports pending_processes = {} for port in ports: p = multiprocessing.Process(target=ping_serial_port, args=(port,)) p.start() pending_processes[p] = port # Find out which one is connected ports_connected = set() start = time.time() while len(pending_processes) > 0 and time.time() - start < 5: time.sleep(0.01) # Check the state of all the pending processes for p in list(pending_processes.keys()): if p.exitcode is not None: # Remove the process from the pending list port = pending_processes.pop(p) if p.exitcode == 0: ports_connected.add(port) # Ensure any pending process that may have not completed within the above timeout get killed for p, port_name in pending_processes.items(): print(f"WARNING: The port {port_name} did not finish in time") p.terminate() if len(ports_connected) > 0: # Use the best-rated TTY device ports_connected = sorted(ports_connected, key=self.score_tty_device, reverse=True) best_port = ports_connected[0] # Associate the tty device to the current machine in the SALAD service, then let the users know which ports # were connected msg = f"SALAD.machine_id={self.machine_id}\n" msg += f"Found the following local TTY devices connected to SALAD: {", ".join(ports_connected)}\n" print(msg) self.send_through_local_tty_device(msg, tty_device=best_port) return best_port else: print(f"WARNING: None of the following TTYs are connected to SALAD: {ports}!") def to_machine_registration_request(self, ignore_local_tty_device=False): addrs = self.default_gateway_nif_addrs ret = { "base_name": self.machine_base_name, "tags": list(self.machine_tags), "mac_address": addrs.mac, "ip_address": addrs.ipv4, } # NOTE: Since the executor does not like to receive parameters it doesn't know about, # only include the 'id' parameter if absolutely necessary. Old executors will keep # on working as expected, and new ones will use the mac address as an id by default. # The only time we need to set the machine id is when it isn't the same as the mac # address, which will only be supported by newer executors... but only newer executors # can boot DUTs that are not identified by mac addresses :D if self.machine_id != addrs.mac: ret['id'] = self.machine_id if not ignore_local_tty_device: # Get the name of the local tty device (strip /dev/) tty_dev_name = self.local_tty_device if tty_dev_name is not None and tty_dev_name.startswith("/dev/"): tty_dev_name = tty_dev_name[5:] ret["local_tty_device"] = tty_dev_name return ret def serial_console_works() -> bool: def check_serial_console(): import termios # stdin is closed by multiprocessing, re-open it! sys.stdin = os.fdopen(0) # Remove any input we might have received thus far termios.tcflush(sys.stdin, termios.TCIFLUSH) # Send the ping sys.stdout.write("\nSALAD.ping\n") sys.stdout.flush() # Wait for the pong! is_answer_pong = re.match(r"^SALAD.pong\r?\n$", sys.stdin.readline()) sys.exit(0 if is_answer_pong else 42) # Start a process that will try to print and read p = multiprocessing.Process(target=check_serial_console) p.start() p.join(1) if p.exitcode == 0: return True elif p.exitcode is None: p.terminate() return False def action_register(args): info = MachineInfo(executor_base_url=f"http://{args.mars_host}") params = info.to_machine_registration_request(ignore_local_tty_device=args.no_tty) r = requests.post(f"{info.executor_base_url}/api/v1/dut/", json=params) if r.status_code == 400: print(f"WARNING: Posting the machine failed:\n{r.text}\n") # NOTE: Use the machine id when available, otherwise default to using the mac address as this is the only way # older versions of the executor could identify DUTs mid = params.get('id') or params.get('mac_address') r = requests.patch(f"{info.executor_base_url}/api/v1/dut/{mid}/", json=params) if r.status_code != 200: print(f"ERROR: Patching the machine failed:\n{r.text}\n") status = "complete" if r.status_code == 200 else "failed" print(f"MaRS: Registration {status}\n") info.send_through_local_tty_device(f"MaRS: Registration {status}\n") sys.exit(0 if r.status_code == 200 else 1) def action_check(args): info = MachineInfo(executor_base_url=f"http://{args.mars_host}") # Get the expected configuration r = requests.get(f"{info.executor_base_url}/api/v1/dut/{info.machine_id}/") r.raise_for_status() expected_conf = r.json() # Generate the configuration local_config = info.to_machine_registration_request(ignore_local_tty_device=True) has_differences = False for key, value in local_config.items(): expected_value = expected_conf.get(key) if (type(expected_value) != type(value) or \ (type(value) is list and set(expected_value) != set(value)) or \ (type(value) is not list and expected_value != value)): # NOTE: older versions of the executor assumed that the mac address of the DUT was always the machine id... # and thus did not need to have a separate 'id' field. If we are in the situation where the 'id' field is # missing from the expected values but the local value for this field is the mac address in the expected # values, then ignore the difference :) if key == 'id' and 'id' not in expected_conf and local_config.get('id') == expected_conf.get('mac_address'): continue has_differences = True print(f"Mismatch for '{key}': {value} vs the expected {expected_value}") # Check that the serial console is working if not args.no_tty: if serial_console_works(): print(f"SALAD.machine_id={info.machine_id}") else: has_differences = True print(f"The configured console is not connected to SALAD") if has_differences: print("FATAL ERROR: The local machine doesn't match its expected state from MaRS") else: print("Machine registration: SUCCESS - No differences found!") sys.exit(0 if not has_differences else 1) def action_cache(args): cache_db() print("Downloaded the latest GPU device databases") def action_setup(args): info = MachineInfo(executor_base_url=f"http://{args.mars_host}") wanted_tags = [t for t in args.tags.split(",") if t] if len(wanted_tags) > 0: if not set(wanted_tags).issubset(info.machine_tags): missing_tags = set(wanted_tags) - info.machine_tags print(f"ERROR: The following tags were wanted, but are missing: {",".join(missing_tags)}\n") sys.exit(1) else: print("All the wanted tags have been found\n") else: print("Ignoring the wanted tags check: no tags requested\n") # Removing the GPUs that are not referenced in the list of tags if len(info.gpus) > 0: # Remove all non-gpu tags wanted_tags = [t for t in wanted_tags if t not in (info.cpu_tags | info.ram_tags | info.firmware_tags)] print(f"Disabling all the GPUs that are not explicitly requested ({", ".join(wanted_tags)}):") for gpu in info.gpus: # Remove the first instance of every tag provided by the GPU has_matched_a_tag = False for tag in gpu.tags: try: wanted_tags.remove(tag) has_matched_a_tag = True except ValueError: # nothing to do pass # Tell the user what is going on print(f" * {gpu}: {"keep" if has_matched_a_tag else "disable"}") # Unbind/disable GPUs that were not asked by the user if not has_matched_a_tag: # NOTE: Let's do the unbinding in a separate process, so that # if it crashes the driver it would not take down our process if os.fork() == 0: gpu.unbind() if len(wanted_tags) > 0: print(f"ERROR: The following wanted tags have not been matched by GPUs: {",".join(sorted(wanted_tags))}") sys.exit(1) else: print() sys.exit(0) parser = argparse.ArgumentParser() parser.add_argument('-m', '--mars_host', dest='mars_host', default="ci-gateway", help='URL to the machine registration service MaRS') parser.add_argument('--no-tty', dest="no_tty", action="store_true", help="Do not discover/check the existence of a serial connection to SALAD") subparsers = parser.add_subparsers() register_parser = subparsers.add_parser('register', help='Register the current machine to the machine registration service MaRS') register_parser.set_defaults(func=action_register) check_parser = subparsers.add_parser('check', help="Check that the machine's configuration matches the one found in MaRS") check_parser.set_defaults(func=action_check) cache_parser = subparsers.add_parser('cache', help="Download and cache the latest gfxinfo databases") cache_parser.set_defaults(func=action_cache) setup_parser = subparsers.add_parser('setup', help="Ensure the machine exposes all the resources specified, and no more") setup_parser.add_argument("-t", "--tags", default="", help=("A comma-separated list of tags that the machine should match to succeed. " "GPUs not referenced by these tags will be unbound, repeat tags if needed.")) setup_parser.set_defaults(func=action_setup) # Parse the cmdline and execute the related action args = parser.parse_args() if func := getattr(args, "func", None): func(args) else: parser.print_help()