1. Загальна інформація про Hadoop

Парадигму MapReduce запропонувала компанія Google у 2004 році у своїй статті MapReduce: Simplified Data Processing on Large Clusters. Оскільки запропонована стаття містила опис парадигми, але реалізація була відсутня, кілька програмістів із Yahoo запропонували свою реалізацію в межах робіт над web-краулером nutch. Докладніше історію Hadoop можна почитати у статті History of Hadoop: З 4-х днів до майбутнього data.

Спочатку Hadoop був, в першу чергу, інструментом для зберігання даних і запуску MapReduce-задач. Зараз Hadoop являє собою великий стек технологій, так чи інакше пов'язаних з обробкою великих даних (не тільки за допомогою MapReduce).

Основними (core) компонентами Hadoop є:

  • Hadoop Distributed File System (HDFS) — розподілена файлова система, що дозволяє зберігати інформацію практично необмеженого обсягу.
  • Hadoop YARN — фреймворк для управління ресурсами кластера та менеджменту завдань, у тому числі включає фреймворк MapReduce.
  • Hadoop common

Також існує велика кількість проєктів, безпосередньо пов'язаних з Hadoop, але які не входять до Hadoop core:

  • Hive — інструмент для SQL-like запитів над великими даними (перетворює SQL-запити в серію MapReduce-завдань);
  • Pig — мова програмування для аналізу даних на високому рівні. Один рядок коду цією мовою може перетворитися на послідовність MapReduce-завдань;
  • Hbase — колонкова база даних, що реалізує парадигму BigTable;
  • Cassandra — високопродуктивна розподілена key-value база даних;
  • ZooKeeper — сервіс для розподіленого зберігання конфігурації та синхронізації змін цієї конфігурації;
  • Mahout — бібліотека та движок машинного навчання на великих даних.

Особливо хотілося б відзначити проєкт Apache Spark, який є рушієм для розподіленої обробки даних. Apache Spark зазвичай використовує компоненти Hadoop, такі як HDFS та YARN для своєї роботи, при цьому сам останнім часом став популярнішим, ніж Hadoop:

Деяким з переразованих компонент будуть присвячені окремі статті цього циклу матеріалів, а поки що розберемо, яким чином можна почати працювати з Hadoop і застосовувати його на практиці.

2. Запуск MapReduce програм на Hadoop

Тепер розглянемо, як запустити MapReduce-завдання на Hadoop. Як завдання скористаємося класичним прикладом WordCount, який був розібраний у попередній лекції.

Нагадаю формулювання завдання: є набір документів. Необхідно для кожного слова, що зустрічається в наборі документів, порахувати скільки разів зустрічається слово в наборі.

Рішення:

Map розбиває документ на слова і повертає безліч пар (word, 1).

Reduce підсумовує входження кожного слова:

def map(doc):
for word in doc.split():
	yield word, 1
def reduce(word, values):
	yield word, sum(values)

Тепер завдання — запрограмувати це рішення у вигляді коду, який можна буде виконати на Hadoop та запустити.

3. Спосіб №1. Hadoop Streaming

Найпростіший спосіб запустити MapReduce-програму на Hadoop — скористатися streaming-інтерфейсом Hadoop. Streaming-інтерфейс передбачає, що map та reduce реалізовані у вигляді програм, які приймають дані зі stdin та видають результат на stdout.

Програма, яка виконує функцію map називається mapper. Програма, яка виконує reduce, називається відповідно reducer.

Streaming інтерфейс за замовчуванням передбачає, що один вхідний рядок до mapper або reducer відповідає одному вхідному запису для map.

Виведення mapper-a потрапляє на вхід reducer-у у вигляді пар (ключ, значення), водночас всі пари, що відповідають одному ключу:

  • Гарантовано будуть оброблені одним запуском reducer-a;
  • Будуть подані на вхід підряд (тобто якщо один reducer обробляє кілька різних ключів, вхід буде згрупований за ключем).

Отже, реалізуємо mapper і reducer на python:

#mapper.py
import sys

def do_map(doc):
for word in doc.split():
	yield word.lower(), 1

for line in sys.stdin:
	for key, value in do_map(line):
    	print(key + "\t" + str(value))

#reducer.py
import sys

def do_reduce(word, values):
	return word, sum(values)

prev_key = None
values = []

for line in sys.stdin:
	key, value = line.split("\t")
	if key != prev_key and prev_key is not None:
    	result_key, result_value = do_reduce(prev_key, values)
    	print(result_key + "\t" + str(result_value))
    	values = []
	prev_key = key
	values.append(int(value))

if prev_key is not None:
	result_key, result_value = do_reduce(prev_key, values)
	print(result_key + "\t" + str(result_value))

