4.1 Общая информация о Hadoop

Парадигму MapReduce предложила компания Google в 2004 году в своей статье MapReduce: Simplified Data Processing on Large Clusters. Поскольку предложенная статья содержала описание парадигмы, но реализация отсутствовала – несколько программистов из Yahoo предложили свою реализацию в рамках работ над web-краулером nutch. Более подробно историю Hadoop можно почитать в статье Hadoop | History or Evolution.

Изначально Hadoop был, в первую очередь, инструментом для хранения данных и запуска MapReduce-задач, сейчас же Hadoop представляет собой большой стек технологий, так или иначе связанных с обработкой больших данных (не только при помощи MapReduce).

Основными (core) компонентами Hadoop являются:

  • Hadoop Distributed File System (HDFS) – распределённая файловая система, позволяющая хранить информацию практически неограниченного объёма.
  • Hadoop YARN – фреймворк для управления ресурсами кластера и менеджмента задач, в том числе включает фреймворк MapReduce.
  • Hadoop common

Также существует большое количество проектов непосредственно связанных с Hadoop, но не входящих в Hadoop core:

  • Hive – инструмент для SQL-like запросов над большими данными (превращает SQL-запросы в серию MapReduce–задач);
  • Pig – язык программирования для анализа данных на высоком уровне. Одна строчка кода на этом языке может превратиться в последовательность MapReduce-задач;
  • Hbase – колоночная база данных, реализующая парадигму BigTable;
  • Cassandra – высокопроизводительная распределенная key-value база данных;
  • ZooKeeper – сервис для распределённого хранения конфигурации и синхронизации изменений этой конфигурации;
  • Mahout – библиотека и движок машинного обучения на больших данных.

Отдельно хотелось бы отметить проект Apache Spark, который представляет собой движок для распределённой обработки данных. Apache Spark обычно использует компоненты Hadoop, такие как HDFS и YARN для своей работы, при этом сам в последнее время стал популярнее, чем Hadoop:

Некоторым из перечисленных компонент будут посвящены отдельные статьи этого цикла материалов, а пока разберём, каким образом можно начать работать с Hadoop и применять его на практике.

4.2 Запуск MapReduce программ на Hadoop

Теперь рассмотрим, как запустить MapReduce-задачу на Hadoop. В качестве задачи воспользуемся классическим примером WordCount, который был разобран в предыдущей лекции.

Напомню формулировку задачи: имеется набор документов. Необходимо для каждого слова, встречающегося в наборе документов, посчитать, сколько раз встречается слово в наборе.

Решение:

Map разбивает документ на слова и возвращает множество пар (word, 1).

Reduce суммирует вхождения каждого слова:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Теперь задача запрограммировать это решение в виде кода, который можно будет исполнить на Hadoop и запустить.

4.3 Способ №1. Hadoop Streaming

Самый простой способ запустить MapReduce-программу на Hadoop – воспользоваться streaming-интерфейсом Hadoop. Streaming-интерфейс предполагает, что map и reduce реализованы в виде программ, которые принимают данные с stdin и выдают результат на stdout.

Программа, которая исполняет функцию map называется mapper. Программа, которая выполняет reduce, называется, соответственно, reducer.

Streaming интерфейс предполагает по умолчанию, что одна входящая строка в mapper или reducer соответствует одной входящей записи для map.

Вывод mapper’a попадает на вход reducer’у в виде пар (ключ, значение), при этом все пары, соответствующие одному ключу:

  • Гарантированно будут обработаны одним запуском reducer’a;
  • Будут поданы на вход подряд (то есть если один reducer обрабатывает несколько разных ключей – вход будет сгруппирован по ключу).

Итак, реализуем mapper и reducer на python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Данные, которые будет обрабатывать Hadoop должны храниться на HDFS. Загрузим наши статьи и положим на HDFS. Для этого нужно воспользоваться командой hadoop fs:

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Утилита hadoop fs поддерживает большое количество методов для манипуляций с файловой системой, многие из которых один в один повторяют стандартные утилиты linux.

Теперь запустим streaming-задачу:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Утилита yarn служит для запуска и управления различными приложениями (в том числе map-reduce based) на кластере. Hadoop-streaming.jar – это как раз один из примеров такого yarn-приложения.

