forked from infra/keys
141 lines
5 KiB
Python
Executable file
141 lines
5 KiB
Python
Executable file
#!/usr/bin/env python
|
||
# Verify all commits in a Git repository.
|
||
# This is a script for GitHub/Forgejo Actions and does not work properly without the necessary environment variables set.
|
||
# Required dependencies: GitPython, requests
|
||
|
||
import logging
|
||
import git
|
||
from os import environ as env, chdir
|
||
from pathlib import Path
|
||
from tempfile import NamedTemporaryFile
|
||
import requests
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
is_ci = "CI" in env
|
||
repo_name = env["GITHUB_REPOSITORY"]
|
||
server = env["GITHUB_SERVER_URL"]
|
||
action_ref = env["GITHUB_REF"]
|
||
|
||
|
||
def collect_user_dirs():
|
||
return (
|
||
dir
|
||
for dir in Path(".").glob("*")
|
||
if dir.is_dir() and not dir.name.startswith(".")
|
||
)
|
||
|
||
|
||
def last_commit_for(dir: Path, ref: git.Reference):
|
||
"""Returns the Git commit signature for the last commit on this path."""
|
||
last_commit_hash = str(ref.repo.git.rev_list("--max-count=1", action_ref, dir))
|
||
return ref.repo.commit(last_commit_hash)
|
||
|
||
|
||
def keylist_to_principals(keyfile_text: str, email: str) -> str:
|
||
return "\n".join(
|
||
f"{email} {public_key}" for public_key in keyfile_text.splitlines()
|
||
)
|
||
|
||
|
||
def get_forgejo_keys(username: str) -> str:
|
||
response = requests.get(f"{server}/{username}.keys")
|
||
if not response.ok:
|
||
log.error(f"Server error: {response.status_code} {response.reason}")
|
||
raise Exception(f"Couldn’t retrieve keylist for user {username}.")
|
||
return response.text
|
||
|
||
|
||
def verify_dir(dir: Path, ref: git.Reference):
|
||
username = dir.name
|
||
keyfile = dir / "keys"
|
||
if not keyfile.exists():
|
||
raise Exception("Missing keyfile")
|
||
commit = last_commit_for(dir, ref)
|
||
log.debug(f"Found last commit: {commit.name_rev}")
|
||
if commit.author.name != username:
|
||
raise Exception(
|
||
f"Commit author {commit.author.name} is not the owner of this directory."
|
||
)
|
||
signature = commit.gpgsig
|
||
if signature is None or len(signature) == 0:
|
||
raise Exception(
|
||
f"Last commit {commit.name_rev} was not signed (committer {commit.author.name})."
|
||
)
|
||
email = commit.author.email
|
||
if email is None:
|
||
raise Exception(
|
||
f"Commit {commit.name_rev} does not have an email address attached (committer {commit.author.name})."
|
||
)
|
||
log.info(
|
||
f"Last commit {commit.name_rev} was signed by the correct user {commit.author}"
|
||
)
|
||
|
||
local_keys = keyfile.read_text()
|
||
remote_keys = get_forgejo_keys(username)
|
||
log.debug(f"Got remote keys:\n{remote_keys}")
|
||
with NamedTemporaryFile(mode="+w") as temp_keyfile:
|
||
# First, configure git to use our custom-made principals file
|
||
with ref.repo.config_writer("repository") as config:
|
||
config.set_value("gpg.ssh", "allowedSignersFile", temp_keyfile.name)
|
||
|
||
temp_keyfile_contents = keylist_to_principals(remote_keys, email)
|
||
log.debug(f"temp keyfile:\n{temp_keyfile_contents}")
|
||
temp_keyfile.write(temp_keyfile_contents)
|
||
temp_keyfile.flush()
|
||
# Check whether one of the user keys signed this commit.
|
||
# throws an exception automatically if verification fails, nothing else to do
|
||
ref.repo.git.verify_commit("--raw", commit.hexsha)
|
||
# BTW: For ssh-keygen -Y verify, the namespace is really just “git”. No, this is not documented anywhere.
|
||
# Oh I absolutely went digging for this in the git source code, though it wasn’t hard to find.
|
||
# See https://git.kernel.org/pub/scm/git/git.git/tree/gpg-interface.c?id=5f8f7081f7761acdf83d0a4c6819fe3d724f01d7#n559
|
||
|
||
# Verify that the key is also in the local keylist.
|
||
temp_keyfile_contents = keylist_to_principals(local_keys, email)
|
||
temp_keyfile.write(temp_keyfile_contents)
|
||
temp_keyfile.flush()
|
||
ref.repo.git.verify_commit("--raw", commit.hexsha)
|
||
|
||
|
||
def current_ref(repo: git.Repo) -> git.Reference:
|
||
for ref in repo.references:
|
||
if ref.name == action_ref or ref.path == action_ref:
|
||
return ref
|
||
raise Exception(f"No ref named {action_ref} found")
|
||
|
||
|
||
def main():
|
||
logging.basicConfig(
|
||
style="{",
|
||
format="[{levelname:8}] {message}",
|
||
level=logging.INFO if is_ci else logging.DEBUG,
|
||
)
|
||
chdir(env.get("GITHUB_WORKSPACE", default="."))
|
||
if is_ci:
|
||
log.info("Verification script running in CI.")
|
||
log.debug(f"{server} / {repo_name}")
|
||
log.debug(f"Repo dir: {Path.cwd().resolve()}")
|
||
repo = git.Repo(Path.cwd())
|
||
ref = current_ref(repo)
|
||
|
||
dirs = collect_user_dirs()
|
||
errors = 0
|
||
successes = 0
|
||
for dir in dirs:
|
||
try:
|
||
log.info(f"------ Verifying {dir.name} ...")
|
||
verify_dir(dir, ref)
|
||
successes += 1
|
||
except Exception as e:
|
||
log.error(f"Verification failure for user {dir.name}: {e}")
|
||
errors += 1
|
||
|
||
if errors > 0:
|
||
log.error("At least one user failed verification. See log for details.")
|
||
exit(errors)
|
||
else:
|
||
log.info(f"All users passed verification. Checked {successes} directories.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|