5.1 Map only job

Настав час описати різні прийоми, які дозволяють ефективно використовувати MapReduce для вирішення практичних завдань, а також показати деякі особливості Hadoop, які дозволяють спростити розробку або суттєво прискорити виконання MapReduce-завдання на кластері.

Як ми пам'ятаємо, MapReduce складається із стадій Map, Shuffle та Reduce. Як правило, у практичних завданнях найважчою виявляється стадія Shuffle, тому що на цій стадії відбувається сортування даних. Насправді існує низка завдань, у яких можна обійтися лише стадією Map. Ось приклади таких завдань:

  • Фільтрування даних (наприклад, "Знайти всі записи з IP-адресаи 123.123.123.123" у логах web-сервера);
  • Перетворення даних («Видалити колонку в CSV-логах»);
  • Завантаження та вивантаження даних із зовнішнього джерела («Вставити всі записи з лога до бази даних»).

Такі завдання вирішуються за допомогою Map-Only. При створенні Map-Only завдання Hadoop потрібно вказати нульову кількість reducer'ів:

Приклад конфігурації map-only завдання на hadoop:

Native interface Hadoop Streaming Interface

Вказати нульову кількість ред'юсерів під час конфігурації job'a:

job.setNumReduceTasks(0); 

Не вказуємо ред'юсер та вказуємо нульову кількість ред'юсерів. Приклад:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only jobs можуть бути дуже корисними. Наприклад, у платформі Facetz.DCA виявлення характеристик користувачів з їхньої поведінки використовується саме один великий map-only, кожен маппер якого приймає на вхід користувача і вихід віддає його характеристики.

5.2 Combine

Як я вже писав, зазвичай найважча стадія під час виконання Map-Reduce завдання – це стадія shuffle. Відбувається це тому, що проміжні результати (вихід mapper'a) записуються на диск, сортуються та передаються по мережі. Однак існують завдання, в яких така поведінка здається не дуже розумною. Наприклад, у тій же задачі підрахунку слів у документах можна попередньо передагрегувати результати виходів кількох mapper'ів на одному вузлі map-reduce завдання, і передавати на reducer вже підсумовані значення кожної машині.

У hadoop для цього можна визначити комбінуючу функцію, яка оброблятиме вихід частини mapper-ів. Комбінуюча функція дуже схожа на reduce – вона приймає на вхід вихід частини mapper'ів та видає агрегований результат для цих mapper'ів, тому дуже часто reducer використовують і як combiner. Важлива відмінність від reduce – на комбінуючу функцію потрапляють в повному обсязі значення, відповідні одному ключу .

Більше того, hadoop не гарантує того, що комбінуюча функція взагалі буде виконана для виходу mapper'a. Тому комбінуюча функція не завжди застосовна, наприклад у разі пошуку медіанного значення по ключу. Тим не менш, у тих завданнях, де комбінуюча функція застосовна, її використання дозволяє досягти суттєвого приросту до швидкості виконання MapReduce-завдання.

Використання Combiner'a на hadoop:

Native Interface Hadoop streaming

При конфігурації job-a вказати клас Combiner. Як правило, він збігається з Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

У параметрах командного рядка вказати команду Combiner. Як правило, ця команда збігається з командою reducer'a. Приклад:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Ланцюжки MapReduce-завдань

Бувають ситуації, коли для вирішення завдання одним MapReduce не обійтися. Наприклад, розглянемо трохи видозмінене завдання WordCount: є набір текстових документів, необхідно порахувати, скільки слів зустрілося від 1 до 1000 разів у наборі, скільки слів від 1001 до 2000, скільки від 2001 до 3000 тощо. Для вирішення нам знадобиться 2 MapReduce job'а:

  • Видозмінений wordcount, який для кожного слова розрахує, до якого з інтервалів воно потрапило;
  • MapReduce, який підраховує, скільки разів у виході першого MapReduce зустрівся кожен з інтервалів.

Рішення на псевдокоді:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Для того, щоб виконати послідовність MapReduce-завдань на hadoop, досить просто як вхідні дані для другого завдання вказати папку, яка була вказана як output для першої і запустити їх по черзі.

На практиці ланцюжка MapReduce-завдань можуть бути досить складними послідовностями, в яких MapReduce-завдання можуть бути підключені як послідовно, так і паралельно один одному. Для спрощення управління такими планами виконання завдань існують окремі інструменти типу oozie та luigi, яким буде присвячена окрема стаття цього циклу.

5.4 Distributed cache

Важливим механізмом Hadoop є Distributed Cache. Distributed Cache дозволяє додавати файли (наприклад, текстові файли, архіви, jar-файли) до оточення, в якому виконується MapReduce-завдання.

Можна додавати файли, що зберігаються на HDFS, локальні файли (локальні для машини, з якої виконується запуск завдання). Я вже неявно показував, як використовувати Distributed Cache разом із hadoop streaming: додаючи через опцію -file файли mapper.py та reducer.py. Насправді можна додавати не тільки mapper.py та reducer.py, а взагалі довільні файли, і потім користуватися ними начебто вони знаходяться у локальній папці.

Використання Distributed Cache:

Native API
//Конфігурація Job'a
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//Приклад використання в mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // Отримуємо кешовані дані з архівів
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // використовуємо дані тут
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#перераховуємо файли, які необхідно додати до distributed cache у параметрі –files. Параметр –files має йти перед іншими параметрами.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

приклад використання:

import sys 
#просто читаємо файл із локальної папки 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduce Join

Ті, хто звик працювати з реляційними базами, часто користуються дуже зручною операцією Join, що дозволяє спільно обробити зміст деяких таблиць, об'єднавши їх за деяким ключем. Працюючи з великими даними таке завдання теж іноді виникає. Розглянемо наступний приклад:

Є логи двох web-серверів, кожен лог має такий вигляд:

t\t

Приклад шматочка лога:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Необхідно порахувати для кожної IP-адресаи на якій із 2-х серверів він частіше заходив. Результат має бути представлений у вигляді:

\t

Приклад частини результату:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

На жаль, на відміну від реляційних баз даних, у загальному випадку об'єднання двох логів за ключом (в даному випадку – за IP-адресаою) є досить важкою операцією і вирішується за допомогою 3-х MapReduce і патерну Reduce Join:

ReduceJoin працює наступним чином:

1) На кожен із вхідних логів запускається окремий MapReduce (Map only), що перетворює вхідні дані до наступного виду:

