Skip to content

Python API Reference

Use AKIOS programmatically from your Python code. This is useful for integrating AKIOS workflows into larger applications.

Installation

pip install akios>=1.0.15

Quick Example

from akios.core.runtime.engine import RuntimeEngine

# Create engine
engine = RuntimeEngine()

# Run workflow
result = engine.run(
    "workflows/process-document.yml",
    context={"user_id": 123, "priority": "high"}
)

# Check results
print(f"Status: {result['status']}")
print(f"Time: {result['execution_time']}s")
print(f"Output: {result['output_directory']}")

Core API

RuntimeEngine

The main engine for running workflows.

Import:

from akios.core.runtime.engine import RuntimeEngine

Create engine:

engine = RuntimeEngine()

# Or with custom config path
engine = RuntimeEngine(config_path="./custom-config.yaml")

Run workflow:

result = engine.run(
    workflow="workflows/my-workflow.yml",
    context={"key": "value"}  # Optional context variables
)

Return value:

{
    "status": "success",         # or "failed"
    "execution_time": 12.5,       # seconds
    "output_directory": "data/output/run_2024-01-15_10-30-45/",
    "error": None,                # or error message if failed
    # v1.0.12+ enriched fields:
    "tokens": {
        "input": 1250,            # total input tokens
        "output": 830,            # total output tokens
        "total": 2080
    },
    "cost_usd": 0.0042,          # total cost
    "pii_redactions": {
        "count": 3,               # total redactions applied
        "categories": ["email", "phone"],  # detected categories
    }
}

Agent APIs

Use individual agents directly in your code.

FilesystemAgent

Safe file operations with path restrictions.

from akios.core.runtime.agents.filesystem import FilesystemAgent

# Create agent with allowed paths
fs = FilesystemAgent({
    "allowed_paths": [
        "./data/input",
        "./data/output"
    ]
})

# Read file
content = fs.read("./data/input/document.txt")

# Write file
fs.write("./data/output/result.txt", "Processed content")

# List directory
files = fs.list("./data/input")

# Check if file exists
if fs.exists("./data/input/doc.txt"):
    print("File exists")

# Get file stats
stats = fs.stat("./data/input/doc.txt")
print(f"Size: {stats['size']} bytes")

Available methods:

  • read(path) → str
  • write(path, content) → None
  • list(path) → List[str]
  • exists(path) → bool
  • stat(path) → dict

HttpAgent

HTTP requests with rate limiting and redaction.

from akios.core.runtime.agents.http import HttpAgent

# Create agent
http = HttpAgent({
    "network_access_allowed": True,
    "rate_limit": 10  # requests per minute
})

# GET request
response = http.get("https://api.example.com/data")
print(response["status"], response["body"])

# POST request
response = http.post(
    "https://api.example.com/submit",
    json={"key": "value"},
    headers={"Authorization": "Bearer token"}
)

# Other methods
response = http.put(url, json=data)
response = http.delete(url)

Available methods:

  • get(url, params=None, headers=None) → dict
  • post(url, json=None, data=None, headers=None) → dict
  • put(url, json=None, headers=None) → dict
  • delete(url, headers=None) → dict

LlmAgent

AI model calls with budget tracking.

from akios.core.runtime.agents.llm import LlmAgent

# Create agent
llm = LlmAgent({
    "provider": "grok",
    "model": "grok-3",
    "api_key": "your-key",
    "budget_limit": 1.0  # USD
})

# Generate completion
response = llm.complete(
    prompt="Summarize this document",
    max_tokens=1000
)
print(response["text"])
print(f"Cost: ${response['cost']}")

# Chat-style
response = llm.chat(
    messages=[
        {"role": "user", "content": "Hello!"}
    ]
)

Available methods:

  • complete(prompt, max_tokens=1000) → dict
  • chat(messages, max_tokens=1000) → dict

ToolExecutorAgent

Safe command execution.

from akios.core.runtime.agents.tool_executor import ToolExecutorAgent

# Create agent
tool = ToolExecutorAgent({
    "allowed_commands": ["echo", "date", "ls"]
})

# Run command
result = tool.run(
    command=["echo", "Hello"],
    timeout=10
)
print(result["stdout"])

Available methods:

  • run(command, timeout=30) → dict

Error Handling

AKIOS raises clear exceptions when something goes wrong.

from akios.core.runtime.engine import RuntimeEngine
from akios.exceptions import (
    WorkflowValidationError,
    AgentError,
    SecurityError,
    BudgetExceededError
)

engine = RuntimeEngine()

try:
    result = engine.run("workflows/my-workflow.yml")
except WorkflowValidationError as e:
    print(f"Invalid workflow: {e}")
except SecurityError as e:
    print(f"Security violation: {e}")
except BudgetExceededError as e:
    print(f"Budget exceeded: {e}")
except AgentError as e:
    print(f"Agent error: {e}")

Common errors:

  • WorkflowValidationError - Invalid workflow structure
  • AgentError - Agent action failed
  • SecurityError - Disallowed path, network blocked
  • BudgetExceededError - Cost limit reached
  • ConfigurationError - Invalid configuration

Validation

Validate workflow before running:

from akios.core.validation import validate_workflow

try:
    validate_workflow("workflows/my-workflow.yml")
    print("Workflow is valid")
except WorkflowValidationError as e:
    print(f"Validation failed: {e}")

Audit Access

Access audit logs programmatically:

from akios.core.audit import AuditLog

audit = AuditLog("./audit/")

