Advanced Topics: Use Cases and Implementation Tips
This guide covers advanced patterns, best practices, and use cases for capability-based security—including applications beyond AI agents.
Implementation Best Practices
Capability Lifetime Management
Balance security with performance by choosing appropriate TTLs and usage limits:
| Level | TTL | Max Uses | Use Case |
|---|---|---|---|
| Root | 8-24 hours | Unlimited | Long-running orchestrators |
| Agent | 1-4 hours | 100-1000 | Main agents with multiple tasks |
| Sub-agent | 5-30 minutes | 10-100 | Specialized workers |
| Worker | 1-5 minutes | 1-10 | Single-purpose operations |
Example:
# Root orchestrator - long-lived
root = client.create_root_capability(
interfaces=["database:*", "api:*"],
resources=["*"],
ttl_seconds=28800, # 8 hours
max_uses=None
)
# Main agent - medium-lived
agent = root.attenuate(
interfaces=["database:read", "api:execute"],
resources=["documents", "nlp_service"],
ttl_seconds=3600, # 1 hour
max_uses=1000
)
# Worker - short-lived
worker = agent.attenuate(
interfaces=["database:read"],
resources=["documents"],
ttl_seconds=300, # 5 minutes
max_uses=10
)
Delegation Patterns
Use consistent resource hierarchies to make attenuation clearer:
# Good: Hierarchical resource naming
resources = [
"customers",
"customers.profiles",
"customers.orders",
"transactions",
"transactions.recent"
]
# This makes attenuation obvious:
parent_resources = ["customers", "transactions"]
child_resources = ["customers.profiles"] # Clear subset
# ✅ Valid delegation
child = parent.attenuate(resources=["customers.profiles"])
# ❌ Invalid delegation
child = parent.attenuate(resources=["inventory"]) # Not in parent
Interface Design
Design interfaces to match your system’s operations:
# Database interfaces
interfaces = [
"database:read",
"database:write",
"database:delete",
"database:admin"
]
# API interfaces
interfaces = [
"api:execute",
"api:configure",
"api:monitor"
]
# Service-specific interfaces
interfaces = [
"nlp:analyze",
"storage:upload",
"storage:download",
"compute:execute"
]
# Combine as needed
agent_interfaces = [
"database:read",
"api:execute",
"nlp:analyze"
]
Error Handling
Implement comprehensive error handling for capability operations:
from amla_sdk import (
AttenuationViolationError,
CapabilityExpiredError,
UsageLimitExceededError,
InvalidSignatureError
)
try:
# Verify locally without network call
is_valid = capability.verify_signature(public_key)
if not is_valid:
raise InvalidSignatureError("Token tampered")
if capability.is_expired():
raise CapabilityExpiredError("Capability expired")
if capability.is_exhausted():
raise UsageLimitExceededError("Usage limit exceeded")
# Proceed with operation
result = capability.authorize(
operation="read",
resource="documents",
category="database"
)
except AttenuationViolationError as e:
# Privilege escalation attempt
logger.critical(f"Security violation: {e}")
alert_security_team(capability.id, str(e))
except CapabilityExpiredError as e:
# Normal expiration - request new capability
logger.info(f"Capability expired: {capability.id}")
capability = request_new_capability(parent)
except UsageLimitExceededError as e:
# Hit usage limit - request new capability or backoff
logger.warning(f"Usage limit exceeded: {capability.id}")
capability = request_new_capability(parent)
except InvalidSignatureError as e:
# Tampering detected - security incident
logger.critical(f"Signature verification failed: {e}")
alert_security_team(capability.id, str(e))
Beyond AI Agents: Other Use Cases
While AI agents are compelling, capabilities solve broader credential delegation problems:
CI/CD Pipelines
Delegate different permissions to build, test, and deploy jobs:
# Build job gets read access
build_capability = root.attenuate(
interfaces=["repository:read", "artifacts:write"],
resources=["src/*", "artifacts/builds/*"],
ttl_seconds=1800, # 30 minutes
max_uses=50
)
# Test job CANNOT access production
test_capability = root.attenuate(
interfaces=["repository:read", "database:read"],
resources=["src/*", "test_db/*"], # No production DB!
ttl_seconds=3600,
max_uses=100
)
# Deploy job gets limited write access to production
deploy_capability = root.attenuate(
interfaces=["repository:read", "deployment:write"],
resources=["artifacts/builds/*", "production/*"],
ttl_seconds=600, # 10 minutes only
max_uses=10 # Limited deploys
)
# Security properties:
# - Test job physically CANNOT access production (no interface/resource)
# - Deploy job expires quickly (10 minutes)
# - Build job cannot deploy (no deployment:write interface)
API Key Replacement
Replace static API keys with time-bound, usage-limited capabilities:
# Traditional API key (problems):
# - Never expires
# - Unlimited uses
# - Cannot be delegated
# - Revocation requires database update
api_key = "sk_live_abc123..."
# Capability-based API credential (better):
customer_capability = gateway.create_customer_capability(
customer_id="acme-corp",
interfaces=["api:execute"],
resources=["analytics", "reports"],
ttl_seconds=86400, # 24 hours, auto-expires
max_uses=1000, # Built-in rate limiting
metadata={
"customer": "acme-corp",
"tier": "premium"
}
)
# Customer can self-delegate to teams
team_capability = customer_capability.attenuate(
resources=["reports"], # Just reports, not analytics
max_uses=100, # Lower rate limit
ttl_seconds=3600 # 1 hour
)
# Benefits:
# - Automatic expiration (no forgotten keys)
# - Built-in rate limiting (max_uses)
# - Self-service delegation (teams can create sub-keys)
# - Automatic revocation (when parent expires)
Temporary Access / Contractor Access
Grant time-limited access without database user management:
# Contractor gets limited, temporary access
contractor_capability = root.attenuate(
interfaces=["repository:read"],
resources=["docs/*"], # Only docs, not code
ttl_seconds=604800, # 1 week
max_uses=500,
metadata={
"contractor": "[email protected]",
"project": "documentation-review"
}
)
# Automatically expires - no cleanup needed
# No database user to delete
# No SSH keys to rotate
# Cannot be extended without root permission
Multi-Tenant SaaS
Isolate tenant access with capabilities:
# Each tenant gets isolated capability
tenant_a_capability = root.attenuate(
interfaces=["database:read", "database:write"],
resources=[f"tenant_{tenant_a_id}/*"], # Scoped to tenant
ttl_seconds=86400,
max_uses=10000
)
tenant_b_capability = root.attenuate(
interfaces=["database:read", "database:write"],
resources=[f"tenant_{tenant_b_id}/*"], # Different tenant
ttl_seconds=86400,
max_uses=10000
)
# Tenant A CANNOT access Tenant B's resources
# Enforced cryptographically, not just application logic
Microservices Authentication
Replace service-to-service tokens with capabilities:
# API Gateway creates capability for each request
def handle_request(request):
user = authenticate(request)
# Create request-scoped capability
request_capability = gateway_capability.attenuate(
interfaces=user.allowed_interfaces,
resources=user.allowed_resources,
ttl_seconds=30, # Request timeout
max_uses=10, # Reasonable for one request
metadata={
"user_id": user.id,
"request_id": request.id
}
)
# Pass to downstream services
response = call_service(
"user-service",
capability=request_capability
)
return response
# Benefits:
# - Each request gets isolated capability
# - Services verify without calling auth server
# - Automatic cleanup (30 second TTL)
# - Built-in rate limiting
Serverless Functions
Delegate specific permissions to Lambda/Cloud Functions:
# Each function invocation gets unique capability
def invoke_function(function_name, event):
# Create function-specific capability
function_cap = root.attenuate(
interfaces=["database:read", "storage:read"],
resources=["users/*", "uploads/*"],
ttl_seconds=300, # 5 minute timeout
max_uses=50, # Prevent runaway loops
metadata={
"function": function_name,
"invocation_id": generate_id()
}
)
# Invoke function with capability
result = lambda_client.invoke(
FunctionName=function_name,
Payload={
"event": event,
"capability": function_cap.token
}
)
return result
# Function code:
def lambda_handler(event, context):
capability = Capability.from_token(event['capability'])
# Use capability for all operations
data = capability.authorize_and_execute(
operation="read",
resource="users/profile",
category="database",
action=lambda: db.query(...)
)
return data
Advanced Patterns
Capability Refresh
Implement automatic capability refresh for long-running agents:
class AutoRefreshAgent:
def __init__(self, parent_capability):
self.parent = parent_capability
self.current_capability = None
self.refresh_capability()
def refresh_capability(self):
"""Create new capability when current expires or exhausts"""
self.current_capability = self.parent.attenuate(
interfaces=["database:read"],
resources=["documents"],
ttl_seconds=3600, # 1 hour
max_uses=100
)
def execute(self, operation):
# Check if capability needs refresh
if (self.current_capability.is_expired() or
self.current_capability.is_exhausted()):
self.refresh_capability()
return self.current_capability.authorize_and_execute(
operation=operation,
resource="documents",
category="database",
action=lambda: self._perform_operation()
)
Capability Pooling
For high-throughput systems, maintain a pool of capabilities:
from queue import Queue
class CapabilityPool:
def __init__(self, root_capability, pool_size=10):
self.root = root_capability
self.pool = Queue(maxsize=pool_size)
# Pre-create capabilities
for _ in range(pool_size):
cap = self._create_capability()
self.pool.put(cap)
def _create_capability(self):
return self.root.attenuate(
interfaces=["database:read"],
resources=["documents"],
ttl_seconds=3600,
max_uses=100
)
def acquire(self):
"""Get capability from pool"""
cap = self.pool.get()
# Refresh if needed
if cap.is_expired() or cap.is_exhausted():
cap = self._create_capability()
return cap
def release(self, capability):
"""Return capability to pool"""
if not capability.is_expired() and not capability.is_exhausted():
self.pool.put(capability)
# Usage:
pool = CapabilityPool(root_capability, pool_size=20)
def process_request(request):
cap = pool.acquire()
try:
result = cap.authorize_and_execute(...)
return result
finally:
pool.release(cap)
Delegation Chain Limits
Prevent excessively long delegation chains:
MAX_DELEGATION_DEPTH = 5
def attenuate_with_depth_check(parent_capability, **kwargs):
# Get delegation depth from metadata
current_depth = parent_capability.metadata.get("delegation_depth", 0)
if current_depth >= MAX_DELEGATION_DEPTH:
raise DelegationDepthExceededError(
f"Maximum delegation depth ({MAX_DELEGATION_DEPTH}) exceeded"
)
# Create child with incremented depth
child = parent_capability.attenuate(
**kwargs,
metadata={
**kwargs.get("metadata", {}),
"delegation_depth": current_depth + 1
}
)
return child
Monitoring and Observability
Audit Log Analysis
Query audit logs to detect anomalies:
# Find capabilities with high failure rates
audit_logs = client.get_audit_trail(
start_time=datetime.now() - timedelta(hours=1)
)
failure_rates = {}
for log in audit_logs:
cap_id = log.capability_id
if cap_id not in failure_rates:
failure_rates[cap_id] = {"success": 0, "failure": 0}
if log.success:
failure_rates[cap_id]["success"] += 1
else:
failure_rates[cap_id]["failure"] += 1
# Alert on capabilities with >50% failure rate
for cap_id, stats in failure_rates.items():
total = stats["success"] + stats["failure"]
failure_rate = stats["failure"] / total
if failure_rate > 0.5:
alert_security_team(
f"High failure rate for capability {cap_id}: {failure_rate:.1%}"
)
Delegation Chain Visualization
Visualize delegation chains for debugging:
def visualize_delegation_chain(capability_id):
"""Print delegation chain for a capability"""
chain = client.get_delegation_chain(capability_id)
print("Delegation Chain:")
for i, cap in enumerate(chain):
indent = " " * i
print(f"{indent}↓ {cap.id}")
print(f"{indent} Interfaces: {cap.interfaces}")
print(f"{indent} Resources: {cap.resources}")
print(f"{indent} TTL: {cap.ttl_seconds}s")
print(f"{indent} Max uses: {cap.max_uses}")
Next Steps
You now have a comprehensive understanding of capability-based security. To continue:
- API Documentation - Complete API reference
- GitHub Examples - Sample implementations
- Request Beta Access - Start using capabilities in production
Related Guides:
Interested in Amla Labs?
We're building the future of AI agent security with capability-based credentials. Join our design partner program or star us on GitHub.