397777089e
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
839 lines
31 KiB
Python
839 lines
31 KiB
Python
#!/usr/bin/env python3
|
|
"""adr-judge: diff-vs-ADR engine for adr-kit (v0.12.0+).
|
|
|
|
Pairs with the /adr-kit:judge Claude Code skill. Two evaluation paths run
|
|
on every commit when invoked from the pre-commit hook:
|
|
|
|
1. Declarative pass — fast, regex-only, no LLM round-trip. Reads the
|
|
fenced JSON Enforcement block of each Accepted ADR and applies
|
|
forbid_pattern / forbid_import / require_pattern rules to the staged
|
|
git diff.
|
|
|
|
2. LLM pass (v0.13.0+, opt-out via ADR_KIT_NO_LLM=1) — for ADRs with
|
|
`llm_judge: true`, batches all of them into ONE Claude Sonnet call
|
|
(default: claude-sonnet-4-6 via `claude -p`). Sonnet returns a JSON
|
|
verdict object {ADR-NNN: {verdict: OK | VIOLATION, reason: ...}}.
|
|
The LLM pass requires the `claude` CLI on PATH; if missing or auth
|
|
fails, adr-judge prints a warning and falls back to declarative-only
|
|
so a missing CLI never blocks legitimate commits.
|
|
|
|
ADRs without an Enforcement block are skipped silently regardless of mode.
|
|
|
|
Exit codes (mirror bin/adr-lint):
|
|
0 no violations (advisory entries may exist)
|
|
1 at least one violation (declarative or LLM)
|
|
2 config or input error
|
|
|
|
Usage:
|
|
adr-judge # diff from stdin, ADRs from docs/adr/
|
|
adr-judge --diff <file> # read diff from a file (use - for stdin)
|
|
adr-judge --llm # also run the LLM pass for llm_judge:true ADRs
|
|
adr-judge --llm-cmd "claude -p ..." # override the LLM invocation (tests, custom models)
|
|
adr-judge --json # machine-readable output
|
|
adr-judge --config <path> # override .adr-kit.json location
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import fnmatch
|
|
import json
|
|
import os
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
# Default LLM invocation. Overridable via --llm-cmd, ADR_KIT_LLM_CMD env, or
|
|
# .adr-kit.json's judge.llm_cmd. Tests inject a fake binary here.
|
|
DEFAULT_LLM_CMD = ["claude", "-p", "--model", "claude-sonnet-4-6"]
|
|
DEFAULT_LLM_TIMEOUT_S = 120
|
|
|
|
# ---------- ADR / diff parsing ----------
|
|
|
|
ADR_FILENAME_RE = re.compile(r"(?i)^ADR-(\d{1,4})-.*\.md$")
|
|
STATUS_LINE_RE = re.compile(r"^Status\s*:?\s*(\w+)", re.IGNORECASE | re.MULTILINE)
|
|
STATUS_HEADING_RE = re.compile(
|
|
r"^##\s+Status\s*$\n+([^\n]+)", re.IGNORECASE | re.MULTILINE
|
|
)
|
|
# Legacy bold-inline Status format used by many pre-canonical ADR sets:
|
|
# **Status:** Accepted
|
|
# **Status**: Proposed
|
|
# **Status: Accepted**
|
|
# adr-lint flags these on Completeness (no '## Status' heading), which is
|
|
# correct — but adr-judge only needs the *value* to decide whether to enforce.
|
|
# Recognising the bold-inline form here means a project mid-migration still
|
|
# gets diff-vs-Enforcement coverage on its Accepted ADRs without first having
|
|
# to run /adr-kit:migrate. See v0.12.1 changelog.
|
|
STATUS_BOLD_INLINE_RE = re.compile(
|
|
r"^\s*\*\*\s*Status\s*:?\s*\*\*\s*:?\s*([A-Za-z]+)|^\s*\*\*\s*Status\s*:?\s*([A-Za-z]+)\s*\*\*",
|
|
re.IGNORECASE | re.MULTILINE,
|
|
)
|
|
# Section-bounded Enforcement parsing. The previous single-regex form
|
|
# `^##\s+Enforcement\s*$\n+(?:.*?\n)*?` ```json ... ``` ` with re.DOTALL
|
|
# suffered catastrophic backtracking when an ADR had `## Enforcement` but
|
|
# no fenced JSON block in it (prose-only enforcement is valid — see
|
|
# ADR-011). The nested non-greedy quantifier `(?:.*?\n)*?` with DOTALL
|
|
# exhausted the regex engine searching for a non-existent closing fence
|
|
# through ~50+ lines, producing 60s+ hangs.
|
|
#
|
|
# Fix: decompose into three non-backtracking searches. Side benefit —
|
|
# the JSON fence is now correctly scoped to the Enforcement section, so
|
|
# a ```json block in a later section (e.g. References) is not picked up.
|
|
ENFORCEMENT_HEADING_RE = re.compile(
|
|
r"^##\s+Enforcement\s*$", re.IGNORECASE | re.MULTILINE
|
|
)
|
|
NEXT_SECTION_HEADING_RE = re.compile(r"^##\s+", re.MULTILINE)
|
|
JSON_FENCE_RE = re.compile(r"```json\s*\n(.*?)\n```", re.DOTALL)
|
|
HUNK_HEADER_RE = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,\d+)? @@")
|
|
|
|
|
|
class JudgeError(Exception):
|
|
"""Raised on configuration / input errors (exit code 2)."""
|
|
|
|
|
|
def adr_status(text: str) -> Optional[str]:
|
|
"""Return the ADR's status (Accepted/Proposed/Deprecated/Superseded) or None.
|
|
|
|
Handles all of these (case-insensitive):
|
|
Status: Accepted, 2026-04-25. (single-line plain, anywhere)
|
|
## Status\n\nAccepted, 2026-04-25. (heading + body, comma form)
|
|
## Status\n\nAccepted. Date: 2026-04-25. (heading + body, period form)
|
|
## Status\n\nSuperseded by ADR-099, 2026-05-01.
|
|
**Status:** Accepted (bold-inline, since v0.12.1)
|
|
**Status**: Proposed (bold-inline, alt punctuation)
|
|
**Status: Accepted** (bold-inline, fully bracketed)
|
|
|
|
Returns the first alphabetic word it finds in the status line. Trailing
|
|
punctuation is stripped so 'Accepted.' becomes 'Accepted'.
|
|
"""
|
|
m = STATUS_HEADING_RE.search(text)
|
|
if m:
|
|
line = m.group(1).strip()
|
|
wm = re.match(r"\s*([A-Za-z]+)", line)
|
|
return wm.group(1) if wm else None
|
|
m = STATUS_BOLD_INLINE_RE.search(text)
|
|
if m:
|
|
return m.group(1) or m.group(2)
|
|
m = STATUS_LINE_RE.search(text)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
def parse_enforcement(adr_text: str, adr_path: Path) -> Optional[Dict]:
|
|
"""Extract and parse the JSON inside an ADR's ## Enforcement section.
|
|
|
|
Returns None when there is no Enforcement section, OR the section has
|
|
no fenced JSON block (prose-only enforcement is valid — see ADR-011).
|
|
Raises JudgeError when the JSON exists but is malformed.
|
|
"""
|
|
hm = ENFORCEMENT_HEADING_RE.search(adr_text)
|
|
if not hm:
|
|
return None
|
|
section_start = hm.end()
|
|
nm = NEXT_SECTION_HEADING_RE.search(adr_text, section_start)
|
|
section_end = nm.start() if nm else len(adr_text)
|
|
fm = JSON_FENCE_RE.search(adr_text, section_start, section_end)
|
|
if not fm:
|
|
return None
|
|
raw = fm.group(1)
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError as e:
|
|
raise JudgeError(
|
|
f"{adr_path}: malformed JSON in ## Enforcement block "
|
|
f"({e.msg} at line {e.lineno})"
|
|
)
|
|
if not isinstance(data, dict):
|
|
raise JudgeError(
|
|
f"{adr_path}: ## Enforcement JSON must be an object, got {type(data).__name__}"
|
|
)
|
|
|
|
# Basic shape validation. Optional jsonschema deeper check below.
|
|
for key in ("forbid_pattern", "require_pattern", "forbid_import"):
|
|
if key in data and not isinstance(data[key], list):
|
|
raise JudgeError(f"{adr_path}: Enforcement.{key} must be an array")
|
|
if "llm_judge" in data and not isinstance(data["llm_judge"], bool):
|
|
raise JudgeError(f"{adr_path}: Enforcement.llm_judge must be a boolean")
|
|
|
|
try:
|
|
import jsonschema # type: ignore
|
|
schema_path = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "schemas"
|
|
/ "adr-enforcement.schema.json"
|
|
)
|
|
if schema_path.exists():
|
|
schema = json.loads(schema_path.read_text(encoding="utf-8"))
|
|
jsonschema.validate(data, schema)
|
|
except ImportError:
|
|
pass
|
|
except Exception as e:
|
|
raise JudgeError(f"{adr_path}: Enforcement block fails schema validation: {e}")
|
|
|
|
return data
|
|
|
|
|
|
def parse_diff(text: str) -> Dict[str, List[Tuple[int, str]]]:
|
|
"""Extract (lineno, content) tuples per added line, keyed by post-diff path.
|
|
|
|
Skips deleted files (+++ /dev/null) and binary diffs. Tracks the new-file
|
|
line counter via the @@ hunk header so reporting can cite file:line.
|
|
"""
|
|
files: Dict[str, List[Tuple[int, str]]] = {}
|
|
current: Optional[str] = None
|
|
lineno = 0
|
|
for line in text.splitlines():
|
|
if line.startswith("+++ "):
|
|
target = line[4:].strip()
|
|
if target == "/dev/null" or target.startswith("/dev/null"):
|
|
current = None
|
|
else:
|
|
# Strip leading "b/" if present (git default)
|
|
current = target[2:] if target.startswith("b/") else target
|
|
files.setdefault(current, [])
|
|
elif line.startswith("@@ "):
|
|
m = HUNK_HEADER_RE.match(line)
|
|
if m:
|
|
lineno = int(m.group(1))
|
|
elif current and line.startswith("+") and not line.startswith("+++"):
|
|
files[current].append((lineno, line[1:]))
|
|
lineno += 1
|
|
elif line.startswith(" "):
|
|
lineno += 1
|
|
# diff --git, index, ---, --- /dev/null, removed lines: ignored
|
|
return files
|
|
|
|
|
|
# ---------- glob & rule application ----------
|
|
|
|
|
|
def glob_to_regex(glob: str) -> re.Pattern:
|
|
"""Translate a shell-style glob (with ** for recursive descent) to a regex.
|
|
|
|
Examples:
|
|
*.py → only top-level .py
|
|
**/*.py → any .py at any depth (including top-level)
|
|
src/**/*.py → any .py under src/ at any depth
|
|
src/** → anything under src/
|
|
src/**/*.{ino,cpp,h} → .ino, .cpp, or .h files anywhere under src/ (v0.12.2+)
|
|
src/{a,b,c}.ino → exactly src/a.ino, src/b.ino, or src/c.ino (v0.12.2+)
|
|
|
|
Brace expansion ({a,b,c}) was added in v0.12.2 — without it, real-world
|
|
Enforcement-block path_globs that scope to a list of source files
|
|
silently match nothing (regressed from common shell-glob expectations).
|
|
"""
|
|
out: List[str] = []
|
|
i = 0
|
|
while i < len(glob):
|
|
c = glob[i]
|
|
if c == "*":
|
|
if i + 1 < len(glob) and glob[i + 1] == "*":
|
|
# consume **, then optional trailing slash
|
|
if i + 2 < len(glob) and glob[i + 2] == "/":
|
|
out.append("(?:.*/)?")
|
|
i += 3
|
|
else:
|
|
out.append(".*")
|
|
i += 2
|
|
else:
|
|
out.append("[^/]*")
|
|
i += 1
|
|
elif c == "?":
|
|
out.append("[^/]")
|
|
i += 1
|
|
elif c == "{":
|
|
# Brace expansion: {a,b,c} -> (?:a|b|c). Find matching closing brace.
|
|
# Nested braces are not supported; if the user needs them they should
|
|
# restructure the glob. An unclosed brace is treated literally.
|
|
close = glob.find("}", i + 1)
|
|
if close == -1 or "{" in glob[i + 1:close]:
|
|
out.append(re.escape(c))
|
|
i += 1
|
|
else:
|
|
inner = glob[i + 1:close]
|
|
alts = inner.split(",")
|
|
# Each alternative is a sub-glob, recursively translated. Wrap in
|
|
# a non-capturing group with anchored sub-patterns stripped of the
|
|
# surrounding ^...$ that glob_to_regex would otherwise add.
|
|
alt_patterns = [
|
|
glob_to_regex(a).pattern.lstrip("^").rstrip("$") if a else ""
|
|
for a in alts
|
|
]
|
|
out.append("(?:" + "|".join(alt_patterns) + ")")
|
|
i = close + 1
|
|
else:
|
|
out.append(re.escape(c))
|
|
i += 1
|
|
return re.compile("^" + "".join(out) + "$")
|
|
|
|
|
|
def path_matches(path: str, glob: Optional[str]) -> bool:
|
|
"""True when path matches the glob, or no glob is set."""
|
|
if not glob:
|
|
return True
|
|
return bool(glob_to_regex(glob).match(path))
|
|
|
|
|
|
def any_skip_match(path: str, skip_globs: List[str]) -> bool:
|
|
return any(path_matches(path, g) for g in skip_globs)
|
|
|
|
|
|
def apply_rules_to_diff(
|
|
adr_id: str,
|
|
enforcement: Dict,
|
|
diff_files: Dict[str, List[Tuple[int, str]]],
|
|
repo_root: Path,
|
|
skip_files: List[str],
|
|
llm_mode_active: bool = False,
|
|
) -> List[Dict]:
|
|
"""Apply one ADR's Enforcement block to the parsed diff. Returns findings.
|
|
|
|
When ``llm_mode_active`` is True (added in v0.13.0), pure-llm_judge ADRs
|
|
(those with no declarative rules) are NOT emitted as advisories here —
|
|
they are batched into the LLM pass instead. When False, the v0.12.x
|
|
advisory behaviour is preserved so existing hooks that don't pass
|
|
--llm continue working unchanged.
|
|
"""
|
|
findings: List[Dict] = []
|
|
for kind in ("forbid_pattern", "forbid_import"):
|
|
for rule in enforcement.get(kind, []):
|
|
pattern = rule.get("pattern")
|
|
path_glob = rule.get("path_glob")
|
|
message = rule.get("message") or f"{kind}: {pattern}"
|
|
try:
|
|
regex = re.compile(pattern)
|
|
except re.error as e:
|
|
raise JudgeError(
|
|
f"{adr_id}: invalid regex in {kind} rule ({pattern!r}): {e}"
|
|
)
|
|
for path, added in diff_files.items():
|
|
if any_skip_match(path, skip_files):
|
|
continue
|
|
if not path_matches(path, path_glob):
|
|
continue
|
|
for lineno, content in added:
|
|
if regex.search(content):
|
|
findings.append(
|
|
{
|
|
"adr": adr_id,
|
|
"rule": kind,
|
|
"pattern": pattern,
|
|
"path": path,
|
|
"line": lineno,
|
|
"snippet": content.rstrip("\n")[:200],
|
|
"message": message,
|
|
"severity": "violation",
|
|
}
|
|
)
|
|
|
|
for rule in enforcement.get("require_pattern", []):
|
|
pattern = rule.get("pattern")
|
|
path_glob = rule.get("path_glob")
|
|
message = rule.get("message") or f"require_pattern: {pattern}"
|
|
try:
|
|
regex = re.compile(pattern, re.MULTILINE)
|
|
except re.error as e:
|
|
raise JudgeError(
|
|
f"{adr_id}: invalid regex in require_pattern rule ({pattern!r}): {e}"
|
|
)
|
|
for path in diff_files:
|
|
if any_skip_match(path, skip_files):
|
|
continue
|
|
if not path_matches(path, path_glob):
|
|
continue
|
|
file_path = repo_root / path
|
|
if not file_path.is_file():
|
|
continue
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
continue
|
|
if not regex.search(content):
|
|
findings.append(
|
|
{
|
|
"adr": adr_id,
|
|
"rule": "require_pattern",
|
|
"pattern": pattern,
|
|
"path": path,
|
|
"line": None,
|
|
"snippet": None,
|
|
"message": message,
|
|
"severity": "violation",
|
|
}
|
|
)
|
|
|
|
if enforcement.get("llm_judge") and not (
|
|
enforcement.get("forbid_pattern")
|
|
or enforcement.get("forbid_import")
|
|
or enforcement.get("require_pattern")
|
|
):
|
|
if not llm_mode_active:
|
|
# v0.12.x behaviour: hook stays advisory; user runs /adr-kit:judge.
|
|
findings.append(
|
|
{
|
|
"adr": adr_id,
|
|
"rule": "llm_judge",
|
|
"pattern": None,
|
|
"path": None,
|
|
"line": None,
|
|
"snippet": None,
|
|
"message": (
|
|
"ADR has llm_judge:true and no declarative rules; "
|
|
"run /adr-kit:judge in your Claude Code session for full coverage."
|
|
),
|
|
"severity": "advisory",
|
|
}
|
|
)
|
|
# else: handled by run_llm_batch — produces "violation" or nothing per ADR.
|
|
|
|
return findings
|
|
|
|
|
|
# ---------- LLM judge pass (v0.13.0+) ----------
|
|
|
|
DECISION_SECTION_RE = re.compile(
|
|
r"^##\s+Decision\s*$\n+(.*?)(?=^##\s|\Z)",
|
|
re.IGNORECASE | re.MULTILINE | re.DOTALL,
|
|
)
|
|
TITLE_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
|
|
|
|
|
|
def extract_title(body: str) -> str:
|
|
"""Return the ADR's `# ADR-NNN Title` line text, or '' if absent."""
|
|
m = TITLE_RE.search(body)
|
|
return m.group(1).strip() if m else ""
|
|
|
|
|
|
def extract_decision(body: str) -> str:
|
|
"""Return the body of the ## Decision section, or '' if absent.
|
|
|
|
The Decision section is the rule the LLM judge needs to evaluate against
|
|
the diff. Sending it (rather than the full ADR body) keeps the prompt
|
|
compact and the judge focused.
|
|
"""
|
|
m = DECISION_SECTION_RE.search(body)
|
|
return m.group(1).strip() if m else ""
|
|
|
|
|
|
def collect_llm_targets(adrs: List[Tuple[str, Path, str]]) -> List[Dict]:
|
|
"""Return [{adr_id, title, decision}] for Accepted ADRs with llm_judge:true."""
|
|
out: List[Dict] = []
|
|
for adr_id, adr_path, body in adrs:
|
|
status = adr_status(body)
|
|
if status is None or status.lower() != "accepted":
|
|
continue
|
|
enforcement = parse_enforcement(body, adr_path)
|
|
if enforcement is None:
|
|
continue
|
|
if not enforcement.get("llm_judge"):
|
|
continue
|
|
title = extract_title(body)
|
|
decision = extract_decision(body)
|
|
if not decision:
|
|
# Skip ADRs without a Decision section — the LLM can't reason
|
|
# about them. The Completeness gate would already have flagged
|
|
# this; the judge silently skips.
|
|
continue
|
|
out.append({"adr_id": adr_id, "title": title, "decision": decision})
|
|
return out
|
|
|
|
|
|
def build_llm_prompt(targets: List[Dict], diff_text: str) -> str:
|
|
"""Build the single-call batch prompt for `claude -p`.
|
|
|
|
ADR set goes BEFORE the diff so prompt-cache hits across commits when
|
|
the ADR set is stable. Diff is the only varying input per commit.
|
|
"""
|
|
parts = [
|
|
"You are evaluating whether a staged git diff violates documented "
|
|
"Architecture Decision Records (ADRs).",
|
|
"",
|
|
"For each ADR below, decide whether the diff introduces something "
|
|
"that conflicts with the ADR's stated decision. Be conservative: "
|
|
"only flag CLEAR violations. If the diff is unrelated to the ADR's "
|
|
"subject area, the verdict is OK. Do not flag stylistic issues, "
|
|
"minor refactors, or anything ambiguous.",
|
|
"",
|
|
"Return ONLY a single JSON object — no preamble, no commentary, no "
|
|
"code fences. The object maps each ADR id to a verdict:",
|
|
"",
|
|
' {"ADR-NNN": {"verdict": "OK"}}',
|
|
' {"ADR-NNN": {"verdict": "VIOLATION", "reason": "<one sentence '
|
|
'citing the file and what conflicts>"}}',
|
|
"",
|
|
"=== ADRS TO EVALUATE ===",
|
|
"",
|
|
]
|
|
for t in targets:
|
|
parts.append(f"{t['adr_id']} — {t['title']}")
|
|
parts.append("Decision:")
|
|
parts.append(t["decision"])
|
|
parts.append("")
|
|
parts.append("=== STAGED DIFF ===")
|
|
parts.append("")
|
|
parts.append(diff_text if diff_text.strip() else "(empty diff)")
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def parse_llm_response(raw: str) -> Dict[str, Dict]:
|
|
"""Extract the JSON verdict object from Claude's response.
|
|
|
|
Robust to: direct JSON, fenced code block, leading/trailing prose.
|
|
Raises JudgeError when no JSON is recoverable.
|
|
"""
|
|
raw = raw.strip()
|
|
if not raw:
|
|
raise JudgeError("empty LLM response")
|
|
# Try direct
|
|
try:
|
|
data = json.loads(raw)
|
|
if isinstance(data, dict):
|
|
return data
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# Try fenced code block
|
|
m = re.search(r"```(?:json)?\s*\n(.*?)\n```", raw, re.DOTALL)
|
|
if m:
|
|
try:
|
|
data = json.loads(m.group(1))
|
|
if isinstance(data, dict):
|
|
return data
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# Try greedy first {...} block
|
|
m = re.search(r"\{.*\}", raw, re.DOTALL)
|
|
if m:
|
|
try:
|
|
data = json.loads(m.group(0))
|
|
if isinstance(data, dict):
|
|
return data
|
|
except json.JSONDecodeError:
|
|
pass
|
|
raise JudgeError(
|
|
f"could not extract JSON from LLM response (first 200 chars): {raw[:200]!r}"
|
|
)
|
|
|
|
|
|
def run_llm_batch(
|
|
targets: List[Dict],
|
|
diff_text: str,
|
|
llm_cmd: List[str],
|
|
timeout_s: int,
|
|
) -> Optional[List[Dict]]:
|
|
"""Run the LLM judge over all `llm_judge: true` targets in one call.
|
|
|
|
Returns a list of findings (only VIOLATION entries; OK is the silent
|
|
default). Returns None when the LLM CLI is missing, errors, or returns
|
|
unparseable output — caller should fall back to declarative-only without
|
|
blocking the commit.
|
|
"""
|
|
if not targets:
|
|
return []
|
|
binary = llm_cmd[0]
|
|
if shutil.which(binary) is None:
|
|
print(
|
|
f"[adr-judge] WARN: LLM judge requested but {binary!r} not on PATH; "
|
|
f"skipping LLM pass (declarative checks unaffected). "
|
|
f"To enable, install Claude Code or set --llm-cmd.",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
prompt = build_llm_prompt(targets, diff_text)
|
|
try:
|
|
result = subprocess.run(
|
|
llm_cmd,
|
|
input=prompt,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout_s,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
print(
|
|
f"[adr-judge] WARN: LLM judge timed out after {timeout_s}s; "
|
|
f"skipping LLM pass. Increase judge.llm_timeout_seconds in "
|
|
f".adr-kit.json if commits routinely exceed this.",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
if result.returncode != 0:
|
|
print(
|
|
f"[adr-judge] WARN: LLM judge command exited {result.returncode}: "
|
|
f"{result.stderr.strip()[:200]!r}; skipping LLM pass.",
|
|
file=sys.stderr,
|
|
)
|
|
return None
|
|
try:
|
|
verdicts = parse_llm_response(result.stdout)
|
|
except JudgeError as e:
|
|
print(f"[adr-judge] WARN: {e}; skipping LLM pass.", file=sys.stderr)
|
|
return None
|
|
findings: List[Dict] = []
|
|
for t in targets:
|
|
adr_id = t["adr_id"]
|
|
v = verdicts.get(adr_id)
|
|
if not isinstance(v, dict):
|
|
continue
|
|
if str(v.get("verdict", "")).upper() != "VIOLATION":
|
|
continue
|
|
reason = str(v.get("reason") or "LLM judge flagged a violation.")
|
|
findings.append(
|
|
{
|
|
"adr": adr_id,
|
|
"rule": "llm_judge",
|
|
"pattern": None,
|
|
"path": None,
|
|
"line": None,
|
|
"snippet": None,
|
|
"message": reason[:500],
|
|
"severity": "violation",
|
|
}
|
|
)
|
|
return findings
|
|
|
|
|
|
# ---------- config & top-level orchestration ----------
|
|
|
|
|
|
def load_config(path: Optional[Path]) -> Dict:
|
|
"""Read .adr-kit.json (if present). Returns {} when missing."""
|
|
if path is None or not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as e:
|
|
raise JudgeError(f"{path}: invalid JSON ({e.msg} at line {e.lineno})")
|
|
|
|
|
|
def adr_id_from_filename(name: str) -> Optional[str]:
|
|
m = ADR_FILENAME_RE.match(name)
|
|
if not m:
|
|
return None
|
|
return f"ADR-{int(m.group(1)):03d}"
|
|
|
|
|
|
def collect_adrs(adr_dir: Path) -> List[Tuple[str, Path, str]]:
|
|
"""Return [(adr_id, path, body)] for every ADR-*.md file in adr_dir."""
|
|
out: List[Tuple[str, Path, str]] = []
|
|
if not adr_dir.is_dir():
|
|
return out
|
|
for p in sorted(adr_dir.glob("ADR-*.md")):
|
|
adr_id = adr_id_from_filename(p.name)
|
|
if not adr_id:
|
|
continue
|
|
try:
|
|
body = p.read_text(encoding="utf-8")
|
|
except OSError:
|
|
continue
|
|
out.append((adr_id, p, body))
|
|
return out
|
|
|
|
|
|
def read_diff(diff_arg: str) -> str:
|
|
if diff_arg == "-" or diff_arg == "":
|
|
return sys.stdin.read()
|
|
return Path(diff_arg).read_text(encoding="utf-8")
|
|
|
|
|
|
def emit_text(findings: List[Dict], adr_count: int, advisory_only: bool) -> None:
|
|
violations = [f for f in findings if f["severity"] == "violation"]
|
|
advisories = [f for f in findings if f["severity"] == "advisory"]
|
|
print(f"[adr-judge] checked {adr_count} ADR(s) with Enforcement blocks", file=sys.stderr)
|
|
for f in violations:
|
|
loc = f["path"] if f["line"] is None else f"{f['path']}:{f['line']}"
|
|
print(f" VIOLATION {f['adr']} {f['rule']} {loc}", file=sys.stderr)
|
|
print(f" {f['message']}", file=sys.stderr)
|
|
if f["snippet"]:
|
|
print(f" > {f['snippet']}", file=sys.stderr)
|
|
for f in advisories:
|
|
print(f" ADVISORY {f['adr']} {f['rule']}", file=sys.stderr)
|
|
print(f" {f['message']}", file=sys.stderr)
|
|
if violations and advisory_only:
|
|
print(
|
|
f"[adr-judge] {len(violations)} violation(s), {len(advisories)} advisory; "
|
|
f"advisory_only=true → exiting 0",
|
|
file=sys.stderr,
|
|
)
|
|
elif violations:
|
|
print(
|
|
f"[adr-judge] {len(violations)} violation(s), {len(advisories)} advisory",
|
|
file=sys.stderr,
|
|
)
|
|
else:
|
|
print(
|
|
f"[adr-judge] OK — 0 violations, {len(advisories)} advisory",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
|
|
def emit_json(findings: List[Dict], adr_count: int) -> None:
|
|
payload = {
|
|
"summary": {
|
|
"adrs_checked": adr_count,
|
|
"violations": sum(1 for f in findings if f["severity"] == "violation"),
|
|
"advisories": sum(1 for f in findings if f["severity"] == "advisory"),
|
|
},
|
|
"findings": findings,
|
|
}
|
|
json.dump(payload, sys.stdout, indent=2)
|
|
sys.stdout.write("\n")
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(
|
|
prog="adr-judge",
|
|
description="Apply ADR Enforcement blocks to a staged git diff.",
|
|
)
|
|
p.add_argument(
|
|
"--diff",
|
|
default="-",
|
|
help="Path to a unified diff file (use '-' for stdin). Default: stdin.",
|
|
)
|
|
p.add_argument(
|
|
"--adr-dir",
|
|
default="docs/adr",
|
|
help="Directory containing ADR-*.md files. Default: docs/adr.",
|
|
)
|
|
p.add_argument(
|
|
"--config",
|
|
default=None,
|
|
help="Path to .adr-kit.json. Default: <adr-dir>/.adr-kit.json.",
|
|
)
|
|
p.add_argument("--json", action="store_true", help="Emit JSON to stdout.")
|
|
p.add_argument(
|
|
"--repo-root",
|
|
default=None,
|
|
help="Repo root for resolving file paths in require_pattern rules. "
|
|
"Default: current working directory.",
|
|
)
|
|
p.add_argument(
|
|
"--llm",
|
|
action="store_true",
|
|
help="Also run the LLM pass: batch all llm_judge:true ADRs into one "
|
|
"Claude Sonnet call. Requires the `claude` CLI on PATH (or override "
|
|
"via --llm-cmd). Falls back to declarative-only when the CLI is "
|
|
"unavailable. Default off; the pre-commit hook template enables it.",
|
|
)
|
|
p.add_argument(
|
|
"--llm-cmd",
|
|
default=None,
|
|
help="Override the LLM invocation. Default: 'claude -p --model "
|
|
"claude-sonnet-4-6'. Tests inject a fake binary here; users may "
|
|
"switch model via this flag or via .adr-kit.json's judge.llm_model.",
|
|
)
|
|
p.add_argument(
|
|
"--llm-timeout",
|
|
type=int,
|
|
default=None,
|
|
help="Per-call timeout for the LLM pass in seconds. Default 120 "
|
|
"(or judge.llm_timeout_seconds in .adr-kit.json).",
|
|
)
|
|
args = p.parse_args()
|
|
|
|
try:
|
|
adr_dir = Path(args.adr_dir).resolve()
|
|
config_path = Path(args.config) if args.config else (adr_dir / ".adr-kit.json")
|
|
cfg = load_config(config_path)
|
|
judge_cfg = cfg.get("judge") or {}
|
|
skip_files = judge_cfg.get("skip_files") or []
|
|
advisory_only = bool(judge_cfg.get("advisory_only", False))
|
|
max_diff_bytes = int(judge_cfg.get("max_diff_bytes", 1048576))
|
|
|
|
# LLM mode resolution. Precedence: ADR_KIT_NO_LLM env (highest),
|
|
# --llm flag, judge.llm_default in config (lowest).
|
|
env_no_llm = os.environ.get("ADR_KIT_NO_LLM", "0") == "1"
|
|
llm_mode_active = (
|
|
args.llm or bool(judge_cfg.get("llm_default", False))
|
|
) and not env_no_llm
|
|
|
|
# LLM command resolution.
|
|
if args.llm_cmd:
|
|
llm_cmd = shlex.split(args.llm_cmd)
|
|
elif os.environ.get("ADR_KIT_LLM_CMD"):
|
|
llm_cmd = shlex.split(os.environ["ADR_KIT_LLM_CMD"])
|
|
elif judge_cfg.get("llm_cmd"):
|
|
llm_cmd = list(judge_cfg["llm_cmd"]) if isinstance(judge_cfg["llm_cmd"], list) else shlex.split(judge_cfg["llm_cmd"])
|
|
elif judge_cfg.get("llm_model"):
|
|
# User specified just the model — keep the default `claude -p` shape.
|
|
llm_cmd = ["claude", "-p", "--model", str(judge_cfg["llm_model"])]
|
|
else:
|
|
llm_cmd = list(DEFAULT_LLM_CMD)
|
|
|
|
llm_timeout_s = int(
|
|
args.llm_timeout
|
|
if args.llm_timeout is not None
|
|
else judge_cfg.get("llm_timeout_seconds", DEFAULT_LLM_TIMEOUT_S)
|
|
)
|
|
|
|
repo_root = Path(args.repo_root).resolve() if args.repo_root else Path.cwd()
|
|
|
|
diff_text = read_diff(args.diff)
|
|
if max_diff_bytes and len(diff_text.encode("utf-8")) > max_diff_bytes:
|
|
print(
|
|
f"[adr-judge] diff exceeds max_diff_bytes={max_diff_bytes}; skipping",
|
|
file=sys.stderr,
|
|
)
|
|
return 0
|
|
diff_files = parse_diff(diff_text)
|
|
|
|
adrs = collect_adrs(adr_dir)
|
|
all_findings: List[Dict] = []
|
|
adrs_with_enforcement = 0
|
|
for adr_id, adr_path, body in adrs:
|
|
status = adr_status(body)
|
|
if status is None or status.lower() != "accepted":
|
|
continue
|
|
enforcement = parse_enforcement(body, adr_path)
|
|
if enforcement is None:
|
|
continue
|
|
adrs_with_enforcement += 1
|
|
all_findings.extend(
|
|
apply_rules_to_diff(
|
|
adr_id, enforcement, diff_files, repo_root, skip_files,
|
|
llm_mode_active=llm_mode_active,
|
|
)
|
|
)
|
|
|
|
# LLM pass — only when the user opted in via --llm (or judge.llm_default).
|
|
# Failures here log a warning and fall through; they NEVER block the
|
|
# commit, because a missing CLI or transient API hiccup must not break
|
|
# legitimate work.
|
|
llm_findings_emitted = 0
|
|
if llm_mode_active:
|
|
targets = collect_llm_targets(adrs)
|
|
if targets:
|
|
print(
|
|
f"[adr-judge] running LLM pass over {len(targets)} "
|
|
f"llm_judge ADR(s) with {llm_cmd[0]}...",
|
|
file=sys.stderr,
|
|
)
|
|
llm_findings = run_llm_batch(targets, diff_text, llm_cmd, llm_timeout_s)
|
|
if llm_findings is not None:
|
|
all_findings.extend(llm_findings)
|
|
llm_findings_emitted = len(llm_findings)
|
|
|
|
if args.json:
|
|
emit_json(all_findings, adrs_with_enforcement)
|
|
else:
|
|
emit_text(all_findings, adrs_with_enforcement, advisory_only)
|
|
|
|
violations = [f for f in all_findings if f["severity"] == "violation"]
|
|
if violations and not advisory_only:
|
|
return 1
|
|
return 0
|
|
|
|
except JudgeError as e:
|
|
print(f"[adr-judge] ERROR: {e}", file=sys.stderr)
|
|
return 2
|
|
except KeyboardInterrupt:
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|