Big Data
MapReduce: парадигма
Цели урока
- Понимать роль каждой фазы Map, Shuffle, Reduce в pipeline
- Объяснять, почему Shuffle - бутылочное горлышко и как его оптимизировать
- Применять Combiner только для ассоциативных и коммутативных операций
- Знать ограничения MapReduce и когда использовать Spark вместо него
Предварительные знания
2008 год. New York Times хочет оцифровать 11 миллионов газетных статей за 150 лет. На одном сервере - месяцы работы. Инженер Дерек Готфрид запустил MapReduce-задачу на 100 виртуальных машинах Amazon EC2. Результат: 4 TB обработано за 24 часа, потрачено 240 долларов. Принцип «раздели и властвуй», масштабированный на сотни машин.
- **Google** изначально использовал MapReduce для построения поискового индекса всего веба - триллионы страниц
- **Spotify** обрабатывает миллиарды событий прослушивания через MapReduce-подобные pipeline для Wrapped (итоги года)
- **NASA** использует MapReduce для анализа терабайтов спутниковых снимков и телеметрии
Джефф Дин, Санджай Гхемават и бумага, которая изменила Big Data
В 2004 году инженеры Google Джефф Дин и Санджай Гхемават опубликовали статью «MapReduce: Simplified Data Processing on Large Clusters». Они не изобрели map и reduce - эти операции пришли из функционального программирования (Lisp, 1960-е). Но Google взял абстракцию и сделал её промышленной системой, автоматически обрабатывающей отказы узлов и балансирующей нагрузку. Статья вдохновила создание Apache Hadoop в 2006 году - открытой реализации, которая демократизировала Big Data.
Map - трансформация данных
Фаза **Map** - первый этап обработки. Каждый узел кластера получает свой блок данных из HDFS и применяет к нему функцию-mapper. Mapper читает входные пары `(key, value)` и генерирует промежуточные пары `(key', value')`. Главное: **mappers работают параллельно и независимо** - каждый обрабатывает только свой блок, не зная о других.
В реальном Hadoop MapReduce mapper читает данные построчно. Входной ключ - смещение строки в файле (offset), значение - текст строки. Mapper выдаёт новые пары ключ-значение.
**Hadoop Streaming** позволяет писать mapper и reducer на любом языке (Python, Ruby, bash). Данные передаются через stdin/stdout. Для production обычно используют Java (нативный API), но для прототипов Streaming идеален.
Один mapper обрабатывает ровно один HDFS-блок (128 MB по умолчанию). Если файл занимает 10 блоков - запустятся 10 mappers. Если 100 файлов по 1 блоку - 100 mappers. Количество mappers определяется **количеством входных блоков**, а не количеством узлов.
Файл в HDFS занимает 5 блоков по 128 MB. Сколько mapper-задач запустится?
Shuffle - группировка и сортировка
После Map-фазы данные разбросаны: пары `(hello, 1)` могут быть на разных узлах. Перед Reduce нужно **собрать все значения одного ключа на один reducer**. Это делает Shuffle - самая затратная фаза MapReduce, потому что требует **передачи данных по сети** между узлами. Именно поэтому Apache Spark вытеснил Hadoop MapReduce для итеративных задач.
Shuffle состоит из нескольких подэтапов. Каждый mapper сортирует выходные данные по ключу и записывает на локальный диск. Затем **Partitioner** определяет, какому reducer отправить каждый ключ (обычно `hash(key) % num_reducers`). Данные передаются по сети и **merge-сортируются** на стороне reducer.
**Shuffle - главный враг производительности MapReduce.** Все промежуточные данные пишутся на диск (mapper side), передаются по сети и снова пишутся на диск (reducer side). Для 1 TB промежуточных данных это означает ~3 TB disk I/O + 1 TB network I/O. Apache Spark решает эту проблему, храня данные в памяти.
**Custom Partitioner** позволяет контролировать, какие ключи попадут на какой reducer. По умолчанию используется HashPartitioner. Но если данные skewed (один ключ встречается в 90% записей), этот ключ создаст bottleneck на одном reducer, пока остальные простаивают.
**Data Skew** - одна из главных проблем MapReduce. Если ключ "null" встречается в 50% записей, один reducer получит 50% всей работы. Решение: salting (добавить случайный суффикс к ключу), pre-aggregation, или custom partitioner.
Почему Shuffle - самая медленная фаза MapReduce?
Reduce - агрегация результатов
После Shuffle каждый reducer получает **все значения для своего набора ключей**, уже отсортированные. Функция-reducer агрегирует значения: суммирует, считает среднее, находит максимум - зависит от задачи. Результат записывается обратно в HDFS.
Для корректной работы reducer функция агрегации должна обладать определёнными свойствами. **Ассоциативность** - порядок группировки не важен: `(a + b) + c = a + (b + c)`. **Коммутативность** - порядок операндов не важен: `a + b = b + a`. Это гарантирует правильный результат при параллельном выполнении.
| Операция | Ассоциативная | Коммутативная | Подходит для Reduce |
|---|---|---|---|
| SUM (сумма) | Да | Да | Идеально |
| COUNT (подсчёт) | Да | Да | Идеально |
| MAX / MIN | Да | Да | Идеально |
| AVERAGE (среднее) | Нет! | Да | Нужно хранить sum + count, а не avg |
| CONCAT (строки) | Да | Нет! | Результат зависит от порядка |
| MEDIAN | Нет! | Нет! | Требует все данные сразу |
Всегда тестировать mapper и reducer **локально** через pipe (`mapper | sort | reducer`) перед запуском на кластере. Отладка на кластере - это минуты на запуск, логи разбросаны по узлам. Локальный тест - секунды.
Вычисление медианы зарплат по отделам через MapReduce. Mapper выдаёт (отдел, зарплата). Какая проблема возникнет в Reduce?
Combiners - локальная оптимизация
Shuffle - бутылочное горлышко из-за network I/O. Что если **уменьшить объём данных ДО передачи по сети**? Для этого существует **Combiner** - мини-reducer, который запускается **на стороне mapper**, сразу после Map-фазы.
Combiner - это тот же reducer, но выполняется **локально** на узле mapper. Не все операции поддерживают combiner. Правило: combiner можно использовать, если **функция агрегации ассоциативна и коммутативна**.
| Операция | Combiner возможен? | Почему |
|---|---|---|
| SUM | Да | sum(1,1) + sum(1,1) = sum(1,1,1,1) = 4 |
| COUNT | Да | count→sum: (word,1)+(word,1) → (word,2) |
| MAX / MIN | Да | max(max(a,b), max(c,d)) = max(a,b,c,d) |
| AVERAGE | Нет напрямую | avg(avg(1,3), avg(5,7)) = avg(2,6) = 4 ≠ avg(1,3,5,7) = 4. Передавай (sum, count) |
| MEDIAN | Нет | Медиана подмножеств ≠ общая медиана |
| DISTINCT | Частично | Локальный distinct уменьшит данные, но финальный нужен в reducer |
На практике combiner может сократить network traffic в **десятки раз**. Для Word Count на естественном языке (где слова часто повторяются) экономия достигает 80-90%. Для данных с уникальными ключами (UUID) - combiner бесполезен.
**Hadoop не гарантирует вызов Combiner.** Framework может вызвать его 0, 1 или несколько раз. Поэтому combiner - это **оптимизация**, а не часть логики. Результат должен быть корректным и без combiner.
MapReduce подходит для любых распределённых вычислений - это универсальная парадигма.
MapReduce плохо подходит для итеративных алгоритмов (ML, PageRank), real-time обработки и графовых задач. Для них существуют Spark (in-memory), Flink (streaming), GraphX (графы).
Каждая итерация MapReduce записывает промежуточные данные на диск. Алгоритм PageRank требует 20-50 итераций - это 20-50 циклов disk I/O. Spark хранит данные в RAM между итерациями, что в 10-100 раз быстрее. Для каждого класса задач есть оптимальный инструмент.
Вычисление медианы зарплат через MapReduce. Можно ли использовать Combiner?
Ключевые идеи
- **Map** - параллельная трансформация: каждый mapper обрабатывает один HDFS-блок, выдавая пары (key, value). Помните 11 миллионов статей NYT? Каждая статья обрабатывалась отдельным mapper на своём узле
- **Shuffle** - самая дорогая фаза: группировка по ключу + сортировка + передача данных по сети между узлами
- **Reduce** - агрегация: функция должна быть ассоциативной и коммутативной для корректной параллельной работы
- **Combiner** - оптимизация: локальный reduce на mapper-узле, сокращает network traffic в разы. Работает только для ассоциативных операций
- MapReduce - не серебряная пуля: для итеративных, real-time и графовых задач используйте Spark, Flink, GraphX
Связанные темы
Разобран MapReduce в деталях. Следующий шаг - Apache Spark, который решает главную проблему MapReduce (disk I/O):
- Hadoop Ecosystem — MapReduce - одна из трёх основ Hadoop (HDFS + YARN + MR)
- Что такое Big Data: 5V — MapReduce решает проблему Volume - обработка петабайтов данных
Вопросы для размышления
- Почему в Word Count combiner идентичен reducer, а для вычисления среднего - нет? Какое математическое свойство здесь играет роль?
- Представьте: 90% данных имеют ключ 'null'. Как это повлияет на Shuffle и Reduce? Как решить проблему?
- Если бы MapReduce хранил промежуточные данные в памяти, а не на диске - какие новые проблемы возникли бы? (Подсказка: что если узел упадёт?)
Связанные уроки
- bd-02 — HDFS и блочная структура данных - фундамент Map-фазы
- bd-01 — Концепция 5V и Volume как причина существования MapReduce
- bd-04 — Apache Spark решает главный минус MapReduce - disk I/O между итерациями
- alg-01 — Ассоциативность и коммутативность определяют применимость Combiner
- alg-04 — Divide and conquer - базовая идея за Map/Reduce разбиением задачи
- alg-19-divide-conquer