Параллельные вычисления

MapReduce и Spark

2004. Один paper Джеффа Дина переворачивает индустрию. Через 20 лет тот же фундамент крутит данные в Anthropic, OpenAI, Tesla Autopilot, Netflix recommendations и Spotify Discover Weekly. MapReduce не умер - он стал воздухом, которым дышит big data.

  • **Google index 2004:** первый production MapReduce обработал петабайты веб-страниц для построения индекса
  • **Spark в Databricks:** обрабатывает 9 экзабайт данных в день в облачных кластерах
  • **ML обучение:** distributed dataloader в PyTorch - тот же MapReduce под капотом
  • **Spotify Discover Weekly:** аггрегации listening events на Spark - десятки миллиардов событий в неделю

MapReduce: бумага, изменившая обработку данных

2004. Jeff Dean и Sanjay Ghemawat публикуют в OSDI пейпер 'MapReduce: Simplified Data Processing on Large Clusters'. 13 страниц. В индустрии бомба. Google уже год индексирует веб через эту систему. Через пять лет Yahoo пишет Hadoop, и весь мир получает open-source. Без MapReduce не было бы big data в современном виде.

Идея до примитивности проста. Программист пишет две функции: $map(k_1, v_1) \to list(k_2, v_2)$ и $reduce(k_2, list(v_2)) \to list(v_3)$. Map применяется к каждому входному элементу. Reduce агрегирует значения с одинаковым ключом. Всё остальное - распределение, fault tolerance, retry - делает фреймворк. Программист думает в терминах functional combinators, а не параллельных потоков.

Та же абстракция, что в forward pass нейросети: матричное умножение - это и есть map+reduce. Каждый нейрон собирает сумму взвешенных входов (reduce по строке). Каждый weight применяется к своему input'у (map). PyTorch внутри делает MapReduce на GPU без слова MapReduce. Идея слишком фундаментальна, чтобы быть только про big data.

**Word count - канонический пример:** map(doc) → list of (word, 1). Reduce(word, [1,1,1,...]) → (word, sum). На 100 нодах считает частотность слов в 10 TB текста за минуты. Тот же алгоритм считает n-grams для тренировки LLM, BM25-индексов в поисковиках, embedding-статистик в Sentence-Transformers.

В чём ключевая абстракция MapReduce?

Spark RDD: lineage побеждает чекпоинты

2009. UC Berkeley. Матей Захария замечает: классический MapReduce пишет промежуточные результаты на диск между job'ами. Для итеративных алгоритмов (PageRank, k-means, ML обучение) это смерть - десять итераций = десять чтений и записей на HDFS. Он придумывает Spark с RDD - Resilient Distributed Dataset, абстракцией, которая держит данные в RAM, а отказоустойчивость даёт через lineage.

RDD - неизменяемый партиционированный набор данных. На него можно применить transformation: `map`, `filter`, `groupByKey`. Каждая transformation возвращает новый RDD, не вычисляя его сразу - lazy evaluation. Spark строит DAG зависимостей. Когда программист зовёт `action` (`collect`, `count`, `save`), весь DAG исполняется.

Lineage - запись о том, *как* RDD был получен из других RDD. Если партиция теряется при падении ноды, Spark пересчитывает её по lineage, а не восстанавливает с диска. Это та же идея, что в gradient checkpointing для нейросетей: не хранить, а пересчитывать - дешевле памяти, дороже вычислений, но при rare failure это выигрыш.

**Cache vs persist:** `.cache()` держит RDD в RAM, `.persist(MEMORY_AND_DISK)` спиллит на диск при OOM. Для PageRank на 30 итераций - принципиальная разница: с cache на хорошем кластере 100x speedup против Hadoop. Та же логика, что KV-cache в трансформере: не пересчитывать то, что уже посчитано.

Что даёт Spark преимущество в скорости над Hadoop MapReduce для итеративных задач?

Shuffle: дорогой час пик в распределённой обработке

После map каждый воркер держит свои локальные пары (key, value). Чтобы reduce увидел все значения одного ключа в одном месте, эти пары надо физически переместить по сети. Это и есть shuffle - этап, где данные летят через кластер. На большом джобе shuffle - 80% времени и 99% боли.

Алгоритм простой: hash(key) mod num_partitions определяет, какой reducer получит пару. Map-сторона пишет пары в локальные shuffle-файлы, разбитые по партициям. Reduce-сторона тянет свои партиции от всех маперов. $M \times R$ сетевых соединений, $O(N)$ записей на диск. Бенчмарки показывают: shuffle 1 TB между 100 нодами - 5-15 минут на 10 Gbit сети.

