Skip to content

API Reference

EnforceCore exposes 30 Tier 1 public symbols from the top-level enforcecore package — the stable, frozen contract. An additional 80 Tier 2 symbols remain importable for backwards compatibility but are not covered by the stability guarantee.

API Stability: The 30 Tier 1 symbols are frozen as of v1.0.0b1. No additions or removals until v2.0.0. Tier 2 imports emit deprecation warnings and may be reorganized in v2.0.

Tier 1 Symbols (Stable — 30)

Category Symbols
Enforcement enforce, Enforcer, Decision, EnforcementResult
Policy Policy, load_policy
Redaction Redactor, RedactionResult, RedactionStrategy, SecretScanner
Audit Auditor, AuditEntry, VerificationResult, verify_trail, load_trail
Resource Management ResourceGuard, CostTracker, KillSwitch, RateLimiter
Exceptions EnforceCoreError, EnforcementViolation, ToolDeniedError, ContentViolationError, PolicyError, PolicyLoadError, CostLimitError, ResourceLimitError
Configuration Settings, settings, __version__

Tip: If you only import Tier 1 symbols, your code is guaranteed forward-compatible through the entire v1.x series.


Primary API

@enforce() Decorator

The main entry point. Wraps any callable with policy enforcement.

from enforcecore import enforce

# From YAML file
@enforce(policy="policies/agent.yaml")
async def search_web(query: str) -> str:
    return await api.search(query)

# Sync functions also work
@enforce(policy="policies/agent.yaml")
def read_file(path: str) -> str:
    return open(path).read()

# Inline policy (quick prototyping)
@enforce(
    allowed_tools=["search_web", "calculator"],
    pii_redaction=True,
    max_cost_usd=5.0,
)
async def my_tool(args: dict) -> str: ...

# Explicit tool name
@enforce(policy="policy.yaml", tool_name="web_search")
async def _internal_search(query: str) -> str: ...

Behavior:

  • Before call: evaluate pre-call rules, redact inputs
  • During call: enforce resource limits
  • After call: evaluate post-call rules, redact outputs, record audit entry
  • On violation: raise EnforcementViolation (call never executes)

Enforcer Class

from enforcecore import Enforcer, Policy

enforcer = Enforcer(Policy.from_file("policy.yaml"))

# Sync
result = enforcer.enforce_sync(search_fn, "query", tool_name="search_web")

# Async
result = await enforcer.enforce_async(search_fn, "query", tool_name="search_web")

# From file shortcut
enforcer = Enforcer.from_file("policy.yaml")

# Properties
print(enforcer.policy_name)  # "my-policy"
print(enforcer.policy)       # Policy instance

Note: guard_sync() and guard_async() were removed in v1.0.16a1. Use enforce_sync() / enforce_async() instead — they run the full enforcement pipeline (policy + PII redaction + audit + resource guard).


Policy API

Loading Policies

from enforcecore import Policy, load_policy

# From YAML
policy = load_policy("policies/strict.yaml")

# Programmatic
policy = Policy(
    name="my-policy",
    rules={
        "allowed_tools": ["search", "calculator"],
        "pii_redaction": {"enabled": True, "categories": ["email", "phone"]},
        "resource_limits": {"max_call_duration_seconds": 30},
    },
    on_violation="block",
)

# Validate without loading
errors = Policy.validate_file("policies/strict.yaml")

Policy Model

class Policy(BaseModel):
    name: str
    version: str = "1.0"
    rules: PolicyRules
    on_violation: Literal["block", "log", "redact"] = "block"

class PolicyRules(BaseModel):
    allowed_tools: list[str] | None = None    # None = all allowed
    denied_tools: list[str] = []
    pii_redaction: PIIRedactionConfig = PIIRedactionConfig()
    resource_limits: ResourceLimits = ResourceLimits()
    network: NetworkPolicy = NetworkPolicy()
    max_output_size_bytes: int | None = None
    redact_output: bool = True

Redactor API

