Zanim napiszemy pierwszego joba trzeba przygotować sobie środowisko. Obsługa wszystkiego z konsoli może z czasem być nużąca (chociaż ja tam lubię;).
Ja do dewelopmentu używam
NetBeansa i polecam wtyczkę
Karmaspehere Community Edition. Pozwala ona w łatwy sposób definiować i odpalać sobie zadania Hadoopa.
Aby zainstalować tę wtyczkę należy dodać do listy 'update sites' adres: http://hadoopstudio.org/updates/updates.xml. Ja zainstalowałem wszystkie 4 dostępne pluginy od Karmasphere. Polecam też się zarejestrować i aktywować plugin, bo będą się pojawiać komunikaty o aktywacji (ale chyba można pracować bez niej).
Ok, możemy zacząć pisać :-) Projekt oparłem o maven, pom.xml nie jest zbyt rozdbudowany:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>pl.lstachowiak</groupId>
<artifactId>HadoopSamples</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>HadoopSamples</name>
<url>http://blog.lstachowiak.pl</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mahout.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
</project>
Problem jaki wybrałem dla pierwszego przykładu to analiza statystyk serwera Apache. Bierzemy plik access.log i analizujemy typy żądań do serwera. Na końcu chcemy mieć liczbę żądań każdego typu.
Mój pomysł na Map i Reduce jest następujący:
Czyli:
- Hadoop wczytuje i dzieli plik na tzw '
input splits'
- przekazuje każdy split do funkcji map, gdzie kluczem jest numer linii w pliku a wartością sama linia
- implementowana przez nas funkcja
map sprawdza jaki tym żądania HTTP znajduje się w linii i zwraca jako klucz typ żądania a jako wartość liczbę 1 (umownie sobie tak robimy)
- Hadoop wykonuje tzw.
shuffle grupując nam wyniki z map w pary [TYP] -> [KOLEKCJA JEDYNEK]
- nasza funkcja
reduce jedyne co robi to zlicza liczbę elementów w kolekcji i zwraca jako klucz typ żądania a jako wartość liczbę elementów w kolekcji co jest zarazem wynikiem końcowym
- Hadoop zapisuje dane na dysk
(Bardzo) Możliwe, że nie jest to idealne rozwiązanie, ale nie mam jeszcze wielkiego doświadczenia z Hadoop ;)
Przejdźmy do implementacji. Funkcja map:
HttpMethodMapper.java:
package pl.lstachowiak.hadoop.sample1;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class HttpMethodMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 1
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
HttpMethod method = getMethodFromLine(value); // 2
context.write(new Text(method.name()), new IntWritable(1)); // 3
}
HttpMethod getMethodFromLine(Text value) {
if (value.find(HttpMethod.CONNECT.name()) != -1) {
return HttpMethod.CONNECT;
}
if (value.find(HttpMethod.DELETE.name()) != -1) {
return HttpMethod.DELETE;
}
if (value.find(HttpMethod.GET.name()) != -1) {
return HttpMethod.GET;
}
if (value.find(HttpMethod.HEAD.name()) != -1) {
return HttpMethod.HEAD;
}
if (value.find(HttpMethod.OPTIONS.name()) != -1) {
return HttpMethod.OPTIONS;
}
if (value.find(HttpMethod.PATH.name()) != -1) {
return HttpMethod.PATH;
}
if (value.find(HttpMethod.POST.name()) != -1) {
return HttpMethod.POST;
}
if (value.find(HttpMethod.PUT.name()) != -1) {
return HttpMethod.PUT;
}
if (value.find(HttpMethod.TRACE.name()) != -1) {
return HttpMethod.TRACE;
}
return HttpMethod.UNKNOWN;
}
}
Krótki komentarz do tego co się dzieje w kodzie:
1 - klasa mappera musi (od wersji API 0.20) rozszerzać klasę Mapper. Parametrami są kolejno:
- typ klucza (numer linii)
- typ wartości (linia w pliku)
- typ klucza wynikowego (typ żądania http)
- typ wartości wynikowe (liczba 1)
2 - pobranie typu żądania z linii, wiem, że rozwiązanie jest dalekie od idealnego, ale nie o to chodzi w przykładzie ;)
3 - zapis wyniku
Jak widać Hadoop posiada własny zestaw typów danych. W większości są to obudowane typy podstawowe. Stworzone zostały one aby w troszkę lepszy sposób serializować dane i przesyłać przez sieć. Zaleca się ich używanie. Typ HttpRequest to zwykły enum z listą dostępnych typów żądań HTTP. Dostępny jest w źródłach.
Funkcja reduce:
HttpMethodReducer.java:
package pl.lstachowiak.hadoop.sample1;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class HttpMethodReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 1
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 2
int counter = countValues(values); // 3
context.write(key, new IntWritable(counter)); // 4
}
<T> int countValues(Iterable<T> values) {
int counter = 0;
for (T value : values) {
counter++;
}
return counter;
}
}Ponownie krótki komentarz:
1 - musimy rozszerzać klasę Reducer, parametry kolejno to:
- typ klucza z Map
- typ wartości z Map
- typ klucza wynikowego
- typ wartości wynikowej
2 - tutaj widać efekt działania operacji shuffle. Zamiast otrzymywać tą jedynkę z funkcji Map otrzymujemy iterator po kolekcji jedynek (w naszym przykładzie).
3 - zliczamy elementy
4 - zapisujemy w wynikach :-)
No to jeszcze program do uruchomienia zadania:
HttpMethodJob.java:
package pl.lstachowiak.hadoop.sample1;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HttpMethodJob {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: HttpMethodJob <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(HttpMethodJob.class); // 1
FileInputFormat.addInputPath(job, new Path(args[0])); // 2
FileOutputFormat.setOutputPath(job, new Path(args[1])); // 3
job.setMapperClass(HttpMethodMapper.class); // 4
job.setReducerClass(HttpMethodReducer.class); // 5
job.setOutputKeyClass(Text.class); // 6
job.setOutputValueClass(IntWritable.class); // 7
System.exit(job.waitForCompletion(true) ? 0 : 1); // gogogo!
}
}
1 - tworzymy zadanie i podajemy klasę główna
2,3 - podajemy źródło danych oraz miejsce zapisu danych wynikowych (wczytywane z argumentów)
4,5 - podajemy klasę mappera i reducera
6,7 - konfiguracja typów danych wynikowych
I uruchomienie zadania :-)
Zanim jednak zaczniemy potrzebne są jeszcze jakieś dane do analizy. Ja wygooglałem coś
tutaj (4,3MB).
Uruchomienie
Tutaj obrazkowo:) Przechodzimy do zakładki Services (NetBeans) -> Servers -> Hadoop Jobs -> New Job. Wybieramy nazwę dla Joba i typ 'Hadoop Job from pre-existing JAR file'.
Następnie wpisujemy ścieżkę do JARa (ja podałem z targeta) oraz klasę z funkcją main.
W następnym oknie wybieramy klaster dla wersji 0.20 oraz wpisujemy argumenty dla zadania. Kolejno plik access.log oraz katalog w którym mają być składowane wyniki końcowe.
I klikamy 'finish' :-)
Teraz możemy odpalać!
Prawy guzik na nowego Joba -> Run Job -> Run.
Wszystko powinno pójść dobrze, stan konsoli na końcu (statystyki):
11/06/19 11:53:26 INFO mapred.JobClient: Job complete: job_local_0001
11/06/19 11:53:26 INFO mapred.JobClient: Counters: 12
11/06/19 11:53:26 INFO mapred.JobClient: FileSystemCounters
11/06/19 11:53:26 INFO mapred.JobClient: FILE_BYTES_READ=18153346
11/06/19 11:53:26 INFO mapred.JobClient: FILE_BYTES_WRITTEN=9462349
11/06/19 11:53:26 INFO mapred.JobClient: Map-Reduce Framework
11/06/19 11:53:26 INFO mapred.JobClient: Reduce input groups=4
11/06/19 11:53:26 INFO mapred.JobClient: Combine output records=0
11/06/19 11:53:26 INFO mapred.JobClient: Map input records=17484
11/06/19 11:53:26 INFO mapred.JobClient: Reduce shuffle bytes=0
11/06/19 11:53:26 INFO mapred.JobClient: Reduce output records=4
11/06/19 11:53:26 INFO mapred.JobClient: Spilled Records=34968
11/06/19 11:53:26 INFO mapred.JobClient: Map output bytes=142014
11/06/19 11:53:26 INFO mapred.JobClient: Combine input records=0
11/06/19 11:53:26 INFO mapred.JobClient: Map output records=17484
11/06/19 11:53:26 INFO mapred.JobClient: Reduce input records=17484
W katalogu wynikowym powinien znajdować się także plik part-r-00000 z zawartością:
GET 15341
HEAD 1744
POST 398
PUT 1
I mamy gotowe statystyki :-)
Pełne źródła dostępne są
tutaj.