Telemetry Audit Forwarding (Asynchronous Enrichment)
Overview
After the Core API writes an audit event to the local audit_log table (synchronous, HIPAA guarantee), it forwards the event to the Telemetry service asynchronously for enrichment and analytics. This decouples the enrichment pipeline from the critical path of mutation requests.
Why Telemetry?
Enrichment capabilities:
- Geo enrichment (IP → country, city, timezone)
- Actor hashing (SHA-256 patient IDs for privacy-safe analytics)
- Security threat detection (brute force, unusual access patterns)
- ClickHouse aggregation (compliance dashboards, activity heatmaps)
Why not in the platform?
- Geo enrichment requires external API calls (MaxMind GeoIP) — too slow for the Core API's request path
- ClickHouse is optimized for analytics, not transactional workloads
- Threat detection requires ML models and historical pattern analysis
- Separation of concerns: The Core API handles business logic, Telemetry handles observability
Architecture
Core API Audit Middleware
↓
Synchronous INSERT into local audit_log (~1ms)
↓
Async: Forwarder.Forward(event) → bounded channel (500 capacity)
↓
Background worker batches events (50 events or 2 seconds)
↓
HTTP POST to Telemetry /v1/audit/ingest (private network via AWS VPC)
↓
Telemetry enriches and stores in PostgreSQL + ClickHouseBest-Effort Delivery
The forwarder is best-effort:
- If the channel is full (500 events), new events are dropped
- If Telemetry is unreachable, events are logged and discarded (no retry)
- If Telemetry returns an error, events are logged and discarded (no retry)
Why no retries?
- The audit event is already safe in the platform's
audit_logtable - Telemetry can backfill from the Core API's table if needed (via a batch job)
- Retries add complexity and memory pressure (unbounded queue growth)
Implementation
Forwarder
The forwarder lives in the shared pkg/auditevt/ package so it can be used by both the Core API and other services.
// pkg/auditevt/forward.go
package auditevt
import (
"bytes"
"context"
"encoding/json"
"net/http"
"time"
"golang.org/x/exp/slog"
)
type Event struct {
Action string `json:"action"`
EntityType string `json:"entity_type"`
EntityID *int64 `json:"entity_id"`
UserID int64 `json:"user_id"`
OrganizationID int64 `json:"organization_id"`
IPAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
RequestPath string `json:"request_path"`
StatusCode int `json:"status_code"`
Timestamp time.Time `json:"timestamp"`
}
type Forwarder struct {
queue chan Event
client *http.Client
baseURL string // e.g., the TELEMETRY_INTERNAL_URL env var (Telemetry App Runner service URL)
}
func NewForwarder(telemetryURL string) *Forwarder {
f := &Forwarder{
queue: make(chan Event, 500), // Bounded channel
client: &http.Client{Timeout: 2 * time.Second},
baseURL: telemetryURL,
}
go f.processQueue()
return f
}
// Forward enqueues an event for async delivery to Telemetry.
// Non-blocking: if queue is full, event is dropped (already in local audit_log).
func (f *Forwarder) Forward(event Event) {
select {
case f.queue <- event:
// Event enqueued successfully
default:
// Queue full — drop event (already safe in local audit_log)
slog.Warn("audit: telemetry forward queue full, event already in local audit_log",
"action", event.Action, "entity", event.EntityType)
}
}
// processQueue runs in a background goroutine, batching events and sending to Telemetry.
func (f *Forwarder) processQueue() {
ticker := time.NewTicker(2 * time.Second)
var batch []Event
for {
select {
case event := <-f.queue:
batch = append(batch, event)
if len(batch) >= 50 {
f.flush(batch)
batch = nil
}
case <-ticker.C:
if len(batch) > 0 {
f.flush(batch)
batch = nil
}
}
}
}
// flush sends a batch of events to Telemetry.
func (f *Forwarder) flush(events []Event) {
body, _ := json.Marshal(map[string]any{"events": events})
req, _ := http.NewRequest(http.MethodPost, f.baseURL+"/v1/audit/ingest", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := f.client.Do(req)
if err != nil {
slog.Warn("audit: telemetry forward failed (events safe in local audit_log)", "error", err, "count", len(events))
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
slog.Warn("audit: telemetry returned error", "status", resp.StatusCode, "count", len(events))
} else {
slog.Debug("audit: forwarded to telemetry", "count", len(events))
}
}Initialization in the platform
// internal/core/server/server.go
func NewServer(config *config.Config) *Server {
// ...
// Create Telemetry forwarder
auditForwarder := auditevt.NewForwarder(config.TelemetryInternalURL)
return &Server{
db: db,
auditForwarder: auditForwarder,
// ...
}
}Environment Variables
# Telemetry service URL (configured in AWS Secrets Manager, pointing to the Telemetry App Runner service URL)
TELEMETRY_INTERNAL_URL=https://<telemetry-app-runner-service-url>Why AWS VPC private networking?
- No public internet exposure (lower latency, higher security)
- the Core API and Telemetry are separate App Runner services connected via VPC Connector
- Communication is over HTTPS within AWS
- See AWS Infrastructure for the full networking setup
Telemetry Service Endpoint
Telemetry receives batched audit events and enriches them.
// telemetry/internal/audit/handler.go
func (h *Handler) LogBatch(w http.ResponseWriter, r *http.Request) {
var req struct {
Events []auditevt.Event `json:"events"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
httputil.BadRequest(w, "invalid request body")
return
}
for _, event := range req.Events {
// 1. Geo enrichment (IP → country, city, timezone)
geo := h.geoIP.Lookup(event.IPAddress)
// 2. Actor hashing (SHA-256 user IDs for privacy-safe analytics)
actorHash := sha256.Sum256([]byte(fmt.Sprintf("%d:%d", event.UserID, event.OrganizationID)))
// 3. Threat detection (brute force, unusual access patterns)
threatLevel := h.threatDetector.Analyze(event)
// 4. Store in PostgreSQL (enriched audit events)
h.auditRepo.Create(r.Context(), &AuditEntry{
Action: event.Action,
EntityType: event.EntityType,
EntityID: event.EntityID,
UserID: event.UserID,
OrganizationID: event.OrganizationID,
IPAddress: event.IPAddress,
UserAgent: event.UserAgent,
RequestPath: event.RequestPath,
StatusCode: event.StatusCode,
Timestamp: event.Timestamp,
// Enriched fields
Country: geo.Country,
City: geo.City,
Timezone: geo.Timezone,
ActorHash: hex.EncodeToString(actorHash[:]),
ThreatLevel: threatLevel,
})
// 5. Store in ClickHouse (analytics events)
h.clickhouse.Insert(r.Context(), "analytics_events", map[string]any{
"timestamp": event.Timestamp,
"organization_id": event.OrganizationID,
"actor_hash": hex.EncodeToString(actorHash[:]),
"action": event.Action,
"entity_type": event.EntityType,
"country": geo.Country,
"threat_level": threatLevel,
})
}
httputil.NoContent(w)
}Performance Characteristics
Batching
Why batch events?
- Reduces HTTP request overhead (50 events in one request vs 50 separate requests)
- Allows Telemetry to bulk-insert into PostgreSQL and ClickHouse
- Reduces network traffic on the private network
Batch size:
- Max 50 events per batch (tuned for Telemetry's bulk insert performance)
- Max 2 seconds between batches (ensures timely delivery)
Trade-off:
- Higher batch size → more efficient, but longer delay
- Lower batch size → faster delivery, but more HTTP overhead
Memory Footprint
Channel capacity: 500 events
Event size: ~200 bytes per event (JSON-serialized)
Total memory: ~100 KB (negligible)
What happens if the queue fills up?
- New events are dropped (logged at WARN level)
- Expected frequency: 0 (queue should never fill in normal operation)
- If it happens: Telemetry is down or slow, the Core API is under extreme load (>500 mutations/sec)
Failure Modes
| Failure | Behavior | Impact |
|---|---|---|
| Telemetry is down | Events are logged at WARN level and dropped | No enrichment, but events are safe in local audit_log |
| Telemetry is slow (>2s timeout) | HTTP client times out, events are dropped | Same as above |
| Telemetry returns 4xx/5xx | Events are logged at WARN level and dropped | Same as above |
| Queue is full | Events are logged at WARN level and dropped | Same as above |
| the Core API crashes | In-flight events in the channel are lost | Only events in the channel (max 500), not in the database |
In all cases: Events are already safe in the platform's audit_log table (synchronous write). Telemetry forwarding is best-effort enrichment.
Backfill from the Core API
If Telemetry misses events (due to downtime, queue overflow, or network issues), it can backfill from the Core API's audit_log table.
// telemetry/internal/jobs/audit_backfill.go
func (j *AuditBackfillJob) Run(ctx context.Context) error {
// 1. Query the Core API's audit_log table for events after the last processed timestamp
lastProcessed := j.getLastProcessedTimestamp(ctx)
// 2. Fetch events from the Core API via direct database query (same VPC)
rows, _ := j.deskDB.Query(ctx,
`SELECT * FROM audit_log WHERE created_at > $1 ORDER BY created_at LIMIT 1000`,
lastProcessed,
)
// 3. Enrich and store in Telemetry (same logic as the LogBatch handler)
for rows.Next() {
var event auditevt.Event
rows.Scan(&event.Action, &event.EntityType, /* ... */)
// ... enrich and store ...
}
// 4. Update last processed timestamp
j.setLastProcessedTimestamp(ctx, time.Now())
return nil
}When to run backfill:
- After Telemetry downtime (manual trigger)
- Periodic check (daily cron job) to catch any missed events
- On-demand via admin API endpoint
Monitoring & Alerting
Metrics to track:
- Queue depth (should be near 0 in steady state)
- Events forwarded per second
- HTTP errors from Telemetry
- Events dropped due to full queue
Datadog monitors:
Alert: Telemetry forwarding errors
Query: logs("audit: telemetry forward failed").rollup(count).last(5m) > 10
Severity: P2 (warning — events safe in local audit_log, but enrichment failing)
Alert: Queue depth high
Query: avg:audit.forwarder.queue_depth{env:production} > 100
Severity: P2 (warning — backlog building up)
Alert: Events dropped
Query: logs("audit: telemetry forward queue full").rollup(count).last(5m) > 0
Severity: P1 (critical — queue overflow, need to investigate)Testing
Unit Tests
// pkg/auditevt/forward_test.go
func TestForwarderBatching(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req struct {
Events []Event `json:"events"`
}
json.NewDecoder(r.Body).Decode(&req)
assert.Equal(t, 50, len(req.Events))
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
forwarder := NewForwarder(server.URL)
// Send 50 events
for i := 0; i < 50; i++ {
forwarder.Forward(Event{Action: "CREATE", EntityType: "appointment"})
}
time.Sleep(100 * time.Millisecond) // Wait for batch to flush
// Assert server received exactly one batch of 50 events
}
func TestForwarderQueueOverflow(t *testing.T) {
// Slow server that never responds
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(10 * time.Second)
}))
defer server.Close()
forwarder := NewForwarder(server.URL)
// Send 600 events (exceeds channel capacity of 500)
for i := 0; i < 600; i++ {
forwarder.Forward(Event{Action: "CREATE", EntityType: "appointment"})
}
// Assert: first 500 events are queued, next 100 are dropped (logged at WARN level)
}Integration Tests
# Run integration test with a real Telemetry instance
go test ./pkg/auditevt -v -run TestForwarderIntegration -telemetry-url=http://localhost:4000
# Expected: events are forwarded and appear in Telemetry's audit.audit_logs tableSecurity Considerations
No Authentication on Internal Network
The Telemetry endpoint /v1/audit/ingest has no authentication because:
- It's only accessible within the AWS VPC (not exposed to the public internet)
- the Core API and Telemetry communicate via VPC Connector through private subnets
- See AWS Infrastructure for the full networking setup
If Telemetry is ever exposed publicly:
- Add API key authentication (e.g.,
Authorization: Bearer <shared_secret>) - Add request signing (HMAC-SHA256) to prevent replay attacks
Event Tampering
Risk: A compromised the Core API instance could forge audit events.
Mitigation:
- the Core API's
audit_logtable is the source of truth (write-only, no UPDATE/DELETE) - Telemetry enriches but does not replace the Core API's audit log
- Backfill jobs use the Core API's table as the authoritative source
- Any discrepancies trigger alerts (Telemetry event count vs Core API event count)
Privacy
Actor hashing: Telemetry hashes user IDs (SHA-256) before storing in ClickHouse for privacy-safe analytics. The hash includes the organization ID, so the same user in different orgs has different hashes.
Geo enrichment: IP addresses are resolved to country/city but not stored in ClickHouse (only stored in Telemetry PostgreSQL for 7 years per HIPAA).
Compliance Notes
HIPAA:
- Telemetry enrichment is NOT required for HIPAA compliance (the Core API's local
audit_logis sufficient) - Telemetry provides enhanced security monitoring and compliance reporting
- 7-year retention in Telemetry PostgreSQL (exceeds HIPAA's 6-year requirement)
GDPR:
- Actor hashing ensures privacy-safe analytics (no PII in ClickHouse)
- Geo enrichment uses IP address but does not store it in analytics (only in audit.audit_logs)
- Right to erasure: Telemetry deletes all events for a user when the Core API triggers GDPR erasure