Потоковая обработка

Kafka Connect и Schema Registry

LinkedIn: 200 инженеров и 50 баз данных

2016 год. LinkedIn. Инженерная команда из 200 человек поддерживает 50 разных источников данных: MySQL, Oracle, HDFS, Voldemort (собственный KV-store), Espresso, десятки внешних API. Каждая интеграция с Kafka - отдельный скрипт. Нет единого мониторинга, нет стандартного формата, нет отказоустойчивости. Когда что-то падало - никто не знал какой из 200 скриптов сломался. Kafka Connect был создан внутри LinkedIn как ответ на этот хаос, открыт в 2015-м и вошёл в Apache Kafka 0.9. Сегодня Confluent Hub предлагает 200+ готовых коннекторов - MySQL, PostgreSQL, MongoDB, Salesforce, Snowflake, S3, каждый с поддержкой SMT и Schema Registry.

Kafka Connect: конец интеграционного кошмара

2016 год. LinkedIn. 200 инженеров. 50 разных баз данных, систем хранения, внешних сервисов. Каждая интеграция - отдельный скрипт на Python или Java, написанный когда-то кем-то, поддерживаемый никем. Когда MySQL переезжал на новый хост, падало 12 таких скриптов одновременно. Когда добавлялось новое поле в схему - нужно было найти все эти скрипты и поправить вручную. Kafka Connect появился как ответ на этот хаос: одна конфигурация в JSON вместо тысячи строк кода.

Kafka Connect - это фреймворк для передачи данных между Kafka и внешними системами. Не библиотека, не паттерн - именно фреймворк с собственным runtime, API и экосистемой готовых коннекторов. Source connector читает из внешнего источника (PostgreSQL, MongoDB, S3, HTTP API) и пишет в Kafka. Sink connector читает из Kafka и пишет во внешнюю систему (Elasticsearch, Snowflake, BigQuery, S3). Один worker может запускать десятки коннекторов параллельно.

Workers бывают двух видов. Standalone - один процесс, конфигурация в файле, без отказоустойчивости. Distributed - кластер workers, конфигурация через REST API, автоматическое перераспределение задач при падении узла. В продакшне всегда distributed: если worker умирает, его tasks уходят к живым коллегам без потери данных. Это тот же механизм consumer groups из Kafka - только для коннекторов.

Single Message Transforms (SMT) - это pipeline трансформаций, которые применяются к каждому сообщению до записи в Kafka или после чтения. Переименовать поле, добавить timestamp, замаскировать PII-данные, отфильтровать null-значения - всё это без единой строки кода. Цепочка SMT: `ReplaceField -> MaskField -> InsertField`. Ключевое ограничение: SMT stateless и работает с одним сообщением. Если нужен join двух потоков или агрегация - это уже Kafka Streams или Flink.

Debezium - самый популярный source connector для CDC (Change Data Capture). Вместо polling по timestamp он читает WAL (Write-Ahead Log) PostgreSQL или binlog MySQL. Задержка - миллисекунды вместо секунд. Но требует настройки репликации на уровне БД и реплика-слота. Подробнее - в уроке про CDC (stream-13).

У коннектора в distributed-режиме упал один из worker'ов. Что произойдёт с его tasks?

Schema Registry: контракт, который не сломается

Kafka хранит байты. Просто байты. Producer кладёт их, consumer читает. Это даёт гибкость - но создаёт катастрофу, когда producer меняет формат сообщений, а consumer об этом не знает. Именно это и произошло в Uber в 2017-м: команда сервиса геолокации добавила поле в Protobuf-схему, не предупредив downstream-потребителей. Три сервиса упали с `UnmarshallingException`. Schema Registry - это центральный реестр схем, который делает такие аварии невозможными на уровне инфраструктуры.

Confluent Schema Registry - HTTP-сервис, хранящий версии схем для каждого Kafka-топика. Когда producer публикует сообщение с Avro-сериализацией, клиент автоматически регистрирует схему в Registry и добавляет к каждому сообщению 5-байтовый заголовок: magic byte (0x00) + schema ID (4 байта, int32). Consumer читает этот заголовок, по ID получает схему из Registry и десериализует. Схема передаётся один раз при регистрации - не в каждом сообщении. При 1 млн сообщений в секунду это экономит гигабайты трафика.

Avro - не единственный формат, но самый популярный в Kafka-экосистеме. Схема определяется в JSON, данные хранятся в компактном бинарном формате без имён полей - только значения в порядке схемы. 100 байт JSON становятся 20-30 байтами Avro. Protobuf популярен в gRPC-стеках и имеет лучшую поддержку in cross-language окружениях. JSON Schema - для случаев, когда читаемость важнее размера или уже есть JSON-продюсеры. Schema Registry поддерживает все три.

Subject naming strategy определяет, как Schema Registry сопоставляет схемы с топиками. TopicNameStrategy (по умолчанию): один ключ-схема и одна value-схема на топик. RecordNameStrategy: одна схема на тип записи независимо от топика - удобно при множестве event-типов в одном топике. SchemaNameStrategy: явное имя subject. Выбор стратегии влияет на то, насколько строго соблюдается совместимость.

