5.1 Map only job

Пришла пора описать различные приёмы, которые позволяют эффективно использовать MapReduce для решения практических задач, а также показать некоторые особенности Hadoop, которые позволяют упростить разработку или существенно ускорить выполнение MapReduce-задачи на кластере.

Как мы помним, MapReduce состоит из стадий Map, Shuffle и Reduce. Как правило, в практических задачах самой тяжёлой оказывается стадия Shuffle, так как на этой стадии происходит сортировка данных. На самом деле существует ряд задач, в которых можно обойтись только стадией Map. Вот примеры таких задач:

  • Фильтрация данных (например, «Найти все записи с IP-адреса 123.123.123.123» в логах web-сервера);
  • Преобразование данных («Удалить колонку в csv-логах»);
  • Загрузка и выгрузка данных из внешнего источника («Вставить все записи из лога в базу данных»).

Такие задачи решаются при помощи Map-Only. При создании Map-Only задачи в Hadoop нужно указать нулевое количество reducer’ов:

Пример конфигурации map-only задачи на hadoop:

Native interface Hadoop Streaming Interface

Указать нулевое количество редьюсеров при конфигурации job’a:

job.setNumReduceTasks(0); 

Не указываем редьюсер и указываем нулевое количество редьюсеров. Пример:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only jobs на самом деле могут быть очень полезными. Например, в платформе Facetz.DCA для выявления характеристик пользователей по их поведению используется именно один большой map-only, каждый маппер которого принимает на вход пользователя и на выход отдаёт его характеристики.

5.2 Combine

Как я уже писал, обычно самая тяжёлая стадия при выполнении Map-Reduce задачи – это стадия shuffle. Происходит это потому, что промежуточные результаты (выход mapper’a) записываются на диск, сортируются и передаются по сети. Однако существуют задачи, в которых такое поведение кажется не очень разумным. Например, в той же задаче подсчёта слов в документах можно предварительно предагрегировать результаты выходов нескольких mapper’ов на одном узле map-reduce задачи, и передавать на reducer уже просуммированные значения по каждой машине.

В hadoop для этого можно определить комбинирующую функцию, которая будет обрабатывать выход части mapper-ов. Комбинирующая функция очень похожа на reduce – она принимает на вход выход части mapper’ов и выдаёт агрегированный результат для этих mapper’ов, поэтому очень часто reducer используют и как combiner. Важное отличие от reduce – на комбинирующую функцию попадают не все значения, соответствующие одному ключу.

Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу. Тем не менее, в тех задачах, где комбинирующая функция применима, её использование позволяет добиться существенного прироста к скорости выполнения MapReduce-задачи.

Использование Combiner’a на hadoop:

Native Interface Hadoop streaming

При конфигурации job-a указать класс-Combiner. Как правило, он совпадает с Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

В параметрах командной строки указать команду -combiner. Как правило, эта команда совпадает с командой reducer’a. Пример:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Цепочки MapReduce-задач

Бывают ситуации, когда для решения задачи одним MapReduce не обойтись. Например, рассмотрим немного видоизмененную задачу WordCount: имеется набор текстовых документов, необходимо посчитать, сколько слов встретилось от 1 до 1000 раз в наборе, сколько слов от 1001 до 2000, сколько от 2001 до 3000 и так далее. Для решения нам потребуется 2 MapReduce job’а:

  • Видоизменённый wordcount, который для каждого слова рассчитает, в какой из интервалов оно попало;
  • MapReduce, подсчитывающий, сколько раз в выходе первого MapReduce встретился каждый из интервалов.

Решение на псевдокоде:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Для того, чтобы выполнить последовательность MapReduce-задач на hadoop, достаточно просто в качестве входных данных для второй задачи указать папку, которая была указана в качестве output для первой и запустить их по очереди.

На практике цепочки MapReduce-задач могут представлять собой достаточно сложные последовательности, в которых MapReduce-задачи могут быть подключены как последовательно, так и параллельно друг другу. Для упрощения управления такими планами выполнения задач существуют отдельные инструменты типа oozie и luigi.

5.4 Distributed cache

Важным механизмом в Hadoop является Distributed Cache. Distributed Cache позволяет добавлять файлы (например, текстовые файлы, архивы, jar-файлы) к окружению, в котором выполняется MapReduce-задача.