Та же боль есть в распределённом обучении нейросетей. All-reduce у NCCL для усреднения градиентов - архитектурный родственник shuffle. ZeRO-3 в DeepSpeed раскидывает параметры и градиенты так, чтобы минимизировать all-to-all коммуникацию. Кто понимает shuffle в Spark, легче понимает gradient sharding на GPU-кластере.

**Skew - убийца производительности:** если один ключ имеет 90% значений (например, NULL или 'user_id=unknown'), один reducer получает почти всю работу, остальные простаивают. Решения: salt'ить ключ ($key + random()$), брать broadcast join вместо shuffle join, агрегировать локально перед shuffle (combiner). Та же проблема, что long-tail distribution в RL: один outlier-эпизод тянет gradient за собой.

Почему shuffle - самая дорогая операция в Spark?

Partitioning: чем хорошее разбиение отличается от плохого

Партиция - физический кусок RDD, обрабатываемый одним task'ом на одном executor'е. Если RDD имеет 200 партиций и кластер 50 executor'ов - 50 task'ов идут параллельно, остальные ждут в очереди. Слишком мало партиций - underutilization, слишком много - overhead планировщика. Правило большого пальца: 2-4 партиции на ядро.

Spark выбирает partitioner автоматически, но программист может задать свой. Hash partitioner - стандарт, делит по `hash(key) mod n`. Range partitioner - для сортировки, режет по диапазонам ключей. Custom partitioner - если знаешь данные лучше Spark'а. Та же логика, что в consistent hashing для распределённых KV-store: правильный hash определяет эффективность всей системы.

Co-located partitioning - читерский трюк. Если два RDD партиционированы одним и тем же partitioner'ом по одному и тому же ключу, join между ними не требует shuffle. Spark знает, что нужные пары уже на одной ноде. Это секретный соус Spark SQL: query planner смотрит на partitioning info и устраняет shuffle, где может.

**Coalesce vs repartition:** `coalesce(n)` уменьшает партиции без shuffle (через локальное слияние), `repartition(n)` всегда делает shuffle для равномерности. Coalesce дешевле, но может оставить дисбаланс. Repartition дороже, но даёт ровную нагрузку. Та же диалектика, что в L1 vs L2 регуляризации: один тяжелее, второй точнее.

Чем больше партиций, тем лучше параллелизм

Партиции - инструмент с двумя противоположными ограничениями: слишком мало = underutilization, слишком много = scheduler overhead

При 10000 партиций на 16-ядерном кластере scheduler тратит больше времени на их раздачу, чем worker'ы на обработку. Правило 2-4 партиции на ядро - эмпирически найденная середина. Та же логика, что в batch size для нейросетей: маленький batch = мало parallelism на GPU, огромный batch = OOM и хуже обобщение. Оптимум посередине

Как сократить количество партиций без shuffle?

Связанные темы

Куда MapReduce и Spark ведут дальше:

  • Распределённые системы — Spark - distributed system с fault tolerance через lineage
  • Шардирование БД — Partitioning в Spark - та же идея, что шардирование в Postgres/Cassandra
  • Distributed Training в DL — All-reduce градиентов - современный наследник MapReduce reduce-стадии
  • Streaming systems — Spark Streaming и Kafka Streams - тот же подход на бесконечном потоке

Ключевые идеи

  • MapReduce - две функции от программиста, всё остальное фреймворк делает сам
  • Spark RDD держит данные в RAM, отказоустойчивость через lineage вместо чекпоинтов
  • Shuffle - 80% времени job'а, оптимизация сводится к минимизации сетевого трафика
  • Partitioning определяет параллелизм - правило 2-4 партиции на ядро как стартовая точка
  • Skew (неравномерное распределение ключей) - частая причина простоев executor'ов

Вопросы для размышления

  • Когда задача структурно ложится на MapReduce, а когда требует чего-то другого (streaming, graph processing)?
  • Что лучше - reduceByKey или groupByKey, и почему ответ зависит от размера данных?
  • Как связаны идеи Spark partitioning и distributed gradient training в современном AI?

Связанные уроки

  • par-01 — Базовая мотивация параллелизма - почему вообще распределять вычисления
  • par-05 — Message passing - модель коммуникации, на которой строится MapReduce shuffle
  • ds-04-consistent-hashing — Партиционирование данных по hash - то же consistent hashing
  • db-23-sharding — Шардирование БД и партиционирование RDD - одна идея в разных контекстах
  • bd-02 — Big Data контекст - где MapReduce и Spark естественно появляются
  • dl-17 — Распределённое обучение нейросетей - наследник тех же идей о map+reduce градиентов
  • dist-03-fallacies
  • alg-01-big-o
MapReduce и Spark

0

1

Войти