# Get recent events
events = audit.get_events(limit=10)
for event in events:
    print(f"{event['timestamp']}: {event['action']}")

# Verify integrity
if audit.verify():
    print("Audit log integrity verified")
else:
    print("Audit log tampered!")

# Export events
audit.export("audit-report.json", format="json")

Configuration Override

Override config programmatically:

from akios.core.runtime.engine import RuntimeEngine

engine = RuntimeEngine()

result = engine.run(
    "workflows/my-workflow.yml",
    config_overrides={
        "budget_limit_per_run": 5.0,
        "network_access_allowed": True,
        "log_level": "DEBUG"
    }
)

Settings Management

Access and manage runtime settings programmatically:

from akios.config import get_settings, Settings

# Get current settings
settings = get_settings()
print(f"Audit path: {settings.audit_storage_path}")
print(f"Sandbox enabled: {settings.sandbox_enabled}")
print(f"Max tokens: {settings.max_tokens_per_call}")
print(f"PII redaction: {settings.pii_redaction_enabled}")

# Create custom settings (for testing)
custom_settings = Settings(
    mock_llm=True,
    audit_storage_path="./custom_audit"
)

Error Classification

Classify errors programmatically for better handling:

from akios.core.error.classifier import classify_error

try:
    result = engine.run("invalid-workflow.yml")
except Exception as e:
    fingerprint = classify_error(str(e), type(e).__name__)
    print(f"Category: {fingerprint.category}")
    print(f"Severity: {fingerprint.severity}")
    print(f"Message: {fingerprint.get_user_friendly_message()}")
    for suggestion in fingerprint.recovery_suggestions:
        print(f"  - {suggestion}")

Performance Monitoring

Track and measure workflow performance:

from akios.core.performance.monitor import (
    get_performance_monitor,
    measure_performance
)

monitor = get_performance_monitor()

# Manual timing
with monitor.measure_time("custom_operation"):
    # Your code here
    pass

# Get performance report
report = monitor.get_performance_report()
print(f"Memory usage: {report['memory_usage']['rss_mb']:.1f} MB")
print(f"Recommendations: {report['recommendations']}")

# Decorator for automatic timing
@measure_performance("my_function")
def my_function():
    pass

Security Validation

Validate security posture at runtime:

from akios.security.validation import (
    validate_startup_security,
    validate_all_security
)

try:
    validate_startup_security()
    print("Security validation passed")
except Exception as e:
    print(f"Security validation failed: {e}")

Engine Sub-Modules (v1.0.10+)

As of v1.0.10, the engine monolith has been split into focused modules:

Module Responsibility
StepExecutor Executes individual workflow steps
TemplateRenderer Handles template variable interpolation
OutputExtractor Extracts and probes output values from step results
ConditionEvaluator AST-based safe expression evaluator for condition fields
# All modules are accessible via the engine
from akios.core.runtime.engine import RuntimeEngine

engine = RuntimeEngine()
# Sub-modules are used internally — the RuntimeEngine API is unchanged

PIIDetectorProtocol

Custom PII detection backends implement the PIIDetectorProtocol interface:

from akios.security.pii import PIIDetectorProtocol

class CustomDetector(PIIDetectorProtocol):
    def detect(self, text: str) -> list:
        # Your custom detection logic
        ...
    
    def redact(self, text: str) -> str:
        # Your custom redaction logic
        ...

The default regex-based detector covers 53 patterns with >95% accuracy.

Legacy Compatibility

For backward compatibility:

# Old name (still works as alias)
from akios.core.runtime.engine import WorkflowEngine

# New name (preferred in v1.0+)
from akios.core.runtime.engine import RuntimeEngine

# Both work the same way
engine = WorkflowEngine()  # Works (alias)
engine = RuntimeEngine()   # Preferred

Security Notes

✓ Always:

  • Keep audit logging enabled
  • Use path restrictions for filesystem access
  • Set budget limits for LLM calls
  • Enable PII redaction
  • Validate workflows before running

× Never:

  • Disable sandbox in production
  • Allow unrestricted filesystem access
  • Use unlimited budgets
  • Disable audit logging

Example: Building a Document Processor

Complete example integrating AKIOS into an application:

import os
from akios.core.runtime.engine import RuntimeEngine
from akios.exceptions import WorkflowValidationError, BudgetExceededError

class DocumentProcessor:
    def __init__(self):
        self.engine = RuntimeEngine()
        
    def process_document(self, input_file, user_id):
        """Process a document using AKIOS workflow"""
        try:
            # Run workflow
            result = self.engine.run(
                "workflows/process-document.yml",
                context={
                    "input_file": input_file,
                    "user_id": user_id
                },
                config_overrides={
                    "budget_limit_per_run": 2.0
                }
            )
            
            # Check success
            if result["status"] == "success":
                return {
                    "success": True,
                    "output_dir": result["output_directory"],
                    "time": result["execution_time"]
                }
            else:
                return {
                    "success": False,
                    "error": result.get("error", "Unknown error")
                }
                
        except BudgetExceededError:
            return {"success": False, "error": "Budget exceeded"}
        except WorkflowValidationError as e:
            return {"success": False, "error": f"Invalid workflow: {e}"}
        except Exception as e:
            return {"success": False, "error": str(e)}

# Usage
processor = DocumentProcessor()
result = processor.process_document(
    "data/input/document.pdf",
    user_id="user123"
)

if result["success"]:
    print(f"Success! Output in {result['output_dir']}")
else:
    print(f"Failed: {result['error']}")

Related Docs

ESC