Regex-based PII detection — no heavy NLP dependencies. ~0.1–0.5ms per call.

PII Categories

Category Example Placeholder
email john@example.com <EMAIL>
phone (555) 123-4567 <PHONE>
ssn 123-45-6789 <SSN>
credit_card 4111-1111-1111-1111 <CREDIT_CARD>
ip_address 192.168.1.100 <IP_ADDRESS>
passport AB1234567 <PASSPORT>

Redaction Strategies

Strategy Result for john@example.com
placeholder <EMAIL>
mask ****@****.***
hash [SHA256:6b0b4806b1e57501]
remove (empty string)

Usage

from enforcecore.redactor import Redactor, RedactionResult

redactor = Redactor(categories=["email", "phone"])

# Detection only
entities = redactor.detect("Email john@example.com")
for e in entities:
    print(f"  {e.category}: {e.text!r} at [{e.start}:{e.end}]")

# Full redaction
result = redactor.redact("Call 555-123-4567 or john@example.com")
print(result.text)           # "Call <PHONE> or <EMAIL>"
print(result.count)          # 2
print(result.was_redacted)   # True

Auditor API

Merkle-chained, tamper-proof audit trails.

AuditEntry Fields

Field Type Description
entry_id str UUID v4
timestamp str ISO 8601 UTC
tool_name str Tool called
policy_name str Policy applied
decision str "allowed" or "blocked"
violation_type str | None Error class (if blocked)
overhead_ms float Enforcement overhead
input_redactions int PII redacted from inputs
output_redactions int PII redacted from outputs
previous_hash str SHA-256 of preceding entry
entry_hash str SHA-256 of this entry

Usage

from enforcecore import Auditor, verify_trail, load_trail

# Create auditor
auditor = Auditor(output_path="audit.jsonl")

# Record entries (Merkle-chained automatically)
e1 = auditor.record(tool_name="search_web", policy_name="default", decision="allowed")
e2 = auditor.record(tool_name="delete_file", policy_name="default", decision="blocked",
                     violation_type="ToolDeniedError")
print(e2.previous_hash == e1.entry_hash)  # True — chain linked

# Verify trail
result = verify_trail("audit.jsonl")
print(result.is_valid)      # True
print(result.chain_intact)  # True

# Load entries
trail = load_trail("audit.jsonl")
recent = load_trail("audit.jsonl", max_entries=100)  # Last 100 only

Cross-Session Continuity

The auditor resumes the Merkle chain from an existing trail file:

# Session 1
a1 = Auditor(output_path="trail.jsonl")
a1.record(tool_name="tool_a", policy_name="p")

# Session 2 — chain resumes automatically
a2 = Auditor(output_path="trail.jsonl")
a2.record(tool_name="tool_b", policy_name="p")

assert verify_trail("trail.jsonl").is_valid  # True — chain intact

Pluggable Audit Backends

Swap the default JSONL backend for custom logging:

from enforcecore import (
    AuditBackend, JsonlBackend, NullBackend,
    CallbackBackend, MultiBackend,
)

# Send audit events to your observability stack
callback = CallbackBackend(on_event=lambda entry: send_to_datadog(entry))

# Write locally AND send remotely
multi = MultiBackend([JsonlBackend("audit.jsonl"), callback])

# Disable auditing in tests
null = NullBackend()

Audit Rotation

Manage audit file growth with automatic rotation:

from enforcecore import AuditRotator

rotator = AuditRotator(
    max_size_bytes=50 * 1024 * 1024,  # 50 MB per file
    retention_days=90,                 # Keep 90 days
    compress=True,                     # gzip old files
)

Witness API

Hash-only remote witness backends for tamper detection beyond Merkle chain verification. Even if an attacker rebuilds the entire chain from scratch, witness records stored elsewhere will detect the forgery.

Built-in Witnesses

from enforcecore.auditor.witness import (
    CallbackWitness, FileWitness, LogWitness,
)