Дальше идут параметры запуска:

  • input – папка с исходными данными на hdfs;
  • output – папка на hdfs, куда нужно положить результат;
  • file – файлы, которые нужны в процессе работы map-reduce задачи;
  • mapper – консольная команда, которая будет использоваться для map-стадии;
  • reduce – консольная команда которая будет использоваться для reduce-стадии.

После запуска в консоли можно будет увидеть прогресс выполнения задачи и URL для просмотра более детальной информации о задаче.

В интерфейсе доступном по этому URL можно узнать более детальный статус выполнения задачи, посмотреть логи каждого маппера и редьюсера (что очень полезно в случае упавших задач).

Результат работы после успешного выполнения складывается на HDFS в папку, которую мы указали в поле output. Просмотреть её содержание можно при помощи команды «hadoop fs -ls lenta_wordcount».

Сам результат можно получить следующим образом:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
с	
41  
что	
43  
на	
82  
и	
111  
в	
194 

Команда «hadoop fs -text» выдаёт содержимое папки в текстовом виде. Я отсортировал результат по количеству вхождений слов. Как и ожидалось, самые частые слова в языке – предлоги.

4.4 Способ №2: используем Java

Сам по себе Hadoop написан на java, и нативный интерфейс у Hadoop-a тоже java-based. Покажем, как выглядит нативное java-приложение для wordcount:


import java.io.IOException;  
import java.util.StringTokenizer;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
public class WordCount {  
  
	public static class TokenizerMapper  
        	extends Mapper<Object, Text, Text, IntWritable>{  
  
    	private final static IntWritable one = new IntWritable(1);  
    	private Text word = new Text();  
  
    	public void map(Object key, Text value, Context context  
    	) throws IOException, InterruptedException {  
        	StringTokenizer itr = new StringTokenizer(value.toString());  
        	while (itr.hasMoreTokens()) {  
            	word.set(itr.nextToken());  
            	context.write(word, one);  
        	}  
    	}  
	}  
  
	public static class IntSumReducer  
        	extends Reducer<Text,IntWritable,Text,IntWritable> {  
    	private IntWritable result = new IntWritable();  
  
    	public void reduce(Text key, Iterable values,  
                       	Context context  
    	) throws IOException, InterruptedException {  
        	int sum = 0;  
        	for (IntWritable val : values) {  
            	sum += val.get();  
        	}  
        	result.set(sum);  
        	context.write(key, result);  
    	}  
	}  
  
	public static void main(String[] args) throws Exception {  
    	Configuration conf = new Configuration();  
    	Job job = Job.getInstance(conf, "word count");  
    	job.setJarByClass(WordCount.class);  
    	job.setMapperClass(TokenizerMapper.class);  
    	job.setReducerClass(IntSumReducer.class);  
    	job.setOutputKeyClass(Text.class);  
    	job.setOutputValueClass(IntWritable.class);  
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));  
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));  
    	System.exit(job.waitForCompletion(true) ? 0 : 1);  
	}  
} 

Этот класс делает абсолютно то же самое, что наш пример на Python. Мы создаём классы TokenizerMapper и IntSumReducer, наследуя их от классов Mapper и Reducer соответственно. Классы, передаваемые в качестве параметров шаблона, указывают типы входных и выходных значений. Нативный API подразумевает, что функции map на вход подаётся пара ключ-значение. Поскольку в нашем случае ключ пустой – в качестве типа ключа мы определяем просто Object.

В методе Main мы заводим mapreduce-задачу и определяем её параметры – имя, mapper и reducer, путь в HDFS, где находятся входные данные и куда положить результат. Для компиляции нам потребуются hadoop-овские библиотеки. Я использую для сборки Maven, для которого у cloudera есть репозиторий. Инструкции по его настройке можно найти по ссылке. В итоге файл pom.xmp (который используется maven’ом для описания сборки проекта) у меня получился следующий):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Соберём проект в jar-пакет:

mvn clean package

После сборки проекта в jar-файл запуск происходит похожим образом, как и в случае streaming-интерфейса:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Дожидаемся выполнения и проверяем результат:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
с	
41  
что	
43  
на	
82  
и	
111  
в	
194 

Как нетрудно догадаться, результат выполнения нашего нативного приложения совпадает с результатом streaming-приложения, которое мы запустили предыдущим способом.