Потоковая обработка
Kafka Streams
LinkedIn создал Kafka в 2011 году. Задача: обработать activity feed 300 миллионов пользователей без задержки. Для stream processing сначала использовали отдельные кластеры - Storm, затем Spark Streaming. Это лишние компоненты: отдельный деплой, отдельный мониторинг, отдельные зависимости. В 2016 году Confluent выпустил Kafka Streams - библиотеку, которая добавляет stream processing прямо в Java-приложение. Никакого отдельного кластера. Один `pom.xml`, один JAR, одно приложение. Uber обрабатывает через Kafka 1 триллион сообщений в день со средней латентностью 2 мс. Kafka Streams micro-batch: 100 мс окна. Это не хард real-time, но достаточно для surge pricing.
- LinkedIn: activity feed ranking через KStream + KTable join в реальном времени - новые посты джойнятся с профилем пользователя для персонализации
- Uber: surge pricing через windowed aggregation событий поездок - количество запросов за 5-минутное окно делится на количество доступных водителей
- Netflix: рекомендации через Kafka Streams feature pipeline - embedding пользователя обновляется каждые 5 мин на основе просмотров
- Fraud detection: KStream транзакций + KTable профиля пользователя - join без внешних запросов в базу, ML-скоринг в одном топологии
KTable: Changelog как State Store
2016 год. Confluent выпускает Kafka Streams. Первый вопрос разработчиков: как хранить состояние? Ответ неожиданный: **KTable** - это не таблица в привычном смысле. Это changelog stream, материализованный как in-memory state store. Каждый UPDATE - это новое сообщение в Kafka topic с тем же ключом. Сообщение с null value - это DELETE (tombstone). State store - это просто последнее значение каждого ключа в этом changelog. Log compaction в Kafka оставляет только последнее сообщение для каждого ключа, освобождая место.
**ML Feature Serving через KTable:** вместо запросов в Redis или PostgreSQL при каждом ML inference - хранить последний embedding пользователя прямо в Kafka Streams state store. Kafka Streams обновляет KTable при каждом новом событии. Interactive Queries позволяют читать state store из HTTP endpoint без дополнительной БД.
**RocksDB vs in-memory:** по умолчанию Kafka Streams использует RocksDB как state store - данные персистируются на диск и не теряются при перезапуске. Для тестов и небольших данных можно переключиться на in-memory: `Materialized.as(...).withLoggingDisabled()`.
KTable 'user-subscriptions' получает сообщение с ключом 'user-42' и value=null. Что произойдёт?
KStream: Unbounded Record Sequence
**KStream** - это unbounded последовательность key-value записей. Каждое сообщение обрабатывается независимо: нет понятия 'последнее значение', каждый record - отдельное событие. Kafka Streams DSL предоставляет map, filter, flatMap, peek, groupBy операции. Обработка at-least-once по умолчанию - при перезапуске после сбоя некоторые сообщения обработаются повторно. Exactly-once гарантии требуют включения Kafka транзакций: `processing.guarantee=exactly_once_v2`.
**Exactly-once через Kafka транзакции:** при `processing.guarantee=exactly_once_v2` Kafka Streams оборачивает чтение + обработку + запись + commit offset в одну атомарную транзакцию. Если приложение падает посередине - транзакция откатывается. Downstream consumer-ы видят либо полный результат, либо ничего. Цена: ~20% latency overhead.
KStream pipeline обрабатывает события покупок. Одно событие: userId='u1', amount=500. После `.mapValues(e -> toFeatureVector(e))` что окажется в выходном топике?
Joins: KStream-KTable и Co-partitioning
Join в Kafka Streams - это не SQL JOIN. **KStream-KTable join** работает как lookup: для каждой записи в KStream ищется соответствующее значение в KTable по ключу. Non-windowed - без временных границ. **KStream-KStream join** - windowed: оба потока буферизуются в окне времени. **Требование co-partitioning**: оба топика обязаны иметь одинаковое количество партиций и использовать один и тот же partitioner. Иначе Kafka Streams выбросит TopologyException при старте.
**Co-partitioning нарушение:** если топики имеют разное число партиций - `streams.start()` упадёт с `TopologyException: Topics не co-partitioned`. Решение: при создании топиков фиксировать одинаковое число партиций. Или использовать repartition перед join: `stream.repartition(Repartitioned.numberOfPartitions(4))`.
KStream 'orders' (6 партиций) делает join с KTable 'products' (4 партиции). Что произойдёт при запуске приложения?
Windowing: Tumbling, Hopping, Session
Windowing группирует записи KStream по времени для агрегации. **Tumbling window** - неперекрывающиеся окна фиксированного размера: [0-5мин], [5-10мин]. Каждая запись входит ровно в одно окно. **Hopping window** - перекрывающиеся окна: размер 10 мин, шаг 5 мин. Одна запись в нескольких окнах. **Session window** - переменная длина: окно открывается при первом событии, закрывается после inactivity gap. Для аномалий поведения - сколько кликов за одну сессию. **Grace period** - сколько ждать опоздавших записей после закрытия окна.
**Event time vs Processing time:** по умолчанию Kafka Streams использует processing time (когда запись обработана). Для корректной агрегации по времени события нужен TimestampExtractor: извлекать timestamp из payload. При event time возможны late arrivals - grace period определяет сколько ждать.
Окна буферизуют все сырые записи - большой window = большой memory footprint
Windows хранят только агрегаты (count, sum) per window per key, не сырые записи
При groupByKey().windowedBy().count() Kafka Streams хранит в state store только итоговый счётчик для каждой пары (key, window). Сырые события не сохраняются в state. Memory footprint = O(unique_keys * windows_in_flight), не O(total_records).
Hopping window размером 10 минут с шагом 5 минут. В 07:30 приходит событие. В скольких окнах оно окажется?
Ключевые идеи
- **KTable** - это changelog stream, материализованный как state store. Каждый UPDATE - это новое сообщение в topic. Tombstone (null value) = DELETE
- **KStream** - unbounded последовательность key-value записей. Каждое сообщение обрабатывается независимо, поддерживает map/filter/flatMap
- **Joins** требуют co-partitioning: оба topica должны иметь одинаковое количество партиций и один partition key. KStream-KTable join - non-windowed lookup
- **Windowing** группирует записи по времени: tumbling (non-overlapping), hopping (overlapping), session (activity-based). Grace period обрабатывает late arrivals
Связанные темы
Kafka Streams пересекается с несколькими областями:
- Apache Kafka: Архитектура — Фундамент - Kafka Streams работает поверх Kafka topics, partitions и consumer groups
- Distributed Replication — KTable state store использует те же принципы репликации через changelog topics
- ML Model Evaluation — Kafka Streams - основной инструмент real-time ML feature pipeline для online serving
- API Integration Patterns — Kafka Streams заменяет синхронные API-вызовы на stream processing для低задержки обработки
Вопросы для размышления
- KTable хранит только последнее значение по ключу. Как реализовать историю изменений - например, все предыдущие адреса доставки пользователя - используя Kafka Streams?
- Co-partitioning требует одинакового числа партиций для join. Что произойдёт, если топик user-profiles имеет 4 партиции, а топик user-events - 8? Как решить эту проблему без остановки системы?
- Session window имеет переменную длину, зависящую от активности пользователя. Каким образом это усложняет агрегацию для ML-фичей типа 'среднее время сессии за последние 30 дней'?