Big Data

Airflow и DAG-оркестрация

К 2014 году в Airbnb 200 cron-джоб питались данными из нескольких CRM, выгружали в Hive, считали витрины и отправляли отчёты. Каждое утро инженер вручную проверял, не сломалось ли что ночью, и перезапускал упавшие звенья - с риском задеть зависимости. Максим Бошмен написал Airflow, чтобы превратить это болото в управляемый граф. Через 10 лет Airflow - индустриальный стандарт оркестрации в Airbnb, Lyft, Snap, Robinhood, Adobe. DAG, операторы, сенсоры и бэкфилл - не аббревиатуры, а конкретные конструкции, которые превращают набор скриптов в производственную инфраструктуру данных.

  • **Airbnb / Lyft:** тысячи DAG-ов гонят данные через Hive, Snowflake и Druid; SLA-monitoring встроен в платформу
  • **Astronomer / MWAA / Cloud Composer:** managed Airflow от Astronomer, AWS и GCP - выбор для команд, не желающих держать инфру самостоятельно
  • **dbt + Airflow:** типовой стек analytics engineering - Airflow оркестрирует, dbt трансформирует, Snowflake/BigQuery хранит

DAG: направленный ациклический граф задач

В 2014 году в Airbnb инженер Максим Бошмен столкнулся с известной болью: cron-скрипты разрослись до 200 файлов, зависимости между ними жили только в головах команды, при падении одной джобы вручную перезапускали 15 следующих. Он написал Airflow - оркестратор, где каждый пайплайн описывается как DAG: направленный ациклический граф задач с явными зависимостями. Узел DAG - задача (task), ребро - зависимость ('B запускается только после успешного A'). DAG версионируется как Python-код, исполнение планируется по расписанию, и для каждого запуска фиксируется DAG Run со статусом каждой задачи.

DAG в Airflow - это Python-объект, который собирается при импорте модуля. Параметры: schedule (cron-выражение или @daily), start_date (точка отсчёта запусков), catchup (нужно ли догнать пропущенные запуски), max_active_runs (сколько DAG Run одновременно). Каждый DAG Run помечен logical_date - меткой бизнес-времени, к которому относятся данные. Задачи внутри DAG-а получают эту дату через {{ ds }} и используют для запросов вроде WHERE date = '{{ ds }}'.

Чем DAG как модель оркестрации принципиально лучше списка cron-задач?

Операторы: атомарные единицы работы

Узел DAG-а в Airflow - не произвольный Python-код, а оператор: класс, инкапсулирующий типовое действие. BashOperator выполняет команду shell, PythonOperator вызывает функцию, PostgresOperator посылает SQL, KubernetesPodOperator поднимает под на кластере. Это сильно отличается от 'каждая задача - произвольный скрипт': операторы дают единый интерфейс конфигурации, единое логирование, поведение при ретраях и стандарт для secrets. Каталог Airflow Providers содержит сотни операторов: Snowflake, BigQuery, dbt, Spark, S3, Slack, dbt Cloud.

TaskFlow API (Airflow 2.0+) - современный синтаксис: декоратор @task превращает обычную Python-функцию в задачу, а возвращаемые значения автоматически прокидываются между задачами через XCom (cross-communication storage). Под капотом @task - всё тот же PythonOperator, но без явного конструирования XCom-ключей. Для тяжёлых пайплайнов с межсистемной интеграцией классические операторы остаются предпочтительнее: они работают с провайдерскими hooks (PostgresHook, S3Hook), которые управляют пулом соединений и credential-ами через Connection.

Зачем использовать готовый оператор вместо BashOperator с произвольным скриптом?

Sensors: ожидание внешних событий

Сенсор - специальный оператор, который ждёт выполнения условия. S3KeySensor проверяет, появился ли файл в bucket; ExternalTaskSensor ждёт завершения задачи в другом DAG; SqlSensor опрашивает таблицу до появления записей. Это решает фундаментальную проблему ETL: расписание не совпадает с реальным появлением данных. Поставщик может выгрузить файл в 02:00, а может в 04:30 - cron бесполезен, а сенсор поллит до результата. Есть два режима: poke (worker держит слот и пингует каждые N секунд) и reschedule (worker освобождается между проверками - спасает от исчерпания пула при сотнях ожидающих задач).

