Параллельные вычисления
MapReduce и Spark
2004. Один paper Джеффа Дина переворачивает индустрию. Через 20 лет тот же фундамент крутит данные в Anthropic, OpenAI, Tesla Autopilot, Netflix recommendations и Spotify Discover Weekly. MapReduce не умер - он стал воздухом, которым дышит big data.
- **Google index 2004:** первый production MapReduce обработал петабайты веб-страниц для построения индекса
- **Spark в Databricks:** обрабатывает 9 экзабайт данных в день в облачных кластерах
- **ML обучение:** distributed dataloader в PyTorch - тот же MapReduce под капотом
- **Spotify Discover Weekly:** аггрегации listening events на Spark - десятки миллиардов событий в неделю
MapReduce: бумага, изменившая обработку данных
2004. Jeff Dean и Sanjay Ghemawat публикуют в OSDI пейпер 'MapReduce: Simplified Data Processing on Large Clusters'. 13 страниц. В индустрии бомба. Google уже год индексирует веб через эту систему. Через пять лет Yahoo пишет Hadoop, и весь мир получает open-source. Без MapReduce не было бы big data в современном виде.
Идея до примитивности проста. Программист пишет две функции: $map(k_1, v_1) \to list(k_2, v_2)$ и $reduce(k_2, list(v_2)) \to list(v_3)$. Map применяется к каждому входному элементу. Reduce агрегирует значения с одинаковым ключом. Всё остальное - распределение, fault tolerance, retry - делает фреймворк. Программист думает в терминах functional combinators, а не параллельных потоков.
Та же абстракция, что в forward pass нейросети: матричное умножение - это и есть map+reduce. Каждый нейрон собирает сумму взвешенных входов (reduce по строке). Каждый weight применяется к своему input'у (map). PyTorch внутри делает MapReduce на GPU без слова MapReduce. Идея слишком фундаментальна, чтобы быть только про big data.
**Word count - канонический пример:** map(doc) → list of (word, 1). Reduce(word, [1,1,1,...]) → (word, sum). На 100 нодах считает частотность слов в 10 TB текста за минуты. Тот же алгоритм считает n-grams для тренировки LLM, BM25-индексов в поисковиках, embedding-статистик в Sentence-Transformers.
В чём ключевая абстракция MapReduce?
Spark RDD: lineage побеждает чекпоинты
2009. UC Berkeley. Матей Захария замечает: классический MapReduce пишет промежуточные результаты на диск между job'ами. Для итеративных алгоритмов (PageRank, k-means, ML обучение) это смерть - десять итераций = десять чтений и записей на HDFS. Он придумывает Spark с RDD - Resilient Distributed Dataset, абстракцией, которая держит данные в RAM, а отказоустойчивость даёт через lineage.
RDD - неизменяемый партиционированный набор данных. На него можно применить transformation: `map`, `filter`, `groupByKey`. Каждая transformation возвращает новый RDD, не вычисляя его сразу - lazy evaluation. Spark строит DAG зависимостей. Когда программист зовёт `action` (`collect`, `count`, `save`), весь DAG исполняется.
Lineage - запись о том, *как* RDD был получен из других RDD. Если партиция теряется при падении ноды, Spark пересчитывает её по lineage, а не восстанавливает с диска. Это та же идея, что в gradient checkpointing для нейросетей: не хранить, а пересчитывать - дешевле памяти, дороже вычислений, но при rare failure это выигрыш.
**Cache vs persist:** `.cache()` держит RDD в RAM, `.persist(MEMORY_AND_DISK)` спиллит на диск при OOM. Для PageRank на 30 итераций - принципиальная разница: с cache на хорошем кластере 100x speedup против Hadoop. Та же логика, что KV-cache в трансформере: не пересчитывать то, что уже посчитано.
Что даёт Spark преимущество в скорости над Hadoop MapReduce для итеративных задач?
Shuffle: дорогой час пик в распределённой обработке
После map каждый воркер держит свои локальные пары (key, value). Чтобы reduce увидел все значения одного ключа в одном месте, эти пары надо физически переместить по сети. Это и есть shuffle - этап, где данные летят через кластер. На большом джобе shuffle - 80% времени и 99% боли.
Алгоритм простой: hash(key) mod num_partitions определяет, какой reducer получит пару. Map-сторона пишет пары в локальные shuffle-файлы, разбитые по партициям. Reduce-сторона тянет свои партиции от всех маперов. $M \times R$ сетевых соединений, $O(N)$ записей на диск. Бенчмарки показывают: shuffle 1 TB между 100 нодами - 5-15 минут на 10 Gbit сети.
Та же боль есть в распределённом обучении нейросетей. All-reduce у NCCL для усреднения градиентов - архитектурный родственник shuffle. ZeRO-3 в DeepSpeed раскидывает параметры и градиенты так, чтобы минимизировать all-to-all коммуникацию. Кто понимает shuffle в Spark, легче понимает gradient sharding на GPU-кластере.
**Skew - убийца производительности:** если один ключ имеет 90% значений (например, NULL или 'user_id=unknown'), один reducer получает почти всю работу, остальные простаивают. Решения: salt'ить ключ ($key + random()$), брать broadcast join вместо shuffle join, агрегировать локально перед shuffle (combiner). Та же проблема, что long-tail distribution в RL: один outlier-эпизод тянет gradient за собой.
Почему shuffle - самая дорогая операция в Spark?
Partitioning: чем хорошее разбиение отличается от плохого
Партиция - физический кусок RDD, обрабатываемый одним task'ом на одном executor'е. Если RDD имеет 200 партиций и кластер 50 executor'ов - 50 task'ов идут параллельно, остальные ждут в очереди. Слишком мало партиций - underutilization, слишком много - overhead планировщика. Правило большого пальца: 2-4 партиции на ядро.
Spark выбирает partitioner автоматически, но программист может задать свой. Hash partitioner - стандарт, делит по `hash(key) mod n`. Range partitioner - для сортировки, режет по диапазонам ключей. Custom partitioner - если знаешь данные лучше Spark'а. Та же логика, что в consistent hashing для распределённых KV-store: правильный hash определяет эффективность всей системы.
Co-located partitioning - читерский трюк. Если два RDD партиционированы одним и тем же partitioner'ом по одному и тому же ключу, join между ними не требует shuffle. Spark знает, что нужные пары уже на одной ноде. Это секретный соус Spark SQL: query planner смотрит на partitioning info и устраняет shuffle, где может.
**Coalesce vs repartition:** `coalesce(n)` уменьшает партиции без shuffle (через локальное слияние), `repartition(n)` всегда делает shuffle для равномерности. Coalesce дешевле, но может оставить дисбаланс. Repartition дороже, но даёт ровную нагрузку. Та же диалектика, что в L1 vs L2 регуляризации: один тяжелее, второй точнее.
Чем больше партиций, тем лучше параллелизм
Партиции - инструмент с двумя противоположными ограничениями: слишком мало = underutilization, слишком много = scheduler overhead
При 10000 партиций на 16-ядерном кластере scheduler тратит больше времени на их раздачу, чем worker'ы на обработку. Правило 2-4 партиции на ядро - эмпирически найденная середина. Та же логика, что в batch size для нейросетей: маленький batch = мало parallelism на GPU, огромный batch = OOM и хуже обобщение. Оптимум посередине
Как сократить количество партиций без shuffle?
Связанные темы
Куда MapReduce и Spark ведут дальше:
- Распределённые системы — Spark - distributed system с fault tolerance через lineage
- Шардирование БД — Partitioning в Spark - та же идея, что шардирование в Postgres/Cassandra
- Distributed Training в DL — All-reduce градиентов - современный наследник MapReduce reduce-стадии
- Streaming systems — Spark Streaming и Kafka Streams - тот же подход на бесконечном потоке
Ключевые идеи
- MapReduce - две функции от программиста, всё остальное фреймворк делает сам
- Spark RDD держит данные в RAM, отказоустойчивость через lineage вместо чекпоинтов
- Shuffle - 80% времени job'а, оптимизация сводится к минимизации сетевого трафика
- Partitioning определяет параллелизм - правило 2-4 партиции на ядро как стартовая точка
- Skew (неравномерное распределение ключей) - частая причина простоев executor'ов
Вопросы для размышления
- Когда задача структурно ложится на MapReduce, а когда требует чего-то другого (streaming, graph processing)?
- Что лучше - reduceByKey или groupByKey, и почему ответ зависит от размера данных?
- Как связаны идеи Spark partitioning и distributed gradient training в современном AI?
Связанные уроки
- par-01 — Базовая мотивация параллелизма - почему вообще распределять вычисления
- par-05 — Message passing - модель коммуникации, на которой строится MapReduce shuffle
- ds-04-consistent-hashing — Партиционирование данных по hash - то же consistent hashing
- db-23-sharding — Шардирование БД и партиционирование RDD - одна идея в разных контекстах
- bd-02 — Big Data контекст - где MapReduce и Spark естественно появляются
- dl-17 — Распределённое обучение нейросетей - наследник тех же идей о map+reduce градиентов
- dist-03-fallacies
- alg-01-big-o