Agent Studio
Adr

ADR-008: Real-Time Observability Architecture

Analytics, WebSocket events, and monitoring infrastructure

ADR-008: Real-Time Observability Architecture

Status

Accepted

Context

Agent Studio requires comprehensive observability for voice call operations:

  1. Dashboard Analytics: Real-time metrics and historical data for monitoring call performance
  2. Live Call Monitoring: Ability to see active calls and receive instant updates
  3. Error Tracking: Centralized error logs from all call sessions
  4. Provider Performance: Latency tracking across STT, TTS, and LLM providers

The observability system must:

  • Provide sub-second updates for active call counts
  • Support historical analytics with configurable time ranges
  • Scale to handle thousands of concurrent WebSocket connections
  • Maintain tenant isolation for all metrics and events

Decision

We implemented a dual-layer observability architecture combining REST APIs for historical data and WebSockets for real-time events.

1. Analytics Repository Pattern

Created a dedicated AnalyticsRepository that computes aggregations directly in PostgreSQL:

class AnalyticsRepository:
    def __init__(self, session: AsyncSession, tenant_id: UUID):
        self.session = session
        self.tenant_id = tenant_id

    async def get_summary(
        self,
        started_after: datetime | None = None,
        started_before: datetime | None = None,
    ) -> dict[str, Any]:
        # Aggregate queries using SQLAlchemy
        stmt = select(
            func.count(Call.id).label("total_calls"),
            func.count(case((Call.status == CallStatus.COMPLETED, 1))).label("completed_calls"),
            func.avg(Call.duration_seconds).label("avg_duration"),
            # ... more aggregations
        ).where(Call.tenant_id == self.tenant_id, ...)

Key design decisions:

  • Database-level aggregation: Push computation to PostgreSQL for efficiency
  • JSONB extraction: Query latency metrics stored in call's metrics JSONB field
  • Time bucketing: Use date_trunc for call volume time series
  • Tenant scoping: All queries filtered by tenant_id

2. Analytics REST Endpoints

Six endpoints in /api/v1/analytics/:

EndpointPurposeRefresh Rate
/summaryOverall metrics (calls, duration, success rate, latencies)30s
/call-volumeTime series for charts60s
/workflow-statsTop workflows by volume60s
/provider-latencyLatency percentiles by provider60s
/errorsPaginated error logs30s
/active-callsCurrent active call count10s

All endpoints support:

  • Period parameter: 1h, 24h, 7d, 30d
  • Custom date ranges via started_after/started_before
  • Scope-based authorization (calls:read)

3. WebSocket Connection Manager

Implemented a centralized connection manager for real-time events:

class ConnectionManager:
    def __init__(self):
        # tenant_id -> list of connections
        self.tenant_connections: dict[UUID, list[WebSocket]] = {}
        # call_id -> list of connections
        self.call_connections: dict[UUID, list[WebSocket]] = {}

    async def broadcast_to_tenant(self, tenant_id: UUID, message: dict) -> None:
        # Send to all connections for a tenant

    async def broadcast_to_call(self, call_id: UUID, message: dict) -> None:
        # Send to connections watching a specific call

Features:

  • Tenant-scoped broadcasts: Events only reach connections for the same tenant
  • Call-specific subscriptions: Clients can subscribe to individual call events
  • Auto-reconnect support: 30-second keepalive pings
  • Graceful cleanup: Disconnected sockets removed from all pools

4. WebSocket Event Types

Standardized event format:

{
  "type": "call.started",
  "call_id": "uuid",
  "timestamp": "2026-01-17T10:30:00Z",
  "data": { ... }
}

Event types:

  • connected - Connection established
  • call.started - New call initiated
  • call.ended - Call completed successfully
  • call.failed - Call failed
  • call.agent_changed - Handoff between agents
  • call.transcript - New transcript entry
  • ping/pong - Keepalive messages

5. Dashboard Integration

React Query hooks with staggered refresh intervals:

export function useAnalyticsSummary(params = {}) {
  return useQuery({
    queryKey: ["analytics", "summary", params],
    queryFn: () => getAnalyticsSummary(params),
    refetchInterval: 30000, // 30 seconds
  });
}

export function useActiveCalls() {
  return useQuery({
    queryKey: ["analytics", "active-calls"],
    queryFn: () => getActiveCalls(),
    refetchInterval: 10000, // 10 seconds for real-time feel
  });
}

Hybrid approach for live call indicator:

  • WebSocket for instant updates when connected
  • REST API fallback when WebSocket unavailable

6. Error Log Extraction

Errors stored in call's errors JSONB array are flattened for querying:

async def get_error_logs(self, ...) -> list[dict]:
    # Get calls with non-empty errors array
    stmt = select(Call).where(
        Call.tenant_id == self.tenant_id,
        func.jsonb_array_length(Call.errors) > 0,
    )
    # Flatten errors from all matching calls
    for call in calls:
        for error in call.errors:
            error_logs.append({
                "call_id": str(call.id),
                "service": error.get("type"),
                "message": error.get("message"),
                ...
            })

Consequences

Positive

  • Real-time visibility: WebSocket events provide instant feedback on call activity
  • Efficient aggregation: Database-level queries scale well with call volume
  • Tenant isolation: All metrics properly scoped to prevent data leakage
  • Graceful degradation: REST API works when WebSocket unavailable
  • Flexible time ranges: Period parameter covers common dashboard use cases

Negative

  • In-memory connection state: Connection manager state lost on server restart
  • No persistence for events: WebSocket events are fire-and-forget
  • Limited percentile accuracy: Approximate percentiles from JSONB averages

Trade-offs

  • Polling vs Push: REST polling for historical data, WebSocket push for live events
  • Memory vs Redis: In-memory connection manager (simpler) vs Redis pub/sub (scalable)
  • Query cost: Analytics queries may be expensive on large datasets

Future Improvements

  1. Redis pub/sub: Scale WebSocket events across multiple API instances
  2. Materialized views: Pre-compute common aggregations for faster queries
  3. ClickHouse integration: Move analytics to columnar database for scale
  4. Event streaming: Persist WebSocket events to Kafka for replay/debugging
  5. Custom dashboards: Allow tenants to create custom metric views

On this page