# Callback — send hashes to a queue, HTTP endpoint, or database
witness = CallbackWitness(callback=lambda entry_hash, meta: send_to_siem(entry_hash))
# ⚠️ Warning: slow callbacks block audit writes. Use a queue-based
# pattern for HTTP/database backends.

# File — write hashes to a separate JSONL file
witness = FileWitness(path="/var/log/enforcecore/witness.jsonl")

# Log — emit hashes via Python logging (syslog/journald)
witness = LogWitness()

Using Witnesses with the Auditor

from enforcecore import Auditor
from enforcecore.auditor.witness import FileWitness

auditor = Auditor(
    output_path="audit.jsonl",
    witness=FileWitness("/secure/witness.jsonl"),
)

Witness Verification

from enforcecore.auditor.witness import verify_with_witness

# Cross-check audit trail hashes against witness records
result = verify_with_witness(
    trail_path="audit.jsonl",
    witness_path="/secure/witness.jsonl",
)
print(result.is_valid)       # True if all hashes match
print(result.mismatches)     # List of mismatched entries (chain-rebuild detection)

Settings-Driven Witness (v1.0.0b5)

Enable witness via environment variable — no code changes needed:

ENFORCECORE_AUDIT_WITNESS_FILE=/var/log/enforcecore/witness.jsonl

All @enforce() calls will automatically publish hashes to the witness file.


Immutable Audit API

OS-enforced append-only protection for audit files. Prevents truncation or chain rebuild even by the file owner.

Enabling Immutable Mode

from enforcecore import Auditor

# OS sets append-only attribute on the audit file
# Linux: chattr +a  |  macOS: chflags uappend
auditor = Auditor(
    output_path="audit.jsonl",
    immutable=True,
)

Platform Support

from enforcecore.auditor.immutable import platform_support_info

info = platform_support_info()
print(info)
# {'supported': True, 'platform': 'linux', 'mechanism': 'chattr +a',
#  'requires_root': False, 'in_container': False,
#  'has_capability': True}  # CAP_LINUX_IMMUTABLE
Platform Mechanism Requires Root Container Notes
Linux chattr +a No (with CAP_LINUX_IMMUTABLE) Docker: --cap-add LINUX_IMMUTABLE
macOS chflags uappend No (user-level flag) N/A
Windows Not supported Falls back safely

Settings-Driven Immutable (v1.0.0b5)

Enable via environment variable — no code changes needed:

ENFORCECORE_AUDIT_IMMUTABLE=true

Combining Witness + Immutable

For maximum tamper-evidence, use both:

from enforcecore import Auditor
from enforcecore.auditor.witness import FileWitness

auditor = Auditor(
    output_path="audit.jsonl",
    immutable=True,
    witness=FileWitness("/secure/witness.jsonl"),
)

Or via environment variables:

ENFORCECORE_AUDIT_IMMUTABLE=true
ENFORCECORE_AUDIT_WITNESS_FILE=/secure/witness.jsonl

Guard API

Cross-platform resource limits and hard termination.

from enforcecore.guard import ResourceGuard, CostTracker, KillSwitch

# Cost tracking
tracker = CostTracker(budget_usd=10.0)
tracker.record(0.05)
tracker.check_budget()          # Passes — under budget

# Kill switch
ks = KillSwitch()
ks.trip("memory exceeded 256MB")
ks.check("tool", "policy")     # Raises ResourceLimitError

# Resource guard (shared thread pool, leak detection)
guard = ResourceGuard(cost_tracker=tracker, kill_switch=ks)
result = await guard.execute_async(
    my_func, (arg,), {},
    max_duration_seconds=30.0,
)

# Check for leaked threads
print(guard.leaked_thread_count)  # 0 = healthy

Secret Scanner API

Detect leaked credentials in tool inputs and outputs:

from enforcecore import SecretScanner, DetectedSecret, PatternRegistry

scanner = SecretScanner()

# Scan text for secrets
secrets = scanner.scan("aws_key = AKIAIOSFODNN7EXAMPLE")
for s in secrets:
    print(f"  {s.category}: {s.text[:10]}...")

