Skip to main content

TealCircuit

TealCircuit implements the circuit breaker pattern to prevent cascading failures in distributed systems.

Overview

TealCircuit provides:
  • Three-state circuit breaker (CLOSED, OPEN, HALF_OPEN)
  • Automatic failure detection and recovery
  • Decision contract for consistency
  • Correlation IDs for traceability

Class

from tealtiger.core.circuit import TealCircuit, CircuitState
from tealtiger.core.context import ExecutionContext
from typing import Callable, Optional, TypeVar

T = TypeVar('T')

class TealCircuit:
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60000,
        half_open_requests: int = 3,
        on_state_change: Optional[Callable[[CircuitState, CircuitState], None]] = None
    ):
        ...
    
    async def execute(self, fn: Callable[[], T]) -> T:
        ...
    
    def evaluate(self, context: Optional[ExecutionContext] = None) -> Decision:
        ...
    
    def get_state(self) -> CircuitState:
        ...
    
    def reset(self) -> None:
        ...

Configuration

from enum import Enum

class CircuitState(str, Enum):
    CLOSED = 'closed'
    OPEN = 'open'
    HALF_OPEN = 'half-open'

Creating a Circuit

Basic Configuration

from tealtiger.core.circuit import TealCircuit

circuit = TealCircuit(
    failure_threshold=5,
    timeout=60000,  # 60 seconds
    half_open_requests=3
)

With State Change Callback

def on_state_change(new_state, old_state):
    print(f"Circuit state changed: {old_state}{new_state}")

circuit = TealCircuit(
    failure_threshold=5,
    timeout=60000,
    on_state_change=on_state_change
)

Executing Operations

Basic Execution

from tealtiger.core.circuit import CircuitOpenError

try:
    result = await circuit.execute(lambda: external_service.call())
    
    print(f"Operation succeeded: {result}")
except CircuitOpenError as error:
    print('Circuit is open, service unavailable')
except Exception as error:
    print(f"Operation failed: {error}")

With Retry Logic

async def call_with_retry(max_retries: int = 3):
    for i in range(max_retries):
        try:
            return await circuit.execute(lambda: external_service.call())
        except CircuitOpenError:
            # Circuit is open, don't retry
            raise
        except Exception as error:
            if i == max_retries - 1:
                raise
            
            # Wait before retry
            await asyncio.sleep(1 * (i + 1))

Evaluating Circuit State

Check State Before Operation

from tealtiger.core.context import create_execution_context
from tealtiger import DecisionAction

context = create_execution_context(
    application='service-a',
    user_id='service-a'
)

decision = circuit.evaluate(context)

if decision.action == DecisionAction.DENY:
    print('Circuit is open, service unavailable')
    print(f"Risk score: {decision.risk_score}")
else:
    # Proceed with operation
    await circuit.execute(lambda: external_service.call())

Monitor Circuit State

decision = circuit.evaluate(context)

print(f"Circuit state: {decision.metadata['circuit_state']}")
print(f"Failures: {decision.metadata['failures']}")
print(f"Last failure: {decision.metadata['last_failure_time']}")

Circuit States

CLOSED State

# Normal operation, requests pass through
decision = circuit.evaluate(context)

print(decision.action)  # ALLOW
print(decision.reason_codes)  # [ReasonCode.POLICY_COMPLIANT]
print(decision.risk_score)  # 0

OPEN State

# Circuit is tripped, requests fail immediately
decision = circuit.evaluate(context)

print(decision.action)  # DENY
print(decision.reason_codes)  # [ReasonCode.CIRCUIT_OPEN]
print(decision.risk_score)  # 100

HALF_OPEN State

# Testing if service has recovered
decision = circuit.evaluate(context)

print(decision.action)  # ALLOW
print(decision.reason_codes)  # [ReasonCode.CIRCUIT_HALF_OPEN]
print(decision.risk_score)  # 50

Manual Control

Reset Circuit

# Reset to CLOSED state
circuit.reset()

print(circuit.get_state())  # CircuitState.CLOSED

Force Open

# Force circuit to OPEN state
circuit.force_open()

print(circuit.get_state())  # CircuitState.OPEN

Force Close

# Force circuit to CLOSED state
circuit.force_close()

print(circuit.get_state())  # CircuitState.CLOSED

Integration with TealAudit

from tealtiger.core.circuit import TealCircuit
from tealtiger import TealAudit
from tealtiger.core.audit import ConsoleOutput

async def on_state_change(new_state, old_state):
    await audit.log({
        'schema_version': '1.0.0',
        'event_type': 'circuit_state_change',
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'correlation_id': 'circuit-001',
        'metadata': {
            'old_state': old_state,
            'new_state': new_state
        }
    })

circuit = TealCircuit(
    failure_threshold=5,
    timeout=60000,
    on_state_change=on_state_change
)

Error Handling

try:
    result = await circuit.execute(lambda: external_service.call())
except CircuitOpenError as error:
    # Circuit is open
    logger.error('Service unavailable (circuit open)', extra={
        'circuit_state': circuit.get_state(),
        'error': str(error)
    })
    
    # Return fallback response
    return fallback_response
except Exception as error:
    # Other error
    logger.error('Operation failed', extra={'error': str(error)})
    raise

Async/Await Support

import asyncio

async def execute_with_circuit():
    result = await circuit.execute(lambda: external_service.call())
    return result

# Run async
result = asyncio.run(execute_with_circuit())

Performance

TealCircuit targets:
  • < 1ms overhead per operation (p99)
  • < 0.5ms for evaluate() (p99)
  • Thread-safe for concurrent operations

Best Practices

Use Appropriate Thresholds

# ✅ Good: Reasonable thresholds
circuit = TealCircuit(
    failure_threshold=5,  # Trip after 5 consecutive failures
    timeout=60000,  # Wait 60s before attempting recovery
    half_open_requests=3  # Require 3 successes to close
)

Provide Fallback Responses

async def call_with_fallback():
    try:
        return await circuit.execute(lambda: external_service.call())
    except CircuitOpenError:
        # Return cached or default response
        return get_cached_response()

Monitor Circuit Health

import asyncio

async def monitor_circuit():
    while True:
        decision = circuit.evaluate(context)
        
        metrics.gauge('circuit.risk_score', decision.risk_score)
        metrics.gauge('circuit.failures', decision.metadata['failures'])
        
        if decision.action == DecisionAction.DENY:
            alerts.trigger('circuit_open', {
                'circuit_state': decision.metadata['circuit_state']
            })
        
        await asyncio.sleep(10)  # Every 10 seconds

# Run monitor
asyncio.create_task(monitor_circuit())

Type Hints

from typing import TypeVar, Callable, Awaitable
from tealtiger.core.circuit import TealCircuit

T = TypeVar('T')

async def execute_with_circuit(
    circuit: TealCircuit,
    fn: Callable[[], Awaitable[T]]
) -> T:
    """Execute function with circuit breaker protection."""
    return await circuit.execute(fn)