Skip to content

Amla Labs 8 min read

Advanced Topics: Use Cases and Implementation Tips

This guide covers advanced patterns, best practices, and use cases for capability-based security—including applications beyond AI agents.

Implementation Best Practices

Capability Lifetime Management

Balance security with performance by choosing appropriate TTLs and usage limits:

LevelTTLMax UsesUse Case
Root8-24 hoursUnlimitedLong-running orchestrators
Agent1-4 hours100-1000Main agents with multiple tasks
Sub-agent5-30 minutes10-100Specialized workers
Worker1-5 minutes1-10Single-purpose operations

Example:

# Root orchestrator - long-lived
root = client.create_root_capability(
    interfaces=["database:*", "api:*"],
    resources=["*"],
    ttl_seconds=28800,  # 8 hours
    max_uses=None
)

# Main agent - medium-lived
agent = root.attenuate(
    interfaces=["database:read", "api:execute"],
    resources=["documents", "nlp_service"],
    ttl_seconds=3600,  # 1 hour
    max_uses=1000
)

# Worker - short-lived
worker = agent.attenuate(
    interfaces=["database:read"],
    resources=["documents"],
    ttl_seconds=300,  # 5 minutes
    max_uses=10
)

Delegation Patterns

Use consistent resource hierarchies to make attenuation clearer:

# Good: Hierarchical resource naming
resources = [
    "customers",
    "customers.profiles",
    "customers.orders",
    "transactions",
    "transactions.recent"
]

# This makes attenuation obvious:
parent_resources = ["customers", "transactions"]
child_resources = ["customers.profiles"]  # Clear subset

# ✅ Valid delegation
child = parent.attenuate(resources=["customers.profiles"])

# ❌ Invalid delegation
child = parent.attenuate(resources=["inventory"])  # Not in parent

Interface Design

Design interfaces to match your system’s operations:

# Database interfaces
interfaces = [
    "database:read",
    "database:write",
    "database:delete",
    "database:admin"
]

# API interfaces
interfaces = [
    "api:execute",
    "api:configure",
    "api:monitor"
]

# Service-specific interfaces
interfaces = [
    "nlp:analyze",
    "storage:upload",
    "storage:download",
    "compute:execute"
]

# Combine as needed
agent_interfaces = [
    "database:read",
    "api:execute",
    "nlp:analyze"
]

Error Handling

Implement comprehensive error handling for capability operations:

from amla_sdk import (
    AttenuationViolationError,
    CapabilityExpiredError,
    UsageLimitExceededError,
    InvalidSignatureError
)

try:
    # Verify locally without network call
    is_valid = capability.verify_signature(public_key)

    if not is_valid:
        raise InvalidSignatureError("Token tampered")

    if capability.is_expired():
        raise CapabilityExpiredError("Capability expired")

    if capability.is_exhausted():
        raise UsageLimitExceededError("Usage limit exceeded")

    # Proceed with operation
    result = capability.authorize(
        operation="read",
        resource="documents",
        category="database"
    )

except AttenuationViolationError as e:
    # Privilege escalation attempt
    logger.critical(f"Security violation: {e}")
    alert_security_team(capability.id, str(e))

except CapabilityExpiredError as e:
    # Normal expiration - request new capability
    logger.info(f"Capability expired: {capability.id}")
    capability = request_new_capability(parent)

except UsageLimitExceededError as e:
    # Hit usage limit - request new capability or backoff
    logger.warning(f"Usage limit exceeded: {capability.id}")
    capability = request_new_capability(parent)

except InvalidSignatureError as e:
    # Tampering detected - security incident
    logger.critical(f"Signature verification failed: {e}")
    alert_security_team(capability.id, str(e))

Beyond AI Agents: Other Use Cases

While AI agents are compelling, capabilities solve broader credential delegation problems:

CI/CD Pipelines

Delegate different permissions to build, test, and deploy jobs:

# Build job gets read access
build_capability = root.attenuate(
    interfaces=["repository:read", "artifacts:write"],
    resources=["src/*", "artifacts/builds/*"],
    ttl_seconds=1800,  # 30 minutes
    max_uses=50
)

# Test job CANNOT access production
test_capability = root.attenuate(
    interfaces=["repository:read", "database:read"],
    resources=["src/*", "test_db/*"],  # No production DB!
    ttl_seconds=3600,
    max_uses=100
)