# Built-in categories: aws_key, github_token, generic_api_key,
# bearer_jwt, private_key, password_url, gcp_service_account,
# azure_connection_string, database_uri, ssh_private_key, slack_token

Custom Patterns

Register domain-specific secret patterns:

from enforcecore import CustomPattern, PatternRegistry

registry = PatternRegistry()
registry.register(CustomPattern(
    name="internal_api_key",
    pattern=r"myco-[a-f0-9]{32}",
    description="Internal API key format",
))

Content Rules API

Block dangerous content patterns in tool arguments and outputs:

from enforcecore import RuleEngine, ContentRule, get_builtin_rules

# Use built-in rules
engine = RuleEngine(rules=get_builtin_rules())

# Check content
violations = engine.check("os.system('rm -rf /')")
for v in violations:
    print(f"  {v.rule_name}: {v.description}")

# 4 built-in categories:
# - shell_injection: os.system, subprocess, exec
# - path_traversal: ../, /etc/passwd, ~root
# - sql_injection: SELECT, DROP, UNION
# - code_execution: eval(), exec(), __import__

Rate Limiter API

Prevent runaway agents from making too many calls:

from enforcecore import RateLimiter, RateLimit

limiter = RateLimiter(
    per_tool=RateLimit(max_calls=10, window_seconds=60),
    global_limit=RateLimit(max_calls=50, window_seconds=60),
)

# Check before calling (raises RateLimitError if exceeded)
limiter.check("search_web")

Domain Checker API

Enforce network access policies:

from enforcecore import DomainChecker

checker = DomainChecker(
    allowed=["api.example.com", "*.trusted.io"],
    denied=["*.evil.com", "*.malware.net"],
)

checker.check("api.example.com")    # OK
checker.check("hack.evil.com")      # Raises DomainDeniedError

Hook System API

Build custom observability and enforcement extensions:

from enforcecore import (
    on_pre_call, on_post_call, on_violation, on_redaction,
    HookRegistry, HookContext, ViolationHookContext, RedactionHookContext,
)

@on_pre_call
def log_start(ctx: HookContext):
    print(f"→ Calling {ctx.tool_name}")

@on_post_call
def log_end(ctx: HookContext):
    print(f"← {ctx.tool_name} completed in {ctx.overhead_ms:.1f}ms")

@on_violation
def alert(ctx: ViolationHookContext):
    send_alert(f"⚠️ {ctx.tool_name} blocked: {ctx.violation_type}")

@on_redaction
def track_pii(ctx: RedactionHookContext):
    metrics.increment("pii_redacted", ctx.redaction_count)

# Programmatic hook management
registry = HookRegistry()
registry.register("post_call", my_hook_fn)

Observability API

OpenTelemetry integration for production monitoring:

from enforcecore import EnforceCoreMetrics, EnforceCoreInstrumentor

# Counters (Prometheus-compatible)
metrics = EnforceCoreMetrics()
# Tracks: calls_total, violations_total, redactions_total, cost_usd

# Spans (distributed tracing)
instrumentor = EnforceCoreInstrumentor()
instrumentor.instrument()  # Auto-instruments all enforce() calls

Event Webhooks

Push enforcement events to external services:

from enforcecore import WebhookDispatcher, WebhookEvent

dispatcher = WebhookDispatcher(
    url="https://hooks.example.com/enforcecore",
    events=[WebhookEvent.VIOLATION, WebhookEvent.COST_LIMIT],
    retry_count=3,
)

Hardening API

Input validation, scope tracking, and unicode protection.

Input Validation

from enforcecore import validate_tool_name, check_input_size, deep_redact

# Validate tool names (rejects empty, overlength, invalid chars)
name = validate_tool_name("my_tool")  # Returns stripped name

# Check input payload size (default: 10 MB)
check_input_size(args, kwargs)  # Raises InputTooLargeError if exceeded

# Recursive PII redaction on nested data
redacted = deep_redact(nested_data, redactor.redact_string, max_depth=10)