key -> (type, value

Де key – це ключ, яким потрібно об'єднувати таблиці, Type – тип таблиці (first чи second у разі), а Value – це будь-які додаткові дані, прив'язані до ключу.

2) Виходи обох MapReduce подаються на вхід 3-го MapReduce, який, власне, і виконує об'єднання. Цей MapReduce містить порожній Mapper, який просто копіює вхідні дані. Далі shuffle розкладає дані за ключами та подає на вхід редьюсеру у вигляді:

key -> [(type, value)]

Важливо, що в цей момент на редьюсер потрапляють записи з обох логів і при цьому по полю типу можна ідентифікувати, з якого з двох логів потрапило конкретне значення. Значить даних достатньо, щоб вирішити вихідне завдання. У нашому випадку reducer просто повинен порахувати для кожного ключа записів, з яким тип зустрілося більше і вивести цей тип.

5.6 MapJoin

Паттерн ReduceJoin описує загальний випадок поєднання двох логів по ключу. Однак є окремий випадок, при якому завдання можна суттєво спростити та прискорити. Це випадок, у якому одне із логів має розмір значно меншого розміру, ніж інший. Розглянемо таке завдання:

Є 2 логи. Перший лог містить лог web-сервера (такий як у попередній задачі), другий файл (розміром в 100кб) містить відповідність URL-> Тематика. Приклад 2-го файлу:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Для кожної IP-адресаи необхідно розрахувати сторінки якої категорії з цієї IP-адресаи завантажувалися найчастіше.

У цьому випадку нам також потрібно виконати Join 2-х логів за URL. Однак у цьому випадку нам не обов'язково запускати 3 MapReduce, тому що другий лог повністю влізе в пам'ять. Для того, щоб вирішити задачу за допомогою 1-го MapReduce, ми можемо завантажити другий лог в Distributed Cache, а при ініціалізації Mapper'a просто вважати його на згадку, поклавши його в словник -> topic.

Далі завдання вирішується так:

Map:

# знаходимо тематику кожної зі сторінок першого лога 
input_line -> [ip,  topic] 

Reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce отримує на вхід ip та список усіх тематик, просто обчислює, яка з тематик зустрілася найчастіше. Таким чином завдання вирішено за допомогою 1-го MapReduce, а власне Join взагалі відбувається всередині map (тому якби не потрібна була додаткова агрегація по ключу – можна було б обійтися MapOnly job-ом):