1. Map only job
Настав час описати різні прийоми, які дозволяють ефективно використовувати MapReduce для вирішення практичних завдань, а також показати деякі особливості Hadoop, які дозволяють спростити розробку або суттєво прискорити виконання MapReduce-завдання на кластері.
Як ми пам'ятаємо, MapReduce складається зі стадій Map, Shuffle та Reduce. Як правило, у практичних завданнях найважчою виявляється стадія Shuffle, тому що на цій стадії відбувається сортування даних. Насправді існує низка завдань, у яких можна обійтися лише стадією Map. Ось приклади таких завдань:
- Фільтрування даних (наприклад, «Знайти всі записи з IP-адреси 123.123.123.123» у логах вебсервера);
- Перетворення даних («Видалити колонку в csv-логах»);
- Завантаження та вивантаження даних із зовнішнього джерела («Вставити всі записи з лога до бази даних»).
Такі завдання вирішуються за допомогою Map-Only. При створенні Map-Only завдання в Hadoop потрібно вказати нульову кількість reducer'ів:
Приклад конфігурації map-only завдання на hadoop:
job.setNumReduceTasks(0);
Не вказуємо редьюсер і вказуємо нульову кількість ред'юсерів. Приклад:
hadoop jar hadoop-streaming.jar \
-D mapred.reduce.tasks=0\
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-file "mapper.py"
Native interface | Hadoop Streaming Interface |
---|---|
Map Only jobs насправді можуть бути дуже корисними. Наприклад, у платформі Facetz.DCA виявлення характеристик користувачів з їхньої поведінки використовується саме один великий map-only, кожен маппер якого приймає на вхід користувача і вихід віддає його характеристики.
2. Combine
Як я вже писав, зазвичай найважча стадія під час виконання Map-Reduce завдання — це стадія shuffle. Відбувається це тому, що проміжні результати (вихід mapper-a) записуються на диск, сортуються та передаються по мережі. Проте існують завдання, у яких така поведінка здається не дуже розумною. Наприклад, у тій же задачі підрахунку слів у документах можна попередньо передагрегувати результати виходів кількох mapper-ів на одному вузлі map-reduce завдання, та передавати на reducer вже підсумовані значення кожної машини.
У hadoop для цього можна визначити комбінуючу функцію, яка оброблятиме вихід частини mapper-ів. Комбінуюча функція дуже схожа на reduce: вона приймає на вхід вихід частини mapper-ів та видає агрегований результат для цих mapper-ів, тому дуже часто reducer використовують і як combiner. Важлива відмінність від reduce — на функцію, що комбінує, потрапляють не всі значення, що відповідають одному ключу.
Більш того, hadoop не гарантує, що комбінуюча функція взагалі буде виконана для виходу mapper-a. Тому комбінуюча функція не завжди застосовується. Наприклад у разі пошуку медіанного значення по ключу. Тим не менш, у тих завданнях, де комбінуючу функцію можна застосувати, її використання дозволяє досягти суттєвого приросту до швидкості виконання MapReduce-завдання.
Використання Combiner-a на hadoop:
Під час конфігурації job-и вказати клас-Combiner. Зазвичай він збігається з Reducer:
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-reducer "python reducer.py"\
-combiner "python reducer.py"\
-file "mapper.py"\
-file "reducer.py"\
Native Interface | Hadoop streaming |
---|---|
3. Ланцюжки MapReduce-завдань
Бують ситуації, коли для вирішення завдання одним MapReduce не обійтися. Наприклад, розглянемо трохи видозмінене завдання WordCount: є набір текстових документів, необхідно порахувати, скільки слів зустрілося від 1 до 1000 разів у наборі, скільки слів від 1001 до 2000, скільки від 2001 до 3000 і так далі. Для вирішення нам знадобиться 2 MapReduce job'а:
- Відозмінений wordcount, який для кожного слова розрахує, до якого з інтервалів воно потрапило;
- MapReduce, який підраховує, скільки разів у виході першого MapReduce зустрівся кожен з інтервалів.
Рішення на псевдокоді:
|
|
|
|
Для того, щоб виконати послідовність MapReduce-задач на hadoop, достатньо просто як вхідні дані для другого завдання вказати папку, яка була вказана як output для першої, і запустити їх по черзі.
На практиці ланцюжки MapReduce-завдань можуть бути досить складними послідовностями, в яких MapReduce-завдання можуть підключатися як послідовно, так і паралельно один одному. Для спрощення управління такими планами виконання завдань існують окремі інструменти типу oozie та luigi, яким буде присвячена окрема стаття цього циклу.
4. Distributed cache
Важливим механізмом у Hadoop є Distributed Cache. Distributed Cache дозволяє додавати файли (наприклад, текстові файли, архіви, jar-файли) до оточення, в якому виконується MapReduce-завдання.
Можно додавати файли, що зберігаються на HDFS, локальні файли (локальні для тієї машини, з якої виконується запуск завдання). Я вже неявно показував, як використовувати Distributed Cache разом із hadoop streaming: додаючи через опцію -file файли mapper.py та reducer.py. Насправді можна додавати не лише mapper.py та reducer.py, а взагалі довільні файли, і потім користуватися ними начебто вони знаходяться у локальній папці.
Використання Distributed Cache:
Native API |
---|
|
Hadoop Streaming |
---|
#перераховуємо файли, які потрібно додати до distributed cache у параметрі –files. Параметр –files має йти перед другими параметрами.
приклад використання: |
5.5 Reduce Join
Ті, хто звик працювати з реляційними базами, часто користуються дуже зручною операцією Join, що дозволяє спільно обробити зміст деяких таблиць, об'єднавши їх за деяким ключем. Працюючи з великими даними таке завдання теж іноді виникає. Розглянемо наступний приклад:
Є логи двох web-серверів, кожен лог має такий вигляд:
t\t
Приклад шматочка лога:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
Необхідно порахувати для кожної IP-адреси, на який із 2-х серверів він частіше заходив. Результат має бути поданий у вигляді:
\t
Приклад частини результату:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
На жаль, на відміну від реляційних баз даних, в загальному уявленні об'єднання двох логів за ключем (у цьому випадку — за IP-адресою) є досить важкою операцією, і вирішується за допомогою 3-х MapReduce та патерну Reduce Join:
ReduceJoin працює таким чином:
1) На кожен із вхідних логів запускається окремий MapReduce (Map only), що перетворює вхідні дані до такого вигляду:
key -> (type, value
Де key — це ключ, яким потрібно об'єднувати таблиці, Type — тип таблиці (first чи second у разі), а Value — це будь-які додаткові дані, прив'язані до ключа.
2) Виходи обох MapReduce подаються на вхід 3-му MapReduce, який, власне, і виконує об'єднання. Цей MapReduce містить порожній Mapper, який просто копіює вхідні дані. Далі shuffle розкладає дані за ключами та подає на вхід редьюсеру у вигляді:
key -> [(type, value)]
Важливо, що в цей момент на редьюсер потрапляють записи з обох логів і водночас по полю типу можна ідентифікувати, з якого з двох логів потрапило конкретне значення. Значить даних достатньо, щоб вирішити вихідне завдання. У нашому випадку reducer просто повинен порахувати для кожного ключа записів, з яким тип зустрілося більше і вивести цей тип.
6. MapJoin
Паттерн ReduceJoin описує загальний випадок об'єднання двох логів за ключем. Однак є окремий випадок, при якому завдання можна суттєво спростити та прискорити. Це випадок, у якому одне із логів має розмір значно меншого розміру, ніж інший. Розглянемо таке завдання:
Є 2 логи. Перший лог містить лог вебсервера (такий як у попередній задачі), другий файл (розміром в 100кб) містить відповідність URL-> Тематика. Приклад 2-го файлу:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
Для кожної IP-адреси необхідно розрахувати сторінки якої категорії з даної IP-адреси завантажувалися найчастіше.
У цьому випадку нам теж необхідно виконати Join 2-х логів за URL. Однак у цьому випадку нам не обов'язково запускати 3 MapReduce, тому що другий лог повністю влізе в пам'ять. Для того, щоб вирішити задачу за допомогою 1-го MapReduce, ми можемо завантажити другий лог в Distributed Cache, а під час ініціалізації Mapper-a просто рахувати його на згадку, поклавши його в словник -> topic.
Далі завдання вирішується таким чином:
Map:
# знаходимо тематику кожної зі сторінок першого лога
input_line -> [ip, topic]
Reduce:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce отримує на вхід ip та список усіх тематик, просто обчислює, яка з тематик зустрілася найчастіше. Таким чином завдання вирішене за допомогою 1-го MapReduce, а власне Join взагалі відбувається всередині map (тому якби не потрібна була додаткова агрегація за ключем — можна було б обійтися MapOnly job-ом):
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