Deferrable Operators (Airflow 2.2+) - современная альтернатива сенсорам для длинных ожиданий. Оператор может вернуть trigger (event-loop корутину), и worker полностью освобождается. Триггеры выполняются в отдельном процессе Triggerer асинхронно: тысячи параллельных ожиданий потребляют минимум ресурсов. Сенсоры с reschedule-режимом - предшественник; deferrable - правильное решение для большинства новых пайплайнов.

S3KeySensor с mode='poke' и poke_interval=60 ждёт файла до 12 часов. В чём проблема при сотнях таких сенсоров одновременно?

Бэкфилл и идемпотентность: повторяемость пайплайна

Бэкфилл - запуск пайплайна для прошлых дат. Сценарии: новый ETL вышел в апреле, нужно посчитать историю за январь-март; нашли баг в трансформации - надо пересчитать неделю; добавили новую витрину - заполнить год назад. В Airflow бэкфилл реализуется автоматически через catchup или вручную через airflow dags backfill. Чтобы это работало корректно, каждая задача должна быть идемпотентна: повторный запуск с тем же logical_date даёт идентичный результат, не дублируя и не теряя данных. Без идемпотентности бэкфилл превращается в источник новых багов вместо исправления старых.

Шаблон идемпотентной загрузки: DELETE WHERE date='{{ ds }}'; INSERT ... WHERE date='{{ ds }}'. Альтернатива - MERGE / UPSERT по бизнес-ключу. В Spark/BigQuery: INSERT OVERWRITE PARTITION(date='{{ ds }}') - перезаписывает партицию целиком. Антипаттерн: SELECT NOW() в трансформации - результат зависит от реального времени запуска, и бэкфилл даст другой результат, чем оригинальный run. Правило: внутри пайплайна нет 'сейчас', есть только {{ ds }} - бизнес-дата.

Airflow и DAG - это просто красивый GUI для cron

Airflow моделирует пайплайн как граф зависимостей с явной logical_date, фиксирует состояние каждого запуска и обеспечивает повторяемость через бэкфилл. Cron знает только время старта

Cron не видит зависимостей и не знает разницы между 'сегодняшним' и 'историческим' запуском. Airflow же фиксирует logical_date, что даёт идемпотентность, бэкфилл, аудит и возможность переиграть прошлое. Это качественно другая модель - именно поэтому индустрия мигрировала от cron к оркестраторам

ETL посчитал ревенью за вчера и записал INSERT INTO mart.daily_revenue VALUES (..., NOW()). При бэкфилле что произойдёт?

Ключевые идеи

  • **DAG явно описывает граф зависимостей** - планировщик знает порядок, точку восстановления и критический путь.
  • **Оператор - типовое действие** со стандартным интерфейсом: connection, retry, logging, templating. Это сильно отличается от ad-hoc bash.
  • **Сенсор ждёт внешнего события** (файл в S3, успех другого DAG); reschedule-режим и deferrable operators масштабируют ожидание.
  • **Бэкфилл + идемпотентность - неразделимая пара:** только идемпотентный пайплайн можно безопасно перезапускать на прошлые даты без расхождений.

Связанные темы

Оркестрация связывает воедино реал-тайм события, хранилища и аналитические витрины.

  • Real-time Analytics — Airflow обычно оркестрирует batch-витрины, дополняющие streaming-пайплайны (lambda-архитектура)
  • Распределённые системы: отказоустойчивость — Идемпотентность задачи в Airflow - то же требование, что для retry-логики в распределённых протоколах

Вопросы для размышления

  • Airflow жив с 2014 года, но появились альтернативы: Dagster, Prefect, Temporal. По каким критериям команды выбирают между ними и когда осмысленно остаться на Airflow?
  • Идемпотентность пайплайна звучит просто, но требует дисциплины: ни одной NOW(), всегда DELETE+INSERT по бизнес-дате. Какие практики помогают команде удерживать эту дисциплину на сотнях DAG-ов?
  • Сенсоры с poke-режимом исчерпывают слоты воркеров; deferrable operators решают это асинхронно. В каких случаях команды осознанно остаются на классических сенсорах вместо миграции на deferrable?

Связанные уроки

  • bd-12 — Предыдущий урок big data - контекст для Airflow
  • bd-14 — dbt трансформации оркестрируются через Airflow DAG
  • bd-15 — Airflow запускает Spark MLlib задачи
  • alg-18-topological — DAG scheduling = топологическая сортировка зависимостей
  • ds-03-consensus — Airflow scheduler leader election - distributed consensus
  • st-02-stocks-flows — DAG pipeline - stocks-flows модель данных
  • devops-09
Airflow и DAG-оркестрация

0

1

Войти