Adr
ADR-008: Real-Time Observability Architecture
Analytics, WebSocket events, and monitoring infrastructure
ADR-008: Real-Time Observability Architecture
Status
Accepted
Context
Agent Studio requires comprehensive observability for voice call operations:
- Dashboard Analytics: Real-time metrics and historical data for monitoring call performance
- Live Call Monitoring: Ability to see active calls and receive instant updates
- Error Tracking: Centralized error logs from all call sessions
- Provider Performance: Latency tracking across STT, TTS, and LLM providers
The observability system must:
- Provide sub-second updates for active call counts
- Support historical analytics with configurable time ranges
- Scale to handle thousands of concurrent WebSocket connections
- Maintain tenant isolation for all metrics and events
Decision
We implemented a dual-layer observability architecture combining REST APIs for historical data and WebSockets for real-time events.
1. Analytics Repository Pattern
Created a dedicated AnalyticsRepository that computes aggregations directly in PostgreSQL:
class AnalyticsRepository:
def __init__(self, session: AsyncSession, tenant_id: UUID):
self.session = session
self.tenant_id = tenant_id
async def get_summary(
self,
started_after: datetime | None = None,
started_before: datetime | None = None,
) -> dict[str, Any]:
# Aggregate queries using SQLAlchemy
stmt = select(
func.count(Call.id).label("total_calls"),
func.count(case((Call.status == CallStatus.COMPLETED, 1))).label("completed_calls"),
func.avg(Call.duration_seconds).label("avg_duration"),
# ... more aggregations
).where(Call.tenant_id == self.tenant_id, ...)Key design decisions:
- Database-level aggregation: Push computation to PostgreSQL for efficiency
- JSONB extraction: Query latency metrics stored in call's
metricsJSONB field - Time bucketing: Use
date_truncfor call volume time series - Tenant scoping: All queries filtered by
tenant_id
2. Analytics REST Endpoints
Six endpoints in /api/v1/analytics/:
| Endpoint | Purpose | Refresh Rate |
|---|---|---|
/summary | Overall metrics (calls, duration, success rate, latencies) | 30s |
/call-volume | Time series for charts | 60s |
/workflow-stats | Top workflows by volume | 60s |
/provider-latency | Latency percentiles by provider | 60s |
/errors | Paginated error logs | 30s |
/active-calls | Current active call count | 10s |
All endpoints support:
- Period parameter:
1h,24h,7d,30d - Custom date ranges via
started_after/started_before - Scope-based authorization (
calls:read)
3. WebSocket Connection Manager
Implemented a centralized connection manager for real-time events:
class ConnectionManager:
def __init__(self):
# tenant_id -> list of connections
self.tenant_connections: dict[UUID, list[WebSocket]] = {}
# call_id -> list of connections
self.call_connections: dict[UUID, list[WebSocket]] = {}
async def broadcast_to_tenant(self, tenant_id: UUID, message: dict) -> None:
# Send to all connections for a tenant
async def broadcast_to_call(self, call_id: UUID, message: dict) -> None:
# Send to connections watching a specific callFeatures:
- Tenant-scoped broadcasts: Events only reach connections for the same tenant
- Call-specific subscriptions: Clients can subscribe to individual call events
- Auto-reconnect support: 30-second keepalive pings
- Graceful cleanup: Disconnected sockets removed from all pools
4. WebSocket Event Types
Standardized event format:
{
"type": "call.started",
"call_id": "uuid",
"timestamp": "2026-01-17T10:30:00Z",
"data": { ... }
}Event types:
connected- Connection establishedcall.started- New call initiatedcall.ended- Call completed successfullycall.failed- Call failedcall.agent_changed- Handoff between agentscall.transcript- New transcript entryping/pong- Keepalive messages
5. Dashboard Integration
React Query hooks with staggered refresh intervals:
export function useAnalyticsSummary(params = {}) {
return useQuery({
queryKey: ["analytics", "summary", params],
queryFn: () => getAnalyticsSummary(params),
refetchInterval: 30000, // 30 seconds
});
}
export function useActiveCalls() {
return useQuery({
queryKey: ["analytics", "active-calls"],
queryFn: () => getActiveCalls(),
refetchInterval: 10000, // 10 seconds for real-time feel
});
}Hybrid approach for live call indicator:
- WebSocket for instant updates when connected
- REST API fallback when WebSocket unavailable
6. Error Log Extraction
Errors stored in call's errors JSONB array are flattened for querying:
async def get_error_logs(self, ...) -> list[dict]:
# Get calls with non-empty errors array
stmt = select(Call).where(
Call.tenant_id == self.tenant_id,
func.jsonb_array_length(Call.errors) > 0,
)
# Flatten errors from all matching calls
for call in calls:
for error in call.errors:
error_logs.append({
"call_id": str(call.id),
"service": error.get("type"),
"message": error.get("message"),
...
})Consequences
Positive
- Real-time visibility: WebSocket events provide instant feedback on call activity
- Efficient aggregation: Database-level queries scale well with call volume
- Tenant isolation: All metrics properly scoped to prevent data leakage
- Graceful degradation: REST API works when WebSocket unavailable
- Flexible time ranges: Period parameter covers common dashboard use cases
Negative
- In-memory connection state: Connection manager state lost on server restart
- No persistence for events: WebSocket events are fire-and-forget
- Limited percentile accuracy: Approximate percentiles from JSONB averages
Trade-offs
- Polling vs Push: REST polling for historical data, WebSocket push for live events
- Memory vs Redis: In-memory connection manager (simpler) vs Redis pub/sub (scalable)
- Query cost: Analytics queries may be expensive on large datasets
Future Improvements
- Redis pub/sub: Scale WebSocket events across multiple API instances
- Materialized views: Pre-compute common aggregations for faster queries
- ClickHouse integration: Move analytics to columnar database for scale
- Event streaming: Persist WebSocket events to Kafka for replay/debugging
- Custom dashboards: Allow tenants to create custom metric views
Related ADRs
- ADR-007: REST API Router Architecture - Router patterns used by analytics endpoints
- ADR-001: Multi-Tenant Isolation - Tenant scoping for all queries