1. Map only job

Настав час описати різні прийоми, які дозволяють ефективно використовувати MapReduce для вирішення практичних завдань, а також показати деякі особливості Hadoop, які дозволяють спростити розробку або суттєво прискорити виконання MapReduce-завдання на кластері.

Як ми пам'ятаємо, MapReduce складається зі стадій Map, Shuffle та Reduce. Як правило, у практичних завданнях найважчою виявляється стадія Shuffle, тому що на цій стадії відбувається сортування даних. Насправді існує низка завдань, у яких можна обійтися лише стадією Map. Ось приклади таких завдань:

  • Фільтрування даних (наприклад, «Знайти всі записи з IP-адреси 123.123.123.123» у логах вебсервера);
  • Перетворення даних («Видалити колонку в csv-логах»);
  • Завантаження та вивантаження даних із зовнішнього джерела («Вставити всі записи з лога до бази даних»).

Такі завдання вирішуються за допомогою Map-Only. При створенні Map-Only завдання в Hadoop потрібно вказати нульову кількість reducer'ів:

Map-Only

Приклад конфігурації map-only завдання на hadoop:

job.setNumReduceTasks(0);

Не вказуємо редьюсер і вказуємо нульову кількість ред'юсерів. Приклад:

hadoop jar hadoop-streaming.jar \
 -D mapred.reduce.tasks=0\
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-file "mapper.py"
Вказати нульову кількість ред'юсерів при конфігурації job-и:
Native interface Hadoop Streaming Interface

Map Only jobs насправді можуть бути дуже корисними. Наприклад, у платформі Facetz.DCA виявлення характеристик користувачів з їхньої поведінки використовується саме один великий map-only, кожен маппер якого приймає на вхід користувача і вихід віддає його характеристики.

2. Combine

Як я вже писав, зазвичай найважча стадія під час виконання Map-Reduce завдання — це стадія shuffle. Відбувається це тому, що проміжні результати (вихід mapper-a) записуються на диск, сортуються та передаються по мережі. Проте існують завдання, у яких така поведінка здається не дуже розумною. Наприклад, у тій же задачі підрахунку слів у документах можна попередньо передагрегувати результати виходів кількох mapper-ів на одному вузлі map-reduce завдання, та передавати на reducer вже підсумовані значення кожної машини.

Combine Map-Reduce

У hadoop для цього можна визначити комбінуючу функцію, яка оброблятиме вихід частини mapper-ів. Комбінуюча функція дуже схожа на reduce: вона приймає на вхід вихід частини mapper-ів та видає агрегований результат для цих mapper-ів, тому дуже часто reducer використовують і як combiner. Важлива відмінність від reduce — на функцію, що комбінує, потрапляють не всі значення, що відповідають одному ключу.

Більш того, hadoop не гарантує, що комбінуюча функція взагалі буде виконана для виходу mapper-a. Тому комбінуюча функція не завжди застосовується. Наприклад у разі пошуку медіанного значення по ключу. Тим не менш, у тих завданнях, де комбінуючу функцію можна застосувати, її використання дозволяє досягти суттєвого приросту до швидкості виконання MapReduce-завдання.

Використання Combiner-a на hadoop:

Під час конфігурації job-и вказати клас-Combiner. Зазвичай він збігається з Reducer:

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-reducer "python reducer.py"\
-combiner "python reducer.py"\
-file "mapper.py"\
-file "reducer.py"\
У параметрах командного рядка вказати команду -combiner. Зазвичай ця команда збігається з командою reducer-a. Приклад:
Native Interface Hadoop streaming

3. Ланцюжки MapReduce-завдань

Бують ситуації, коли для вирішення завдання одним MapReduce не обійтися. Наприклад, розглянемо трохи видозмінене завдання WordCount: є набір текстових документів, необхідно порахувати, скільки слів зустрілося від 1 до 1000 разів у наборі, скільки слів від 1001 до 2000, скільки від 2001 до 3000 і так далі. Для вирішення нам знадобиться 2 MapReduce job'а:

  • Відозмінений wordcount, який для кожного слова розрахує, до якого з інтервалів воно потрапило;
  • MapReduce, який підраховує, скільки разів у виході першого MapReduce зустрівся кожен з інтервалів.

Рішення на псевдокоді:

#map1
def map(doc):
for word in doc:
yield word, 1
#reduce1
def reduce(word, values):
yield int(sum(values)/1000), 1 
#map2
def map(doc):
interval, cnt = doc.split()
yield interval, cnt 
#reduce2
def reduce(interval, values):
yield interval*1000, sum(values) 

Для того, щоб виконати послідовність MapReduce-задач на hadoop, достатньо просто як вхідні дані для другого завдання вказати папку, яка була вказана як output для першої, і запустити їх по черзі.

На практиці ланцюжки MapReduce-завдань можуть бути досить складними послідовностями, в яких MapReduce-завдання можуть підключатися як послідовно, так і паралельно один одному. Для спрощення управління такими планами виконання завдань існують окремі інструменти типу oozie та luigi, яким буде присвячена окрема стаття цього циклу.

