checkdeps
A dependency checker in Async Python
Sometimes, when you have a project that uses many components there are no easy ways to systematically check if dependencies are installed. If you stick to a single programming language, this problem is better handled by a package manager. When you step outside the bounds of a single language however, there is no easy to use tool. Maybe you need some UNIX tools to be present along with utilities from Python and some scripts that run on top of Node.
The goal of this script is to check software dependencies for you. If you have some complicated setup which requires a combination of executables, libraries for different languages etc., this script can check if those are in order.
You specify the dependencies in a dependencies.toml
file, then this script checks them. You only need Python installed, nothing else for this script to work. You simply ship this script with your distribution.
Tutorial
Suppose your project needs a specific version of GNU Awk. According to the GNU guidelines for writing command-line applications, every program should support the --version
flag. If we run awk --version
, what do we get?
awk --version
output:
GNU Awk 5.2.1, API 3.2, PMA Avon 8-g1, (GNU MPFR 4.2.1, GNU MP 6.3.0)
Copyright (C) 1989, 1991-2022 Free Software Foundation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see http://www.gnu.org/licenses/.
That's a lot of information, but all we need is a version number. From all that output we need to extract a version number, which is best done by regex. Let's ask for an impossible version:
[awk]
require = ">=6"
get_version = "awk --version"
pattern = "GNU Awk (.*), API .*"
suggestion_text = "This should be available from your package manager."
Now run check-deps
cd example/first; ../../check-deps
output:
awk >=6 : 5.2.1 Fail
Failure
| awk: Too old.
| found version 5.2.1
The output of check-deps
, out of necessity, is the most spectacular when a problem is detected. For a second example let's try one that succeeds. We add GNU Make to our dependencies.
[awk]
require = ">=5"
get_version = "awk --version"
pattern = "GNU Awk (.*), API .*"
suggestion_text = "This should be available from your package manager."
[make]
require = ">=4"
get_version = "make --version"
pattern = "GNU Make (.*)"
suggestion_text = "This should be available from your package manager."
cd example/second; ../../check-deps
output:
make >=4 : 4.3 Ok
awk >=5 : 5.2.1 Ok
Success
Dependencies
Now for some Python packages. First we need to ensure that the correct version of Python in installed. This follows the pattern that we saw before.
[python3]
require = ">=3.12"
get_version = "python3 --version"
pattern = "Python (.*)"
suggestion_text = "This is a problem. The easiest is probably to install Anaconda from https://www.anaconda.com/."
To check the version of an installed package we may use pip
.
[numpy]
require = ">=1.0"
get_version = "pip show numpy | grep 'Version:'"
pattern = "Version: (.*)"
suggestion_text = "This is a Python package that can be installed through pip."
suggestion = "pip install numpy"
depends = "python3"
Now check-deps
knows to check for Python before checking for numpy
.
cd example/depends; ../../check-deps
output:
python3 >=3.12 : 3.12.10 Ok
numpy >=1.0 : not found
Failure
| numpy: WARNING: Package(s) not found: numpy
Once we ask for one Python package, it is not so strange to ask for more. In that case it can be advantageous to use a template.
Templates
Because we may need many Python packages, it is possible to define a template. The template defines all the fields that we would expect from a normal entry, but uses Python formating syntax to define some wildcards. These wildcards are interpolated using values given at instantiation of a template. In this case we only ask for name
, but this key is not fixed. Then the output of the template is merged with the specifics. If keys clash, the instance overrules the template's defaults.
[template.pip]
get_version = "pip show {name} | grep 'Version:'"
pattern = "Version: (.*)"
suggestion_text = "This is a Python package that can be installed through pip."
suggestion = "pip install {name}"
depends = "python3"
[python3]
require = ">=3.8"
get_version = "python3 --version"
pattern = "Python (.*)"
suggestion_text = "This is a problem. The easiest is probably to install Anaconda from https://www.anaconda.com/."
[numpy]
require = ">=1.0"
template = "pip"
cd example/template; ../../check-deps
output:
python3 >=3.8 : 3.12.10 Ok
numpy >=1.0 : not found
Failure
| numpy: WARNING: Package(s) not found: numpy
That covers all the features of this script. The rest of this README is the actual implementation of check-deps
.
Implementation
Because this script needs to work stand-alone, that means that some of the functionality here would be much easier implemented using other packages, however I'm limited to what a standard Python install has to offer.
I'll start with a bit of version logic, and then show how this script runs the checks in parallel using asyncio
.
assert sys.version_info[0] == 3, "This script only works with Python 3."
class ConfigError(Exception):
pass
T = TypeVar("T")
#!/usr/bin/env python3
from __future__ import annotations
# If you make changes to this file, please consider contributing
# to: https://github.com/jhidding/check-deps
<<imports>>
<<boilerplate>>
<<relation>>
<<version>>
<<version-constraint>>
<<parsing>>
<<running>>
Imports
We use a lot of things that should be in the standard library, chiefly typing and dataclasses. Then, some textwrap
and redirect_stdout
for fancy output.
import sys
import tomllib
from dataclasses import dataclass, field
from typing import Optional, List, Mapping, Tuple, Callable, TypeVar
from enum import Enum
import asyncio
import re
from contextlib import contextmanager, redirect_stdout
import textwrap
import io
Versions
It is common to label software versions with series of ascending numbers. A often recommended pattern is that of major, minor, patch semantic versioning, where a version contains three numbers separated by dots, sometimes post-fixed with addenda for alpha, beta or release candidates. There are notable exceptions, chief among them $\TeX$, which has version numbers converging to the digits of $\pi$.
We would like to express version constraints using comparison operators along with the murky semantics of version numbers found in the wild. So we have the following relations in an enum:
class Relation(Enum):
"""Encodes ordinal relations among versions. Currently six operators are
supported: `>=`, `<=`, `<`, `>`, `==`, `!=`."""
GE = 1
LE = 2
LT = 3
GT = 4
EQ = 5
NE = 6
def __str__(self):
return {
Relation.GE: ">=",
Relation.LE: "<=",
Relation.LT: "<",
Relation.GT: ">",
Relation.EQ: "==",
Relation.NE: "!="}[self]
I'm going out on a limb and say that versions consist of a tuple
of int
, optionally with a suffix that can be stored in a str
. We need to define an ordering on this system:
@dataclass
class Version:
"""Stores a version in the form of a tuple of ints and an optional string extension.
This class supports (in)equality operators listed in `Relation`."""
number: tuple[int, ...]
extra: Optional[str]
<<version-methods>>
Implementation of
Version
operators
def __lt__(self, other):
for n, m in zip(self.number, other.number):
if n < m:
return True
elif n > m:
return False
return False
def __gt__(self, other):
return other < self
def __le__(self, other):
for n, m in zip(self.number, other.number):
if n < m:
return True
elif n > m:
return False
return True
def __ge__(self, other):
return other <= self
def __eq__(self, other):
for n, m in zip(self.number, other.number):
if n != m:
return False
return True
def __ne__(self, other):
return not self == other
def __str__(self):
return ".".join(map(str, self.number)) + (self.extra or "")
A combination of a Version
with a Relation
form a VersionConstraint
. Such a constraint can be called with another Version
which should give a bool
.
@dataclass
class VersionConstraint:
"""A VersionConstraint is a product of a `Version` and a `Relation`."""
version: Version
relation: Relation
def __call__(self, other: Version) -> bool:
method = f"__{self.relation.name}__".lower()
return getattr(other, method)(self.version)
def __str__(self):
return f"{self.relation}{self.version}"
Now, we also need to be able to read a version constraint from input.
Each parser takes a str
and returns a tuple of (value, str)
, where the second part of the tuple is the text that is not yet parsed.
Parsing version constraints
def split_at(split_chars: str, x: str) -> Tuple[str, str]:
"""Tries to split at character `x`. Returns a 2-tuple of the string
before and after the given separator."""
a = x.split(split_chars, maxsplit=1)
if len(a) == 2:
return a[0], a[1]
else:
return a[0], ""
def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \
-> Tuple[T, str]:
"""Given a string, splits at given character `x` and passes the left value
through a function (probably a parser). The second half of the return tuple is the
remainder of the string."""
item, x = split_at(split_chars, x)
val = f(item)
return val, x
def parse_version(x: str) -> Tuple[Version, str]:
"""Parse a given string to a `Version`. A sequence of dot `.` separated integers
is put into the numerical version component, while the remaining text ends up in
the `extra` component."""
_x = x
number = []
extra = None
while True:
try:
n, _x = parse_split_f(".", int, _x)
number.append(n)
except ValueError:
if len(x) > 0:
m = re.match("([0-9]*)(.*)", _x)
if lastn := m and m.group(1):
number.append(int(lastn))
if suff := m and m.group(2):
extra = suff or None
else:
extra = _x
break
if not number:
raise ConfigError(f"A version needs a numeric component, got: {x}")
return Version(tuple(number), extra), _x
def parse_relation(x: str) -> Tuple[Relation, str]:
"""Parses the operator of the version constraint."""
op_map = {
"<=": Relation.LE,
">=": Relation.GE,
"<": Relation.LT,
">": Relation.GT,
"==": Relation.EQ,
"!=": Relation.NE}
for sym, op in op_map.items():
if x.startswith(sym):
return (op, x[len(sym):])
raise ConfigError(f"Not a comparison operator: {x}")
def parse_version_constraint(x: str) -> Tuple[VersionConstraint, str]:
relation, x = parse_relation(x)
version, x = parse_version(x)
return VersionConstraint(version, relation), x
Running
Some check may need to be preceded by another check. Say if we want to see if we have some Python module installed, first we need to see if the correct Python version is here, then if pip
is actually installed, then if we can see the module. If we have many such modules, how do we make sure that we check for Python and pip
only once? One way is to plan everything in advance, then run the workflow. That's nice, but adds a lot of complication on top of what we can get out of the box with asyncio
. Another way is to cache results, and then when we need the result a second time, we used the cached value.
def async_cache(f):
"""Caches results from the `async` function `f`. This assumes `f` is a
member of a class, where we have `_lock`, `_result` and `_done` members
available."""
async def g(self, *args, **kwargs):
async with self._lock:
if self._done:
return self._result
self._result = await f(self, *args, **kwargs)
self._done = True
return self._result
return g
Result
The result of a version check is stored in Result
.
@dataclass
class Result:
test: VersionTest
success: bool
failure_text: Optional[str] = None
found_version: Optional[Version] = None
def __bool__(self):
return self.success
Job
The logistics for each job checking a version are stored in VersionTest
. This is basically a giant closure wrapped in async_cache
.
The run
method takes an argument recurse
. This is used to call dependencies of the current version test.
@dataclass
class VersionTest:
name: str
require: VersionConstraint
get_version: str
platform: Optional[str] = None
pattern: Optional[str] = None
suggestion_text: Optional[str] = None
suggestion: Optional[str] = None
depends: List[str] = field(default_factory=list)
template: Optional[str] = None
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
_done: bool = False
def print_formatted(self, msg):
prefix = f"{self.name} {self.require}"
print(f"{prefix:25}: {msg}")
def print_not_found(self):
self.print_formatted("not found")
@async_cache
async def run(self, recurse):
for dep in self.depends:
if not await recurse(dep):
return Result(self, False,
failure_text=f"Failed dependency: {dep}")
proc = await asyncio.create_subprocess_shell(
self.get_version,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
(stdout, stderr) = await proc.communicate()
if proc.returncode != 0:
self.print_not_found()
return Result(
self,
success=False,
failure_text=f"{stderr.decode().strip()}")
try:
if self.pattern is not None:
m = re.match(self.pattern, stdout.decode())
if m is not None:
out, _ = parse_version(m.group(1).strip())
else:
self.print_not_found()
msg = f"No regex match on pattern '{self.pattern}'"
return Result(self, False, failure_text=msg)
else:
out, _ = parse_version(stdout.decode().strip())
except ConfigError as e:
return Result(self, False, failure_text=str(e))
if self.require(out):
self.print_formatted(f"{str(out):10} Ok")
return Result(self, True)
else:
self.print_formatted(f"{str(out):10} Fail")
return Result(self, False, failure_text="Too old.",
found_version=out)
Parsing input
def parse_config(name: str, config: Mapping[str, str], templates):
if "template" in config.keys():
_config = {}
if config["template"] not in templates.keys():
raise ConfigError(f"Template {config['template']} not found. Templates: {list(templates.keys())}")
for k, v in templates[config["template"]].items():
if isinstance(v, str):
_config[k] = v.format(name=name)
else:
_config[k] = v
_config.update(config)
else:
_config = dict(config)
_deps = map(str.strip, _config.get("depends", "").split(","))
deps = list(filter(lambda x: x != "", _deps))
assert "require" in _config, "Every item needs a `require` field"
assert "get_version" in _config, "Every item needs a `get_version` field"
require, _ = parse_version_constraint(_config["require"])
return VersionTest(
name=name,
require=require,
get_version=_config["get_version"],
platform=_config.get("platform", None),
pattern=_config.get("pattern", None),
suggestion_text=_config.get("suggestion_text", None),
suggestion=_config.get("suggestion", None),
depends=deps,
template=_config.get("template", None))
Indentation
It looks nice to indent some output. This captures stdout
and forwards it by printing each line with a given prefix.
@contextmanager
def indent(prefix: str):
f = io.StringIO()
with redirect_stdout(f):
yield
output = f.getvalue()
print(textwrap.indent(output, prefix), end="")
Main
async def main():
config = tomllib.load(open("dependencies.toml", "rb"))
if "template" in config.keys():
templates = config["template"]
del config["template"]
else:
templates = dict()
try:
tests = {
name: parse_config(name, config[name], templates)
for name in config if ":" not in name and name != "DEFAULT"
}
except (AssertionError, ConfigError) as e:
print("Configuration error:", e)
sys.exit(1)
async def test_version(name: str):
assert name in tests, f"unknown dependency {name}"
x = await tests[name].run(test_version)
return x
result = await asyncio.gather(*(test_version(k) for k in tests))
if all(r.success for r in result):
print("Success")
sys.exit(0)
else:
print("Failure")
with indent(" | "):
for r in (r for r in result if not r.success):
if r.failure_text:
print(f"{r.test.name}: {r.failure_text}")
if r.found_version:
print(f" found version {r.found_version}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Literate Programming
This script is composed from the code blocks in this README using Entangled. To generate the HTML renedered documentation, I used Pdoc in conjunction with some Awk scripts.
Note, all output from the shell scripts in the tutorial are expanded by Awk in CI. That means that the tutorial doubles up for integration test as well.
from __future__ import annotations
import subprocess
proc_eval = subprocess.run(
["awk", "-f", "eval_shell_pass.awk"],
input=open("README.md", "rb").read(), capture_output=True)
proc_label = subprocess.run(
["awk", "-f", "noweb_label_pass.awk"],
input=proc_eval.stdout, capture_output=True)
__doc__ = proc_label.stdout.decode()
<<imports>>
<<boilerplate>>
<<relation>>
<<version>>
<<version-constraint>>
<<parsing>>
<<running>>
API Documentation
While this script strictly speaking is in no need for API docs, here they are anyway.
1# ~/~ begin <<README.md#checkdeps/__init__.py>>[init] 2from __future__ import annotations 3 4import subprocess 5proc_eval = subprocess.run( 6 ["awk", "-f", "eval_shell_pass.awk"], 7 input=open("README.md", "rb").read(), capture_output=True) 8proc_label = subprocess.run( 9 ["awk", "-f", "noweb_label_pass.awk"], 10 input=proc_eval.stdout, capture_output=True) 11__doc__ = proc_label.stdout.decode() 12 13# ~/~ begin <<README.md#imports>>[init] 14import sys 15import tomllib 16from dataclasses import dataclass, field 17from typing import Optional, List, Mapping, Tuple, Callable, TypeVar 18from enum import Enum 19import asyncio 20import re 21from contextlib import contextmanager, redirect_stdout 22import textwrap 23import io 24# ~/~ end 25# ~/~ begin <<README.md#boilerplate>>[init] 26assert sys.version_info[0] == 3, "This script only works with Python 3." 27 28class ConfigError(Exception): 29 pass 30 31T = TypeVar("T") 32# ~/~ end 33 34# ~/~ begin <<README.md#relation>>[init] 35class Relation(Enum): 36 """Encodes ordinal relations among versions. Currently six operators are 37 supported: `>=`, `<=`, `<`, `>`, `==`, `!=`.""" 38 GE = 1 39 LE = 2 40 LT = 3 41 GT = 4 42 EQ = 5 43 NE = 6 44 45 def __str__(self): 46 return { 47 Relation.GE: ">=", 48 Relation.LE: "<=", 49 Relation.LT: "<", 50 Relation.GT: ">", 51 Relation.EQ: "==", 52 Relation.NE: "!="}[self] 53# ~/~ end 54# ~/~ begin <<README.md#version>>[init] 55@dataclass 56class Version: 57 """Stores a version in the form of a tuple of ints and an optional string extension. 58 This class supports (in)equality operators listed in `Relation`.""" 59 number: tuple[int, ...] 60 extra: Optional[str] 61 62 # ~/~ begin <<README.md#version-methods>>[init] 63 def __lt__(self, other): 64 for n, m in zip(self.number, other.number): 65 if n < m: 66 return True 67 elif n > m: 68 return False 69 return False 70 71 def __gt__(self, other): 72 return other < self 73 74 def __le__(self, other): 75 for n, m in zip(self.number, other.number): 76 if n < m: 77 return True 78 elif n > m: 79 return False 80 return True 81 82 def __ge__(self, other): 83 return other <= self 84 85 def __eq__(self, other): 86 for n, m in zip(self.number, other.number): 87 if n != m: 88 return False 89 return True 90 91 def __ne__(self, other): 92 return not self == other 93 94 def __str__(self): 95 return ".".join(map(str, self.number)) + (self.extra or "") 96 # ~/~ end 97# ~/~ end 98# ~/~ begin <<README.md#version-constraint>>[init] 99@dataclass 100class VersionConstraint: 101 """A VersionConstraint is a product of a `Version` and a `Relation`.""" 102 version: Version 103 relation: Relation 104 105 def __call__(self, other: Version) -> bool: 106 method = f"__{self.relation.name}__".lower() 107 return getattr(other, method)(self.version) 108 109 def __str__(self): 110 return f"{self.relation}{self.version}" 111# ~/~ end 112# ~/~ begin <<README.md#parsing>>[init] 113def split_at(split_chars: str, x: str) -> Tuple[str, str]: 114 """Tries to split at character `x`. Returns a 2-tuple of the string 115 before and after the given separator.""" 116 a = x.split(split_chars, maxsplit=1) 117 if len(a) == 2: 118 return a[0], a[1] 119 else: 120 return a[0], "" 121 122 123def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \ 124 -> Tuple[T, str]: 125 """Given a string, splits at given character `x` and passes the left value 126 through a function (probably a parser). The second half of the return tuple is the 127 remainder of the string.""" 128 item, x = split_at(split_chars, x) 129 val = f(item) 130 return val, x 131 132 133def parse_version(x: str) -> Tuple[Version, str]: 134 """Parse a given string to a `Version`. A sequence of dot `.` separated integers 135 is put into the numerical version component, while the remaining text ends up in 136 the `extra` component.""" 137 _x = x 138 number = [] 139 extra = None 140 141 while True: 142 try: 143 n, _x = parse_split_f(".", int, _x) 144 number.append(n) 145 except ValueError: 146 if len(x) > 0: 147 m = re.match("([0-9]*)(.*)", _x) 148 if lastn := m and m.group(1): 149 number.append(int(lastn)) 150 if suff := m and m.group(2): 151 extra = suff or None 152 else: 153 extra = _x 154 break 155 156 if not number: 157 raise ConfigError(f"A version needs a numeric component, got: {x}") 158 159 return Version(tuple(number), extra), _x 160 161 162def parse_relation(x: str) -> Tuple[Relation, str]: 163 """Parses the operator of the version constraint.""" 164 op_map = { 165 "<=": Relation.LE, 166 ">=": Relation.GE, 167 "<": Relation.LT, 168 ">": Relation.GT, 169 "==": Relation.EQ, 170 "!=": Relation.NE} 171 for sym, op in op_map.items(): 172 if x.startswith(sym): 173 return (op, x[len(sym):]) 174 raise ConfigError(f"Not a comparison operator: {x}") 175 176 177def parse_version_constraint(x: str) -> Tuple[VersionConstraint, str]: 178 relation, x = parse_relation(x) 179 version, x = parse_version(x) 180 return VersionConstraint(version, relation), x 181# ~/~ end 182# ~/~ begin <<README.md#running>>[init] 183def async_cache(f): 184 """Caches results from the `async` function `f`. This assumes `f` is a 185 member of a class, where we have `_lock`, `_result` and `_done` members 186 available.""" 187 async def g(self, *args, **kwargs): 188 async with self._lock: 189 if self._done: 190 return self._result 191 self._result = await f(self, *args, **kwargs) 192 self._done = True 193 return self._result 194 return g 195# ~/~ end 196# ~/~ begin <<README.md#running>>[1] 197@dataclass 198class Result: 199 test: VersionTest 200 success: bool 201 failure_text: Optional[str] = None 202 found_version: Optional[Version] = None 203 204 def __bool__(self): 205 return self.success 206# ~/~ end 207# ~/~ begin <<README.md#running>>[2] 208@dataclass 209class VersionTest: 210 name: str 211 require: VersionConstraint 212 get_version: str 213 platform: Optional[str] = None 214 pattern: Optional[str] = None 215 suggestion_text: Optional[str] = None 216 suggestion: Optional[str] = None 217 depends: List[str] = field(default_factory=list) 218 template: Optional[str] = None 219 220 _lock: asyncio.Lock = field(default_factory=asyncio.Lock) 221 _done: bool = False 222 223 def print_formatted(self, msg): 224 prefix = f"{self.name} {self.require}" 225 print(f"{prefix:25}: {msg}") 226 227 def print_not_found(self): 228 self.print_formatted("not found") 229 230 @async_cache 231 async def run(self, recurse): 232 for dep in self.depends: 233 if not await recurse(dep): 234 return Result(self, False, 235 failure_text=f"Failed dependency: {dep}") 236 237 proc = await asyncio.create_subprocess_shell( 238 self.get_version, 239 stdout=asyncio.subprocess.PIPE, 240 stderr=asyncio.subprocess.PIPE) 241 (stdout, stderr) = await proc.communicate() 242 if proc.returncode != 0: 243 self.print_not_found() 244 return Result( 245 self, 246 success=False, 247 failure_text=f"{stderr.decode().strip()}") 248 try: 249 if self.pattern is not None: 250 m = re.match(self.pattern, stdout.decode()) 251 if m is not None: 252 out, _ = parse_version(m.group(1).strip()) 253 else: 254 self.print_not_found() 255 msg = f"No regex match on pattern '{self.pattern}'" 256 return Result(self, False, failure_text=msg) 257 else: 258 out, _ = parse_version(stdout.decode().strip()) 259 except ConfigError as e: 260 return Result(self, False, failure_text=str(e)) 261 262 if self.require(out): 263 self.print_formatted(f"{str(out):10} Ok") 264 return Result(self, True) 265 else: 266 self.print_formatted(f"{str(out):10} Fail") 267 return Result(self, False, failure_text="Too old.", 268 found_version=out) 269# ~/~ end 270# ~/~ begin <<README.md#running>>[3] 271def parse_config(name: str, config: Mapping[str, str], templates): 272 if "template" in config.keys(): 273 _config = {} 274 if config["template"] not in templates.keys(): 275 raise ConfigError(f"Template {config['template']} not found. Templates: {list(templates.keys())}") 276 277 for k, v in templates[config["template"]].items(): 278 if isinstance(v, str): 279 _config[k] = v.format(name=name) 280 else: 281 _config[k] = v 282 _config.update(config) 283 else: 284 _config = dict(config) 285 286 _deps = map(str.strip, _config.get("depends", "").split(",")) 287 deps = list(filter(lambda x: x != "", _deps)) 288 289 assert "require" in _config, "Every item needs a `require` field" 290 assert "get_version" in _config, "Every item needs a `get_version` field" 291 292 require, _ = parse_version_constraint(_config["require"]) 293 294 return VersionTest( 295 name=name, 296 require=require, 297 get_version=_config["get_version"], 298 platform=_config.get("platform", None), 299 pattern=_config.get("pattern", None), 300 suggestion_text=_config.get("suggestion_text", None), 301 suggestion=_config.get("suggestion", None), 302 depends=deps, 303 template=_config.get("template", None)) 304# ~/~ end 305# ~/~ begin <<README.md#running>>[4] 306@contextmanager 307def indent(prefix: str): 308 f = io.StringIO() 309 with redirect_stdout(f): 310 yield 311 output = f.getvalue() 312 print(textwrap.indent(output, prefix), end="") 313# ~/~ end 314# ~/~ begin <<README.md#running>>[5] 315async def main(): 316 config = tomllib.load(open("dependencies.toml", "rb")) 317 318 if "template" in config.keys(): 319 templates = config["template"] 320 del config["template"] 321 else: 322 templates = dict() 323 324 try: 325 tests = { 326 name: parse_config(name, config[name], templates) 327 for name in config if ":" not in name and name != "DEFAULT" 328 } 329 except (AssertionError, ConfigError) as e: 330 print("Configuration error:", e) 331 sys.exit(1) 332 333 async def test_version(name: str): 334 assert name in tests, f"unknown dependency {name}" 335 x = await tests[name].run(test_version) 336 return x 337 338 result = await asyncio.gather(*(test_version(k) for k in tests)) 339 if all(r.success for r in result): 340 print("Success") 341 sys.exit(0) 342 else: 343 print("Failure") 344 with indent(" | "): 345 for r in (r for r in result if not r.success): 346 if r.failure_text: 347 print(f"{r.test.name}: {r.failure_text}") 348 if r.found_version: 349 print(f" found version {r.found_version}") 350 sys.exit(1) 351 352 353if __name__ == "__main__": 354 asyncio.run(main()) 355# ~/~ end 356# ~/~ end
Common base class for all non-exit exceptions.
36class Relation(Enum): 37 """Encodes ordinal relations among versions. Currently six operators are 38 supported: `>=`, `<=`, `<`, `>`, `==`, `!=`.""" 39 GE = 1 40 LE = 2 41 LT = 3 42 GT = 4 43 EQ = 5 44 NE = 6 45 46 def __str__(self): 47 return { 48 Relation.GE: ">=", 49 Relation.LE: "<=", 50 Relation.LT: "<", 51 Relation.GT: ">", 52 Relation.EQ: "==", 53 Relation.NE: "!="}[self]
Encodes ordinal relations among versions. Currently six operators are
supported: >=
, <=
, <
, >
, ==
, !=
.
56@dataclass 57class Version: 58 """Stores a version in the form of a tuple of ints and an optional string extension. 59 This class supports (in)equality operators listed in `Relation`.""" 60 number: tuple[int, ...] 61 extra: Optional[str] 62 63 # ~/~ begin <<README.md#version-methods>>[init] 64 def __lt__(self, other): 65 for n, m in zip(self.number, other.number): 66 if n < m: 67 return True 68 elif n > m: 69 return False 70 return False 71 72 def __gt__(self, other): 73 return other < self 74 75 def __le__(self, other): 76 for n, m in zip(self.number, other.number): 77 if n < m: 78 return True 79 elif n > m: 80 return False 81 return True 82 83 def __ge__(self, other): 84 return other <= self 85 86 def __eq__(self, other): 87 for n, m in zip(self.number, other.number): 88 if n != m: 89 return False 90 return True 91 92 def __ne__(self, other): 93 return not self == other 94 95 def __str__(self): 96 return ".".join(map(str, self.number)) + (self.extra or "") 97 # ~/~ end
Stores a version in the form of a tuple of ints and an optional string extension.
This class supports (in)equality operators listed in Relation
.
100@dataclass 101class VersionConstraint: 102 """A VersionConstraint is a product of a `Version` and a `Relation`.""" 103 version: Version 104 relation: Relation 105 106 def __call__(self, other: Version) -> bool: 107 method = f"__{self.relation.name}__".lower() 108 return getattr(other, method)(self.version) 109 110 def __str__(self): 111 return f"{self.relation}{self.version}"
114def split_at(split_chars: str, x: str) -> Tuple[str, str]: 115 """Tries to split at character `x`. Returns a 2-tuple of the string 116 before and after the given separator.""" 117 a = x.split(split_chars, maxsplit=1) 118 if len(a) == 2: 119 return a[0], a[1] 120 else: 121 return a[0], ""
Tries to split at character x
. Returns a 2-tuple of the string
before and after the given separator.
124def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \ 125 -> Tuple[T, str]: 126 """Given a string, splits at given character `x` and passes the left value 127 through a function (probably a parser). The second half of the return tuple is the 128 remainder of the string.""" 129 item, x = split_at(split_chars, x) 130 val = f(item) 131 return val, x
Given a string, splits at given character x
and passes the left value
through a function (probably a parser). The second half of the return tuple is the
remainder of the string.
134def parse_version(x: str) -> Tuple[Version, str]: 135 """Parse a given string to a `Version`. A sequence of dot `.` separated integers 136 is put into the numerical version component, while the remaining text ends up in 137 the `extra` component.""" 138 _x = x 139 number = [] 140 extra = None 141 142 while True: 143 try: 144 n, _x = parse_split_f(".", int, _x) 145 number.append(n) 146 except ValueError: 147 if len(x) > 0: 148 m = re.match("([0-9]*)(.*)", _x) 149 if lastn := m and m.group(1): 150 number.append(int(lastn)) 151 if suff := m and m.group(2): 152 extra = suff or None 153 else: 154 extra = _x 155 break 156 157 if not number: 158 raise ConfigError(f"A version needs a numeric component, got: {x}") 159 160 return Version(tuple(number), extra), _x
Parse a given string to a Version
. A sequence of dot .
separated integers
is put into the numerical version component, while the remaining text ends up in
the extra
component.
163def parse_relation(x: str) -> Tuple[Relation, str]: 164 """Parses the operator of the version constraint.""" 165 op_map = { 166 "<=": Relation.LE, 167 ">=": Relation.GE, 168 "<": Relation.LT, 169 ">": Relation.GT, 170 "==": Relation.EQ, 171 "!=": Relation.NE} 172 for sym, op in op_map.items(): 173 if x.startswith(sym): 174 return (op, x[len(sym):]) 175 raise ConfigError(f"Not a comparison operator: {x}")
Parses the operator of the version constraint.
184def async_cache(f): 185 """Caches results from the `async` function `f`. This assumes `f` is a 186 member of a class, where we have `_lock`, `_result` and `_done` members 187 available.""" 188 async def g(self, *args, **kwargs): 189 async with self._lock: 190 if self._done: 191 return self._result 192 self._result = await f(self, *args, **kwargs) 193 self._done = True 194 return self._result 195 return g
Caches results from the async
function f
. This assumes f
is a
member of a class, where we have _lock
, _result
and _done
members
available.
198@dataclass 199class Result: 200 test: VersionTest 201 success: bool 202 failure_text: Optional[str] = None 203 found_version: Optional[Version] = None 204 205 def __bool__(self): 206 return self.success
209@dataclass 210class VersionTest: 211 name: str 212 require: VersionConstraint 213 get_version: str 214 platform: Optional[str] = None 215 pattern: Optional[str] = None 216 suggestion_text: Optional[str] = None 217 suggestion: Optional[str] = None 218 depends: List[str] = field(default_factory=list) 219 template: Optional[str] = None 220 221 _lock: asyncio.Lock = field(default_factory=asyncio.Lock) 222 _done: bool = False 223 224 def print_formatted(self, msg): 225 prefix = f"{self.name} {self.require}" 226 print(f"{prefix:25}: {msg}") 227 228 def print_not_found(self): 229 self.print_formatted("not found") 230 231 @async_cache 232 async def run(self, recurse): 233 for dep in self.depends: 234 if not await recurse(dep): 235 return Result(self, False, 236 failure_text=f"Failed dependency: {dep}") 237 238 proc = await asyncio.create_subprocess_shell( 239 self.get_version, 240 stdout=asyncio.subprocess.PIPE, 241 stderr=asyncio.subprocess.PIPE) 242 (stdout, stderr) = await proc.communicate() 243 if proc.returncode != 0: 244 self.print_not_found() 245 return Result( 246 self, 247 success=False, 248 failure_text=f"{stderr.decode().strip()}") 249 try: 250 if self.pattern is not None: 251 m = re.match(self.pattern, stdout.decode()) 252 if m is not None: 253 out, _ = parse_version(m.group(1).strip()) 254 else: 255 self.print_not_found() 256 msg = f"No regex match on pattern '{self.pattern}'" 257 return Result(self, False, failure_text=msg) 258 else: 259 out, _ = parse_version(stdout.decode().strip()) 260 except ConfigError as e: 261 return Result(self, False, failure_text=str(e)) 262 263 if self.require(out): 264 self.print_formatted(f"{str(out):10} Ok") 265 return Result(self, True) 266 else: 267 self.print_formatted(f"{str(out):10} Fail") 268 return Result(self, False, failure_text="Too old.", 269 found_version=out)
272def parse_config(name: str, config: Mapping[str, str], templates): 273 if "template" in config.keys(): 274 _config = {} 275 if config["template"] not in templates.keys(): 276 raise ConfigError(f"Template {config['template']} not found. Templates: {list(templates.keys())}") 277 278 for k, v in templates[config["template"]].items(): 279 if isinstance(v, str): 280 _config[k] = v.format(name=name) 281 else: 282 _config[k] = v 283 _config.update(config) 284 else: 285 _config = dict(config) 286 287 _deps = map(str.strip, _config.get("depends", "").split(",")) 288 deps = list(filter(lambda x: x != "", _deps)) 289 290 assert "require" in _config, "Every item needs a `require` field" 291 assert "get_version" in _config, "Every item needs a `get_version` field" 292 293 require, _ = parse_version_constraint(_config["require"]) 294 295 return VersionTest( 296 name=name, 297 require=require, 298 get_version=_config["get_version"], 299 platform=_config.get("platform", None), 300 pattern=_config.get("pattern", None), 301 suggestion_text=_config.get("suggestion_text", None), 302 suggestion=_config.get("suggestion", None), 303 depends=deps, 304 template=_config.get("template", None))
316async def main(): 317 config = tomllib.load(open("dependencies.toml", "rb")) 318 319 if "template" in config.keys(): 320 templates = config["template"] 321 del config["template"] 322 else: 323 templates = dict() 324 325 try: 326 tests = { 327 name: parse_config(name, config[name], templates) 328 for name in config if ":" not in name and name != "DEFAULT" 329 } 330 except (AssertionError, ConfigError) as e: 331 print("Configuration error:", e) 332 sys.exit(1) 333 334 async def test_version(name: str): 335 assert name in tests, f"unknown dependency {name}" 336 x = await tests[name].run(test_version) 337 return x 338 339 result = await asyncio.gather(*(test_version(k) for k in tests)) 340 if all(r.success for r in result): 341 print("Success") 342 sys.exit(0) 343 else: 344 print("Failure") 345 with indent(" | "): 346 for r in (r for r in result if not r.success): 347 if r.failure_text: 348 print(f"{r.test.name}: {r.failure_text}") 349 if r.found_version: 350 print(f" found version {r.found_version}") 351 sys.exit(1)