Big Data

MapReduce: парадигма

Цели урока

  • Понимать роль каждой фазы Map, Shuffle, Reduce в pipeline
  • Объяснять, почему Shuffle - бутылочное горлышко и как его оптимизировать
  • Применять Combiner только для ассоциативных и коммутативных операций
  • Знать ограничения MapReduce и когда использовать Spark вместо него

Предварительные знания

  • Hadoop Ecosystem

2008 год. New York Times хочет оцифровать 11 миллионов газетных статей за 150 лет. На одном сервере - месяцы работы. Инженер Дерек Готфрид запустил MapReduce-задачу на 100 виртуальных машинах Amazon EC2. Результат: 4 TB обработано за 24 часа, потрачено 240 долларов. Принцип «раздели и властвуй», масштабированный на сотни машин.

  • **Google** изначально использовал MapReduce для построения поискового индекса всего веба - триллионы страниц
  • **Spotify** обрабатывает миллиарды событий прослушивания через MapReduce-подобные pipeline для Wrapped (итоги года)
  • **NASA** использует MapReduce для анализа терабайтов спутниковых снимков и телеметрии

Джефф Дин, Санджай Гхемават и бумага, которая изменила Big Data

В 2004 году инженеры Google Джефф Дин и Санджай Гхемават опубликовали статью «MapReduce: Simplified Data Processing on Large Clusters». Они не изобрели map и reduce - эти операции пришли из функционального программирования (Lisp, 1960-е). Но Google взял абстракцию и сделал её промышленной системой, автоматически обрабатывающей отказы узлов и балансирующей нагрузку. Статья вдохновила создание Apache Hadoop в 2006 году - открытой реализации, которая демократизировала Big Data.

Map - трансформация данных

Фаза **Map** - первый этап обработки. Каждый узел кластера получает свой блок данных из HDFS и применяет к нему функцию-mapper. Mapper читает входные пары `(key, value)` и генерирует промежуточные пары `(key', value')`. Главное: **mappers работают параллельно и независимо** - каждый обрабатывает только свой блок, не зная о других.

В реальном Hadoop MapReduce mapper читает данные построчно. Входной ключ - смещение строки в файле (offset), значение - текст строки. Mapper выдаёт новые пары ключ-значение.

**Hadoop Streaming** позволяет писать mapper и reducer на любом языке (Python, Ruby, bash). Данные передаются через stdin/stdout. Для production обычно используют Java (нативный API), но для прототипов Streaming идеален.

Один mapper обрабатывает ровно один HDFS-блок (128 MB по умолчанию). Если файл занимает 10 блоков - запустятся 10 mappers. Если 100 файлов по 1 блоку - 100 mappers. Количество mappers определяется **количеством входных блоков**, а не количеством узлов.

Файл в HDFS занимает 5 блоков по 128 MB. Сколько mapper-задач запустится?

Shuffle - группировка и сортировка

После Map-фазы данные разбросаны: пары `(hello, 1)` могут быть на разных узлах. Перед Reduce нужно **собрать все значения одного ключа на один reducer**. Это делает Shuffle - самая затратная фаза MapReduce, потому что требует **передачи данных по сети** между узлами. Именно поэтому Apache Spark вытеснил Hadoop MapReduce для итеративных задач.

Shuffle состоит из нескольких подэтапов. Каждый mapper сортирует выходные данные по ключу и записывает на локальный диск. Затем **Partitioner** определяет, какому reducer отправить каждый ключ (обычно `hash(key) % num_reducers`). Данные передаются по сети и **merge-сортируются** на стороне reducer.

**Shuffle - главный враг производительности MapReduce.** Все промежуточные данные пишутся на диск (mapper side), передаются по сети и снова пишутся на диск (reducer side). Для 1 TB промежуточных данных это означает ~3 TB disk I/O + 1 TB network I/O. Apache Spark решает эту проблему, храня данные в памяти.

**Custom Partitioner** позволяет контролировать, какие ключи попадут на какой reducer. По умолчанию используется HashPartitioner. Но если данные skewed (один ключ встречается в 90% записей), этот ключ создаст bottleneck на одном reducer, пока остальные простаивают.

**Data Skew** - одна из главных проблем MapReduce. Если ключ "null" встречается в 50% записей, один reducer получит 50% всей работы. Решение: salting (добавить случайный суффикс к ключу), pre-aggregation, или custom partitioner.

Почему Shuffle - самая медленная фаза MapReduce?

Reduce - агрегация результатов

После Shuffle каждый reducer получает **все значения для своего набора ключей**, уже отсортированные. Функция-reducer агрегирует значения: суммирует, считает среднее, находит максимум - зависит от задачи. Результат записывается обратно в HDFS.

Для корректной работы reducer функция агрегации должна обладать определёнными свойствами. **Ассоциативность** - порядок группировки не важен: `(a + b) + c = a + (b + c)`. **Коммутативность** - порядок операндов не важен: `a + b = b + a`. Это гарантирует правильный результат при параллельном выполнении.