Дані, які буде обробляти Hadoop, повинні зберігатися на HDFS. Завантажимо наші статті та покладемо на HDFS. Для цього потрібно скористатися командою hadoop fs:

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles

Утиліта hadoop fs підтримує велику кількість методів для маніпуляцій з файловою системою, багато з яких один в один повторюють стандартні утиліти linux.

Тепер запустимо streaming-завдання:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
 -input lenta_articles\
 -output lenta_wordcount\
 -file mapper.py\
 -file reducer.py\
 -mapper "python mapper.py"\
 -reducer "python reducer.py"

Утиліта yarn служить для запуску та керування різними програмами (у тому числі map-reduce based) на кластері. Hadoop-streaming.jar — це саме один із прикладів такого yarn-додатку.

Далі йдуть параметри запуску:

  • input — папка з вихідними даними на hdfs;
  • output — папка на hdfs, куди потрібно покласти результат;
  • file — файли, які потрібні у процесі роботи map-reduce завдання;
  • mapper — консольна команда, яка використовуватиметься для map-стадії;
  • reduce — консольна команда, яка буде використовуватися для reduce-стадії.

Після запуску в консолі можна буде побачити прогрес виконання завдання та URL для перегляду більш детальної інформації про завдання.

В інтерфейсі доступному за цією URL можна дізнатися більш детальний статус виконання завдання, подивитися логи кожного мапера і редьюсера (що дуже корисно у випадку із завданнями, що впали).

Результат роботи після успішного виконання складається на HDFS в папку, яку ми вказали в полі output. Переглянути її зміст можна за допомогою команди hadoop fs -ls lenta_wordcount.

Сам результат можна отримати таким чином:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
с
41
що
43
на
82
і
111
в
194

Команда hadoop fs -text видає вміст папки в текстовому вигляді. Я відсортував результат за кількістю слів. Як і очікувалося, найчастіші слова в мові — прийменники.

4.4 Спосіб №2: використовуємо Java

Сам по собі Hadoop написаний на java, і нативний інтерфейс у Hadoop-і теж Java-based. Покажемо, як виглядає нативний java-додаток для wordcount:


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable<IntWritable> values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Цей клас робить абсолютно те саме, що наш приклад на Python. Ми створюємо класи TokenizerMapper та IntSumReducer, успадковуючи їх від класів Mapper та Reducer відповідно. Класи, що передаються як параметри шаблону, вказують типи вхідних та вихідних значень. Нативний API передбачає, що функції map на вхід подається пара ключ-значення. Оскільки в нашому випадку ключ порожній, як тип ключа ми визначаємо просто Object.

У методі Main ми заводимо mapreduce-завдання та визначаємо її параметри: ім'я, mapper та reducer, шлях у HDFS, де знаходяться вхідні дані та куди покласти результат. Для компіляції нам потрібні hadoop-івські бібліотеки. Я використовую для складання Maven, для якого у cloudera є репозиторій. Інструкції щодо його налаштування можна знайти за посиланням. У результаті файл pom.xmp (який використовується maven-ом для опису складання проєкту) у мене вийшов такий:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<repositories>
    	<repository>
        	<id>cloudera</id>
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    	</repository>
	</repositories>

	<dependencies>
    	<dependency>
        	<groupId>org.apache.hadoop</groupId>
        	<artifactId>hadoop-common</artifactId>
        	<version>2.6.0-cdh5.4.2</version>
    	</dependency>

    	<dependency>
        	<groupId>org.apache.hadoop</groupId>
        	<artifactId>hadoop-auth</artifactId>
        	<version>2.6.0-cdh5.4.2</version>
    	</dependency>

    	<dependency>
        	<groupId>org.apache.hadoop</groupId>
        	<artifactId>hadoop-hdfs</artifactId>
        	<version>2.6.0-cdh5.4.2</version>
    	</dependency>

    	<dependency>
        	<groupId>org.apache.hadoop</groupId>
        	<artifactId>hadoop-mapreduce-client-app</artifactId>
        	<version>2.6.0-cdh5.4.2</version>
    	</dependency>

	</dependencies>

	<groupId>org.dca.examples</groupId>
	<artifactId>wordcount</artifactId>
	<version>1.0-SNAPSHOT</version>

</project>

Зберемо проект у jar-пакет:

mvn clean package

Після складання проєкту до jar-файлу запуск відбувається схожим чином, як і у разі streaming-інтерфейсу:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount

Чекаємо виконання та перевіряємо результат:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
з
41
що
43
на
82
і
111
в
194

Як можна здогадатися, результат виконання нашої нативної програми збігається з результатом streaming-додатку, який ми запустили попереднім способом.