Транспорт бэкенда

Kafka Internals: ISR, компактификация, Streams

LinkedIn запускает 4 000-брокерный кластер Kafka, обрабатывающий 7 триллионов сообщений в день. Брокер падает каждую неделю, схемы данных меняются ежемесячно, а consumer-приложения работают без перерыва. Как это держится?

  • **Uber** использует Kafka Streams для расчёта ETA в реальном времени: миллионы поездок, локальное состояние водителей в RocksDB, exactly-once для биллинга
  • **Netflix** через Debezium тащит CDC из 1000+ MySQL-инстансов в Kafka, оттуда - в data lake и Elastic для аналитики
  • **Confluent Schema Registry** в проде Robinhood обслуживает 12 000 продюсеров и 3 000 консьюмеров; одна несовместимая схема могла бы положить торги

Репликация и ISR

У LinkedIn кластер Kafka из 4 000 брокеров обрабатывает 7 триллионов сообщений в день. Один брокер падает каждую неделю - и трафик идёт дальше без потерь. Секрет в **репликации**: каждая партиция хранится на нескольких брокерах, один из них - **leader**, остальные - **followers**, которые подтягивают данные.

Но не все followers полезны: тот, кто отстал на минуту, не годится в кандидаты на лидерство. Поэтому Kafka вводит понятие **ISR (In-Sync Replicas)** - набор followers, которые синхронизированы с leader не более чем на `replica.lag.time.max.ms` (по умолчанию 30 секунд). Запись считается **committed** только когда её получили все ISR-реплики.

**unclean.leader.election.enable=false** - запрещает выбирать лидером реплику не из ISR. Это даёт consistency: данные не теряются, но если все ISR упали - партиция недоступна (CP в CAP). При `true` доступность сохраняется, но возможна потеря записей.

Топик имеет replication.factor=3 и min.insync.replicas=2. Один follower выпадает из ISR из-за сетевой задержки. Что произойдёт с продюсером, использующим acks=all?

Log Compaction

Стандартный retention в Kafka - удаление по времени или размеру (например, 7 дней). Но что если топик хранит **состояние**, а не события? Например, профили пользователей: `user-42 -> {name: Ann, city: SPB}`. Через год городов будет много, но интересен только последний. Удалять по времени нельзя - потеряем актуальное состояние.

**Log compaction** - стратегия retention, при которой Kafka хранит **минимум одну запись на ключ**. Старые значения для того же ключа удаляются фоновым процессом cleaner. Если послать запись с `value=null`, ключ помечается как удалённый (**tombstone**) и через delete.retention.ms физически удаляется.

Compacted-топик с большим числом уникальных ключей раздувается так же, как обычный. Compaction решает проблему **частых апдейтов одного ключа**, а не общего объёма.

Топик `__consumer_offsets` в Kafka использует cleanup.policy=compact. Почему именно этот тип retention выбран для него?

Kafka Streams

Допустим, нужно посчитать количество заказов на пользователя за последний час. Можно поднять Flink или Spark Streaming - но это отдельный кластер с zookeeper, ресурсами, командой DevOps. **Kafka Streams** - это библиотека (JAR), которая встраивается в обычное JVM-приложение и читает/пишет Kafka, держа состояние локально.

Streams API оперирует двумя абстракциями: **KStream** (поток событий, append-only) и **KTable** (изменяемая таблица состояния, как compacted-топик). Локальное состояние хранится в RocksDB на диске и реплицируется в Kafka через changelog-топик - так состояние переживёт перезапуск инстанса.

**Exactly-once semantics** в Streams включается через `processing.guarantee=exactly_once_v2` и работает поверх Kafka transactions: чтение, обновление state и запись в output - всё или ничего. Это сложно в Flink и почти невозможно вручную.

Инстанс приложения Kafka Streams упал. На другой машине запускается новый инстанс той же группы. Что произойдёт с локальным состоянием (RocksDB)?

Kafka Connect

В компании 20 источников данных: Postgres, MySQL, MongoDB, S3, REST API. Каждый нужно лить в Kafka, а оттуда - в ClickHouse и Elastic. Писать 40 продюсеров и консьюмеров на Java? Это путь к зоопарку. **Kafka Connect** - фреймворк для интеграций без кода: декларативно описывается коннектор (source или sink), а Connect worker занимается параллелизмом, retries и offsets.