# Deploy job gets limited write access to production
deploy_capability = root.attenuate(
    interfaces=["repository:read", "deployment:write"],
    resources=["artifacts/builds/*", "production/*"],
    ttl_seconds=600,  # 10 minutes only
    max_uses=10  # Limited deploys
)

# Security properties:
# - Test job physically CANNOT access production (no interface/resource)
# - Deploy job expires quickly (10 minutes)
# - Build job cannot deploy (no deployment:write interface)

API Key Replacement

Replace static API keys with time-bound, usage-limited capabilities:

# Traditional API key (problems):
# - Never expires
# - Unlimited uses
# - Cannot be delegated
# - Revocation requires database update
api_key = "sk_live_abc123..."

# Capability-based API credential (better):
customer_capability = gateway.create_customer_capability(
    customer_id="acme-corp",
    interfaces=["api:execute"],
    resources=["analytics", "reports"],
    ttl_seconds=86400,  # 24 hours, auto-expires
    max_uses=1000,  # Built-in rate limiting
    metadata={
        "customer": "acme-corp",
        "tier": "premium"
    }
)

# Customer can self-delegate to teams
team_capability = customer_capability.attenuate(
    resources=["reports"],  # Just reports, not analytics
    max_uses=100,  # Lower rate limit
    ttl_seconds=3600  # 1 hour
)

# Benefits:
# - Automatic expiration (no forgotten keys)
# - Built-in rate limiting (max_uses)
# - Self-service delegation (teams can create sub-keys)
# - Automatic revocation (when parent expires)

Temporary Access / Contractor Access

Grant time-limited access without database user management:

# Contractor gets limited, temporary access
contractor_capability = root.attenuate(
    interfaces=["repository:read"],
    resources=["docs/*"],  # Only docs, not code
    ttl_seconds=604800,  # 1 week
    max_uses=500,
    metadata={
        "contractor": "[email protected]",
        "project": "documentation-review"
    }
)

# Automatically expires - no cleanup needed
# No database user to delete
# No SSH keys to rotate
# Cannot be extended without root permission

Multi-Tenant SaaS

Isolate tenant access with capabilities:

# Each tenant gets isolated capability
tenant_a_capability = root.attenuate(
    interfaces=["database:read", "database:write"],
    resources=[f"tenant_{tenant_a_id}/*"],  # Scoped to tenant
    ttl_seconds=86400,
    max_uses=10000
)

tenant_b_capability = root.attenuate(
    interfaces=["database:read", "database:write"],
    resources=[f"tenant_{tenant_b_id}/*"],  # Different tenant
    ttl_seconds=86400,
    max_uses=10000
)

# Tenant A CANNOT access Tenant B's resources
# Enforced cryptographically, not just application logic

Microservices Authentication

Replace service-to-service tokens with capabilities:

# API Gateway creates capability for each request
def handle_request(request):
    user = authenticate(request)

    # Create request-scoped capability
    request_capability = gateway_capability.attenuate(
        interfaces=user.allowed_interfaces,
        resources=user.allowed_resources,
        ttl_seconds=30,  # Request timeout
        max_uses=10,  # Reasonable for one request
        metadata={
            "user_id": user.id,
            "request_id": request.id
        }
    )

    # Pass to downstream services
    response = call_service(
        "user-service",
        capability=request_capability
    )

    return response

# Benefits:
# - Each request gets isolated capability
# - Services verify without calling auth server
# - Automatic cleanup (30 second TTL)
# - Built-in rate limiting

Serverless Functions

Delegate specific permissions to Lambda/Cloud Functions:

# Each function invocation gets unique capability
def invoke_function(function_name, event):
    # Create function-specific capability
    function_cap = root.attenuate(
        interfaces=["database:read", "storage:read"],
        resources=["users/*", "uploads/*"],
        ttl_seconds=300,  # 5 minute timeout
        max_uses=50,  # Prevent runaway loops
        metadata={
            "function": function_name,
            "invocation_id": generate_id()
        }
    )

    # Invoke function with capability
    result = lambda_client.invoke(
        FunctionName=function_name,
        Payload={
            "event": event,
            "capability": function_cap.token
        }
    )

    return result

# Function code:
def lambda_handler(event, context):
    capability = Capability.from_token(event['capability'])

    # Use capability for all operations
    data = capability.authorize_and_execute(
        operation="read",
        resource="users/profile",
        category="database",
        action=lambda: db.query(...)
    )

    return data

