A blog is not just a document. It is a surface on which attention plays out in time.
When a reader arrives, scrolls, pauses, copies a snippet, follows a link - each of these acts carries information. Not just about what was read, but how it was read: how long the eye rested on a diagram, which code block was copied and from which article, whether the article was abandoned at the first section or read to completion.
This post describes how to capture that signal as a typed, schema-validated event stream - and route it through a pipeline that preserves it for later analysis.
The Event Schema
The foundation is a discriminated union. Every event shares a common base, and the type field determines its shape:
interface TelemetryEventBase {
timestamp: string // ISO 8601
sessionId: string // uuid, persisted in sessionStorage
articleSlug?: string
referrer?: string
semanticTopicTags?: string[]
}
type TelemetryEvent =
| PageViewEvent
| ArticleCompleteEvent
| ScrollDepthEvent
| SnippetCopyEvent
| SearchQueryEvent
// ...This shape is validated at the API boundary using Zod's discriminatedUnion, which provides precise error messages and zero runtime overhead for valid events.
The Pipeline Architecture
Events flow from the browser through a batch queue to a Kafka topic, then fan out to ClickHouse (columnar analytics) and OpenSearch (semantic indexing):
Browser
→ BatchQueue (client)
→ POST /api/telemetry
→ KafkaTransport
→ ClickHouse (time-series queries)
→ OpenSearch (semantic retrieval)The pipeline implementation wires these together:
import {
BatchQueue,
RetryQueue,
KafkaTransport,
ClickHouseTransport,
OpenSearchTransport,
type Transport,
type TelemetryEvent,
} from '@bigpines/telemetry'
// Fan-out transport: sends to multiple sinks in parallel
class MultiTransport implements Transport {
readonly name = 'multi'
constructor(private readonly transports: Transport[]) {}
async send(events: TelemetryEvent[]): Promise<void> {
const results = await Promise.allSettled(
this.transports.map((t) => t.send(events))
)
const failures = results.filter((r) => r.status === 'rejected')
if (failures.length > 0) {
console.warn(`[MultiTransport] ${failures.length}/${results.length} transports failed`)
}
}
}
function buildPipeline(): BatchQueue {
const kafka = new KafkaTransport({
brokers: (process.env['KAFKA_BROKERS'] ?? 'localhost:9092').split(','),
topic: process.env['KAFKA_TOPIC'] ?? 'blog-events',
clientId: 'blog-pipeline',
})
const clickhouse = new ClickHouseTransport({
url: process.env['CLICKHOUSE_URL'] ?? 'http://localhost:8123',
database: process.env['CLICKHOUSE_DATABASE'] ?? 'blog',
})
const opensearch = new OpenSearchTransport({
url: process.env['OPENSEARCH_URL'] ?? 'http://localhost:9200',
index: process.env['OPENSEARCH_INDEX'] ?? 'blog-events',
})
const multi = new MultiTransport([kafka, clickhouse, opensearch])
const resilient = new RetryQueue(multi, { maxRetries: 3, baseDelayMs: 1000 })
return new BatchQueue(resilient, {
batchSize: 50,
flushInterval: 10_000,
})
}
// Singleton pipeline for server-side use
let pipeline: BatchQueue | null = null
export function getPipeline(): BatchQueue {
if (!pipeline) {
pipeline = buildPipeline()
pipeline.start()
}
return pipeline
}
export async function shutdownPipeline(): Promise<void> {
if (pipeline) {
await pipeline.stop()
pipeline = null
}
}
Batching and Reliability
Events are never sent individually. The BatchQueue accumulates events in memory and flushes on a timer or when it reaches a size threshold. The RetryQueue wraps any transport with exponential backoff:
const transport = new RetryQueue(new KafkaTransport({ brokers, topic, clientId }), {
maxRetries: 3,
baseDelayMs: 1000,
})
const queue = new BatchQueue(transport, {
batchSize: 50,
flushInterval: 10_000,
})The keepalive: true flag on the HTTP transport ensures that focus_loss events - fired when the page unloads - are not dropped.
The Kafka Producer
The Kafka producer wraps KafkaJS with a simple batch-send interface:
import { Kafka, type Producer } from 'kafkajs'
import type { TelemetryEvent } from '@bigpines/telemetry'
const kafka = new Kafka({
clientId: 'blog-telemetry-producer',
brokers: (process.env['KAFKA_BROKERS'] ?? 'localhost:9092').split(','),
})
let producer: Producer | null = null
async function getProducer(): Promise<Producer> {
if (!producer) {
producer = kafka.producer({
allowAutoTopicCreation: false,
transactionTimeout: 30_000,
})
await producer.connect()
}
return producer
}
export async function publishEvent(event: TelemetryEvent): Promise<void> {
const p = await getProducer()
await p.send({
topic: process.env['KAFKA_TOPIC'] ?? 'blog-events',
messages: [
{
key: event.sessionId,
value: JSON.stringify(event),
headers: {
type: event.type,
version: '1',
timestamp: event.timestamp,
},
},
],
})
}
export async function publishBatch(events: TelemetryEvent[]): Promise<void> {
if (events.length === 0) return
const p = await getProducer()
await p.send({
topic: process.env['KAFKA_TOPIC'] ?? 'blog-events',
messages: events.map((event) => ({
key: event.sessionId,
value: JSON.stringify(event),
headers: { type: event.type, version: '1' },
})),
})
}
export async function disconnectProducer(): Promise<void> {
if (producer) {
await producer.disconnect()
producer = null
}
}
What Comes Next
This pipeline is the instrumentation layer. The next article covers what happens on the receiving end: consuming these events from Kafka, materializing them into ClickHouse, and making them retrievable via OpenSearch's k-NN index.
💡Running locally
Set TELEMETRY_TRANSPORT=console to log events to the terminal without needing Kafka or
ClickHouse running. The blog works fully offline with this setting.