**Source connector** читает из внешней системы и пишет в Kafka (Debezium для CDC из Postgres, JDBC для опроса таблиц). **Sink connector** делает обратное - выгружает топик в ClickHouse, Elastic, S3. Каждый коннектор делится на **tasks**, которые распределяются между worker-нодами для параллелизма.

**Debezium CDC** читает WAL Postgres и публикует каждое изменение строки как событие в Kafka. Это даёт near-real-time реплику БД в топике - основа event-driven архитектуры и outbox pattern.

Чем Debezium-коннектор (CDC через WAL) принципиально лучше JDBC source connector (опрос таблиц по timestamp)?

Schema Registry

Продюсер пишет JSON `{userId: 42, name: 'Ann'}`. Через полгода кто-то меняет поле на `user_id` (snake_case). Все консьюмеры падают, потому что про новое поле они не знают. Без контракта между продюсером и консьюмерами эволюция данных превращается в ад. **Schema Registry** хранит схемы (Avro, Protobuf, JSON Schema), а клиенты при записи/чтении проверяют совместимость.

Сообщение в Kafka сериализуется как `[magic byte][schema id (4 bytes)][payload]`. Consumer достаёт schema id, идёт в Registry за схемой и десериализует. Schema Registry ведёт версии и проверяет правила совместимости при регистрации новой версии: **BACKWARD** (новые консьюмеры читают старые данные), **FORWARD** (старые консьюмеры читают новые данные), **FULL** (оба направления).

JSON Schema позволяет нестрогую типизацию (свободные поля), но в продакшене это часто стреляет в ногу. **Avro** или **Protobuf** дают строгий контракт, бинарную сериализацию и встроенную проверку совместимости.

Schema Registry - просто словарь схем; можно жить и без него, договорившись командно.

Schema Registry централизованно валидирует совместимость и блокирует deploy несовместимых изменений; устные договорённости рассыпаются на 5+ командах.

Контракт на данные - это не документация, а runtime-проверка. Без Registry один продюсер ломает всех консьюмеров одной выкаткой.

В compacted-топике с Avro-схемой нужно добавить новое обязательное поле без default. Registry настроен на BACKWARD compatibility. Что произойдёт при попытке зарегистрировать v2?

Ключевые идеи

  • **ISR + acks=all** дают консистентность: запись считается committed только когда её подтвердили все синхронные реплики
  • **Log compaction** хранит последнее значение на ключ - идеально для compacted-топиков состояния (offsets, профили, KTable changelogs)
  • **Kafka Streams** - библиотека, а не кластер: state в RocksDB + changelog-топик дают надёжное локальное состояние с exactly-once семантикой
  • **Kafka Connect** - декларативные интеграции без кода: Debezium для CDC, JDBC/S3/Elastic коннекторы покрывают 90% потребностей
  • **Schema Registry** превращает schema-эволюцию из ада в формальный контракт с правилами совместимости BACKWARD/FORWARD/FULL

Связанные темы

Возвращаясь к LinkedIn-сценарию: репликация спасает от падения брокеров, schema registry - от поломок интеграций, Streams - от отдельного потокового кластера. Эта инфраструктура - фундамент для:

  • Apache Kafka: основы — Топики, партиции, продюсеры, консьюмеры - база, на которой строится ISR, compaction и всё остальное
  • Распределённый консенсус — ISR + leader election - частный случай consensus; Kafka раньше использовал ZooKeeper, теперь KRaft (свой Raft)

Вопросы для размышления

  • Если min.insync.replicas=1 и replication.factor=3, чем отличается этот конфиг от обычного фуллстек-кластера без Kafka? Где в нём остаётся ценность Kafka?
  • Compacted-топик с миллиардом уникальных ключей и редкими апдейтами раздувается так же, как обычный. В каком сценарии compaction вообще не имеет смысла?
  • Schema Registry централизует контракт, но становится single point of failure. Как организовать его HA, и нужно ли всегда жертвовать децентрализацией ради консистентности?

Связанные уроки

  • bt-13-kafka — Базовый Kafka обязателен перед internals
  • bt-17-event-driven — Kafka Streams - event processing в реальном времени
  • bt-19-outbox — Outbox pattern использует Kafka как транзакционный log
  • bt-15-nats — NATS JetStream - альтернативный persistent log
  • ds-05-replication — ISR replication - конкретная реализация distributed replication
  • prob-08-variance — Log compaction - минимизация дисперсии хранения
  • db-04-cap
Kafka Internals: ISR, компактификация, Streams

0

1

Войти