checkdeps

A dependency checker in Async Python

website Entangled badge

Sometimes, when you have a project that uses many components there are no easy ways to systematically check if dependencies are installed. If you stick to a single programming language, this problem is better handled by a package manager. When you step outside the bounds of a single language however, there is no easy to use tool. Maybe you need some UNIX tools to be present along with utilities from Python and some scripts that run on top of Node.

The goal of this script is to check software dependencies for you. If you have some complicated setup which requires a combination of executables, libraries for different languages etc., this script can check if those are in order.

You specify the dependencies in a dependencies.ini file, then this script checks them. You only need Python installed, nothing else for this script to work. You simply ship this script with your distribution.

Tutorial

Suppose your project needs a specific version of GNU Awk. According to the GNU guidelines for writing command-line applications, every program should support the --version flag. If we run awk --version, what do we get?

awk --version

output:

GNU Awk 5.1.0, API: 3.0 (GNU MPFR 4.1.0, GNU MP 6.2.1)
Copyright (C) 1989, 1991-2020 Free Software Foundation.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see http://www.gnu.org/licenses/.

That's a lot of information, but all we need is a version number. From all that output we need to extract a version number, which is best done by regex. Let's ask for an impossible version:

file:example/first/dependencies.ini
[awk]
require = >=6
get_version = awk --version
pattern = GNU Awk (.*), API: .*
suggestion_text = This should be available from your package manager.

Now run check-deps

cd example/first; ../../check-deps

output:

awk >=6                  : 5.1.0      Fail
Failure
  |  awk: Too old.
  |      found version 5.1.0

The output of check-deps, out of necessity, is the most spectacular when a problem is detected. For a second example let's try one that succeeds. We add GNU Make to our dependencies.

file:example/second/dependencies.ini
[awk]
require = >=5
get_version = awk --version
pattern = GNU Awk (.*), API: .*
suggestion_text = This should be available from your package manager.

[make]
require = >=4
get_version = make --version
pattern = GNU Make (.*)
suggestion_text = This should be available from your package manager.
cd example/second; ../../check-deps

output:

make >=4                 : 4.3        Ok
awk >=5                  : 5.1.0      Ok
Success

Dependencies

Now for some Python packages. First we need to ensure that the correct version of Python in installed. This follows the pattern that we saw before.

⪡example-depends⪢≣
file:example/depends/dependencies.ini
[python3]
require = >=3.12
get_version = python3 --version
pattern = Python (.*)
suggestion_text = This is a problem. The easiest is probably to install Anaconda from https://www.anaconda.com/.

To check the version of an installed package we may use pip.

⪡example-depends⪢⊞
[numpy]
require = >=1.0
get_version = pip show numpy | grep "Version:"
pattern = Version: (.*)
suggestion_text = This is a Python package that can be installed through pip.
suggestion = pip install numpy
depends = python3

Now check-deps knows to check for Python before checking for numpy.

cd example/depends; ../../check-deps

output:

python3 >=3.12           : 3.10.9     Fail
Failure
  |  python3: Too old.
  |      found version 3.10.9
  |  numpy: Failed dependency: python3

Once we ask for one Python package, it is not so strange to ask for more. In that case it can be advantageous to use a template.

Templates

Because we may need many Python packages, it is possible to define a template. The template defines all the fields that we would expect from a normal entry, but uses Python formating syntax to define some wildcards. These wildcards are interpolated using values given at instantiation of a template. In this case we only ask for name, but this key is not fixed. Then the output of the template is merged with the specifics. If keys clash, the instance overrules the template's defaults.

file:example/template/dependencies.ini
[template:pip]
get_version = pip show {name} | grep "Version:"
pattern = Version: (.*)
suggestion_text = This is a Python package that can be installed through pip.
suggestion = pip install {name}
depends = python3

[python3]
require = >=3.8
get_version = python3 --version
pattern = Python (.*)
suggestion_text = This is a problem. The easiest is probably to install Anaconda from https://www.anaconda.com/.

[numpy]
require = >=1.0
template = pip
cd example/template; ../../check-deps

output:

python3 >=3.8            : 3.10.9     Ok
numpy >=1.0              : 1.23.5     Ok
Success

That covers all the features of this script. The rest of this README is the actual implementation of check-deps.

Implementation

Because this script needs to work stand-alone, that means that some of the functionality here would be much easier implemented using other packages, however I'm limited to what a standard Python install has to offer.

I'll start with a bit of version logic, and then show how this script runs the checks in parallel using asyncio.

⪡boilerplate⪢≣
assert sys.version_info[0] == 3, "This script only works with Python 3."

class ConfigError(Exception):
    pass

T = TypeVar("T")
file:check-deps
#!/usr/bin/env python3
from __future__ import annotations

<<imports>>
<<boilerplate>>

<<relation>>
<<version>>
<<version-constraint>>
<<parsing>>
<<running>>

Imports

We use a lot of things that should be in the standard library, chiefly typing and dataclasses. Then, some textwrap and redirect_stdout for fancy output.

