Big Data

Spark SQL и оптимизация

2014 год. Databricks выходит из академии. Spark SQL добавляет оптимизатор запросов к RDD API. Одна строка на Scala vs 50 строк Java MapReduce. Query plan оптимизируется автоматически. Catalyst видит весь план целиком и переписывает его: фильтры опускаются к источнику, ненужные колонки не читаются, JVM GC обходится через Unsafe API, join стратегия выбирается по статистике. Разница между наивным и оптимизированным Spark кодом - это разница между 5 часами и 5 минутами.

  • **Databricks**: Spark SQL для feature engineering в ML pipelines - Delta Lake + MLflow + Catalyst = единый оптимизированный path от сырых данных до модели
  • **Netflix**: Spark для A/B test analysis - 1 TB/day событий обрабатываются Catalyst с predicate pushdown по Parquet partition columns (date, country)
  • **LinkedIn**: Spark для training data preparation - 100 ТБ датасеты для ML моделей feed ranking, bucket join устраняет shuffle при повторяющихся join'ах
  • **Uber**: Spark SQL для fraud detection feature computation - 10 млрд строк в день, broadcast join для lookup таблиц правил мошенничества

Catalyst Optimizer: от SQL до байткода

2014 год. Databricks выходит из академии. Spark SQL добавляет оптимизатор запросов к RDD API. Одна строка на Scala vs 50 строк Java MapReduce. Query plan оптимизируется автоматически. Catalyst - компилятор запросов внутри Spark. Путь от SQL-строки до байткода: **unresolved logical plan** (AST без типов) -> **resolved logical plan** (таблицы и колонки привязаны) -> **optimized logical plan** (применены правила оптимизации) -> **physical plans** (несколько вариантов выполнения) -> **selected physical plan** (выбран по cost model). На каждом шаге Catalyst применяет десятки правил: predicate pushdown (фильтры опускаются к источнику данных), constant folding (выражение `1 + 2` заменяется на `3` во время компиляции), column pruning (читаются только нужные колонки из Parquet).

**EXPLAIN output**: строка `PushedFilters` в физическом плане подтверждает что predicate pushdown сработал. Строка `ReadSchema` показывает только нужные колонки - column pruning активен. Если `FileScan` читает все колонки - значит column pruning не применился (возможно, используются nested fields или `select('*')`).

В Spark MLlib все трансформации фичей (`VectorAssembler`, `StringIndexer`, `StandardScaler`) проходят через Catalyst. Pipeline из 5 трансформаций компилируется в один оптимизированный проход по данным - нет пяти отдельных сканирований датасета.

Catalyst применяет 'predicate pushdown' к запросу. Что это означает на практике при чтении Parquet?

Tungsten: обход JVM для производительности

Tungsten отказался от JVM garbage collector. Spark управляет памятью вручную через Unsafe API. JVM - это overhead. Для big data JVM abstractions стали узким местом и их обошли. Tungsten - три взаимосвязанные оптимизации. **Off-heap memory management**: данные хранятся вне JVM heap в нативной памяти через `sun.misc.Unsafe`. GC не сканирует эти объекты - паузы GC исчезают даже при 100 GB рабочего датасета. **Binary encoding**: строки DataFrame хранятся в компактном row format, не как Java объекты с header'ами и указателями. `String` в JVM - 40+ байт overhead; в Tungsten бинарном формате - только данные. **Whole-stage code generation**: Catalyst генерирует специализированный JVM байткод для каждого запроса через компилятор Janino. Вместо интерпретируемых виртуальных вызовов - inlined статический код, дружественный к CPU branch predictor и кешам L1/L2.

В физическом плане (`explain()`) операции с префиксом `*` охвачены whole-stage codegen. Операции без `*` интерпретируются обычным образом. Цель - минимизировать количество 'переходов' между скомпилированными блоками. Partial aggregation (`HashAggregate`) обычно скомпилирован; exchange (shuffle) - нет.

Почему off-heap memory management в Tungsten улучшает производительность Spark на больших датасетах?

Партиционирование и data skew

Партиция - единица параллелизма Spark. Каждая партиция обрабатывается одним task'ом на одном core. Правило: **2-4 партиции на каждый core кластера**. При 100 core'ах оптимально 200-400 партиций. Слишком мало партиций - core'а простаивают. Слишком много - overhead планировщика превышает пользу. **Hash partitioning** (default для join и groupBy): строки с одинаковым ключом попадают в одну партицию через `hash(key) % numPartitions`. **Range partitioning**: строки распределяются по диапазонам значений - для sorted output. `repartition(N)`: полный shuffle, создаёт ровно N партиций через hash partitioning. `coalesce(N)`: объединяет существующие партиции без shuffle - только для уменьшения числа партиций. **Data skew** - когда одна партиция содержит 100x больше данных, чем остальные. Один task выполняется 5 минут, остальные 99 - 5 секунд. Job ждёт один task. Решение для skew: salting - к skewed ключу добавляется случайный суффикс `key_0`, `key_1`, ..., `key_N`, распределяя данные по N партициям.