Можно добавлять файлы, хранящиеся на HDFS, локальные файлы (локальные для той машины, с которой выполняется запуск задачи). Я уже неявно показывал, как использовать Distributed Cache вместе с hadoop streaming: добавляя через опцию -file файлы mapper.py и reducer.py. На самом деле можно добавлять не только mapper.py и reducer.py, а вообще произвольные файлы, и потом пользоваться ими как будто они находятся в локальной папке.

Использование Distributed Cache:

Native API

//конфигурация Job’a 
JobConf job = new JobConf(); 
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job); 
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); 
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); 
 
//пример использования в mapper-e: 
public static class MapClass extends MapReduceBase   
implements Mapper<K, V, K, V> { 
 
 private Path[] localArchives; 
 private Path[] localFiles; 
  
 public void configure(JobConf job) { 
   // получаем кэшированные данные из архивов 
   File f = new File("./map.zip/some/file/in/zip.txt"); 
 } 
  
 public void map(K key, V value,  
             	OutputCollector<K, V> output, Reporter reporter)  
 throws IOException { 
   // используем данные тут 
   // ... 
   // ... 
   output.collect(k, v); 
 } 
}
Hadoop Streaming

#перечисляем файлы, которые необходимо добавить в distributed cache в параметре –files. Параметр –files должен идти перед другими параметрами.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

пример использования:

import sys 
#просто читаем файл из локальной папки 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduce Join

Те, кто привык работать с реляционными базами, часто пользуются очень удобной операцией Join, позволяющей совместно обработать содержание некоторых таблиц, объединив их по некоторому ключу. При работе с большими данными такая задача тоже иногда возникает. Рассмотрим следующий пример:

Имеются логи двух web-серверов, каждый лог имеет следующий вид:

t\t

Пример кусочка лога:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Необходимо посчитать для каждого IP-адреса на какой из 2-х серверов он чаще заходил. Результат должен быть представлен в виде:

\t

Пример части результата:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

К сожалению, в отличие от реляционных баз данных, в общем случае объединение двух логов по ключу (в данном случае – по IP-адресу) представляет собой достаточно тяжёлую операцию и решается при помощи 3-х MapReduce и паттерна Reduce Join:

ReduceJoin работает следующим образом:

1) На каждый из входных логов запускается отдельный MapReduce (Map only), преобразующий входные данные к следующему виду:

key -> (type, value

Где key – это ключ, по которому нужно объединять таблицы, Type – тип таблицы (first или second в нашем случае), а Value – это любые дополнительные данные, привязанные к ключу.

2) Выходы обоих MapReduce подаются на вход 3-му MapReduce, который, собственно, и выполняет объединение. Этот MapReduce содержит пустой Mapper, который просто копирует входные данные. Дальше shuffle раскладывает данные по ключам и подаёт на вход редьюсеру в виде:

key -> [(type, value)]

Важно, что в этот момент на редьюсер попадают записи из обоих логов и при этом по полю type можно идентифицировать, из какого из двух логов попало конкретное значение. Значит данных достаточно, чтобы решить исходную задачу. В нашем случае reducer просто должен посчитать для каждого ключа записей, с каким type встретилось больше и вывести этот type.

5.6 MapJoin

Паттерн ReduceJoin описывает общий случай объединения двух логов по ключу. Однако есть частный случай, при котором задачу можно существенно упростить и ускорить. Это случай, при котором один из логов имеет размер существенно меньшего размера, чем другой. Рассмотрим следующую задачу:

Имеются 2 лога. Первый лог содержит лог web-cервера (такой же как в предыдущей задаче), второй файл (размером в 100кб) содержит соответствие URL-> Тематика. Пример 2-го файла:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Для каждого IP-адреса необходимо рассчитать страницы какой категории с данного IP-адреса загружались чаще всего.

В этом случае нам тоже необходимо выполнить Join 2-х логов по URL. Однако в этом случае нам не обязательно запускать 3 MapReduce, так как второй лог полностью влезет в память. Для того, чтобы решить задачу при помощи 1-го MapReduce, мы можем загрузить второй лог в Distributed Cache, а при инициализации Mapper’a просто считать его в память, положив его в словарь -> topic.

Далее задача решается следующим образом:

Map:

# находим тематику каждой из страниц первого лога 
input_line -> [ip,  topic] 

Reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce получает на вход ip и список всех тематик, просто вычисляет, какая из тематик встретилась чаще всего. Таким образом задача решена при помощи 1-го MapReduce, а собственно Join вообще происходит внутри map (поэтому если бы не нужна была дополнительная агрегация по ключу – можно было бы обойтись MapOnly job-ом):