⪡imports⪢≣
import sys
import configparser
from dataclasses import dataclass, field
from typing import Optional, List, Mapping, Tuple, Callable, TypeVar
from enum import Enum
import asyncio
import re
from contextlib import contextmanager, redirect_stdout
import textwrap
import io

Versions

It is common to label software versions with series of ascending numbers. A often recommended pattern is that of major, minor, patch semantic versioning, where a version contains three numbers separated by dots, sometimes post-fixed with addenda for alpha, beta or release candidates. There are notable exceptions, chief among them $\TeX$, which has version numbers converging to the digits of $\pi$.

We would like to express version constraints using comparison operators along with the murky semantics of version numbers found in the wild. So we have the following relations in an enum:

⪡relation⪢≣
class Relation(Enum):
    """Encodes ordinal relations among versions. Currently six operators are
    supported: `>=`, `<=`, `<`, `>`, `==`, `!=`.""" 
    GE = 1
    LE = 2
    LT = 3
    GT = 4
    EQ = 5
    NE = 6

    def __str__(self):
        return {
            Relation.GE: ">=",
            Relation.LE: "<=",
            Relation.LT: "<",
            Relation.GT: ">",
            Relation.EQ: "==",
            Relation.NE: "!="}[self]

I'm going out on a limb and say that versions consist of a tuple of int, optionally with a suffix that can be stored in a str. We need to define an ordering on this system:

⪡version⪢≣
@dataclass
class Version:
    """Stores a version in the form of a tuple of ints and an optional string extension.
    This class supports (in)equality operators listed in `Relation`."""
    number: tuple[int, ...]
    extra: Optional[str]

    <<version-methods>>

Implementation of Version operators

⪡version-methods⪢≣
def __lt__(self, other):
    for n, m in zip(self.number, other.number):
        if n < m:
            return True
        elif n > m:
            return False
    return False

def __gt__(self, other):
    return other < self

def __le__(self, other):
    for n, m in zip(self.number, other.number):
        if n < m:
            return True
        elif n > m:
            return False
    return True

def __ge__(self, other):
    return other <= self

def __eq__(self, other):
    for n, m in zip(self.number, other.number):
        if n != m:
            return False
        return True

def __ne__(self, other):
    return not self == other

def __str__(self):
    return ".".join(map(str, self.number)) + (self.extra or "")

A combination of a Version with a Relation form a VersionConstraint. Such a constraint can be called with another Version which should give a bool.

⪡version-constraint⪢≣
@dataclass
class VersionConstraint:
    """A VersionConstraint is a product of a `Version` and a `Relation`."""
    version: Version
    relation: Relation

    def __call__(self, other: Version) -> bool:
        method = f"__{self.relation.name}__".lower()
        return getattr(other, method)(self.version)

    def __str__(self):
        return f"{self.relation}{self.version}"

Now, we also need to be able to read a version constraint from input. Each parser takes a str and returns a tuple of (value, str), where the second part of the tuple is the text that is not yet parsed.

Parsing version constraints

⪡parsing⪢≣
def split_at(split_chars: str, x: str) -> Tuple[str, str]:
    """Tries to split at character `x`. Returns a 2-tuple of the string
    before and after the given separator."""
    a = x.split(split_chars, maxsplit=1)
    if len(a) == 2:
        return a[0], a[1]
    else:
        return a[0], ""


def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \
        -> Tuple[T, str]:
    """Given a string, splits at given character `x` and passes the left value
    through a function (probably a parser). The second half of the return tuple is the
    remainder of the string."""
    item, x = split_at(split_chars, x)
    val = f(item)
    return val, x


def parse_version(x: str) -> Tuple[Version, str]:
    """Parse a given string to a `Version`. A sequence of dot `.` separated integers
    is put into the numerical version component, while the remaining text ends up in
    the `extra` component."""
    _x = x
    number = []
    extra = None

    while True:
        try:
            n, _x = parse_split_f(".", int, _x)
            number.append(n)
        except ValueError:
            if len(x) > 0:
                m = re.match("([0-9]*)(.*)", _x)
                if lastn := m and m.group(1):
                    number.append(int(lastn))
                if suff := m and m.group(2):
                    extra = suff or None
                else:
                    extra = _x
            break

    if not number:
        raise ConfigError(f"A version needs a numeric component, got: {x}")

    return Version(tuple(number), extra), _x


def parse_relation(x: str) -> Tuple[Relation, str]:
    """Parses the operator of the version constraint."""
    op_map = {
        "<=": Relation.LE,
        ">=": Relation.GE,
        "<": Relation.LT,
        ">": Relation.GT,
        "==": Relation.EQ,
        "!=": Relation.NE}
    for sym, op in op_map.items():
        if x.startswith(sym):
            return (op, x[len(sym):])
    raise ConfigError(f"Not a comparison operator: {x}")


def parse_version_constraint(x: str) -> Tuple[VersionConstraint, str]:
    relation, x = parse_relation(x)
    version, x = parse_version(x)
    return VersionConstraint(version, relation), x