Enforcement Scope Tracking

from enforcecore import (
    enter_enforcement, exit_enforcement,
    get_enforcement_depth, get_enforcement_chain,
)

enter_enforcement("tool_a")
print(get_enforcement_depth())  # 1
print(get_enforcement_chain())  # ["tool_a"]

# Max depth: 10 (raises EnforcementDepthError if exceeded)

Unicode Hardening

Protects against PII evasion via zero-width characters, homoglyphs, and URL/HTML encoding:

from enforcecore import normalize_unicode, normalize_homoglyphs, prepare_for_detection

# Full pipeline (called automatically by Redactor.detect())
clean = prepare_for_detection(text)
# 1. NFC normalization + strip zero-width chars
# 2. Replace ~40 confusable chars (Cyrillic, Greek, fullwidth)
# 3. Decode URL percent-encoding and HTML entities

Exception Hierarchy

EnforceCoreError              # Base exception
├── PolicyError
│   ├── PolicyLoadError       # File not found / unparseable
│   ├── PolicyValidationError # Schema violation
│   └── PolicyEvaluationError # Error during rule evaluation
├── EnforcementViolation
│   ├── ToolDeniedError       # Tool not allowed
│   ├── DomainDeniedError     # Network domain blocked
│   ├── CostLimitError        # Budget exceeded
│   ├── RateLimitError        # Rate limit exceeded
│   ├── ContentViolationError # Content rule violated
│   └── ResourceLimitError    # Resource limit breached
├── RedactionError            # PII error (fails closed)
├── AuditError                # Audit error (fails closed)
├── HardeningError
│   ├── InvalidToolNameError  # Bad tool name
│   ├── InputTooLargeError    # Payload too large
│   └── EnforcementDepthError # Recursive depth exceeded
└── GuardError                # Resource guard failure

CLI

# Show version and platform info
enforcecore info

# Validate a policy file
enforcecore validate policies/strict.yaml

# Verify an audit trail
enforcecore verify audit.jsonl

# Run evaluation suite
enforcecore eval --scenarios all --output results/

# Dry-run a tool call against a policy (no execution)
enforcecore dry-run --policy policy.yaml --tool search_web

# Inspect a policy's rules and settings
enforcecore inspect policies/strict.yaml

Environment Variables

All settings can be configured via environment variables (via pydantic-settings). This is the canonical reference — all 19 variables:

Variable Default Description
ENFORCECORE_DEFAULT_POLICY None Default policy file path
ENFORCECORE_AUDIT_PATH ./audit_logs/ Audit trail directory
ENFORCECORE_AUDIT_ENABLED true Enable/disable auditing
ENFORCECORE_AUDIT_BACKEND jsonl Audit backend (jsonl, null, callback, multi)
ENFORCECORE_AUDIT_MAX_BYTES 10485760 Max audit file size before rotation (10 MB)
ENFORCECORE_AUDIT_MAX_FILES 5 Max rotated audit files to keep
ENFORCECORE_AUDIT_IMMUTABLE false Enable OS-enforced append-only audit files
ENFORCECORE_AUDIT_WITNESS_FILE None Path for hash-only witness JSONL file
ENFORCECORE_REDACTION_ENABLED true Enable/disable PII redaction
ENFORCECORE_SECRET_SCANNING true Enable secret detection
ENFORCECORE_CONTENT_RULES true Enable content rule checking
ENFORCECORE_LOG_LEVEL INFO Structured log level
ENFORCECORE_COST_BUDGET_USD 100.0 Global cost budget
ENFORCECORE_RATE_LIMIT_RPM 0 Global rate limit (0 = disabled)
ENFORCECORE_FAIL_OPEN false Allow bypass on errors (⚠️ dev-only)
ENFORCECORE_DEV_MODE false Development mode flag
ENFORCECORE_OTEL_ENABLED false Enable OpenTelemetry metrics/tracing
ENFORCECORE_WEBHOOK_URL None Webhook endpoint for enforcement events
ESC