5.1 Map only job
Настав час описати різні прийоми, які дозволяють ефективно використовувати MapReduce для вирішення практичних завдань, а також показати деякі особливості Hadoop, які дозволяють спростити розробку або суттєво прискорити виконання MapReduce-завдання на кластері.
Як ми пам'ятаємо, MapReduce складається із стадій Map, Shuffle та Reduce. Як правило, у практичних завданнях найважчою виявляється стадія Shuffle, тому що на цій стадії відбувається сортування даних. Насправді існує низка завдань, у яких можна обійтися лише стадією Map. Ось приклади таких завдань:
- Фільтрування даних (наприклад, "Знайти всі записи з IP-адресаи 123.123.123.123" у логах web-сервера);
- Перетворення даних («Видалити колонку в CSV-логах»);
- Завантаження та вивантаження даних із зовнішнього джерела («Вставити всі записи з лога до бази даних»).
Такі завдання вирішуються за допомогою Map-Only. При створенні Map-Only завдання Hadoop потрібно вказати нульову кількість reducer'ів:

Приклад конфігурації map-only завдання на hadoop:
Native interface | Hadoop Streaming Interface |
---|---|
Вказати нульову кількість ред'юсерів під час конфігурації job'a:
|
Не вказуємо ред'юсер та вказуємо нульову кількість ред'юсерів. Приклад:
|
Map Only jobs можуть бути дуже корисними. Наприклад, у платформі Facetz.DCA виявлення характеристик користувачів з їхньої поведінки використовується саме один великий map-only, кожен маппер якого приймає на вхід користувача і вихід віддає його характеристики.
5.2 Combine
Як я вже писав, зазвичай найважча стадія під час виконання Map-Reduce завдання – це стадія shuffle. Відбувається це тому, що проміжні результати (вихід mapper'a) записуються на диск, сортуються та передаються по мережі. Однак існують завдання, в яких така поведінка здається не дуже розумною. Наприклад, у тій же задачі підрахунку слів у документах можна попередньо передагрегувати результати виходів кількох mapper'ів на одному вузлі map-reduce завдання, і передавати на reducer вже підсумовані значення кожної машині.

У hadoop для цього можна визначити комбінуючу функцію, яка оброблятиме вихід частини mapper-ів. Комбінуюча функція дуже схожа на reduce – вона приймає на вхід вихід частини mapper'ів та видає агрегований результат для цих mapper'ів, тому дуже часто reducer використовують і як combiner. Важлива відмінність від reduce – на комбінуючу функцію потрапляють в повному обсязі значення, відповідні одному ключу .
Більше того, hadoop не гарантує того, що комбінуюча функція взагалі буде виконана для виходу mapper'a. Тому комбінуюча функція не завжди застосовна, наприклад у разі пошуку медіанного значення по ключу. Тим не менш, у тих завданнях, де комбінуюча функція застосовна, її використання дозволяє досягти суттєвого приросту до швидкості виконання MapReduce-завдання.
Використання Combiner'a на hadoop:
Native Interface | Hadoop streaming |
---|---|
При конфігурації job-a вказати клас Combiner. Як правило, він збігається з Reducer:
|
У параметрах командного рядка вказати команду Combiner. Як правило, ця команда збігається з командою reducer'a. Приклад:
|
5.3 Ланцюжки MapReduce-завдань
Бувають ситуації, коли для вирішення завдання одним MapReduce не обійтися. Наприклад, розглянемо трохи видозмінене завдання WordCount: є набір текстових документів, необхідно порахувати, скільки слів зустрілося від 1 до 1000 разів у наборі, скільки слів від 1001 до 2000, скільки від 2001 до 3000 тощо. Для вирішення нам знадобиться 2 MapReduce job'а:
- Видозмінений wordcount, який для кожного слова розрахує, до якого з інтервалів воно потрапило;
- MapReduce, який підраховує, скільки разів у виході першого MapReduce зустрівся кожен з інтервалів.
Рішення на псевдокоді:
|
|
|
|
Для того, щоб виконати послідовність MapReduce-завдань на hadoop, досить просто як вхідні дані для другого завдання вказати папку, яка була вказана як output для першої і запустити їх по черзі.
На практиці ланцюжка MapReduce-завдань можуть бути досить складними послідовностями, в яких MapReduce-завдання можуть бути підключені як послідовно, так і паралельно один одному. Для спрощення управління такими планами виконання завдань існують окремі інструменти типу oozie та luigi, яким буде присвячена окрема стаття цього циклу.

5.4 Distributed cache
Важливим механізмом Hadoop є Distributed Cache. Distributed Cache дозволяє додавати файли (наприклад, текстові файли, архіви, jar-файли) до оточення, в якому виконується MapReduce-завдання.
Можна додавати файли, що зберігаються на HDFS, локальні файли (локальні для машини, з якої виконується запуск завдання). Я вже неявно показував, як використовувати Distributed Cache разом із hadoop streaming: додаючи через опцію -file файли mapper.py та reducer.py. Насправді можна додавати не тільки mapper.py та reducer.py, а взагалі довільні файли, і потім користуватися ними начебто вони знаходяться у локальній папці.
Використання Distributed Cache:
Native API |
---|
|
Hadoop Streaming |
---|
#перераховуємо файли, які необхідно додати до distributed cache у параметрі –files. Параметр –files має йти перед іншими параметрами.
приклад використання:
|
5.5 Reduce Join
Ті, хто звик працювати з реляційними базами, часто користуються дуже зручною операцією Join, що дозволяє спільно обробити зміст деяких таблиць, об'єднавши їх за деяким ключем. Працюючи з великими даними таке завдання теж іноді виникає. Розглянемо наступний приклад:
Є логи двох web-серверів, кожен лог має такий вигляд:
t\t
Приклад шматочка лога:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
Необхідно порахувати для кожної IP-адресаи на якій із 2-х серверів він частіше заходив. Результат має бути представлений у вигляді:
\t
Приклад частини результату:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
На жаль, на відміну від реляційних баз даних, у загальному випадку об'єднання двох логів за ключом (в даному випадку – за IP-адресаою) є досить важкою операцією і вирішується за допомогою 3-х MapReduce і патерну Reduce Join:

ReduceJoin працює наступним чином:
1) На кожен із вхідних логів запускається окремий MapReduce (Map only), що перетворює вхідні дані до наступного виду:
key -> (type, value
Де key – це ключ, яким потрібно об'єднувати таблиці, Type – тип таблиці (first чи second у разі), а Value – це будь-які додаткові дані, прив'язані до ключу.
2) Виходи обох MapReduce подаються на вхід 3-го MapReduce, який, власне, і виконує об'єднання. Цей MapReduce містить порожній Mapper, який просто копіює вхідні дані. Далі shuffle розкладає дані за ключами та подає на вхід редьюсеру у вигляді:
key -> [(type, value)]
Важливо, що в цей момент на редьюсер потрапляють записи з обох логів і при цьому по полю типу можна ідентифікувати, з якого з двох логів потрапило конкретне значення. Значить даних достатньо, щоб вирішити вихідне завдання. У нашому випадку reducer просто повинен порахувати для кожного ключа записів, з яким тип зустрілося більше і вивести цей тип.
5.6 MapJoin
Паттерн ReduceJoin описує загальний випадок поєднання двох логів по ключу. Однак є окремий випадок, при якому завдання можна суттєво спростити та прискорити. Це випадок, у якому одне із логів має розмір значно меншого розміру, ніж інший. Розглянемо таке завдання:
Є 2 логи. Перший лог містить лог web-сервера (такий як у попередній задачі), другий файл (розміром в 100кб) містить відповідність URL-> Тематика. Приклад 2-го файлу:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
Для кожної IP-адресаи необхідно розрахувати сторінки якої категорії з цієї IP-адресаи завантажувалися найчастіше.
У цьому випадку нам також потрібно виконати Join 2-х логів за URL. Однак у цьому випадку нам не обов'язково запускати 3 MapReduce, тому що другий лог повністю влізе в пам'ять. Для того, щоб вирішити задачу за допомогою 1-го MapReduce, ми можемо завантажити другий лог в Distributed Cache, а при ініціалізації Mapper'a просто вважати його на згадку, поклавши його в словник -> topic.
Далі завдання вирішується так:
Map:
# знаходимо тематику кожної зі сторінок першого лога
input_line -> [ip, topic]
Reduce:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce отримує на вхід ip та список усіх тематик, просто обчислює, яка з тематик зустрілася найчастіше. Таким чином завдання вирішено за допомогою 1-го MapReduce, а власне Join взагалі відбувається всередині map (тому якби не потрібна була додаткова агрегація по ключу – можна було б обійтися MapOnly job-ом):

ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