Advanced Patterns

Capability Refresh

Implement automatic capability refresh for long-running agents:

class AutoRefreshAgent:
    def __init__(self, parent_capability):
        self.parent = parent_capability
        self.current_capability = None
        self.refresh_capability()

    def refresh_capability(self):
        """Create new capability when current expires or exhausts"""
        self.current_capability = self.parent.attenuate(
            interfaces=["database:read"],
            resources=["documents"],
            ttl_seconds=3600,  # 1 hour
            max_uses=100
        )

    def execute(self, operation):
        # Check if capability needs refresh
        if (self.current_capability.is_expired() or
            self.current_capability.is_exhausted()):
            self.refresh_capability()

        return self.current_capability.authorize_and_execute(
            operation=operation,
            resource="documents",
            category="database",
            action=lambda: self._perform_operation()
        )

Capability Pooling

For high-throughput systems, maintain a pool of capabilities:

from queue import Queue

class CapabilityPool:
    def __init__(self, root_capability, pool_size=10):
        self.root = root_capability
        self.pool = Queue(maxsize=pool_size)

        # Pre-create capabilities
        for _ in range(pool_size):
            cap = self._create_capability()
            self.pool.put(cap)

    def _create_capability(self):
        return self.root.attenuate(
            interfaces=["database:read"],
            resources=["documents"],
            ttl_seconds=3600,
            max_uses=100
        )

    def acquire(self):
        """Get capability from pool"""
        cap = self.pool.get()

        # Refresh if needed
        if cap.is_expired() or cap.is_exhausted():
            cap = self._create_capability()

        return cap

    def release(self, capability):
        """Return capability to pool"""
        if not capability.is_expired() and not capability.is_exhausted():
            self.pool.put(capability)

# Usage:
pool = CapabilityPool(root_capability, pool_size=20)

def process_request(request):
    cap = pool.acquire()
    try:
        result = cap.authorize_and_execute(...)
        return result
    finally:
        pool.release(cap)

Delegation Chain Limits

Prevent excessively long delegation chains:

MAX_DELEGATION_DEPTH = 5

def attenuate_with_depth_check(parent_capability, **kwargs):
    # Get delegation depth from metadata
    current_depth = parent_capability.metadata.get("delegation_depth", 0)

    if current_depth >= MAX_DELEGATION_DEPTH:
        raise DelegationDepthExceededError(
            f"Maximum delegation depth ({MAX_DELEGATION_DEPTH}) exceeded"
        )

    # Create child with incremented depth
    child = parent_capability.attenuate(
        **kwargs,
        metadata={
            **kwargs.get("metadata", {}),
            "delegation_depth": current_depth + 1
        }
    )

    return child

Monitoring and Observability

Audit Log Analysis

Query audit logs to detect anomalies:

# Find capabilities with high failure rates
audit_logs = client.get_audit_trail(
    start_time=datetime.now() - timedelta(hours=1)
)

failure_rates = {}
for log in audit_logs:
    cap_id = log.capability_id
    if cap_id not in failure_rates:
        failure_rates[cap_id] = {"success": 0, "failure": 0}

    if log.success:
        failure_rates[cap_id]["success"] += 1
    else:
        failure_rates[cap_id]["failure"] += 1

# Alert on capabilities with >50% failure rate
for cap_id, stats in failure_rates.items():
    total = stats["success"] + stats["failure"]
    failure_rate = stats["failure"] / total

    if failure_rate > 0.5:
        alert_security_team(
            f"High failure rate for capability {cap_id}: {failure_rate:.1%}"
        )

Delegation Chain Visualization

Visualize delegation chains for debugging:

def visualize_delegation_chain(capability_id):
    """Print delegation chain for a capability"""
    chain = client.get_delegation_chain(capability_id)

    print("Delegation Chain:")
    for i, cap in enumerate(chain):
        indent = "  " * i
        print(f"{indent}{cap.id}")
        print(f"{indent}  Interfaces: {cap.interfaces}")
        print(f"{indent}  Resources: {cap.resources}")
        print(f"{indent}  TTL: {cap.ttl_seconds}s")
        print(f"{indent}  Max uses: {cap.max_uses}")

Next Steps

You now have a comprehensive understanding of capability-based security. To continue:


Related Guides:

Interested in Amla Labs?

We're building the future of AI agent security with capability-based credentials. Join our design partner program or star us on GitHub.