Частая ошибка с `coalesce`: после `df.repartition(400)` вызов `df.coalesce(1)` перед записью создаёт один огромный файл. Для записи в S3/HDFS лучше использовать `df.write.option('maxRecordsPerFile', 1000000)` - Spark сам разобьёт партицию на файлы без лишнего shuffle.

В чём разница между `repartition(N)` и `coalesce(N)` в Spark?

Стратегии join: broadcast, shuffle, bucket

Broadcast join: таблица меньше 10 МБ рассылается всем worker'ам. Shuffle join: 100 ГБ данных перемещаются по сети. Разница в latency - 5 секунд vs 5 минут. Одна аннотация `broadcast()` - и сетевая нагрузка исчезает. Три стратегии join в Spark: **Broadcast Hash Join** - маленькая таблица копируется на каждый executor, большая сканируется локально без shuffle. Порог: `spark.sql.autoBroadcastJoinThreshold` = 10 МБ по умолчанию. **Sort-Merge Join** (default для больших таблиц) - обе таблицы сортируются по join ключу, shuffle перемещает совпадающие ключи на одну ноду, merge читает отсортированные потоки. Сложность: $O(N \log N)$ network. **Bucket Join** - таблицы заранее партиционированы и отсортированы по join ключу при записи. Shuffle при join полностью отсутствует. Требует одинакового числа bucket'ов в обеих таблицах.

**Bucket join** - самая производительная стратегия для повторяющихся join'ов. Создаётся один раз при записи таблицы, используется многократно без shuffle. Ограничение: обе таблицы должны иметь одинаковое число bucket'ов и одинаковый ключ. В Delta Lake bucket join работает через `OPTIMIZE ZORDER BY`.

Broadcast join всегда быстрее sort-merge join - нужно всегда использовать broadcast()

Broadcast join быстрее только когда таблица действительно маленькая (умещается в памяти каждого executor'а). Если broadcast таблица большая (> памяти executor'а), broadcast join вызывает OOM и медленнее sort-merge join - поэтому порог 10 МБ существует

При broadcast каждый executor хранит полную копию таблицы в памяти. 200 executor'ов * 500 МБ таблица = 100 ГБ суммарно. Если executor имеет 4 ГБ памяти - OOM и падение job'а. Sort-merge join перемещает только нужные партиции.

Таблица lookup содержит 15 МБ данных. `spark.sql.autoBroadcastJoinThreshold` = 10 МБ. Что произойдёт при join с большой таблицей (500 GB) без явных hints?

Ключевые идеи

  • **Catalyst** компилирует запрос в 4 этапа: parse -> analyze -> optimize (predicate pushdown, column pruning, constant folding) -> physical plan
  • **Tungsten** обходит JVM GC через off-heap Unsafe API и генерирует специализированный байткод - исчезают GC паузы на больших данных
  • **Партиционирование**: `repartition` = shuffle + равномерность, `coalesce` = без shuffle + возможный skew; salting устраняет data skew через распределение hot key'ей
  • **Join стратегии**: broadcast (< 10 МБ, нет shuffle), sort-merge (default, $O(N \log N)$ shuffle), bucket (нет shuffle, требует предварительной записи)

Связанные темы

Spark SQL и оптимизация пересекаются с несколькими областями computer science:

  • Apache Spark: основы — Catalyst оптимизирует DataFrame операции, изученные в базовом уроке Spark
  • SQL и реляционные базы данных — Spark SQL синтаксис совместим с SQL; query planner Catalyst реализует те же концепции что и PostgreSQL planner
  • Репликация в распределённых системах — Партиционирование Spark - аналог шардинга; data skew в Spark - аналог hot spot в распределённых БД
  • Оценка ML моделей — Feature engineering для ML через Spark SQL - Catalyst оптимизирует MLlib Pipeline трансформации

Вопросы для размышления

  • Когда явный `broadcast()` hint нужен несмотря на то что Catalyst должен выбрать стратегию автоматически - какие сценарии делают автоматический выбор ненадёжным?
  • Data skew устраняется salting'ом - но salting требует дублирования lookup таблицы в N раз. Как выбрать оптимальное NUM_SALTS и при каком размере lookup таблицы salting становится неэффективным?
  • Whole-stage code generation генерирует JVM байткод для каждого нового запроса. Как это влияет на latency первого выполнения запроса vs последующих - и важно ли это для streaming vs batch сценариев?

Связанные уроки

  • bd-04
  • bd-06
  • ds-05-replication
  • db-05-sql-basics
  • ml-05-evaluation
Spark SQL и оптимизация

0

1

Войти