4. Distributed cache

Важливим механізмом у Hadoop є Distributed Cache. Distributed Cache дозволяє додавати файли (наприклад, текстові файли, архіви, jar-файли) до оточення, в якому виконується MapReduce-завдання.

Можно додавати файли, що зберігаються на HDFS, локальні файли (локальні для тієї машини, з якої виконується запуск завдання). Я вже неявно показував, як використовувати Distributed Cache разом із hadoop streaming: додаючи через опцію -file файли mapper.py та reducer.py. Насправді можна додавати не лише mapper.py та reducer.py, а взагалі довільні файли, і потім користуватися ними начебто вони знаходяться у локальній папці.

Використання Distributed Cache:

Native API

//конфігурація Job’a
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//приклад використання в mapper-і:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // отримуємо кешовані дані з архівів
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // використовуємо дані тут
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#перераховуємо файли, які потрібно додати до distributed cache у параметрі –files. Параметр –files має йти перед другими параметрами.

yarn  hadoop-streaming.jar\
-files mapper.py,reducer.py,some_cached_data.txt\
-input '/some/input/path' \
-output '/some/output/path' \
-mapper 'python mapper.py' \
-reducer 'python reducer.py' \

приклад використання:

import sys
#просто читаємо файл із локальної папки
data = open('some_cached_data.txt').read()

for line in sys.stdin()
#processing input
#use data here

5.5 Reduce Join

Ті, хто звик працювати з реляційними базами, часто користуються дуже зручною операцією Join, що дозволяє спільно обробити зміст деяких таблиць, об'єднавши їх за деяким ключем. Працюючи з великими даними таке завдання теж іноді виникає. Розглянемо наступний приклад:

Є логи двох web-серверів, кожен лог має такий вигляд:

t\t

Приклад шматочка лога:

1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css

Необхідно порахувати для кожної IP-адреси, на який із 2-х серверів він частіше заходив. Результат має бути поданий у вигляді:

\t

Приклад частини результату:

178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first

На жаль, на відміну від реляційних баз даних, в загальному уявленні об'єднання двох логів за ключем (у цьому випадку — за IP-адресою) є досить важкою операцією, і вирішується за допомогою 3-х MapReduce та патерну Reduce Join:

ReduceJoin працює таким чином:

1) На кожен із вхідних логів запускається окремий MapReduce (Map only), що перетворює вхідні дані до такого вигляду:

key -> (type, value

Де key — це ключ, яким потрібно об'єднувати таблиці, Type — тип таблиці (first чи second у разі), а Value — це будь-які додаткові дані, прив'язані до ключа.

2) Виходи обох MapReduce подаються на вхід 3-му MapReduce, який, власне, і виконує об'єднання. Цей MapReduce містить порожній Mapper, який просто копіює вхідні дані. Далі shuffle розкладає дані за ключами та подає на вхід редьюсеру у вигляді:

key -> [(type, value)]

Важливо, що в цей момент на редьюсер потрапляють записи з обох логів і водночас по полю типу можна ідентифікувати, з якого з двох логів потрапило конкретне значення. Значить даних достатньо, щоб вирішити вихідне завдання. У нашому випадку reducer просто повинен порахувати для кожного ключа записів, з яким тип зустрілося більше і вивести цей тип.

6. MapJoin

Паттерн ReduceJoin описує загальний випадок об'єднання двох логів за ключем. Однак є окремий випадок, при якому завдання можна суттєво спростити та прискорити. Це випадок, у якому одне із логів має розмір значно меншого розміру, ніж інший. Розглянемо таке завдання:

Є 2 логи. Перший лог містить лог вебсервера (такий як у попередній задачі), другий файл (розміром в 100кб) містить відповідність URL-> Тематика. Приклад 2-го файлу:

/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business

Для кожної IP-адреси необхідно розрахувати сторінки якої категорії з даної IP-адреси завантажувалися найчастіше.

У цьому випадку нам теж необхідно виконати Join 2-х логів за URL. Однак у цьому випадку нам не обов'язково запускати 3 MapReduce, тому що другий лог повністю влізе в пам'ять. Для того, щоб вирішити задачу за допомогою 1-го MapReduce, ми можемо завантажити другий лог в Distributed Cache, а під час ініціалізації Mapper-a просто рахувати його на згадку, поклавши його в словник -> topic.

Далі завдання вирішується таким чином:

Map:

# знаходимо тематику кожної зі сторінок першого лога
      input_line -> [ip,  topic]

Reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce отримує на вхід ip та список усіх тематик, просто обчислює, яка з тематик зустрілася найчастіше. Таким чином завдання вирішене за допомогою 1-го MapReduce, а власне Join взагалі відбувається всередині map (тому якби не потрібна була додаткова агрегація за ключем — можна було б обійтися MapOnly job-ом):