Running

Some check may need to be preceded by another check. Say if we want to see if we have some Python module installed, first we need to see if the correct Python version is here, then if pip is actually installed, then if we can see the module. If we have many such modules, how do we make sure that we check for Python and pip only once? One way is to plan everything in advance, then run the workflow. That's nice, but adds a lot of complication on top of what we can get out of the box with asyncio. Another way is to cache results, and then when we need the result a second time, we used the cached value.

⪡running⪢≣
def async_cache(f):
    """Caches results from the `async` function `f`. This assumes `f` is a
    member of a class, where we have `_lock`, `_result` and `_done` members
    available."""
    async def g(self, *args, **kwargs):
        async with self._lock:
            if self._done:
                return self._result
            self._result = await f(self, *args, **kwargs)
            self._done = True
            return self._result
    return g

Result

The result of a version check is stored in Result.

⪡running⪢⊞
@dataclass
class Result:
    test: VersionTest
    success: bool
    failure_text: Optional[str] = None
    found_version: Optional[Version] = None

    def __bool__(self):
        return self.success

Job

The logistics for each job checking a version are stored in VersionTest. This is basically a giant closure wrapped in async_cache.

The run method takes an argument recurse. This is used to call dependencies of the current version test.

⪡running⪢⊞
@dataclass
class VersionTest:
    name: str
    require: VersionConstraint
    get_version: str
    platform: Optional[str] = None
    pattern: Optional[str] = None
    suggestion_text: Optional[str] = None
    suggestion: Optional[str] = None
    depends: List[str] = field(default_factory=list)
    template: Optional[str] = None

    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    _done: bool = False

    def print_formatted(self, msg):
        prefix = f"{self.name} {self.require}"
        print(f"{prefix:25}: {msg}")

    def print_not_found(self):
        self.print_formatted("not found")

    @async_cache
    async def run(self, recurse):
        for dep in self.depends:
            if not await recurse(dep):
                return Result(self, False,
                              failure_text=f"Failed dependency: {dep}")

        proc = await asyncio.create_subprocess_shell(
            self.get_version,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        (stdout, stderr) = await proc.communicate()
        if proc.returncode != 0:
            self.print_not_found()
            return Result(
                self,
                success=False,
                failure_text=f"{stderr.decode().strip()}")
        try:
            if self.pattern is not None:
                m = re.match(self.pattern, stdout.decode())
                if m is not None:
                    out, _ = parse_version(m.group(1).strip())
                else:
                    self.print_not_found()
                    msg = f"No regex match on pattern '{self.pattern}'"
                    return Result(self, False, failure_text=msg)
            else:
                out, _ = parse_version(stdout.decode().strip())
        except ConfigError as e:
            return Result(self, False, failure_text=str(e))

        if self.require(out):
            self.print_formatted(f"{str(out):10} Ok")
            return Result(self, True)
        else:
            self.print_formatted(f"{str(out):10} Fail")
            return Result(self, False, failure_text="Too old.",
                          found_version=out)

Parsing input

⪡running⪢⊞
def parse_config(name: str, config: Mapping[str, str], templates):
    if "template" in config:
        _config = {}
        for k, v in templates[config["template"]].items():
            if isinstance(v, str):
                _config[k] = v.format(name=name)
            else:
                _config[k] = v
        _config.update(config)
    else:
        _config = dict(config)

    _deps = map(str.strip, _config.get("depends", "").split(","))
    deps = list(filter(lambda x: x != "", _deps))

    assert "require" in _config, "Every item needs a `require` field"
    assert "get_version" in _config, "Every item needs a `get_version` field"

    require, _ = parse_version_constraint(_config["require"])

    return VersionTest(
        name=name,
        require=require,
        get_version=_config["get_version"],
        platform=_config.get("platform", None),
        pattern=_config.get("pattern", None),
        suggestion_text=_config.get("suggestion_text", None),
        suggestion=_config.get("suggestion", None),
        depends=deps,
        template=_config.get("template", None))

Indentation

It looks nice to indent some output. This captures stdout and forwards it by printing each line with a given prefix.

⪡running⪢⊞
@contextmanager
def indent(prefix: str):
    f = io.StringIO()
    with redirect_stdout(f):
        yield
    output = f.getvalue()
    print(textwrap.indent(output, prefix), end="")

Main

⪡running⪢⊞
async def main():
    config = configparser.ConfigParser()
    config.read("dependencies.ini")

    templates = {
        name[9:]: config[name]
        for name in config if name.startswith("template:")
    }

    try:
        tests = {
            name: parse_config(name, config[name], templates)
            for name in config if ":" not in name and name != "DEFAULT"
        }
    except (AssertionError, ConfigError) as e:
        print("Configuration error:", e)
        sys.exit(1)

    async def test_version(name: str):
        assert name in tests, f"unknown dependency {name}"
        x = await tests[name].run(test_version)
        return x

    result = await asyncio.gather(*(test_version(k) for k in tests))
    if all(r.success for r in result):
        print("Success")
        sys.exit(0)
    else:
        print("Failure")
        with indent("  |  "):
            for r in (r for r in result if not r.success):
                if r.failure_text:
                    print(f"{r.test.name}: {r.failure_text}")
                if r.found_version:
                    print(f"    found version {r.found_version}")
        sys.exit(1)


if __name__ == "__main__":
    asyncio.run(main())

Literate Programming

This script is composed from the code blocks in this README using Entangled. To generate the HTML renedered documentation, I used Pdoc in conjunction with some Awk scripts.

Note, all output from the shell scripts in the tutorial are expanded by Awk in CI. That means that the tutorial doubles up for integration test as well.

file:checkdeps/__init__.py
from __future__ import annotations

import subprocess
proc_eval = subprocess.run(
    ["awk", "-f", "eval_shell_pass.awk"],
    input=open("README.md", "rb").read(), capture_output=True)
proc_label = subprocess.run(
    ["awk", "-f", "noweb_label_pass.awk"],
    input=proc_eval.stdout, capture_output=True)
__doc__ = proc_label.stdout.decode()

<<imports>>
<<boilerplate>>

<<relation>>
<<version>>
<<version-constraint>>
<<parsing>>
<<running>>

API Documentation

While this script strictly speaking is in no need for API docs, here they are anyway.

  1# ~\~ language=Python filename=checkdeps/__init__.py
  2# ~\~ begin <<README.md|checkdeps/__init__.py>>[init]
  3from __future__ import annotations
  4
  5import subprocess
  6proc_eval = subprocess.run(
  7    ["awk", "-f", "eval_shell_pass.awk"],
  8    input=open("README.md", "rb").read(), capture_output=True)
  9proc_label = subprocess.run(
 10    ["awk", "-f", "noweb_label_pass.awk"],
 11    input=proc_eval.stdout, capture_output=True)
 12__doc__ = proc_label.stdout.decode()
 13
 14# ~\~ begin <<README.md|imports>>[init]
 15import sys
 16import configparser
 17from dataclasses import dataclass, field
 18from typing import Optional, List, Mapping, Tuple, Callable, TypeVar
 19from enum import Enum
 20import asyncio
 21import re
 22from contextlib import contextmanager, redirect_stdout
 23import textwrap
 24import io
 25# ~\~ end
 26# ~\~ begin <<README.md|boilerplate>>[init]
 27assert sys.version_info[0] == 3, "This script only works with Python 3."
 28
 29class ConfigError(Exception):
 30    pass
 31
 32T = TypeVar("T")
 33# ~\~ end
 34
 35# ~\~ begin <<README.md|relation>>[init]
 36class Relation(Enum):
 37    """Encodes ordinal relations among versions. Currently six operators are
 38    supported: `>=`, `<=`, `<`, `>`, `==`, `!=`.""" 
 39    GE = 1
 40    LE = 2
 41    LT = 3
 42    GT = 4
 43    EQ = 5
 44    NE = 6
 45
 46    def __str__(self):
 47        return {
 48            Relation.GE: ">=",
 49            Relation.LE: "<=",
 50            Relation.LT: "<",
 51            Relation.GT: ">",
 52            Relation.EQ: "==",
 53            Relation.NE: "!="}[self]
 54# ~\~ end
 55# ~\~ begin <<README.md|version>>[init]
 56@dataclass
 57class Version:
 58    """Stores a version in the form of a tuple of ints and an optional string extension.
 59    This class supports (in)equality operators listed in `Relation`."""
 60    number: tuple[int, ...]
 61    extra: Optional[str]
 62
 63    # ~\~ begin <<README.md|version-methods>>[init]
 64    def __lt__(self, other):
 65        for n, m in zip(self.number, other.number):
 66            if n < m:
 67                return True
 68            elif n > m:
 69                return False
 70        return False
 71
 72    def __gt__(self, other):
 73        return other < self
 74
 75    def __le__(self, other):
 76        for n, m in zip(self.number, other.number):
 77            if n < m:
 78                return True
 79            elif n > m:
 80                return False
 81        return True
 82
 83    def __ge__(self, other):
 84        return other <= self
 85
 86    def __eq__(self, other):
 87        for n, m in zip(self.number, other.number):
 88            if n != m:
 89                return False
 90            return True
 91
 92    def __ne__(self, other):
 93        return not self == other
 94
 95    def __str__(self):
 96        return ".".join(map(str, self.number)) + (self.extra or "")
 97    # ~\~ end
 98# ~\~ end
 99# ~\~ begin <<README.md|version-constraint>>[init]
100@dataclass
101class VersionConstraint:
102    """A VersionConstraint is a product of a `Version` and a `Relation`."""
103    version: Version
104    relation: Relation
105
106    def __call__(self, other: Version) -> bool:
107        method = f"__{self.relation.name}__".lower()
108        return getattr(other, method)(self.version)
109
110    def __str__(self):
111        return f"{self.relation}{self.version}"
112# ~\~ end
113# ~\~ begin <<README.md|parsing>>[init]
114def split_at(split_chars: str, x: str) -> Tuple[str, str]:
115    """Tries to split at character `x`. Returns a 2-tuple of the string
116    before and after the given separator."""
117    a = x.split(split_chars, maxsplit=1)
118    if len(a) == 2:
119        return a[0], a[1]
120    else:
121        return a[0], ""
122
123
124def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \
125        -> Tuple[T, str]:
126    """Given a string, splits at given character `x` and passes the left value
127    through a function (probably a parser). The second half of the return tuple is the
128    remainder of the string."""
129    item, x = split_at(split_chars, x)
130    val = f(item)
131    return val, x
132
133
134def parse_version(x: str) -> Tuple[Version, str]:
135    """Parse a given string to a `Version`. A sequence of dot `.` separated integers
136    is put into the numerical version component, while the remaining text ends up in
137    the `extra` component."""
138    _x = x
139    number = []
140    extra = None
141
142    while True:
143        try:
144            n, _x = parse_split_f(".", int, _x)
145            number.append(n)
146        except ValueError:
147            if len(x) > 0:
148                m = re.match("([0-9]*)(.*)", _x)
149                if lastn := m and m.group(1):
150                    number.append(int(lastn))
151                if suff := m and m.group(2):
152                    extra = suff or None
153                else:
154                    extra = _x
155            break
156
157    if not number:
158        raise ConfigError(f"A version needs a numeric component, got: {x}")
159
160    return Version(tuple(number), extra), _x
161
162
163def parse_relation(x: str) -> Tuple[Relation, str]:
164    """Parses the operator of the version constraint."""
165    op_map = {
166        "<=": Relation.LE,
167        ">=": Relation.GE,
168        "<": Relation.LT,
169        ">": Relation.GT,
170        "==": Relation.EQ,
171        "!=": Relation.NE}
172    for sym, op in op_map.items():
173        if x.startswith(sym):
174            return (op, x[len(sym):])
175    raise ConfigError(f"Not a comparison operator: {x}")
176
177
178def parse_version_constraint(x: str) -> Tuple[VersionConstraint, str]:
179    relation, x = parse_relation(x)
180    version, x = parse_version(x)
181    return VersionConstraint(version, relation), x
182# ~\~ end
183# ~\~ begin <<README.md|running>>[init]
184def async_cache(f):
185    """Caches results from the `async` function `f`. This assumes `f` is a
186    member of a class, where we have `_lock`, `_result` and `_done` members
187    available."""
188    async def g(self, *args, **kwargs):
189        async with self._lock:
190            if self._done:
191                return self._result
192            self._result = await f(self, *args, **kwargs)
193            self._done = True
194            return self._result
195    return g
196# ~\~ end
197# ~\~ begin <<README.md|running>>[1]
198@dataclass
199class Result:
200    test: VersionTest
201    success: bool
202    failure_text: Optional[str] = None
203    found_version: Optional[Version] = None
204
205    def __bool__(self):
206        return self.success
207# ~\~ end
208# ~\~ begin <<README.md|running>>[2]
209@dataclass
210class VersionTest:
211    name: str
212    require: VersionConstraint
213    get_version: str
214    platform: Optional[str] = None
215    pattern: Optional[str] = None
216    suggestion_text: Optional[str] = None
217    suggestion: Optional[str] = None
218    depends: List[str] = field(default_factory=list)
219    template: Optional[str] = None
220
221    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
222    _done: bool = False
223
224    def print_formatted(self, msg):
225        prefix = f"{self.name} {self.require}"
226        print(f"{prefix:25}: {msg}")
227
228    def print_not_found(self):
229        self.print_formatted("not found")
230
231    @async_cache
232    async def run(self, recurse):
233        for dep in self.depends:
234            if not await recurse(dep):
235                return Result(self, False,
236                              failure_text=f"Failed dependency: {dep}")
237
238        proc = await asyncio.create_subprocess_shell(
239            self.get_version,
240            stdout=asyncio.subprocess.PIPE,
241            stderr=asyncio.subprocess.PIPE)
242        (stdout, stderr) = await proc.communicate()
243        if proc.returncode != 0:
244            self.print_not_found()
245            return Result(
246                self,
247                success=False,
248                failure_text=f"{stderr.decode().strip()}")
249        try:
250            if self.pattern is not None:
251                m = re.match(self.pattern, stdout.decode())
252                if m is not None:
253                    out, _ = parse_version(m.group(1).strip())
254                else:
255                    self.print_not_found()
256                    msg = f"No regex match on pattern '{self.pattern}'"
257                    return Result(self, False, failure_text=msg)
258            else:
259                out, _ = parse_version(stdout.decode().strip())
260        except ConfigError as e:
261            return Result(self, False, failure_text=str(e))
262
263        if self.require(out):
264            self.print_formatted(f"{str(out):10} Ok")
265            return Result(self, True)
266        else:
267            self.print_formatted(f"{str(out):10} Fail")
268            return Result(self, False, failure_text="Too old.",
269                          found_version=out)
270# ~\~ end
271# ~\~ begin <<README.md|running>>[3]
272def parse_config(name: str, config: Mapping[str, str], templates):
273    if "template" in config:
274        _config = {}
275        for k, v in templates[config["template"]].items():
276            if isinstance(v, str):
277                _config[k] = v.format(name=name)
278            else:
279                _config[k] = v
280        _config.update(config)
281    else:
282        _config = dict(config)
283
284    _deps = map(str.strip, _config.get("depends", "").split(","))
285    deps = list(filter(lambda x: x != "", _deps))
286
287    assert "require" in _config, "Every item needs a `require` field"
288    assert "get_version" in _config, "Every item needs a `get_version` field"
289
290    require, _ = parse_version_constraint(_config["require"])
291
292    return VersionTest(
293        name=name,
294        require=require,
295        get_version=_config["get_version"],
296        platform=_config.get("platform", None),
297        pattern=_config.get("pattern", None),
298        suggestion_text=_config.get("suggestion_text", None),
299        suggestion=_config.get("suggestion", None),
300        depends=deps,
301        template=_config.get("template", None))
302# ~\~ end
303# ~\~ begin <<README.md|running>>[4]
304@contextmanager
305def indent(prefix: str):
306    f = io.StringIO()
307    with redirect_stdout(f):
308        yield
309    output = f.getvalue()
310    print(textwrap.indent(output, prefix), end="")
311# ~\~ end
312# ~\~ begin <<README.md|running>>[5]
313async def main():
314    config = configparser.ConfigParser()
315    config.read("dependencies.ini")
316
317    templates = {
318        name[9:]: config[name]
319        for name in config if name.startswith("template:")
320    }
321
322    try:
323        tests = {
324            name: parse_config(name, config[name], templates)
325            for name in config if ":" not in name and name != "DEFAULT"
326        }
327    except (AssertionError, ConfigError) as e:
328        print("Configuration error:", e)
329        sys.exit(1)
330
331    async def test_version(name: str):
332        assert name in tests, f"unknown dependency {name}"
333        x = await tests[name].run(test_version)
334        return x
335
336    result = await asyncio.gather(*(test_version(k) for k in tests))
337    if all(r.success for r in result):
338        print("Success")
339        sys.exit(0)
340    else:
341        print("Failure")
342        with indent("  |  "):
343            for r in (r for r in result if not r.success):
344                if r.failure_text:
345                    print(f"{r.test.name}: {r.failure_text}")
346                if r.found_version:
347                    print(f"    found version {r.found_version}")
348        sys.exit(1)
349
350
351if __name__ == "__main__":
352    asyncio.run(main())
353# ~\~ end
354# ~\~ end
class ConfigError(builtins.Exception):
30class ConfigError(Exception):
31    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
class Relation(enum.Enum):
37class Relation(Enum):
38    """Encodes ordinal relations among versions. Currently six operators are
39    supported: `>=`, `<=`, `<`, `>`, `==`, `!=`.""" 
40    GE = 1
41    LE = 2
42    LT = 3
43    GT = 4
44    EQ = 5
45    NE = 6
46
47    def __str__(self):
48        return {
49            Relation.GE: ">=",
50            Relation.LE: "<=",
51            Relation.LT: "<",
52            Relation.GT: ">",
53            Relation.EQ: "==",
54            Relation.NE: "!="}[self]

Encodes ordinal relations among versions. Currently six operators are supported: >=, <=, <, >, ==, !=.

GE = <Relation.GE: 1>
LE = <Relation.LE: 2>
LT = <Relation.LT: 3>
GT = <Relation.GT: 4>
EQ = <Relation.EQ: 5>
NE = <Relation.NE: 6>
Inherited Members
enum.Enum
name
value
@dataclass
class Version:
57@dataclass
58class Version:
59    """Stores a version in the form of a tuple of ints and an optional string extension.
60    This class supports (in)equality operators listed in `Relation`."""
61    number: tuple[int, ...]
62    extra: Optional[str]
63
64    # ~\~ begin <<README.md|version-methods>>[init]
65    def __lt__(self, other):
66        for n, m in zip(self.number, other.number):
67            if n < m:
68                return True
69            elif n > m:
70                return False
71        return False
72
73    def __gt__(self, other):
74        return other < self
75
76    def __le__(self, other):
77        for n, m in zip(self.number, other.number):
78            if n < m:
79                return True
80            elif n > m:
81                return False
82        return True
83
84    def __ge__(self, other):
85        return other <= self
86
87    def __eq__(self, other):
88        for n, m in zip(self.number, other.number):
89            if n != m:
90                return False
91            return True
92
93    def __ne__(self, other):
94        return not self == other
95
96    def __str__(self):
97        return ".".join(map(str, self.number)) + (self.extra or "")
98    # ~\~ end

Stores a version in the form of a tuple of ints and an optional string extension. This class supports (in)equality operators listed in Relation.

Version(number: tuple[int, ...], extra: Optional[str])
@dataclass
class VersionConstraint:
101@dataclass
102class VersionConstraint:
103    """A VersionConstraint is a product of a `Version` and a `Relation`."""
104    version: Version
105    relation: Relation
106
107    def __call__(self, other: Version) -> bool:
108        method = f"__{self.relation.name}__".lower()
109        return getattr(other, method)(self.version)
110
111    def __str__(self):
112        return f"{self.relation}{self.version}"

A VersionConstraint is a product of a Version and a Relation.

VersionConstraint(version: checkdeps.Version, relation: checkdeps.Relation)
def split_at(split_chars: str, x: str) -> Tuple[str, str]:
115def split_at(split_chars: str, x: str) -> Tuple[str, str]:
116    """Tries to split at character `x`. Returns a 2-tuple of the string
117    before and after the given separator."""
118    a = x.split(split_chars, maxsplit=1)
119    if len(a) == 2:
120        return a[0], a[1]
121    else:
122        return a[0], ""

Tries to split at character x. Returns a 2-tuple of the string before and after the given separator.

def parse_split_f(split_chars: str, f: Callable[[str], ~T], x: str) -> Tuple[~T, str]:
125def parse_split_f(split_chars: str, f: Callable[[str], T], x: str) \
126        -> Tuple[T, str]:
127    """Given a string, splits at given character `x` and passes the left value
128    through a function (probably a parser). The second half of the return tuple is the
129    remainder of the string."""
130    item, x = split_at(split_chars, x)
131    val = f(item)
132    return val, x

Given a string, splits at given character x and passes the left value through a function (probably a parser). The second half of the return tuple is the remainder of the string.

def parse_version(x: str) -> Tuple[checkdeps.Version, str]:
135def parse_version(x: str) -> Tuple[Version, str]:
136    """Parse a given string to a `Version`. A sequence of dot `.` separated integers
137    is put into the numerical version component, while the remaining text ends up in
138    the `extra` component."""
139    _x = x
140    number = []
141    extra = None
142
143    while True:
144        try:
145            n, _x = parse_split_f(".", int, _x)
146            number.append(n)
147        except ValueError:
148            if len(x) > 0:
149                m = re.match("([0-9]*)(.*)", _x)
150                if lastn := m and m.group(1):
151                    number.append(int(lastn))
152                if suff := m and m.group(2):
153                    extra = suff or None
154                else:
155                    extra = _x
156            break
157
158    if not number:
159        raise ConfigError(f"A version needs a numeric component, got: {x}")
160
161    return Version(tuple(number), extra), _x

Parse a given string to a Version. A sequence of dot . separated integers is put into the numerical version component, while the remaining text ends up in the extra component.

def parse_relation(x: str) -> Tuple[checkdeps.Relation, str]:
164def parse_relation(x: str) -> Tuple[Relation, str]:
165    """Parses the operator of the version constraint."""
166    op_map = {
167        "<=": Relation.LE,
168        ">=": Relation.GE,
169        "<": Relation.LT,
170        ">": Relation.GT,
171        "==": Relation.EQ,
172        "!=": Relation.NE}
173    for sym, op in op_map.items():
174        if x.startswith(sym):
175            return (op, x[len(sym):])
176    raise ConfigError(f"Not a comparison operator: {x}")

Parses the operator of the version constraint.

def parse_version_constraint(x: str) -> Tuple[checkdeps.VersionConstraint, str]:
179def parse_version_constraint(x: str) -> Tuple[VersionConstraint, str]:
180    relation, x = parse_relation(x)
181    version, x = parse_version(x)
182    return VersionConstraint(version, relation), x
def async_cache(f):
185def async_cache(f):
186    """Caches results from the `async` function `f`. This assumes `f` is a
187    member of a class, where we have `_lock`, `_result` and `_done` members
188    available."""
189    async def g(self, *args, **kwargs):
190        async with self._lock:
191            if self._done:
192                return self._result
193            self._result = await f(self, *args, **kwargs)
194            self._done = True
195            return self._result
196    return g

Caches results from the async function f. This assumes f is a member of a class, where we have _lock, _result and _done members available.

@dataclass
class Result:
199@dataclass
200class Result:
201    test: VersionTest
202    success: bool
203    failure_text: Optional[str] = None
204    found_version: Optional[Version] = None
205
206    def __bool__(self):
207        return self.success
Result( test: checkdeps.VersionTest, success: bool, failure_text: Optional[str] = None, found_version: Optional[checkdeps.Version] = None)
@dataclass
class VersionTest:
210@dataclass
211class VersionTest:
212    name: str
213    require: VersionConstraint
214    get_version: str
215    platform: Optional[str] = None
216    pattern: Optional[str] = None
217    suggestion_text: Optional[str] = None
218    suggestion: Optional[str] = None
219    depends: List[str] = field(default_factory=list)
220    template: Optional[str] = None
221
222    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
223    _done: bool = False
224
225    def print_formatted(self, msg):
226        prefix = f"{self.name} {self.require}"
227        print(f"{prefix:25}: {msg}")
228
229    def print_not_found(self):
230        self.print_formatted("not found")
231
232    @async_cache
233    async def run(self, recurse):
234        for dep in self.depends:
235            if not await recurse(dep):
236                return Result(self, False,
237                              failure_text=f"Failed dependency: {dep}")
238
239        proc = await asyncio.create_subprocess_shell(
240            self.get_version,
241            stdout=asyncio.subprocess.PIPE,
242            stderr=asyncio.subprocess.PIPE)
243        (stdout, stderr) = await proc.communicate()
244        if proc.returncode != 0:
245            self.print_not_found()
246            return Result(
247                self,
248                success=False,
249                failure_text=f"{stderr.decode().strip()}")
250        try:
251            if self.pattern is not None:
252                m = re.match(self.pattern, stdout.decode())
253                if m is not None:
254                    out, _ = parse_version(m.group(1).strip())
255                else:
256                    self.print_not_found()
257                    msg = f"No regex match on pattern '{self.pattern}'"
258                    return Result(self, False, failure_text=msg)
259            else:
260                out, _ = parse_version(stdout.decode().strip())
261        except ConfigError as e:
262            return Result(self, False, failure_text=str(e))
263
264        if self.require(out):
265            self.print_formatted(f"{str(out):10} Ok")
266            return Result(self, True)
267        else:
268            self.print_formatted(f"{str(out):10} Fail")
269            return Result(self, False, failure_text="Too old.",
270                          found_version=out)
VersionTest( name: str, require: checkdeps.VersionConstraint, get_version: str, platform: Optional[str] = None, pattern: Optional[str] = None, suggestion_text: Optional[str] = None, suggestion: Optional[str] = None, depends: List[str] = <factory>, template: Optional[str] = None, _lock: asyncio.locks.Lock = <factory>, _done: bool = False)
def print_formatted(self, msg):
225    def print_formatted(self, msg):
226        prefix = f"{self.name} {self.require}"
227        print(f"{prefix:25}: {msg}")
def print_not_found(self):
229    def print_not_found(self):
230        self.print_formatted("not found")
async def run(self, *args, **kwargs):
189    async def g(self, *args, **kwargs):
190        async with self._lock:
191            if self._done:
192                return self._result
193            self._result = await f(self, *args, **kwargs)
194            self._done = True
195            return self._result
def parse_config(name: str, config: Mapping[str, str], templates):
273def parse_config(name: str, config: Mapping[str, str], templates):
274    if "template" in config:
275        _config = {}
276        for k, v in templates[config["template"]].items():
277            if isinstance(v, str):
278                _config[k] = v.format(name=name)
279            else:
280                _config[k] = v
281        _config.update(config)
282    else:
283        _config = dict(config)
284
285    _deps = map(str.strip, _config.get("depends", "").split(","))
286    deps = list(filter(lambda x: x != "", _deps))
287
288    assert "require" in _config, "Every item needs a `require` field"
289    assert "get_version" in _config, "Every item needs a `get_version` field"
290
291    require, _ = parse_version_constraint(_config["require"])
292
293    return VersionTest(
294        name=name,
295        require=require,
296        get_version=_config["get_version"],
297        platform=_config.get("platform", None),
298        pattern=_config.get("pattern", None),
299        suggestion_text=_config.get("suggestion_text", None),
300        suggestion=_config.get("suggestion", None),
301        depends=deps,
302        template=_config.get("template", None))
@contextmanager
def indent(prefix: str):
305@contextmanager
306def indent(prefix: str):
307    f = io.StringIO()
308    with redirect_stdout(f):
309        yield
310    output = f.getvalue()
311    print(textwrap.indent(output, prefix), end="")
async def main():
314async def main():
315    config = configparser.ConfigParser()
316    config.read("dependencies.ini")
317
318    templates = {
319        name[9:]: config[name]
320        for name in config if name.startswith("template:")
321    }
322
323    try:
324        tests = {
325            name: parse_config(name, config[name], templates)
326            for name in config if ":" not in name and name != "DEFAULT"
327        }
328    except (AssertionError, ConfigError) as e:
329        print("Configuration error:", e)
330        sys.exit(1)
331
332    async def test_version(name: str):
333        assert name in tests, f"unknown dependency {name}"
334        x = await tests[name].run(test_version)
335        return x
336
337    result = await asyncio.gather(*(test_version(k) for k in tests))
338    if all(r.success for r in result):
339        print("Success")
340        sys.exit(0)
341    else:
342        print("Failure")
343        with indent("  |  "):
344            for r in (r for r in result if not r.success):
345                if r.failure_text:
346                    print(f"{r.test.name}: {r.failure_text}")
347                if r.found_version:
348                    print(f"    found version {r.found_version}")
349        sys.exit(1)