Producer пишет 1 млн Avro-сообщений в секунду. Сколько раз за это время клиент обращается к Schema Registry за схемой?

Schema Evolution: как менять контракты без аварий

Схемы меняются. Продукт растёт, требования меняются, старые поля устаревают. Вопрос не в том, изменится ли схема - а в том, сломает ли это изменение consumer'ов, которые читают старые сообщения, или producer'ов, которые пишут с новой схемой. Schema Registry решает эту задачу через режимы совместимости, которые проверяются при каждой попытке зарегистрировать новую версию схемы.

BACKWARD - самый распространённый режим. Новая версия схемы должна уметь читать сообщения, записанные со старой схемой. Это значит: новые поля должны иметь default-значение, удалять можно только поля с default. Сценарий: сначала деплоится новый consumer с новой схемой - и может читать и старые, и новые сообщения. Потом деплоится новый producer. Порядок деплоя: consumers before producers.

FORWARD - обратная гарантия. Старый consumer должен уметь читать новые сообщения. Это значит: новые обязательные поля запрещены, удалять можно любые поля (consumer просто их проигнорирует). Сценарий: сначала деплоится новый producer, потом consumer. Порядок: producers before consumers. На практике FORWARD используется реже - обычно предпочтительнее контролировать deployment consumer'ов.

FULL требует одновременного выполнения обоих условий. Это самый строгий режим: добавлять можно только поля с default, удалять - только поля с default. Deployment order не имеет значения. Цена - меньше гибкости при эволюции. TRANSITIVE-варианты (`BACKWARD_TRANSITIVE`, `FORWARD_TRANSITIVE`, `FULL_TRANSITIVE`) проверяют совместимость не только с предыдущей версией, но со всеми историческими версиями схемы. Используются когда в топике есть сообщения с разными старыми версиями схем - например, retention 7 дней при редких изменениях схемы.

Правило одного направления: при BACKWARD всегда деплоить consumers раньше producers. Один из способов это обеспечить - feature flags на producer стороне. Producer пишет со старой схемой до тех пор, пока мониторинг не подтверждает, что все consumer'ы обновлены.

FULL-совместимость - это самый надёжный режим, его надо использовать везде

FULL ограничивает гибкость без реальной необходимости в большинстве случаев. BACKWARD покрывает 90% сценариев - управляемый деплой consumers before producers достаточен.

FULL запрещает удалять любые поля без default и добавлять обязательные поля. Реальная потребность в FULL возникает только при хаотичном деплое без порядка или при нескольких независимых producer-командах без координации. В остальных случаях FULL - это избыточное ограничение.

Схема в режиме BACKWARD_TRANSITIVE. В топике есть сообщения версий v1, v2, v3 (текущая). Команда хочет зарегистрировать v4. Что проверит Schema Registry?

Ключевые идеи

  • **Kafka Connect** - фреймворк для интеграций без кода: source читает из внешних систем в Kafka, sink - из Kafka во внешние. Distributed mode даёт отказоустойчивость через rebalancing tasks.
  • **SMT (Single Message Transforms)** - stateless pipeline трансформаций на уровне коннектора. Переименование, маскировка, фильтрация - без кода. Для stateful операций (join, aggregation) - Kafka Streams или Flink.
  • **Schema Registry** хранит версии Avro/Protobuf/JSON Schema. Каждое сообщение несёт 5-байтовый header с schema_id. Consumer получает схему по ID - один раз, потом кэш.
  • **BACKWARD** - новый consumer читает старые сообщения (deploy consumers first). **FORWARD** - старый consumer читает новые (deploy producers first). **FULL** - оба направления. **TRANSITIVE** - проверка совместимости со всеми историческими версиями.
  • **Debezium** + Kafka Connect = CDC без polling: читает WAL PostgreSQL или binlog MySQL, задержка в миллисекунды.

Связанные темы

Kafka Connect и Schema Registry - часть более широкой экосистемы потоковых систем:

  • Apache Kafka: Архитектура — Фундамент - topics, partitions, offsets, на которых работает Connect
  • Kafka Streams — Дополнение - где Connect перемещает данные, Streams их обрабатывает
  • Change Data Capture (CDC) — Debezium реализует CDC поверх Kafka Connect Source
  • Миграции и schema evolution — Та же проблема эволюции схемы, но на уровне реляционной БД

Вопросы для размышления

  • В каком сценарии FORWARD-совместимость предпочтительнее BACKWARD - когда контроль над порядком деплоя consumer'ов невозможен?
  • Почему Avro хранит только значения в бинарном формате (без имён полей), но это не проблема для десериализации?
  • Можно ли обойтись без Schema Registry при использовании Kafka Connect - и какой ценой?

Связанные уроки

  • stream-04 — Архитектура Kafka - основа понимания Connect
  • stream-05 — Kafka Streams дополняет Connect в экосистеме
  • stream-13 — CDC строится поверх Kafka Connect (Debezium)
  • db-31-migrations — Schema evolution - та же проблема, что и DB миграции
  • db-34-lsm — Понимание storage помогает настраивать sink connectors
  • ds-05-replication — Репликация данных - контекст для understanding offsets
  • db-02-relational-model
Kafka Connect и Schema Registry

0

1

Войти