API Reference
EnforceCore exposes 30 Tier 1 public symbols from the top-level enforcecore package — the stable, frozen contract. An additional 80 Tier 2 symbols remain importable for backwards compatibility but are not covered by the stability guarantee.
API Stability: The 30 Tier 1 symbols are frozen as of v1.0.0b1. No additions or removals until v2.0.0. Tier 2 imports emit deprecation warnings and may be reorganized in v2.0.
Tier 1 Symbols (Stable — 30)
| Category | Symbols |
|---|---|
| Enforcement | enforce, Enforcer, Decision, EnforcementResult |
| Policy | Policy, load_policy |
| Redaction | Redactor, RedactionResult, RedactionStrategy, SecretScanner |
| Audit | Auditor, AuditEntry, VerificationResult, verify_trail, load_trail |
| Resource Management | ResourceGuard, CostTracker, KillSwitch, RateLimiter |
| Exceptions | EnforceCoreError, EnforcementViolation, ToolDeniedError, ContentViolationError, PolicyError, PolicyLoadError, CostLimitError, ResourceLimitError |
| Configuration | Settings, settings, __version__ |
Tip: If you only import Tier 1 symbols, your code is guaranteed forward-compatible through the entire v1.x series.
Primary API
@enforce() Decorator
The main entry point. Wraps any callable with policy enforcement.
from enforcecore import enforce
# From YAML file
@enforce(policy="policies/agent.yaml")
async def search_web(query: str) -> str:
return await api.search(query)
# Sync functions also work
@enforce(policy="policies/agent.yaml")
def read_file(path: str) -> str:
return open(path).read()
# Inline policy (quick prototyping)
@enforce(
allowed_tools=["search_web", "calculator"],
pii_redaction=True,
max_cost_usd=5.0,
)
async def my_tool(args: dict) -> str: ...
# Explicit tool name
@enforce(policy="policy.yaml", tool_name="web_search")
async def _internal_search(query: str) -> str: ...
Behavior:
- Before call: evaluate pre-call rules, redact inputs
- During call: enforce resource limits
- After call: evaluate post-call rules, redact outputs, record audit entry
- On violation: raise
EnforcementViolation(call never executes)
Enforcer Class
from enforcecore import Enforcer, Policy
enforcer = Enforcer(Policy.from_file("policy.yaml"))
# Sync
result = enforcer.enforce_sync(search_fn, "query", tool_name="search_web")
# Async
result = await enforcer.enforce_async(search_fn, "query", tool_name="search_web")
# From file shortcut
enforcer = Enforcer.from_file("policy.yaml")
# Properties
print(enforcer.policy_name) # "my-policy"
print(enforcer.policy) # Policy instance
Note:
guard_sync()andguard_async()were removed in v1.0.16a1. Useenforce_sync()/enforce_async()instead — they run the full enforcement pipeline (policy + PII redaction + audit + resource guard).
Policy API
Loading Policies
from enforcecore import Policy, load_policy
# From YAML
policy = load_policy("policies/strict.yaml")
# Programmatic
policy = Policy(
name="my-policy",
rules={
"allowed_tools": ["search", "calculator"],
"pii_redaction": {"enabled": True, "categories": ["email", "phone"]},
"resource_limits": {"max_call_duration_seconds": 30},
},
on_violation="block",
)
# Validate without loading
errors = Policy.validate_file("policies/strict.yaml")
Policy Model
class Policy(BaseModel):
name: str
version: str = "1.0"
rules: PolicyRules
on_violation: Literal["block", "log", "redact"] = "block"
class PolicyRules(BaseModel):
allowed_tools: list[str] | None = None # None = all allowed
denied_tools: list[str] = []
pii_redaction: PIIRedactionConfig = PIIRedactionConfig()
resource_limits: ResourceLimits = ResourceLimits()
network: NetworkPolicy = NetworkPolicy()
max_output_size_bytes: int | None = None
redact_output: bool = True
Redactor API
Regex-based PII detection — no heavy NLP dependencies. ~0.1–0.5ms per call.
PII Categories
| Category | Example | Placeholder |
|---|---|---|
email |
john@example.com |
<EMAIL> |
phone |
(555) 123-4567 |
<PHONE> |
ssn |
123-45-6789 |
<SSN> |
credit_card |
4111-1111-1111-1111 |
<CREDIT_CARD> |
ip_address |
192.168.1.100 |
<IP_ADDRESS> |
passport |
AB1234567 |
<PASSPORT> |
Redaction Strategies
| Strategy | Result for john@example.com |
|---|---|
placeholder |
<EMAIL> |
mask |
****@****.*** |
hash |
[SHA256:6b0b4806b1e57501] |
remove |
(empty string) |
Usage
from enforcecore.redactor import Redactor, RedactionResult
redactor = Redactor(categories=["email", "phone"])
# Detection only
entities = redactor.detect("Email john@example.com")
for e in entities:
print(f" {e.category}: {e.text!r} at [{e.start}:{e.end}]")
# Full redaction
result = redactor.redact("Call 555-123-4567 or john@example.com")
print(result.text) # "Call <PHONE> or <EMAIL>"
print(result.count) # 2
print(result.was_redacted) # True
Auditor API
Merkle-chained, tamper-proof audit trails.
AuditEntry Fields
| Field | Type | Description |
|---|---|---|
entry_id |
str |
UUID v4 |
timestamp |
str |
ISO 8601 UTC |
tool_name |
str |
Tool called |
policy_name |
str |
Policy applied |
decision |
str |
"allowed" or "blocked" |
violation_type |
str | None |
Error class (if blocked) |
overhead_ms |
float |
Enforcement overhead |
input_redactions |
int |
PII redacted from inputs |
output_redactions |
int |
PII redacted from outputs |
previous_hash |
str |
SHA-256 of preceding entry |
entry_hash |
str |
SHA-256 of this entry |
Usage
from enforcecore import Auditor, verify_trail, load_trail
# Create auditor
auditor = Auditor(output_path="audit.jsonl")
# Record entries (Merkle-chained automatically)
e1 = auditor.record(tool_name="search_web", policy_name="default", decision="allowed")
e2 = auditor.record(tool_name="delete_file", policy_name="default", decision="blocked",
violation_type="ToolDeniedError")
print(e2.previous_hash == e1.entry_hash) # True — chain linked
# Verify trail
result = verify_trail("audit.jsonl")
print(result.is_valid) # True
print(result.chain_intact) # True
# Load entries
trail = load_trail("audit.jsonl")
recent = load_trail("audit.jsonl", max_entries=100) # Last 100 only
Cross-Session Continuity
The auditor resumes the Merkle chain from an existing trail file:
# Session 1
a1 = Auditor(output_path="trail.jsonl")
a1.record(tool_name="tool_a", policy_name="p")
# Session 2 — chain resumes automatically
a2 = Auditor(output_path="trail.jsonl")
a2.record(tool_name="tool_b", policy_name="p")
assert verify_trail("trail.jsonl").is_valid # True — chain intact
Pluggable Audit Backends
Swap the default JSONL backend for custom logging:
from enforcecore import (
AuditBackend, JsonlBackend, NullBackend,
CallbackBackend, MultiBackend,
)
# Send audit events to your observability stack
callback = CallbackBackend(on_event=lambda entry: send_to_datadog(entry))
# Write locally AND send remotely
multi = MultiBackend([JsonlBackend("audit.jsonl"), callback])
# Disable auditing in tests
null = NullBackend()
Audit Rotation
Manage audit file growth with automatic rotation:
from enforcecore import AuditRotator
rotator = AuditRotator(
max_size_bytes=50 * 1024 * 1024, # 50 MB per file
retention_days=90, # Keep 90 days
compress=True, # gzip old files
)
Witness API
Hash-only remote witness backends for tamper detection beyond Merkle chain verification. Even if an attacker rebuilds the entire chain from scratch, witness records stored elsewhere will detect the forgery.
Built-in Witnesses
from enforcecore.auditor.witness import (
CallbackWitness, FileWitness, LogWitness,
)
# Callback — send hashes to a queue, HTTP endpoint, or database
witness = CallbackWitness(callback=lambda entry_hash, meta: send_to_siem(entry_hash))
# ⚠️ Warning: slow callbacks block audit writes. Use a queue-based
# pattern for HTTP/database backends.
# File — write hashes to a separate JSONL file
witness = FileWitness(path="/var/log/enforcecore/witness.jsonl")
# Log — emit hashes via Python logging (syslog/journald)
witness = LogWitness()
Using Witnesses with the Auditor
from enforcecore import Auditor
from enforcecore.auditor.witness import FileWitness
auditor = Auditor(
output_path="audit.jsonl",
witness=FileWitness("/secure/witness.jsonl"),
)
Witness Verification
from enforcecore.auditor.witness import verify_with_witness
# Cross-check audit trail hashes against witness records
result = verify_with_witness(
trail_path="audit.jsonl",
witness_path="/secure/witness.jsonl",
)
print(result.is_valid) # True if all hashes match
print(result.mismatches) # List of mismatched entries (chain-rebuild detection)
Settings-Driven Witness (v1.0.0b5)
Enable witness via environment variable — no code changes needed:
ENFORCECORE_AUDIT_WITNESS_FILE=/var/log/enforcecore/witness.jsonl
All @enforce() calls will automatically publish hashes to the witness file.
Immutable Audit API
OS-enforced append-only protection for audit files. Prevents truncation or chain rebuild even by the file owner.
Enabling Immutable Mode
from enforcecore import Auditor
# OS sets append-only attribute on the audit file
# Linux: chattr +a | macOS: chflags uappend
auditor = Auditor(
output_path="audit.jsonl",
immutable=True,
)
Platform Support
from enforcecore.auditor.immutable import platform_support_info
info = platform_support_info()
print(info)
# {'supported': True, 'platform': 'linux', 'mechanism': 'chattr +a',
# 'requires_root': False, 'in_container': False,
# 'has_capability': True} # CAP_LINUX_IMMUTABLE
| Platform | Mechanism | Requires Root | Container Notes |
|---|---|---|---|
| Linux | chattr +a |
No (with CAP_LINUX_IMMUTABLE) |
Docker: --cap-add LINUX_IMMUTABLE |
| macOS | chflags uappend |
No (user-level flag) | N/A |
| Windows | Not supported | — | Falls back safely |
Settings-Driven Immutable (v1.0.0b5)
Enable via environment variable — no code changes needed:
ENFORCECORE_AUDIT_IMMUTABLE=true
Combining Witness + Immutable
For maximum tamper-evidence, use both:
from enforcecore import Auditor
from enforcecore.auditor.witness import FileWitness
auditor = Auditor(
output_path="audit.jsonl",
immutable=True,
witness=FileWitness("/secure/witness.jsonl"),
)
Or via environment variables:
ENFORCECORE_AUDIT_IMMUTABLE=true
ENFORCECORE_AUDIT_WITNESS_FILE=/secure/witness.jsonl
Guard API
Cross-platform resource limits and hard termination.
from enforcecore.guard import ResourceGuard, CostTracker, KillSwitch
# Cost tracking
tracker = CostTracker(budget_usd=10.0)
tracker.record(0.05)
tracker.check_budget() # Passes — under budget
# Kill switch
ks = KillSwitch()
ks.trip("memory exceeded 256MB")
ks.check("tool", "policy") # Raises ResourceLimitError
# Resource guard (shared thread pool, leak detection)
guard = ResourceGuard(cost_tracker=tracker, kill_switch=ks)
result = await guard.execute_async(
my_func, (arg,), {},
max_duration_seconds=30.0,
)
# Check for leaked threads
print(guard.leaked_thread_count) # 0 = healthy
Secret Scanner API
Detect leaked credentials in tool inputs and outputs:
from enforcecore import SecretScanner, DetectedSecret, PatternRegistry
scanner = SecretScanner()
# Scan text for secrets
secrets = scanner.scan("aws_key = AKIAIOSFODNN7EXAMPLE")
for s in secrets:
print(f" {s.category}: {s.text[:10]}...")
# Built-in categories: aws_key, github_token, generic_api_key,
# bearer_jwt, private_key, password_url, gcp_service_account,
# azure_connection_string, database_uri, ssh_private_key, slack_token
Custom Patterns
Register domain-specific secret patterns:
from enforcecore import CustomPattern, PatternRegistry
registry = PatternRegistry()
registry.register(CustomPattern(
name="internal_api_key",
pattern=r"myco-[a-f0-9]{32}",
description="Internal API key format",
))
Content Rules API
Block dangerous content patterns in tool arguments and outputs:
from enforcecore import RuleEngine, ContentRule, get_builtin_rules
# Use built-in rules
engine = RuleEngine(rules=get_builtin_rules())
# Check content
violations = engine.check("os.system('rm -rf /')")
for v in violations:
print(f" {v.rule_name}: {v.description}")
# 4 built-in categories:
# - shell_injection: os.system, subprocess, exec
# - path_traversal: ../, /etc/passwd, ~root
# - sql_injection: SELECT, DROP, UNION
# - code_execution: eval(), exec(), __import__
Rate Limiter API
Prevent runaway agents from making too many calls:
from enforcecore import RateLimiter, RateLimit
limiter = RateLimiter(
per_tool=RateLimit(max_calls=10, window_seconds=60),
global_limit=RateLimit(max_calls=50, window_seconds=60),
)
# Check before calling (raises RateLimitError if exceeded)
limiter.check("search_web")
Domain Checker API
Enforce network access policies:
from enforcecore import DomainChecker
checker = DomainChecker(
allowed=["api.example.com", "*.trusted.io"],
denied=["*.evil.com", "*.malware.net"],
)
checker.check("api.example.com") # OK
checker.check("hack.evil.com") # Raises DomainDeniedError
Hook System API
Build custom observability and enforcement extensions:
from enforcecore import (
on_pre_call, on_post_call, on_violation, on_redaction,
HookRegistry, HookContext, ViolationHookContext, RedactionHookContext,
)
@on_pre_call
def log_start(ctx: HookContext):
print(f"→ Calling {ctx.tool_name}")
@on_post_call
def log_end(ctx: HookContext):
print(f"← {ctx.tool_name} completed in {ctx.overhead_ms:.1f}ms")
@on_violation
def alert(ctx: ViolationHookContext):
send_alert(f"⚠️ {ctx.tool_name} blocked: {ctx.violation_type}")
@on_redaction
def track_pii(ctx: RedactionHookContext):
metrics.increment("pii_redacted", ctx.redaction_count)
# Programmatic hook management
registry = HookRegistry()
registry.register("post_call", my_hook_fn)
Observability API
OpenTelemetry integration for production monitoring:
from enforcecore import EnforceCoreMetrics, EnforceCoreInstrumentor
# Counters (Prometheus-compatible)
metrics = EnforceCoreMetrics()
# Tracks: calls_total, violations_total, redactions_total, cost_usd
# Spans (distributed tracing)
instrumentor = EnforceCoreInstrumentor()
instrumentor.instrument() # Auto-instruments all enforce() calls
Event Webhooks
Push enforcement events to external services:
from enforcecore import WebhookDispatcher, WebhookEvent
dispatcher = WebhookDispatcher(
url="https://hooks.example.com/enforcecore",
events=[WebhookEvent.VIOLATION, WebhookEvent.COST_LIMIT],
retry_count=3,
)
Hardening API
Input validation, scope tracking, and unicode protection.
Input Validation
from enforcecore import validate_tool_name, check_input_size, deep_redact
# Validate tool names (rejects empty, overlength, invalid chars)
name = validate_tool_name("my_tool") # Returns stripped name
# Check input payload size (default: 10 MB)
check_input_size(args, kwargs) # Raises InputTooLargeError if exceeded
# Recursive PII redaction on nested data
redacted = deep_redact(nested_data, redactor.redact_string, max_depth=10)
Enforcement Scope Tracking
from enforcecore import (
enter_enforcement, exit_enforcement,
get_enforcement_depth, get_enforcement_chain,
)
enter_enforcement("tool_a")
print(get_enforcement_depth()) # 1
print(get_enforcement_chain()) # ["tool_a"]
# Max depth: 10 (raises EnforcementDepthError if exceeded)
Unicode Hardening
Protects against PII evasion via zero-width characters, homoglyphs, and URL/HTML encoding:
from enforcecore import normalize_unicode, normalize_homoglyphs, prepare_for_detection
# Full pipeline (called automatically by Redactor.detect())
clean = prepare_for_detection(text)
# 1. NFC normalization + strip zero-width chars
# 2. Replace ~40 confusable chars (Cyrillic, Greek, fullwidth)
# 3. Decode URL percent-encoding and HTML entities
Exception Hierarchy
EnforceCoreError # Base exception
├── PolicyError
│ ├── PolicyLoadError # File not found / unparseable
│ ├── PolicyValidationError # Schema violation
│ └── PolicyEvaluationError # Error during rule evaluation
├── EnforcementViolation
│ ├── ToolDeniedError # Tool not allowed
│ ├── DomainDeniedError # Network domain blocked
│ ├── CostLimitError # Budget exceeded
│ ├── RateLimitError # Rate limit exceeded
│ ├── ContentViolationError # Content rule violated
│ └── ResourceLimitError # Resource limit breached
├── RedactionError # PII error (fails closed)
├── AuditError # Audit error (fails closed)
├── HardeningError
│ ├── InvalidToolNameError # Bad tool name
│ ├── InputTooLargeError # Payload too large
│ └── EnforcementDepthError # Recursive depth exceeded
└── GuardError # Resource guard failure
CLI
# Show version and platform info
enforcecore info
# Validate a policy file
enforcecore validate policies/strict.yaml
# Verify an audit trail
enforcecore verify audit.jsonl
# Run evaluation suite
enforcecore eval --scenarios all --output results/
# Dry-run a tool call against a policy (no execution)
enforcecore dry-run --policy policy.yaml --tool search_web
# Inspect a policy's rules and settings
enforcecore inspect policies/strict.yaml
Environment Variables
All settings can be configured via environment variables (via pydantic-settings). This is the canonical reference — all 19 variables:
| Variable | Default | Description |
|---|---|---|
ENFORCECORE_DEFAULT_POLICY |
None |
Default policy file path |
ENFORCECORE_AUDIT_PATH |
./audit_logs/ |
Audit trail directory |
ENFORCECORE_AUDIT_ENABLED |
true |
Enable/disable auditing |
ENFORCECORE_AUDIT_BACKEND |
jsonl |
Audit backend (jsonl, null, callback, multi) |
ENFORCECORE_AUDIT_MAX_BYTES |
10485760 |
Max audit file size before rotation (10 MB) |
ENFORCECORE_AUDIT_MAX_FILES |
5 |
Max rotated audit files to keep |
ENFORCECORE_AUDIT_IMMUTABLE |
false |
Enable OS-enforced append-only audit files |
ENFORCECORE_AUDIT_WITNESS_FILE |
None |
Path for hash-only witness JSONL file |
ENFORCECORE_REDACTION_ENABLED |
true |
Enable/disable PII redaction |
ENFORCECORE_SECRET_SCANNING |
true |
Enable secret detection |
ENFORCECORE_CONTENT_RULES |
true |
Enable content rule checking |
ENFORCECORE_LOG_LEVEL |
INFO |
Structured log level |
ENFORCECORE_COST_BUDGET_USD |
100.0 |
Global cost budget |
ENFORCECORE_RATE_LIMIT_RPM |
0 |
Global rate limit (0 = disabled) |
ENFORCECORE_FAIL_OPEN |
false |
Allow bypass on errors (⚠️ dev-only) |
ENFORCECORE_DEV_MODE |
false |
Development mode flag |
ENFORCECORE_OTEL_ENABLED |
false |
Enable OpenTelemetry metrics/tracing |
ENFORCECORE_WEBHOOK_URL |
None |
Webhook endpoint for enforcement events |