1. Загальна інформація про Hadoop
Парадигму MapReduce запропонувала компанія Google у 2004 році у своїй статті MapReduce: Simplified Data Processing on Large Clusters. Оскільки запропонована стаття містила опис парадигми, але реалізація була відсутня, кілька програмістів із Yahoo запропонували свою реалізацію в межах робіт над web-краулером nutch. Докладніше історію Hadoop можна почитати у статті History of Hadoop: З 4-х днів до майбутнього data.
Спочатку Hadoop був, в першу чергу, інструментом для зберігання даних і запуску MapReduce-задач. Зараз Hadoop являє собою великий стек технологій, так чи інакше пов'язаних з обробкою великих даних (не тільки за допомогою MapReduce).
Основними (core) компонентами Hadoop є:
- Hadoop Distributed File System (HDFS) — розподілена файлова система, що дозволяє зберігати інформацію практично необмеженого обсягу.
- Hadoop YARN — фреймворк для управління ресурсами кластера та менеджменту завдань, у тому числі включає фреймворк MapReduce.
- Hadoop common
Також існує велика кількість проєктів, безпосередньо пов'язаних з Hadoop, але які не входять до Hadoop core:
- Hive — інструмент для SQL-like запитів над великими даними (перетворює SQL-запити в серію MapReduce-завдань);
- Pig — мова програмування для аналізу даних на високому рівні. Один рядок коду цією мовою може перетворитися на послідовність MapReduce-завдань;
- Hbase — колонкова база даних, що реалізує парадигму BigTable;
- Cassandra — високопродуктивна розподілена key-value база даних;
- ZooKeeper — сервіс для розподіленого зберігання конфігурації та синхронізації змін цієї конфігурації;
- Mahout — бібліотека та движок машинного навчання на великих даних.
Особливо хотілося б відзначити проєкт Apache Spark, який є рушієм для розподіленої обробки даних. Apache Spark зазвичай використовує компоненти Hadoop, такі як HDFS та YARN для своєї роботи, при цьому сам останнім часом став популярнішим, ніж Hadoop:
Деяким з переразованих компонент будуть присвячені окремі статті цього циклу матеріалів, а поки що розберемо, яким чином можна почати працювати з Hadoop і застосовувати його на практиці.
2. Запуск MapReduce програм на Hadoop
Тепер розглянемо, як запустити MapReduce-завдання на Hadoop. Як завдання скористаємося класичним прикладом WordCount, який був розібраний у попередній лекції.
Нагадаю формулювання завдання: є набір документів. Необхідно для кожного слова, що зустрічається в наборі документів, порахувати скільки разів зустрічається слово в наборі.
Рішення:
Map розбиває документ на слова і повертає безліч пар (word, 1).
Reduce підсумовує входження кожного слова:
|
|
Тепер завдання — запрограмувати це рішення у вигляді коду, який можна буде виконати на Hadoop та запустити.
3. Спосіб №1. Hadoop Streaming
Найпростіший спосіб запустити MapReduce-програму на Hadoop — скористатися streaming-інтерфейсом Hadoop. Streaming-інтерфейс передбачає, що map та reduce реалізовані у вигляді програм, які приймають дані зі stdin та видають результат на stdout.
Програма, яка виконує функцію map називається mapper. Програма, яка виконує reduce, називається відповідно reducer.
Streaming інтерфейс за замовчуванням передбачає, що один вхідний рядок до mapper або reducer відповідає одному вхідному запису для map.
Виведення mapper-a потрапляє на вхід reducer-у у вигляді пар (ключ, значення), водночас всі пари, що відповідають одному ключу:
- Гарантовано будуть оброблені одним запуском reducer-a;
- Будуть подані на вхід підряд (тобто якщо один reducer обробляє кілька різних ключів, вхід буде згрупований за ключем).
Отже, реалізуємо mapper і reducer на python:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
Дані, які буде обробляти Hadoop, повинні зберігатися на HDFS. Завантажимо наші статті та покладемо на HDFS. Для цього потрібно скористатися командою hadoop fs:
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
Утиліта hadoop fs підтримує велику кількість методів для маніпуляцій з файловою системою, багато з яких один в один повторюють стандартні утиліти linux.
Тепер запустимо streaming-завдання:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
Утиліта yarn служить для запуску та керування різними програмами (у тому числі map-reduce based) на кластері. Hadoop-streaming.jar — це саме один із прикладів такого yarn-додатку.
Далі йдуть параметри запуску:
- input — папка з вихідними даними на hdfs;
- output — папка на hdfs, куди потрібно покласти результат;
- file — файли, які потрібні у процесі роботи map-reduce завдання;
- mapper — консольна команда, яка використовуватиметься для map-стадії;
- reduce — консольна команда, яка буде використовуватися для reduce-стадії.
Після запуску в консолі можна буде побачити прогрес виконання завдання та URL для перегляду більш детальної інформації про завдання.
В інтерфейсі доступному за цією URL можна дізнатися більш детальний статус виконання завдання, подивитися логи кожного мапера і редьюсера (що дуже корисно у випадку із завданнями, що впали).
Результат роботи після успішного виконання складається на HDFS в папку, яку ми вказали в полі output. Переглянути її зміст можна за допомогою команди hadoop fs -ls lenta_wordcount.
Сам результат можна отримати таким чином:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
с
41
що
43
на
82
і
111
в
194
Команда hadoop fs -text видає вміст папки в текстовому вигляді. Я відсортував результат за кількістю слів. Як і очікувалося, найчастіші слова в мові — прийменники.
4.4 Спосіб №2: використовуємо Java
Сам по собі Hadoop написаний на java, і нативний інтерфейс у Hadoop-і теж Java-based. Покажемо, як виглядає нативний java-додаток для wordcount:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Цей клас робить абсолютно те саме, що наш приклад на Python. Ми створюємо класи TokenizerMapper та IntSumReducer, успадковуючи їх від класів Mapper та Reducer відповідно. Класи, що передаються як параметри шаблону, вказують типи вхідних та вихідних значень. Нативний API передбачає, що функції map на вхід подається пара ключ-значення. Оскільки в нашому випадку ключ порожній, як тип ключа ми визначаємо просто Object.
У методі Main ми заводимо mapreduce-завдання та визначаємо її параметри: ім'я, mapper та reducer, шлях у HDFS, де знаходяться вхідні дані та куди покласти результат. Для компіляції нам потрібні hadoop-івські бібліотеки. Я використовую для складання Maven, для якого у cloudera є репозиторій. Інструкції щодо його налаштування можна знайти за посиланням. У результаті файл pom.xmp (який використовується maven-ом для опису складання проєкту) у мене вийшов такий:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Зберемо проект у jar-пакет:
mvn clean package
Після складання проєкту до jar-файлу запуск відбувається схожим чином, як і у разі streaming-інтерфейсу:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Чекаємо виконання та перевіряємо результат:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
з
41
що
43
на
82
і
111
в
194
Як можна здогадатися, результат виконання нашої нативної програми збігається з результатом streaming-додатку, який ми запустили попереднім способом.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