Contents

Resilience & Distributed Design Patterns

Resilience patterns are essential for building fault-tolerant distributed systems that gracefully handle failures, network issues, and service degradation. These patterns prevent cascading failures and ensure system availability even when individual components fail. designgurus codecentric circuit-breaker-bulkhead-retries

Circuit Breaker

The Circuit Breaker pattern prevents cascading failures by failing fast when a service is unavailable, protecting system resources from exhaustion.

How It Works

The circuit breaker monitors failure rates and operates in three states: Closed (normal operation), Open (blocking requests), and Half-Open (testing recovery). ijirmps.org

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
import time
import threading
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception,
        name: str = "Circuit"
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.name = name
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = threading.Lock()
        
    def call(self, func: Callable, *args, **kwargs) -> Any:
        with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    print(f"[{self.name}] Circuit moving to HALF_OPEN state")
                else:
                    raise Exception(f"Circuit breaker is OPEN. Service unavailable.")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        with self._lock:
            self.failure_count = 0
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                print(f"[{self.name}] Circuit CLOSED - service recovered")
    
    def _on_failure(self):
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"[{self.name}] Circuit OPEN - failure threshold reached")

def circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: int = 60,
    expected_exception: type = Exception,
    name: str = "Circuit"
):
    """Decorator for circuit breaker pattern"""
    breaker = CircuitBreaker(failure_threshold, recovery_timeout, expected_exception, name)
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return breaker.call(func, *args, **kwargs)
        return wrapper
    return decorator

# Example Usage
@circuit_breaker(failure_threshold=3, recovery_timeout=5, name="PaymentService")
def call_payment_service(amount: float):
    """Simulated payment service call"""
    import random
    if random.random() < 0.7:  # 70% failure rate for demo
        raise Exception("Payment service unavailable")
    return {"status": "success", "amount": amount}

# Test the circuit breaker
if __name__ == "__main__":
    for i in range(10):
        try:
            result = call_payment_service(100.0)
            print(f"Attempt {i+1}: {result}")
        except Exception as e:
            print(f"Attempt {i+1}: Failed - {e}")
        time.sleep(1)

Use Cases

  • E-commerce Payment Gateway: Prevent overwhelming a failing payment processor by opening the circuit after multiple failures, allowing the service to recover.
  • Microservices Communication: Service A stops calling Service B when B is down, preventing thread exhaustion and resource depletion.

Best Practices

  • Set appropriate failure thresholds based on service SLAs and expected failure rates
  • Use exponential backoff for recovery timeout to avoid thundering herd problems
  • Implement fallback mechanisms to provide degraded functionality when circuit is open.
  • Monitor circuit breaker state changes and alert on frequent state transitions

Retry with Exponential Backoff

Retry with Exponential Backoff handles transient failures by retrying operations with increasing delays, preventing system overload. circuit-breaker-bulkhead-retries codecentric

How It Works

Each retry waits progressively longer (exponentially increasing delay with random jitter) to avoid synchronized retries and thundering herd effects. temporal.io

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
import time
import random
from functools import wraps
from typing import Callable, Type, Tuple

class ExponentialBackoff:
    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retry_exceptions: Tuple[Type[Exception], ...] = (Exception,)
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retry_exceptions = retry_exceptions
    
    def calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and optional jitter"""
        delay = min(
            self.base_delay * (self.exponential_base ** attempt),
            self.max_delay
        )
        
        if self.jitter:
            # Add random jitter (±25%)
            delay = delay * (0.75 + random.random() * 0.5)
        
        return delay
    
    def execute(self, func: Callable, *args, **kwargs):
        """Execute function with retry and exponential backoff"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                result = func(*args, **kwargs)
                if attempt > 0:
                    print(f"Success after {attempt} retries")
                return result
                
            except self.retry_exceptions as e:
                last_exception = e
                
                if attempt < self.max_retries:
                    delay = self.calculate_delay(attempt)
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {delay:.2f} seconds...")
                    time.sleep(delay)
                else:
                    print(f"All {self.max_retries} retries exhausted")
        
        raise last_exception