ОперацияАссоциативнаяКоммутативнаяПодходит для Reduce
SUM (сумма)ДаДаИдеально
COUNT (подсчёт)ДаДаИдеально
MAX / MINДаДаИдеально
AVERAGE (среднее)Нет!ДаНужно хранить sum + count, а не avg
CONCAT (строки)ДаНет!Результат зависит от порядка
MEDIANНет!Нет!Требует все данные сразу

Всегда тестировать mapper и reducer **локально** через pipe (`mapper | sort | reducer`) перед запуском на кластере. Отладка на кластере - это минуты на запуск, логи разбросаны по узлам. Локальный тест - секунды.

Вычисление медианы зарплат по отделам через MapReduce. Mapper выдаёт (отдел, зарплата). Какая проблема возникнет в Reduce?

Combiners - локальная оптимизация

Shuffle - бутылочное горлышко из-за network I/O. Что если **уменьшить объём данных ДО передачи по сети**? Для этого существует **Combiner** - мини-reducer, который запускается **на стороне mapper**, сразу после Map-фазы.

Combiner - это тот же reducer, но выполняется **локально** на узле mapper. Не все операции поддерживают combiner. Правило: combiner можно использовать, если **функция агрегации ассоциативна и коммутативна**.

ОперацияCombiner возможен?Почему
SUMДаsum(1,1) + sum(1,1) = sum(1,1,1,1) = 4
COUNTДаcount→sum: (word,1)+(word,1) → (word,2)
MAX / MINДаmax(max(a,b), max(c,d)) = max(a,b,c,d)
AVERAGEНет напрямуюavg(avg(1,3), avg(5,7)) = avg(2,6) = 4 ≠ avg(1,3,5,7) = 4. Передавай (sum, count)
MEDIANНетМедиана подмножеств ≠ общая медиана
DISTINCTЧастичноЛокальный distinct уменьшит данные, но финальный нужен в reducer

На практике combiner может сократить network traffic в **десятки раз**. Для Word Count на естественном языке (где слова часто повторяются) экономия достигает 80-90%. Для данных с уникальными ключами (UUID) - combiner бесполезен.

**Hadoop не гарантирует вызов Combiner.** Framework может вызвать его 0, 1 или несколько раз. Поэтому combiner - это **оптимизация**, а не часть логики. Результат должен быть корректным и без combiner.

MapReduce подходит для любых распределённых вычислений - это универсальная парадигма.

MapReduce плохо подходит для итеративных алгоритмов (ML, PageRank), real-time обработки и графовых задач. Для них существуют Spark (in-memory), Flink (streaming), GraphX (графы).

Каждая итерация MapReduce записывает промежуточные данные на диск. Алгоритм PageRank требует 20-50 итераций - это 20-50 циклов disk I/O. Spark хранит данные в RAM между итерациями, что в 10-100 раз быстрее. Для каждого класса задач есть оптимальный инструмент.

Вычисление медианы зарплат через MapReduce. Можно ли использовать Combiner?

Ключевые идеи

  • **Map** - параллельная трансформация: каждый mapper обрабатывает один HDFS-блок, выдавая пары (key, value). Помните 11 миллионов статей NYT? Каждая статья обрабатывалась отдельным mapper на своём узле
  • **Shuffle** - самая дорогая фаза: группировка по ключу + сортировка + передача данных по сети между узлами
  • **Reduce** - агрегация: функция должна быть ассоциативной и коммутативной для корректной параллельной работы
  • **Combiner** - оптимизация: локальный reduce на mapper-узле, сокращает network traffic в разы. Работает только для ассоциативных операций
  • MapReduce - не серебряная пуля: для итеративных, real-time и графовых задач используйте Spark, Flink, GraphX

Связанные темы

Разобран MapReduce в деталях. Следующий шаг - Apache Spark, который решает главную проблему MapReduce (disk I/O):

  • Hadoop Ecosystem — MapReduce - одна из трёх основ Hadoop (HDFS + YARN + MR)
  • Что такое Big Data: 5V — MapReduce решает проблему Volume - обработка петабайтов данных

Вопросы для размышления

  • Почему в Word Count combiner идентичен reducer, а для вычисления среднего - нет? Какое математическое свойство здесь играет роль?
  • Представьте: 90% данных имеют ключ 'null'. Как это повлияет на Shuffle и Reduce? Как решить проблему?
  • Если бы MapReduce хранил промежуточные данные в памяти, а не на диске - какие новые проблемы возникли бы? (Подсказка: что если узел упадёт?)

Связанные уроки

  • bd-02 — HDFS и блочная структура данных - фундамент Map-фазы
  • bd-01 — Концепция 5V и Volume как причина существования MapReduce
  • bd-04 — Apache Spark решает главный минус MapReduce - disk I/O между итерациями
  • alg-01 — Ассоциативность и коммутативность определяют применимость Combiner
  • alg-04 — Divide and conquer - базовая идея за Map/Reduce разбиением задачи
  • alg-19-divide-conquer
MapReduce: парадигма

0

1

Войти