Files
brain/tools/mcp-tool-classifier.mjs
T
2026-06-15 17:09:14 +03:00

271 lines
13 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env node
/**
* MCP tool classifier (router-gate v4 Stream C, spec §5.3 + v4.1 G1/G12).
*
* Classifies an MCP / built-in tool call against a path-deny / URL-whitelist /
* SQL-statement overlay. Pure — path normalization & protected-path check are
* injected (Stream A); LLM-judge for WebSearch query is flagged for the consumer
* (Stream D). Unknown tools -> default 'block' (fail-CLOSE).
*/
import {
DEFAULT_PROJECT_URL_WHITELIST,
buildNavigateWhitelistPatterns,
buildWebFetchWhitelistPatterns,
WEBFETCH_SCHEME_BLOCK_PATTERNS,
} from './url-whitelist-rules.mjs';
// §5.3 + v4.1 G1/G12 classification map. Glob keys use `*`. `default` is the
// fallback category for unmatched tools.
export const DEFAULT_MCP_CLASSIFICATION = Object.freeze({
'mcp__redis__get': { category: 'read_only' },
'mcp__redis__list': { category: 'read_only' },
'mcp__redis__set': { category: 'hard_blacklist' },
'mcp__redis__delete': { category: 'hard_blacklist' },
'mcp__github__get_me': { category: 'read_only' },
'mcp__github__list_*': { category: 'read_only' },
'mcp__github__search_*': { category: 'read_only' },
'mcp__github__pull_request_read': { category: 'read_only' },
'mcp__github__issue_read': { category: 'read_only' },
'mcp__laravel-boost__database-query': {
category: 'conditional',
args_key_to_scan: 'query',
// v4.1 G12 — full-statement scan (mutating verb anywhere, not just prefix).
query_full_statement_scan: {
read_only_only_patterns: [
'^\\s*(?:SELECT|EXPLAIN|SHOW|DESCRIBE|DESC|WITH\\s+\\w+\\s+AS\\s*\\(\\s*SELECT)\\b',
],
blocked_anywhere_patterns: [
'\\b(?:UPDATE|INSERT|DELETE|DROP|TRUNCATE|ALTER|CREATE|GRANT|REVOKE|COMMIT|ROLLBACK|MERGE|REPLACE|LOAD)\\b',
';\\s*(?:UPDATE|INSERT|DELETE|DROP|TRUNCATE|ALTER|CREATE|GRANT|REVOKE)\\b',
],
comment_strip: true,
},
},
'mcp__laravel-boost__*': { category: 'read_only', exception: 'database-query handled above' },
'mcp__github__create_*': { category: 'hard_blacklist' },
'mcp__github__update_*': { category: 'hard_blacklist' },
'mcp__github__merge_*': { category: 'hard_blacklist' },
'mcp__github__delete_*': { category: 'hard_blacklist' },
'mcp__github__push_files': { category: 'hard_blacklist' },
'mcp__github__create_or_update_file': { category: 'hard_blacklist', path_args: ['path'] },
'mcp__github__add_*comment*': { category: 'hard_blacklist' },
'mcp__github__add_reply*': { category: 'hard_blacklist' },
'mcp__github__star_repository': { category: 'hard_blacklist' },
'mcp__github__unstar_repository': { category: 'hard_blacklist' },
'mcp__github__manage_*subscription': { category: 'hard_blacklist' },
'mcp__github__mark_*read': { category: 'hard_blacklist' },
'mcp__github__dismiss_*': { category: 'hard_blacklist' },
'mcp__github__discussion_comment_write': { category: 'hard_blacklist' },
'mcp__github__sub_issue_write': { category: 'hard_blacklist' },
'mcp__github__actions_run_trigger': { category: 'hard_blacklist' },
'mcp__playwright__browser_snapshot': { category: 'read_only' },
'mcp__playwright__browser_take_screenshot': { category: 'read_only' },
'mcp__playwright__browser_network_requests': { category: 'read_only' },
'mcp__playwright__browser_console_messages': { category: 'read_only' },
'mcp__playwright__browser_navigate': {
category: 'conditional',
args_key_to_scan: 'url',
url_whitelist_kind: 'navigate',
// Host token MUST be followed by a port/path/query/fragment delimiter or end —
// otherwise a subdomain-suffix spoof (liderra.ru.evil.com / localhost.evil.com)
// slips past. Whitelist built from base hosts project_url_whitelist; the domain
// block-list is dropped (redundant with default-block on non-whitelist, fail-CLOSE).
url_whitelist_patterns: buildNavigateWhitelistPatterns(DEFAULT_PROJECT_URL_WHITELIST),
},
'mcp__playwright__browser_click': { category: 'hard_blacklist' },
'mcp__playwright__browser_fill_form': { category: 'hard_blacklist' },
'mcp__playwright__browser_type': { category: 'hard_blacklist' },
'mcp__playwright__browser_press_key': { category: 'hard_blacklist' },
'mcp__playwright__browser_drag': { category: 'hard_blacklist' },
'mcp__playwright__browser_drop': { category: 'hard_blacklist' },
'mcp__playwright__browser_evaluate': { category: 'hard_blacklist' },
'mcp__playwright__browser_file_upload': { category: 'hard_blacklist' },
'mcp__playwright__browser_handle_dialog': { category: 'hard_blacklist' },
'mcp__playwright__browser_hover': { category: 'hard_blacklist' },
'mcp__playwright__browser_resize': { category: 'hard_blacklist' },
'mcp__playwright__browser_run_code_unsafe': { category: 'hard_blacklist' },
'mcp__playwright__browser_select_option': { category: 'hard_blacklist' },
'mcp__plugin_brand-voice_*__authenticate': { category: 'hard_blacklist' },
'mcp__plugin_brand-voice_*__complete_authentication': { category: 'hard_blacklist' },
'mcp__plugin_*_*__authenticate': { category: 'hard_blacklist' },
'mcp__plugin_*_*__complete_authentication': { category: 'hard_blacklist' },
'mcp__openapi__deals-store': { category: 'hard_blacklist' },
'mcp__openapi__deals-update': { category: 'hard_blacklist' },
'mcp__openapi__deals-bulk-*': { category: 'hard_blacklist' },
'mcp__openapi__deals-export': { category: 'hard_blacklist' },
'mcp__plugin_context7_context7__*': { category: 'read_only' },
'mcp__universal-icons__*': { category: 'read_only' },
// Off-phase research-tooling (Perplexity Pack #87/#88/#89): read_only posture per
// ADR-019 (owner decision 2026-06-14). Web research reads external sources and does
// not mutate project state; egress arg scan (enforce-mcp-classification) still runs.
'mcp__perplexity__*': { category: 'read_only' },
'mcp__exa__*': { category: 'read_only' },
'mcp__firecrawl__*': { category: 'read_only' },
// v4.1 G1 — WebSearch / WebFetch.
'WebSearch': {
category: 'conditional',
args_key_to_scan: 'query',
llm_judge_required: true,
rationale: 'search query observable in engine logs; potential exfil channel',
},
'WebFetch': {
category: 'conditional',
args_key_to_scan: 'url',
url_whitelist_kind: 'webfetch',
// Whitelist built from base (anthropic / github-anthropics+deck / npmjs / stackoverflow)
// project_url_whitelist. Scheme blocks (data:/javascript:) kept; the domain
// negative-lookahead block is dropped (redundant with default-block, fail-CLOSE).
url_whitelist_patterns: buildWebFetchWhitelistPatterns(DEFAULT_PROJECT_URL_WHITELIST),
url_blocked_patterns: WEBFETCH_SCHEME_BLOCK_PATTERNS,
fetched_content_scan: true,
},
'default': 'block',
});
/**
* Convert a glob key (`*` wildcards) to an anchored regex. Escapes regex specials,
* expands `*` to `.*`. No backtracking risk (single-pass, no nested quantifiers).
*/
function globKeyToRegex(key) {
const escaped = key.replace(/[.+^${}()|[\]\\]/g, '\\$&').replace(/\*/g, '.*');
return new RegExp('^' + escaped + '$');
}
/**
* Resolve the classification entry for a tool name. Exact key wins; otherwise the
* most specific glob key (longest literal length = fewest wildcards / longest
* static prefix) wins. The literal "default" key is never matched as a tool.
* @returns {object|null} the entry, or null if nothing matches.
*/
export function matchClassificationKey(toolName, classification = DEFAULT_MCP_CLASSIFICATION) {
if (typeof toolName !== 'string' || !classification) return null;
if (toolName === 'default') return null;
// 1. Exact match (excluding 'default').
if (Object.prototype.hasOwnProperty.call(classification, toolName)) {
const entry = classification[toolName];
if (entry && typeof entry === 'object') return entry;
}
// 2. Glob match — collect all, pick most specific (longest literal length).
let best = null;
let bestScore = -1;
for (const key of Object.keys(classification)) {
if (key === 'default' || key === toolName) continue;
if (!key.includes('*')) continue;
if (!globKeyToRegex(key).test(toolName)) continue;
const score = key.replace(/\*/g, '').length; // literal char count = specificity
if (score > bestScore) {
bestScore = score;
best = classification[key];
}
}
return best && typeof best === 'object' ? best : null;
}
function defaultNormalize(target) {
if (typeof target !== 'string') return '';
return target.replace(/\\/g, '/').toLowerCase();
}
function stripSqlComments(sql) {
// Remove /* ... */ and -- ... line comments (lazy bounded — no backtracking).
return String(sql)
.replace(/\/\*[\s\S]*?\*\//g, ' ')
.replace(/--[^\n]*/g, ' ');
}
function testAny(patterns, text) {
return (patterns || []).some((p) => new RegExp(p, 'i').test(text));
}
/**
* Classify an MCP / built-in tool call into an actionable decision.
*
* @param {string} toolName
* @param {object} toolInput
* @param {{classification?: object, normalize?: Function, isProtectedPath?: Function}} [deps]
* @returns {{decision: 'allow'|'block'|'ask', category?: string, reason?: string,
* needsLlmJudge?: boolean, needsContentScan?: boolean, scanArg?: string}}
*/
export function classifyMcpTool(toolName, toolInput = {}, deps = {}) {
const classification = deps.classification || DEFAULT_MCP_CLASSIFICATION;
const normalize = typeof deps.normalize === 'function' ? deps.normalize : defaultNormalize;
const isProtectedPath = typeof deps.isProtectedPath === 'function' ? deps.isProtectedPath : () => false;
let entry = matchClassificationKey(toolName, classification);
if (!entry) {
return { decision: 'block', category: 'default', reason: `MCP tool ${toolName} not in gate-config classification. Add to mcp_tool_classification.` };
}
// Config-injected project_url_whitelist: rebuild navigate/WebFetch whitelist from
// deps.urlWhitelist (fail-CLOSED when empty). Spread → frozen default untouched.
if (entry.url_whitelist_kind && deps.urlWhitelist !== undefined) {
const proj = deps.urlWhitelist;
if (entry.url_whitelist_kind === 'navigate') {
entry = { ...entry, url_whitelist_patterns: buildNavigateWhitelistPatterns(proj) };
} else if (entry.url_whitelist_kind === 'webfetch') {
entry = { ...entry, url_whitelist_patterns: buildWebFetchWhitelistPatterns(proj) };
}
}
const category = entry.category;
if (category === 'read_only') return { decision: 'allow', category };
if (category === 'hard_blacklist') {
return { decision: 'block', category, reason: `MCP tool ${toolName} classified hard-blacklist.` };
}
if (category === 'conditional') {
// 1. path_args — normalize + protected check.
if (Array.isArray(entry.path_args)) {
for (const key of entry.path_args) {
const raw = toolInput && toolInput[key];
if (typeof raw === 'string' && isProtectedPath(normalize(raw))) {
return { decision: 'block', category, reason: `MCP tool ${toolName} targets protected path "${raw}".` };
}
}
}
const scanKey = entry.args_key_to_scan;
const argVal = scanKey && toolInput ? toolInput[scanKey] : undefined;
// 2. SQL full-statement scan (G12).
if (entry.query_full_statement_scan && typeof argVal === 'string') {
const cfg = entry.query_full_statement_scan;
const sql = cfg.comment_strip ? stripSqlComments(argVal) : argVal;
if (testAny(cfg.blocked_anywhere_patterns, sql)) {
return { decision: 'block', category, reason: `database-query contains a mutating verb (full-statement scan).` };
}
if (testAny(cfg.read_only_only_patterns, sql)) {
return { decision: 'allow', category };
}
return { decision: 'ask', category, reason: `database-query did not match read-only nor blocked patterns — needs approval.`, scanArg: argVal };
}
// 2b. SQL prefix scan (legacy v4.0 style).
if (entry.query_prefix_scan && typeof argVal === 'string') {
const cfg = entry.query_prefix_scan;
if (testAny(cfg.blocked_patterns, argVal)) return { decision: 'block', category };
if (testAny(cfg.read_only_patterns, argVal)) return { decision: 'allow', category };
return { decision: 'ask', category, scanArg: argVal };
}
// 3. URL whitelist / blocklist (WebFetch / browser_navigate).
if (typeof argVal === 'string' && (entry.url_whitelist_patterns || entry.url_blocked_patterns)) {
if (testAny(entry.url_blocked_patterns, argVal)) {
return { decision: 'block', category, reason: `MCP tool ${toolName} URL "${argVal}" is blocked.` };
}
if (testAny(entry.url_whitelist_patterns, argVal)) {
return { decision: 'allow', category, needsContentScan: !!entry.fetched_content_scan };
}
return { decision: 'block', category, reason: `MCP tool ${toolName} URL "${argVal}" not in whitelist.` };
}
// 4. LLM-judge required (WebSearch) — flag for the consumer (Stream D).
if (entry.llm_judge_required) {
return { decision: 'ask', category, needsLlmJudge: true, scanArg: typeof argVal === 'string' ? argVal : undefined };
}
// Conditional with no resolvable signal -> ask.
return { decision: 'ask', category, reason: `MCP tool ${toolName} conditional — needs approval.` };
}
// Unknown category string -> fail-CLOSE.
return { decision: 'block', category: category || 'unknown', reason: `MCP tool ${toolName} unknown category.` };
}