def retry_with_backoff(
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    retry_exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
    """Decorator for retry with exponential backoff"""
    backoff = ExponentialBackoff(
        max_retries, base_delay, max_delay,
        exponential_base, jitter, retry_exceptions
    )
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return backoff.execute(func, *args, **kwargs)
        return wrapper
    return decorator

# Example Usage
class ServiceUnavailableError(Exception):
    pass

@retry_with_backoff(
    max_retries=4,
    base_delay=0.5,
    max_delay=10.0,
    exponential_base=2.0,
    jitter=True,
    retry_exceptions=(ServiceUnavailableError, ConnectionError)
)
def fetch_user_data(user_id: str):
    """Simulated API call with transient failures"""
    import random
    
    # Simulate 60% failure rate
    if random.random() < 0.6:
        raise ServiceUnavailableError(f"Service temporarily unavailable for user {user_id}")
    
    return {"user_id": user_id, "name": "John Doe", "email": "john@example.com"}

# Test the retry mechanism
if __name__ == "__main__":
    try:
        print("Fetching user data with retry...")
        user = fetch_user_data("user_123")
        print(f"Success: {user}")
    except ServiceUnavailableError as e:
        print(f"Failed permanently: {e}")

Use Cases

  • Database Connection Failures: Retry transient connection failures with exponential backoff to handle temporary network issues
  • API Rate Limiting: Automatically retry requests that fail due to rate limits with increasing delays.
  • Message Processing: Retry failed message processing operations in event-driven architectures.

Best Practices

  • Only retry idempotent operations to avoid duplicate side effects
  • Add random jitter to prevent synchronized retries (thundering herd) linkedin
  • Set maximum retry limits to prevent infinite retry loops
  • Use different exceptions for retryable vs non-retryable errors
  • Combine with circuit breaker to stop retrying when service is consistently failing

Bulkhead & Rate Limiting

Bulkhead pattern isolates resources to prevent cascading failures, while rate limiting controls request throughput to protect services from overload. moldstud.com

How It Works

Bulkhead partitions resources (threads, connections, memory) into isolated pools, ensuring one failing operation doesn’t exhaust all resources.

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
```python
import threading
import time
from queue import Queue, Full, Empty
from typing import Callable, Any
from functools import wraps
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class BulkheadStats:
    total_requests: int = 0
    accepted_requests: int = 0
    rejected_requests: int = 0
    active_requests: int = 0

class Bulkhead:
    """Thread pool bulkhead implementation"""
    
    def __init__(self, name: str, max_concurrent: int = 10, queue_size: int = 20):
        self.name = name
        self.max_concurrent = max_concurrent
        self.semaphore = threading.Semaphore(max_concurrent)
        self.task_queue = Queue(maxsize=queue_size)
        self.stats = BulkheadStats()
        self._lock = threading.Lock()
        
    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function within bulkhead limits"""
        with self._lock:
            self.stats.total_requests += 1
        
        acquired = self.semaphore.acquire(blocking=False)
        
        if not acquired:
            with self._lock:
                self.stats.rejected_requests += 1
            raise Exception(f"Bulkhead {self.name} is full. Request rejected.")
        
        try:
            with self._lock:
                self.stats.accepted_requests += 1
                self.stats.active_requests += 1
            
            result = func(*args, **kwargs)
            return result
            
        finally:
            with self._lock:
                self.stats.active_requests -= 1
            self.semaphore.release()
    
    def get_stats(self) -> dict:
        """Get current bulkhead statistics"""
        with self._lock:
            return {
                "name": self.name,
                "total_requests": self.stats.total_requests,
                "accepted": self.stats.accepted_requests,
                "rejected": self.stats.rejected_requests,
                "active": self.stats.active_requests,
                "available": self.max_concurrent - self.stats.active_requests
            }

class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, rate: int, per: float = 1.0):
        """
        Args:
            rate: Number of requests allowed
            per: Time period in seconds
        """
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = time.time()
        self._lock = threading.Lock()
    
    def allow_request(self) -> bool:
        """Check if request is allowed under rate limit"""
        with self._lock:
            current = time.time()
            time_passed = current - self.last_check
            self.last_check = current
            
            # Add tokens based on time passed
            self.allowance += time_passed * (self.rate / self.per)
            
            # Cap at rate limit
            if self.allowance > self.rate:
                self.allowance = self.rate
            
            # Check if we have tokens
            if self.allowance < 1.0:
                return False
            
            # Consume one token
            self.allowance -= 1.0
            return True
    
    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with rate limiting"""
        if not self.allow_request():
            raise Exception(f"Rate limit exceeded: {self.rate} requests per {self.per}s")
        
        return func(*args, **kwargs)

# Example Usage
critical_bulkhead = Bulkhead("CriticalOps", max_concurrent=5)
non_critical_bulkhead = Bulkhead("NonCriticalOps", max_concurrent=2)
api_rate_limiter = RateLimiter(rate=10, per=1.0)  # 10 requests per second

def process_payment(amount: float):
    """Critical operation"""
    time.sleep(0.5)  # Simulate processing
    return f"Payment of ${amount} processed"

def generate_report(report_id: str):
    """Non-critical operation"""
    time.sleep(1.0)  # Simulate processing
    return f"Report {report_id} generated"

# Test bulkhead pattern
if __name__ == "__main__":
    import concurrent.futures
    
    def make_payment(i):
        try:
            result = critical_bulkhead.execute(process_payment, 100.0 * i)
            return f"Payment {i}: {result}"
        except Exception as e:
            return f"Payment {i}: REJECTED - {e}"
    
    def make_report(i):
        try:
            result = non_critical_bulkhead.execute(generate_report, f"RPT{i}")
            return f"Report {i}: {result}"
        except Exception as e:
            return f"Report {i}: REJECTED - {e}"
    
    # Test with thread pool
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        # Submit 10 payment tasks
        payment_futures = [executor.submit(make_payment, i) for i in range(10)]
        
        # Submit 10 report tasks
        report_futures = [executor.submit(make_report, i) for i in range(10)]
        
        # Collect results
        for future in concurrent.futures.as_completed(payment_futures + report_futures):
            print(future.result())
    
    # Print statistics
    print("\n=== Bulkhead Statistics ===")
    print(f"Critical Operations: {critical_bulkhead.get_stats()}")
    print(f"Non-Critical Operations: {non_critical_bulkhead.get_stats()}")

Use Cases

  • API Gateway: Limit concurrent requests per tenant to prevent noisy neighbor problems.
  • Database Connection Pools: Isolate connection pools per service to prevent one service exhausting all connections.
  • Microservices: Dedicate thread pools per downstream dependency to contain failures.

Best Practices

  • Size bulkheads based on resource capacity and expected load
  • Use token bucket or leaky bucket algorithms for smooth rate limiting
  • Implement per-user and per-service rate limits for fair resource allocation
  • Monitor resource utilization and adjust limits dynamically
  • Combine with circuit breaker for comprehensive protection

Saga pattern

Saga pattern manages distributed transactions across microservices using compensating transactions instead of distributed locks. microservices.io infoq.com learn.microsoft.com

How It Works

Orchestration uses a central coordinator to manage saga flow, while Choreography uses event-driven communication without central control. docs.aws.amazon.com viblo.asia

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
import json

class SagaStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"

@dataclass
class SagaStep:
    """Represents a single step in saga"""
    name: str
    action: Callable
    compensation: Callable
    executed: bool = False
    compensated: bool = False

@dataclass
class SagaExecution:
    """Tracks saga execution state"""
    saga_id: str
    status: SagaStatus = SagaStatus.PENDING
    steps_completed: List[str] = field(default_factory=list)
    error: str = None
    started_at: datetime = field(default_factory=datetime.now)
    completed_at: datetime = None

class SagaOrchestrator:
    """Orchestrator-based saga implementation"""
    
    def __init__(self, saga_id: str):
        self.saga_id = saga_id
        self.steps: List[SagaStep] = []
        self.execution = SagaExecution(saga_id=saga_id)
    
    def add_step(self, name: str, action: Callable, compensation: Callable):
        """Add a step to the saga"""
        self.steps.append(SagaStep(name, action, compensation))
    
    def execute(self, context: Dict[str, Any]) -> SagaExecution:
        """Execute the saga"""
        self.execution.status = SagaStatus.IN_PROGRESS
        print(f"\n[SAGA {self.saga_id}] Starting execution...")
        
        try:
            # Execute each step
            for step in self.steps:
                print(f"[SAGA {self.saga_id}] Executing: {step.name}")
                
                try:
                    result = step.action(context)
                    step.executed = True
                    self.execution.steps_completed.append(step.name)
                    
                    # Update context with result
                    context[f"{step.name}_result"] = result
                    print(f"[SAGA {self.saga_id}] ✓ {step.name} completed")
                    
                except Exception as e:
                    print(f"[SAGA {self.saga_id}] ✗ {step.name} failed: {e}")
                    self.execution.error = str(e)
                    self._compensate(context)
                    self.execution.status = SagaStatus.FAILED
                    return self.execution
            
            # All steps successful
            self.execution.status = SagaStatus.COMPLETED
            self.execution.completed_at = datetime.now()
            print(f"[SAGA {self.saga_id}] All steps completed successfully!")
            return self.execution
            
        except Exception as e:
            self.execution.error = str(e)
            self.execution.status = SagaStatus.FAILED
            return self.execution
    
    def _compensate(self, context: Dict[str, Any]):
        """Execute compensation for completed steps"""
        self.execution.status = SagaStatus.COMPENSATING
        print(f"[SAGA {self.saga_id}] Starting compensation...")
        
        # Compensate in reverse order
        for step in reversed(self.steps):
            if step.executed and not step.compensated:
                print(f"[SAGA {self.saga_id}] Compensating: {step.name}")
                
                try:
                    step.compensation(context)
                    step.compensated = True
                    print(f"[SAGA {self.saga_id}] ✓ {step.name} compensated")
                except Exception as e:
                    print(f"[SAGA {self.saga_id}] ✗ Compensation failed for {step.name}: {e}")

# Example: E-commerce Order Saga
class OrderService:
    @staticmethod
    def create_order(context: Dict) -> str:
        order_id = f"ORD-{context['user_id']}-001"
        print(f"  → Order created: {order_id}")
        return order_id
    
    @staticmethod
    def cancel_order(context: Dict):
        order_id = context.get("create_order_result")
        print(f"  → Order cancelled: {order_id}")

class PaymentService:
    @staticmethod
    def process_payment(context: Dict) -> str:
        amount = context["amount"]
        if amount > 10000:  # Simulate payment failure
            raise Exception("Payment amount exceeds limit")
        payment_id = f"PAY-{context['user_id']}-001"
        print(f"  → Payment processed: {payment_id} (${amount})")
        return payment_id
    
    @staticmethod
    def refund_payment(context: Dict):
        payment_id = context.get("process_payment_result")
        print(f"  → Payment refunded: {payment_id}")

class InventoryService:
    @staticmethod
    def reserve_inventory(context: Dict) -> bool:
        items = context["items"]
        print(f"  → Inventory reserved: {items}")
        return True
    
    @staticmethod
    def release_inventory(context: Dict):
        items = context["items"]
        print(f"  → Inventory released: {items}")

class ShippingService:
    @staticmethod
    def create_shipment(context: Dict) -> str:
        order_id = context["create_order_result"]
        shipment_id = f"SHIP-{order_id}"
        print(f"  → Shipment created: {shipment_id}")
        return shipment_id
    
    @staticmethod
    def cancel_shipment(context: Dict):
        shipment_id = context.get("create_shipment_result")
        if shipment_id:
            print(f"  → Shipment cancelled: {shipment_id}")

# Test the saga
if __name__ == "__main__":
    # Successful saga
    print("=== Test 1: Successful Order ===")
    saga1 = SagaOrchestrator("saga-001")
    saga1.add_step("create_order", OrderService.create_order, OrderService.cancel_order)
    saga1.add_step("process_payment", PaymentService.process_payment, PaymentService.refund_payment)
    saga1.add_step("reserve_inventory", InventoryService.reserve_inventory, InventoryService.release_inventory)
    saga1.add_step("create_shipment", ShippingService.create_shipment, ShippingService.cancel_shipment)
    
    context1 = {
        "user_id": "USER123",
        "amount": 500.00,
        "items": ["ITEM1", "ITEM2"]
    }
    
    result1 = saga1.execute(context1)
    print(f"\nFinal Status: {result1.status.value}")
    
    # Failed saga with compensation
    print("\n\n=== Test 2: Failed Order (Payment Exceeds Limit) ===")
    saga2 = SagaOrchestrator("saga-002")
    saga2.add_step("create_order", OrderService.create_order, OrderService.cancel_order)
    saga2.add_step("process_payment", PaymentService.process_payment, PaymentService.refund_payment)
    saga2.add_step("reserve_inventory", InventoryService.reserve_inventory, InventoryService.release_inventory)
    saga2.add_step("create_shipment", ShippingService.create_shipment, ShippingService.cancel_shipment)
    
    context2 = {
        "user_id": "USER456",
        "amount": 15000.00,  # Will fail!
        "items": ["ITEM3"]
    }
    
    result2 = saga2.execute(context2)
    print(f"\nFinal Status: {result2.status.value}")
    print(f"Error: {result2.error}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from typing import List, Callable
from dataclasses import dataclass
import threading

@dataclass
class Event:
    """Domain event"""
    event_type: str
    payload: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)

class EventBus:
    """Simple event bus for choreography"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self._lock = threading.Lock()
    
    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to event type"""
        with self._lock:
            if event_type not in self.subscribers:
                self.subscribers[event_type] = []
            self.subscribers[event_type].append(handler)
    
    def publish(self, event: Event):
        """Publish event to subscribers"""
        print(f"\n[EventBus] Publishing: {event.event_type}")
        
        handlers = self.subscribers.get(event.event_type, [])
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"[EventBus] Handler error: {e}")

# Event-driven services
event_bus = EventBus()

class ChoreographyOrderService:
    @staticmethod
    def handle_create_order(event: Event):
        print(f"  [OrderService] Creating order for user: {event.payload['user_id']}")
        
        # Publish next event
        event_bus.publish(Event(
            event_type="OrderCreated",
            payload={
                "order_id": "ORD-001",
                "user_id": event.payload['user_id'],
                "amount": event.payload['amount']
            }
        ))

class ChoreographyPaymentService:
    @staticmethod
    def handle_order_created(event: Event):
        print(f"  [PaymentService] Processing payment for order: {event.payload['order_id']}")
        
        # Publish next event
        event_bus.publish(Event(
            event_type="PaymentProcessed",
            payload={
                "order_id": event.payload['order_id'],
                "payment_id": "PAY-001"
            }
        ))

class ChoreographyInventoryService:
    @staticmethod
    def handle_payment_processed(event: Event):
        print(f"  [InventoryService] Reserving inventory for order: {event.payload['order_id']}")
        
        event_bus.publish(Event(
            event_type="InventoryReserved",
            payload={
                "order_id": event.payload['order_id']
            }
        ))

# Setup choreography
event_bus.subscribe("CreateOrder", ChoreographyOrderService.handle_create_order)
event_bus.subscribe("OrderCreated", ChoreographyPaymentService.handle_order_created)
event_bus.subscribe("PaymentProcessed", ChoreographyInventoryService.handle_payment_processed)

# Trigger the saga
# event_bus.publish(Event("CreateOrder", {"user_id": "USER789", "amount": 250.0}))

Use Cases

  • E-commerce Order Processing: Coordinate order creation, payment, inventory reservation, and shipping across multiple services.
  • Travel Booking: Book flights, hotels, and car rentals as a single transaction with rollback capability.
  • Financial Transactions: Manage multi-step financial operations with compensation for failed steps

Best Practices

  • Use orchestration for complex workflows with many steps requiring central control
  • Use choreography for loose coupling and autonomous services
  • Design idempotent compensating transactions to handle duplicate compensation requests
  • Store saga state persistently to handle coordinator failures
  • Implement timeout handling for long-running sagas

Transaction Outbox

The Outbox pattern ensures atomic database updates and message publishing without unsafe dual writes, enabling reliable event-driven architectures. infoq.com

How It Works

Write business data and outbox event in a single database transaction, then asynchronously publish events from the outbox table using change data capture (CDC).

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import sqlite3
import json
import time
import threading
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class OutboxStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    PROCESSED = "processed"
    FAILED = "failed"

@dataclass
class OutboxMessage:
    id: Optional[int] = None
    aggregate_type: str = ""
    aggregate_id: str = ""
    event_type: str = ""
    payload: str = ""
    status: str = OutboxStatus.PENDING.value
    created_at: Optional[str] = None
    processed_at: Optional[str] = None
    retry_count: int = 0

class TransactionalOutbox:
    """Transaction Outbox Pattern implementation"""
    
    def __init__(self, db_path: str = ":memory:"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """Initialize database tables"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Business data table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS orders (
                id TEXT PRIMARY KEY,
                user_id TEXT NOT NULL,
                amount REAL NOT NULL,
                status TEXT NOT NULL,
                created_at TEXT NOT NULL
            )
        ''')
        
        # Outbox table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS outbox (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                aggregate_type TEXT NOT NULL,
                aggregate_id TEXT NOT NULL,
                event_type TEXT NOT NULL,
                payload TEXT NOT NULL,
                status TEXT NOT NULL DEFAULT 'pending',
                created_at TEXT NOT NULL,
                processed_at TEXT,
                retry_count INTEGER DEFAULT 0
            )
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_outbox_status 
            ON outbox(status, created_at)
        ''')
        
        conn.commit()
        conn.close()
    
    def create_order(self, order_id: str, user_id: str, amount: float):
        """Create order with transactional outbox"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # Start transaction
            cursor.execute("BEGIN TRANSACTION")
            
            # Insert business data
            cursor.execute('''
                INSERT INTO orders (id, user_id, amount, status, created_at)
                VALUES (?, ?, ?, ?, ?)
            ''', (order_id, user_id, amount, "pending", datetime.now().isoformat()))
            
            # Insert event into outbox (same transaction)
            event = {
                "order_id": order_id,
                "user_id": user_id,
                "amount": amount,
                "timestamp": datetime.now().isoformat()
            }
            
            cursor.execute('''
                INSERT INTO outbox (
                    aggregate_type, aggregate_id, event_type, 
                    payload, status, created_at
                )
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                "Order",
                order_id,
                "OrderCreated",
                json.dumps(event),
                OutboxStatus.PENDING.value,
                datetime.now().isoformat()
            ))
            
            # Commit both operations atomically
            conn.commit()
            print(f"✓ Order {order_id} created and event written to outbox")
            
        except Exception as e:
            conn.rollback()
            print(f"✗ Transaction failed: {e}")
            raise
        finally:
            conn.close()
    
    def get_pending_messages(self, limit: int = 10) -> List[OutboxMessage]:
        """Retrieve pending messages from outbox"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT id, aggregate_type, aggregate_id, event_type, 
                   payload, status, created_at, processed_at, retry_count
            FROM outbox
            WHERE status = ?
            ORDER BY created_at ASC
            LIMIT ?
        ''', (OutboxStatus.PENDING.value, limit))
        
        messages = []
        for row in cursor.fetchall():
            messages.append(OutboxMessage(
                id=row[0],
                aggregate_type=row[1],
                aggregate_id=row[2],
                event_type=row[3],
                payload=row[4],
                status=row[5],
                created_at=row[6],
                processed_at=row[7],
                retry_count=row[8]
            ))
        
        conn.close()
        return messages
    
    def mark_as_processed(self, message_id: int):
        """Mark message as processed"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            UPDATE outbox
            SET status = ?, processed_at = ?
            WHERE id = ?
        ''', (OutboxStatus.PROCESSED.value, datetime.now().isoformat(), message_id))
        
        conn.commit()
        conn.close()
    
    def increment_retry(self, message_id: int):
        """Increment retry count"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            UPDATE outbox
            SET retry_count = retry_count + 1
            WHERE id = ?
        ''', (message_id,))
        
        conn.commit()
        conn.close()

class MessageRelay:
    """Polls outbox and publishes messages"""
    
    def __init__(self, outbox: TransactionalOutbox, poll_interval: float = 2.0):
        self.outbox = outbox
        self.poll_interval = poll_interval
        self.running = False
        self.thread = None
    
    def start(self):
        """Start the message relay"""
        self.running = True
        self.thread = threading.Thread(target=self._poll_loop, daemon=True)
        self.thread.start()
        print("Message Relay started")
    
    def stop(self):
        """Stop the message relay"""
        self.running = False
        if self.thread:
            self.thread.join()
        print("Message Relay stopped")
    
    def _poll_loop(self):
        """Continuously poll for pending messages"""
        while self.running:
            try:
                messages = self.outbox.get_pending_messages()
                
                for msg in messages:
                    self._publish_message(msg)
                
                time.sleep(self.poll_interval)
                
            except Exception as e:
                print(f"Relay error: {e}")
                time.sleep(self.poll_interval)
    
    def _publish_message(self, msg: OutboxMessage):
        """Publish message to message broker"""
        try:
            # Simulate publishing to Kafka/RabbitMQ
            print(f"📤 Publishing event: {msg.event_type} (ID: {msg.id})")
            print(f"   Payload: {msg.payload}")
            
            # Simulate potential failure
            import random
            if random.random() < 0.1:  # 10% failure rate
                raise Exception("Simulated publish failure")
            
            # Mark as processed
            self.outbox.mark_as_processed(msg.id)
            print(f"✓ Message {msg.id} published and marked as processed")
            
        except Exception as e:
            print(f"✗ Failed to publish message {msg.id}: {e}")
            self.outbox.increment_retry(msg.id)

# Example Usage
if __name__ == "__main__":
    outbox = TransactionalOutbox()
    relay = MessageRelay(outbox, poll_interval=1.0)
    
    # Start message relay
    relay.start()
    
    # Create some orders
    print("\n=== Creating Orders ===\n")
    for i in range(5):
        outbox.create_order(
            order_id=f"ORD-{i+1:03d}",
            user_id=f"USER-{i+1}",
            amount=100.0 * (i + 1)
        )
        time.sleep(0.5)
    
    # Wait for messages to be processed
    print("\n=== Waiting for message processing ===\n")
    time.sleep(5)
    
    # Stop relay
    relay.stop()

Use Cases

  • Order Processing: Atomically save order and publish OrderCreated event without risking inconsistency.
  • Saga Orchestration: Combine with saga pattern to ensure reliable saga step execution and event publishing,
  • Event Sourcing: Persist events reliably before publishing to event stream.

Best Practices

  • Use change data capture (CDC) tools like Debezium for production reliability
  • Implement idempotent consumers to handle duplicate event delivery
  • Add event versioning to support schema evolution
  • Use SELECT FOR UPDATE SKIP LOCKED to enable concurrent publishers
  • Regularly archive published events to prevent table growth

Idempotency Keys & Deduplication

Idempotency ensures operations can be safely retried without duplicating side effects, critical for reliable distributed systems. dev.to

How It Works

Clients provide unique idempotency keys with requests; servers store processed keys and return cached responses for duplicate requests.

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import uuid
import hashlib
import time
from typing import Any, Dict, Optional, Callable
from datetime import datetime, timedelta
from functools import wraps
import json

class IdempotencyCache:
    """In-memory cache for idempotency keys"""
    
    def __init__(self, ttl_seconds: int = 86400):  # 24 hours default
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.ttl_seconds = ttl_seconds
    
    def get(self, key: str) -> Optional[Dict[str, Any]]:
        """Get cached result by idempotency key"""
        if key not in self.cache:
            return None
        
        entry = self.cache[key]
        
        # Check if expired
        if datetime.now() > entry['expires_at']:
            del self.cache[key]
            return None
        
        return entry['result']
    
    def set(self, key: str, result: Any):
        """Cache result with idempotency key"""
        self.cache[key] = {
            'result': result,
            'expires_at': datetime.now() + timedelta(seconds=self.ttl_seconds),
            'created_at': datetime.now()
        }
    
    def cleanup_expired(self):
        """Remove expired entries"""
        now = datetime.now()
        expired_keys = [
            k for k, v in self.cache.items()
            if now > v['expires_at']
        ]
        for key in expired_keys:
            del self.cache[key]

class IdempotencyKeyGenerator:
    """Generate idempotency keys"""
    
    @staticmethod
    def generate_uuid() -> str:
        """Generate UUID-based idempotency key"""
        return str(uuid.uuid4())
    
    @staticmethod
    def generate_from_payload(payload: Dict) -> str:
        """Generate deterministic key from payload"""
        # Sort keys for consistent hashing
        sorted_payload = json.dumps(payload, sort_keys=True)
        hash_object = hashlib.sha256(sorted_payload.encode())
        return hash_object.hexdigest()

def idempotent(cache: IdempotencyCache, key_param: str = 'idempotency_key'):
    """Decorator for idempotent operations"""
    
    def decorator(func: Callable):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Extract idempotency key
            idempotency_key = kwargs.get(key_param)
            
            if not idempotency_key:
                raise ValueError(f"Missing required parameter: {key_param}")
            
            # Check cache
            cached_result = cache.get(idempotency_key)
            if cached_result is not None:
                print(f"[IDEMPOTENT] Returning cached result for key: {idempotency_key[:16]}...")
                return cached_result
            
            # Execute function
            print(f"[IDEMPOTENT] Executing for key: {idempotency_key[:16]}...")
            result = func(*args, **kwargs)
            
            # Cache result
            cache.set(idempotency_key, result)
            
            return result
        
        return wrapper
    return decorator

# Example: Payment Service with Idempotency
class PaymentService:
    def __init__(self):
        self.idempotency_cache = IdempotencyCache(ttl_seconds=3600)
        self.processed_payments = []
    
    @idempotent(cache=lambda self: self.idempotency_cache)
    def process_payment(
        self,
        user_id: str,
        amount: float,
        idempotency_key: str
    ) -> Dict[str, Any]:
        """Process payment with idempotency"""
        
        # Simulate payment processing
        print(f"  → Processing payment: ${amount} for user {user_id}")
        time.sleep(0.5)
        
        payment_id = f"PAY-{len(self.processed_payments) + 1:05d}"
        timestamp = datetime.now().isoformat()
        
        payment = {
            "payment_id": payment_id,
            "user_id": user_id,
            "amount": amount,
            "status": "completed",
            "timestamp": timestamp,
            "idempotency_key": idempotency_key
        }
        
        self.processed_payments.append(payment)
        print(f"  ✓ Payment processed: {payment_id}")
        
        return payment

# Message Deduplication
class MessageDeduplicator:
    """Deduplicate messages based on message ID"""
    
    def __init__(self, window_size: int = 1000):
        self.seen_messages = set()
        self.window_size = window_size
        self.message_queue = []
    
    def is_duplicate(self, message_id: str) -> bool:
        """Check if message has been seen before"""
        if message_id in self.seen_messages:
            return True
        
        # Add to seen set
        self.seen_messages.add(message_id)
        self.message_queue.append(message_id)
        
        # Maintain sliding window
        if len(self.message_queue) > self.window_size:
            old_message = self.message_queue.pop(0)
            self.seen_messages.discard(old_message)
        
        return False
    
    def process_message(self, message_id: str, payload: Dict) -> bool:
        """Process message if not duplicate"""
        if self.is_duplicate(message_id):
            print(f"[DEDUP] Skipping duplicate message: {message_id}")
            return False
        
        print(f"[DEDUP] Processing new message: {message_id}")
        # Process message logic here
        return True

# Example Usage
if __name__ == "__main__":
    payment_service = PaymentService()
    key_gen = IdempotencyKeyGenerator()
    
    print("=== Test 1: Normal Payment ===\n")
    key1 = key_gen.generate_uuid()
    result1 = payment_service.process_payment(
        user_id="USER123",
        amount=250.00,
        idempotency_key=key1
    )
    print(f"Result: {result1}\n")
    
    print("=== Test 2: Duplicate Payment (Same Key) ===\n")
    result2 = payment_service.process_payment(
        user_id="USER123",
        amount=250.00,
        idempotency_key=key1  # Same key
    )
    print(f"Result: {result2}\n")
    
    print("=== Test 3: New Payment (Different Key) ===\n")
    key2 = key_gen.generate_uuid()
    result3 = payment_service.process_payment(
        user_id="USER456",
        amount=500.00,
        idempotency_key=key2
    )
    print(f"Result: {result3}\n")
    
    print("\n=== Message Deduplication Example ===\n")
    deduplicator = MessageDeduplicator(window_size=3)
    
    messages = [
        {"id": "MSG-001", "data": "First message"},
        {"id": "MSG-002", "data": "Second message"},
        {"id": "MSG-001", "data": "Duplicate"},  # Duplicate!
        {"id": "MSG-003", "data": "Third message"},
    ]
    
    for msg in messages:
        deduplicator.process_message(msg["id"], msg)

Use Cases

  • Payment Processing: Prevent double-charging users when they retry failed payment requests.
  • Order Submission: Ensure users can safely retry order submission without creating duplicate orders.
  • Message Processing: Deduplicate messages in event-driven systems to handle at-least-once delivery. deduplication-in-distributed-systems

Best Practices

  • Generate idempotency keys on the client side using UUIDs or request content hashes
  • Set appropriate TTL for idempotency records based on retry window requirements
  • Store idempotency keys before processing to handle concurrent duplicate requests
  • Use database constraints or Redis SETNX for atomic deduplication
  • Include request fingerprint in idempotency key (method, path, key parameters)

Dead Letter Queue

Dead Letter Queue (DLQ) captures failed messages that cannot be processed after multiple retry attempts, enabling manual investigation and reprocessing. temporal.io

How It Works

Messages that fail processing after maximum retries are moved to a separate queue for analysis and manual intervention.

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import json
import time
from typing import Any, Dict, Callable, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from queue import Queue, Empty
import threading

class MessageStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    DEAD_LETTER = "dead_letter"

@dataclass
class Message:
    id: str
    payload: Dict[str, Any]
    status: MessageStatus = MessageStatus.PENDING
    retry_count: int = 0
    max_retries: int = 3
    created_at: datetime = field(default_factory=datetime.now)
    last_error: Optional[str] = None
    error_history: list = field(default_factory=list)

class DeadLetterQueue:
    """Dead Letter Queue implementation"""
    
    def __init__(self, name: str = "DLQ"):
        self.name = name
        self.messages = []
        self._lock = threading.Lock()
    
    def add(self, message: Message):
        """Add message to DLQ"""
        with self._lock:
            message.status = MessageStatus.DEAD_LETTER
            self.messages.append(message)
            print(f"[{self.name}] Message {message.id} moved to dead-letter queue")
            print(f"[{self.name}]   Retries exhausted: {message.retry_count}/{message.max_retries}")
            print(f"[{self.name}]   Last error: {message.last_error}")
    
    def get_all(self):
        """Get all messages in DLQ"""
        with self._lock:
            return self.messages.copy()
    
    def get_stats(self) -> Dict:
        """Get DLQ statistics"""
        with self._lock:
            return {
                "total_messages": len(self.messages),
                "oldest_message": min([m.created_at for m in self.messages]) if self.messages else None,
                "error_types": {}  # Could categorize by error type
            }
    
    def replay_message(self, message_id: str) -> Optional[Message]:
        """Remove message from DLQ for reprocessing"""
        with self._lock:
            for i, msg in enumerate(self.messages):
                if msg.id == message_id:
                    msg.status = MessageStatus.PENDING
                    msg.retry_count = 0
                    msg.error_history.clear()
                    return self.messages.pop(i)
        return None

class MessageProcessor:
    """Message processor with DLQ support"""
    
    def __init__(
        self,
        process_func: Callable,
        max_retries: int = 3,
        retry_delay: float = 1.0
    ):
        self.process_func = process_func
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.dlq = DeadLetterQueue()
        self.processed_messages = []
    
    def process_message(self, message: Message) -> bool:
        """Process message with retry and DLQ logic"""
        message.status = MessageStatus.PROCESSING
        
        while message.retry_count <= message.max_retries:
            try:
                print(f"\n[Processor] Processing message {message.id} (attempt {message.retry_count + 1})")
                
                # Process the message
                result = self.process_func(message.payload)
                
                # Success
                message.status = MessageStatus.COMPLETED
                self.processed_messages.append(message)
                print(f"[Processor] ✓ Message {message.id} processed successfully")
                return True
                
            except Exception as e:
                message.retry_count += 1
                message.last_error = str(e)
                message.error_history.append({
                    "attempt": message.retry_count,
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                })
                
                print(f"[Processor] ✗ Attempt {message.retry_count} failed: {e}")
                
                if message.retry_count <= message.max_retries:
                    print(f"[Processor] Retrying in {self.retry_delay}s...")
                    time.sleep(self.retry_delay)
                else:
                    # Move to DLQ
                    print(f"[Processor] Max retries exceeded")
                    message.status = MessageStatus.FAILED
                    self.dlq.add(message)
                    return False
        
        return False
    
    def get_stats(self) -> Dict:
        """Get processing statistics"""
        return {
            "processed": len(self.processed_messages),
            "dlq_size": len(self.dlq.get_all()),
            "dlq_stats": self.dlq.get_stats()
        }

# Example: Order Processing with DLQ
class OrderProcessor:
    def __init__(self):
        self.valid_user_ids = ["USER001", "USER002", "USER003"]
    
    def process_order(self, payload: Dict) -> Dict:
        """Process order - may fail for various reasons"""
        
        # Validation errors (permanent failures)
        if "user_id" not in payload:
            raise ValueError("Missing user_id field")
        
        if payload["user_id"] not in self.valid_user_ids:
            raise ValueError(f"Invalid user_id: {payload['user_id']}")
        
        if payload.get("amount", 0) <= 0:
            raise ValueError("Amount must be positive")
        
        # Simulate transient failures
        import random
        if random.random() < 0.3:  # 30% transient failure rate
            raise ConnectionError("Temporary database connection issue")
        
        # Success
        return {
            "order_id": payload.get("order_id"),
            "status": "processed",
            "timestamp": datetime.now().isoformat()
        }

# Example Usage
if __name__ == "__main__":
    order_processor = OrderProcessor()
    processor = MessageProcessor(
        process_func=order_processor.process_order,
        max_retries=3,
        retry_delay=0.5
    )
    
    # Test messages
    test_messages = [
        Message(id="MSG-001", payload={"order_id": "ORD001", "user_id": "USER001", "amount": 100.0}),
        Message(id="MSG-002", payload={"order_id": "ORD002", "user_id": "INVALID", "amount": 50.0}),  # Will fail
        Message(id="MSG-003", payload={"order_id": "ORD003", "user_id": "USER002", "amount": -10.0}),  # Will fail
        Message(id="MSG-004", payload={"order_id": "ORD004", "user_id": "USER003", "amount": 200.0}),
    ]
    
    print("=== Processing Messages ===")
    for msg in test_messages:
        processor.process_message(msg)
        time.sleep(0.5)
    
    # Print statistics
    print("\n\n=== Processing Statistics ===")
    stats = processor.get_stats()
    print(f"Successfully Processed: {stats['processed']}")
    print(f"Dead Letter Queue Size: {stats['dlq_size']}")
    
    # Show DLQ messages
    print("\n=== Dead Letter Queue Messages ===")
    for dlq_msg in processor.dlq.get_all():
        print(f"\nMessage ID: {dlq_msg.id}")
        print(f"  Payload: {dlq_msg.payload}")
        print(f"  Retries: {dlq_msg.retry_count}")
        print(f"  Last Error: {dlq_msg.last_error}")
        print(f"  Error History:")
        for error in dlq_msg.error_history:
            print(f"    - Attempt {error['attempt']}: {error['error']}")

Use Cases

  • Order Processing: Capture orders that fail validation or payment processing for manual investigation
  • Event Processing: Store events that cannot be processed due to schema changes or missing dependencies.
  • Integration Failures: Handle third-party API failures that require manual intervention.

Best Practices

  • Set appropriate max retry counts based on failure type (transient vs permanent)
  • Include rich error context in DLQ messages for debugging
  • Implement DLQ monitoring and alerting to detect systemic issues
  • Create replay mechanisms to reprocess messages after fixes
  • Use separate DLQs per error type for better organization

Timeouts & Fallback

Timeouts prevent indefinite waiting, while fallbacks provide graceful degradation when services are unavailable. temporal.io

How It Works

Set maximum wait times for operations and define alternative responses or behaviors when timeouts occur or services fail.

Python Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import time
import threading
from typing import Callable, Any, Optional
from functools import wraps
import signal
from contextlib import contextmanager

class TimeoutError(Exception):
    """Raised when operation times out"""
    pass

class TimeoutManager:
    """Manage timeouts for operations"""
    
    @staticmethod
    @contextmanager
    def timeout(seconds: float):
        """Context manager for timeout"""
        def timeout_handler(signum, frame):
            raise TimeoutError(f"Operation timed out after {seconds}s")
        
        # Set the signal handler
        old_handler = signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(int(seconds))
        
        try:
            yield
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)

class ThreadTimeout:
    """Thread-based timeout for cross-platform support"""
    
    def __init__(self, seconds: float):
        self.seconds = seconds
        self.result = None
        self.exception = None
    
    def run(self, func: Callable, *args, **kwargs) -> Any:
        """Run function with timeout"""
        
        def target():
            try:
                self.result = func(*args, **kwargs)
            except Exception as e:
                self.exception = e
        
        thread = threading.Thread(target=target)
        thread.daemon = True
        thread.start()
        thread.join(timeout=self.seconds)
        
        if thread.is_alive():
            raise TimeoutError(f"Operation timed out after {self.seconds}s")
        
        if self.exception:
            raise self.exception
        
        return self.result

class Fallback:
    """Fallback mechanism for failed operations"""
    
    def __init__(
        self,
        primary_func: Callable,
        fallback_func: Callable,
        timeout_seconds: Optional[float] = None,
        fallback_exceptions: tuple = (Exception,)
    ):
        self.primary_func = primary_func
        self.fallback_func = fallback_func
        self.timeout_seconds = timeout_seconds
        self.fallback_exceptions = fallback_exceptions
    
    def execute(self, *args, **kwargs) -> Any:
        """Execute with fallback"""
        try:
            if self.timeout_seconds:
                # Execute with timeout
                timeout_runner = ThreadTimeout(self.timeout_seconds)
                return timeout_runner.run(self.primary_func, *args, **kwargs)
            else:
                # Execute without timeout
                return self.primary_func(*args, **kwargs)
                
        except self.fallback_exceptions as e:
            print(f"[Fallback] Primary function failed: {e}")
            print(f"[Fallback] Executing fallback function...")
            return self.fallback_func(*args, **kwargs)

def with_timeout_and_fallback(
    timeout_seconds: float,
    fallback_func: Callable,
    fallback_exceptions: tuple = (Exception,)
):
    """Decorator for timeout and fallback"""
    
    def decorator(func):
        fallback = Fallback(
            primary_func=func,
            fallback_func=fallback_func,
            timeout_seconds=timeout_seconds,
            fallback_exceptions=fallback_exceptions
        )
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            return fallback.execute(*args, **kwargs)
        
        return wrapper
    return decorator

# Example: Service with Timeout and Fallback
class RecommendationService:
    """Service that provides product recommendations"""
    
    def __init__(self):
        self.cache = {
            "USER001": ["Product A", "Product B", "Product C"],
            "USER002": ["Product D", "Product E"],
        }
    
    def get_personalized_recommendations(self, user_id: str) -> list:
        """Get AI-powered recommendations (slow, may fail)"""
        print(f"[Primary] Fetching personalized recommendations for {user_id}...")
        
        # Simulate slow AI service
        time.sleep(3)
        
        # Simulate occasional failures
        import random
        if random.random() < 0.3:
            raise ConnectionError("AI service unavailable")
        
        return [f"AI-Recommended-{i}" for i in range(1, 6)]
    
    def get_fallback_recommendations(self, user_id: str) -> list:
        """Fallback to cached/popular recommendations"""
        print(f"[Fallback] Using cached recommendations for {user_id}...")
        
        # Return cached recommendations or default popular items
        if user_id in self.cache:
            return self.cache[user_id]
        
        return ["Popular Item 1", "Popular Item 2", "Popular Item 3"]

class WeatherService:
    """Weather service with timeout and fallback"""
    
    def fetch_live_weather(self, city: str) -> dict:
        """Fetch live weather data (may timeout)"""
        print(f"[Primary] Fetching live weather for {city}...")
        
        # Simulate network delay
        time.sleep(2.5)
        
        return {
            "city": city,
            "temperature": 25.5,
            "condition": "Sunny",
            "source": "live"
        }
    
    @staticmethod
    def get_cached_weather(city: str) -> dict:
        """Fallback to cached weather data"""
        print(f"[Fallback] Using cached weather for {city}...")
        
        return {
            "city": city,
            "temperature": 20.0,
            "condition": "Partly Cloudy",
            "source": "cached",
            "warning": "Data may be outdated"
        }

# Apply decorators
recommendation_service = RecommendationService()

@with_timeout_and_fallback(
    timeout_seconds=2.0,
    fallback_func=recommendation_service.get_fallback_recommendations,
    fallback_exceptions=(TimeoutError, ConnectionError)
)
def get_recommendations(user_id: str) -> list:
    return recommendation_service.get_personalized_recommendations(user_id)

weather_service = WeatherService()

@with_timeout_and_fallback(
    timeout_seconds=2.0,
    fallback_func=weather_service.get_cached_weather,
    fallback_exceptions=(TimeoutError,)
)
def get_weather(city: str) -> dict:
    return weather_service.fetch_live_weather(city)

# Example Usage
if __name__ == "__main__":
    print("=== Test 1: Recommendations (Will Timeout) ===\n")
    try:
        recommendations = get_recommendations("USER001")
        print(f"Result: {recommendations}\n")
    except Exception as e:
        print(f"Error: {e}\n")
    
    print("\n=== Test 2: Weather (Will Timeout) ===\n")
    try:
        weather = get_weather("San Francisco")
        print(f"Result: {weather}\n")
    except Exception as e:
        print(f"Error: {e}\n")
    
    print("\n=== Test 3: Multiple Requests ===\n")
    cities = ["New York", "London", "Tokyo"]
    for city in cities:
        try:
            result = get_weather(city)
            print(f"{city}: {result['condition']} ({result['source']})\n")
        except Exception as e:
            print(f"{city}: Error - {e}\n")

Use Cases

  • External API Calls: Set timeouts for third-party APIs and return cached data as fallback
  • Database Queries: Timeout long-running queries and return default values for non-critical data.
  • Microservices: Provide degraded functionality when downstream services are slow or unavailable.

Best Practices

  • Set realistic timeouts based on service SLAs and p99 latency
  • Implement cascading timeouts where caller timeout > callee timeout + overhead
  • Design meaningful fallbacks that maintain core functionality
  • Use cached data as primary fallback strategy
  • Monitor fallback usage rates to detect degraded services

Combining Patterns

Real-world resilient systems combine multiple patterns for comprehensive protection.