4.1 Загальна інформація про Hadoop

Парадигму MapReduce запропонувала компанія Google у 2004 році у своїй статті MapReduce: Simplified Data Processing on Large Clusters . Оскільки запропонована стаття містила опис парадигми, але реалізація була відсутня – кілька програмістів із Yahoo запропонували свою реалізацію в рамках робіт над web-краулером nutch. Докладніше історію Hadoop можна почитати у статті Hadoop | History or Evolution .
Спочатку Hadoop був, в першу чергу, інструментом для зберігання даних і запуску MapReduce-завдань, зараз Hadoop являє собою великий стек технологій, так чи інакше пов'язаних з обробкою великих даних (не тільки за допомогою MapReduce).
Основними (core) компонентами Hadoop є:
- Hadoop Distributed File System (HDFS) – розподілена файлова система, що дозволяє зберігати інформацію практично необмеженого обсягу.
- Hadoop YARN – фреймворк для управління ресурсами кластера та менеджменту завдань, у тому числі включає фреймворк MapReduce.
- Hadoop common
Також існує велика кількість проектів, що безпосередньо пов'язані з Hadoop, але не входять до Hadoop core:
- Hive – інструмент для SQL-like запитів над великими даними (перетворює SQL-запити на серію MapReduce–задач);
- Pig – мова програмування для аналізу даних високому рівні. Один рядок коду цією мовою може перетворитися на послідовність MapReduce-завдань;
- Hbase - колонкова база даних, що реалізує парадигму BigTable;
- Cassandra – високопродуктивна розподілена key-value база даних;
- ZooKeeper – сервіс для розподіленого зберігання конфігурації та синхронізації змін цієї конфігурації;
- Mahout - бібліотека і двигун машинного навчання на високих даних.
Окремо хотілося б відзначити проект Apache Spark , який є двигуном для розподіленої обробки даних. Apache Spark зазвичай використовує компоненти Hadoop, такі як HDFS та YARN для своєї роботи, при цьому сам останнім часом став популярнішим, ніж Hadoop:

Деяким з перерахованих компонентів будуть присвячені окремі статті цього циклу матеріалів, а поки що розберемо, яким чином можна почати працювати з Hadoop і застосовувати його на практиці.
4.2 Запуск програм MapReduce на Hadoop
Тепер розглянемо, як запустити MapReduce-завдання на Hadoop. Як завдання скористаємося класичним прикладом WordCount , який був розібраний у попередній лекції.
Нагадаю формулювання завдання: є набір документів. Необхідно для кожного слова, що зустрічається в наборі документів, порахувати скільки разів зустрічається слово в наборі.
Рішення:
Map розбиває документ на слова та повертає безліч пар (word, 1).
Reduce підсумовує входження кожного слова:
|
|
Тепер завдання запрограмувати це рішення у вигляді коду, який можна буде виконати на Hadoop та запустити.
4.3 Спосіб №1. Hadoop Streaming
Найпростіший спосіб запустити MapReduce-програму на Hadoop – скористатися streaming-інтерфейсом Hadoop. Streaming-інтерфейс передбачає, що map і reduce реалізовані як програм, які приймають дані з stdin і видають результат на stdout .
Програма, яка виконує функцію map, називається mapper. Програма, яка виконує reduce , називається відповідно reducer .
Streaming інтерфейс передбачає за замовчуванням, що один вхідний рядок в mapper або reducer відповідає одному вхідному запису для map .
Висновок mapper'a потрапляє на вхід reducer'у як пар (ключ, значення), у своїй всі пари, відповідні одному ключу:
- Гарантовано буде оброблено одним запуском reducer'a;
- Буде подано на вхід поспіль (тобто якщо один reducer обробляє кілька різних ключів – вхід буде згруповано за ключом).
Отже, реалізуємо mapper та reducer на python:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
Дані, які буде обробляти Hadoop, повинні зберігатися на HDFS. Завантажимо наші статті та покладемо на HDFS. Для цього потрібно скористатися командою hadoop fs :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
Утиліта hadoop fs підтримує велику кількість методів для маніпуляцій з файловою системою, багато з яких один в один повторюють стандартні утиліти linux.
Тепер запустимо streaming-завдання:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
Утиліта yarn служить для запуску та управління різними програмами (у тому числі map-reduce based) на кластері. Hadoop-streaming.jar – це якраз один із прикладів такого yarn-додатку.
Далі йдуть параметри запуску:
- input - папка з вихідними даними на hdfs;
- output – папка на hdfs, куди слід покласти результат;
- file – файли, які необхідні у процесі роботи map-reduce завдання;
- mapper – консольна команда, яка використовуватиметься для map-стадії;
- reduce – консольна команда, яка буде використовуватися для reduce-стадії.
Після запуску консолі можна буде побачити прогрес виконання завдання та URL для перегляду більш детальної інформації про завдання.

В інтерфейсі доступному по цьому URL можна дізнатися більш детальний статус виконання завдання, подивитися логи кожного мапера і редьюсера (що дуже корисно у разі завдань, що впали).

Результат роботи після успішного виконання складається на HDFS до папки, яку ми вказали в поле output. Переглянути її зміст можна за допомогою команди hadoop fs -ls lenta_wordcount.
Сам результат можна отримати так:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
с
41
что
43
на
82
и
111
в
194
Команда hadoop fs -text видає вміст папки в текстовому вигляді. Я відсортував результат за кількістю слів. Як і очікувалося, найчастіші слова у мові – прийменники.
4.4 Спосіб №2: використовуємо Java
Сам собою Hadoop написаний на java, і нативний інтерфейс у Hadoop-a теж java-based. Покажемо, як виглядає нативний java-додаток для wordcount:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Цей клас робить те ж саме, що наш приклад на Python. Ми створюємо класи TokenizerMapper та IntSumReducer, успадковуючи їх від класів Mapper та Reducer відповідно. Класи, що передаються як параметри шаблону, вказують типи вхідних та вихідних значень. Нативний API передбачає, що функції map на вхід подається пара ключ-значення. Оскільки в нашому випадку ключ порожній – як тип ключа ми визначаємо просто Object.
У методі Main ми заводимо mapreduce-завдання та визначаємо її параметри – ім'я, mapper та reducer, шлях у HDFS, де знаходяться вхідні дані та куди покласти результат. Для компіляції нам потрібні hadoop-івські бібліотеки. Я використовую для складання Maven, для якого у cloudera є репозиторій. Інструкції щодо його налаштування можна знайти за посиланням. У результаті файл pom.xmp (який використовується maven'ом для опису складання проекту) у мене вийшов наступний):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Зберемо проект у jar-пакет:
mvn clean package
Після складання проекту в jar-файл запуск відбувається так само, як і у випадку streaming-інтерфейсу:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Очікуємо виконання та перевіряємо результат:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
с
41
что
43
на
82
и
111
в
194
Як неважко здогадатися, результат виконання нашої нативної програми збігається з результатом streaming-додатку, який ми запустабо попереднім способом.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