niedziela, 19 czerwca 2011

Pierwszy Job z Hadoop

Zanim napiszemy pierwszego joba trzeba przygotować sobie środowisko. Obsługa wszystkiego z konsoli może z czasem być nużąca (chociaż ja tam lubię;).

Ja do dewelopmentu używam NetBeansa i polecam wtyczkę Karmaspehere Community Edition. Pozwala ona w łatwy sposób definiować i odpalać sobie zadania Hadoopa.
Aby zainstalować tę wtyczkę należy dodać do listy 'update sites' adres: http://hadoopstudio.org/updates/updates.xml. Ja zainstalowałem wszystkie 4 dostępne pluginy od Karmasphere. Polecam też się zarejestrować i aktywować plugin, bo będą się pojawiać komunikaty o aktywacji (ale chyba można pracować bez niej).

Ok, możemy zacząć pisać :-) Projekt oparłem o maven, pom.xml nie jest zbyt rozdbudowany:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>pl.lstachowiak</groupId>
    <artifactId>HadoopSamples</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>HadoopSamples</name>
    <url>http://blog.lstachowiak.pl</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.8</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.apache.mahout.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.1</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
</project>

Problem jaki wybrałem dla pierwszego przykładu to analiza statystyk serwera Apache. Bierzemy plik access.log i analizujemy typy żądań do serwera. Na końcu chcemy mieć liczbę żądań każdego typu.
Mój pomysł na Map i Reduce jest następujący:
Czyli:
- Hadoop wczytuje i dzieli plik na tzw 'input splits'
- przekazuje każdy split do funkcji map, gdzie kluczem jest numer linii w pliku a wartością sama linia
- implementowana przez nas funkcja map sprawdza jaki tym żądania HTTP znajduje się w linii i zwraca jako klucz typ żądania a jako wartość liczbę 1 (umownie sobie tak robimy)
- Hadoop wykonuje tzw. shuffle grupując nam wyniki z map w pary [TYP] -> [KOLEKCJA JEDYNEK]
- nasza funkcja reduce jedyne co robi to zlicza liczbę elementów w kolekcji i zwraca jako klucz typ żądania a jako wartość liczbę elementów w kolekcji co jest zarazem wynikiem końcowym
- Hadoop zapisuje dane na dysk

(Bardzo) Możliwe, że nie jest to idealne rozwiązanie, ale nie mam jeszcze wielkiego doświadczenia z Hadoop ;)

Przejdźmy do implementacji. Funkcja map:
HttpMethodMapper.java:
package pl.lstachowiak.hadoop.sample1;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class HttpMethodMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 1

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        HttpMethod method = getMethodFromLine(value); // 2
        context.write(new Text(method.name()), new IntWritable(1)); // 3
    }

    HttpMethod getMethodFromLine(Text value) {

        if (value.find(HttpMethod.CONNECT.name()) != -1) {
            return HttpMethod.CONNECT;
        }

        if (value.find(HttpMethod.DELETE.name()) != -1) {
            return HttpMethod.DELETE;
        }

        if (value.find(HttpMethod.GET.name()) != -1) {
            return HttpMethod.GET;
        }

        if (value.find(HttpMethod.HEAD.name()) != -1) {
            return HttpMethod.HEAD;
        }

        if (value.find(HttpMethod.OPTIONS.name()) != -1) {
            return HttpMethod.OPTIONS;
        }

        if (value.find(HttpMethod.PATH.name()) != -1) {
            return HttpMethod.PATH;
        }

        if (value.find(HttpMethod.POST.name()) != -1) {
            return HttpMethod.POST;
        }

        if (value.find(HttpMethod.PUT.name()) != -1) {
            return HttpMethod.PUT;
        }

        if (value.find(HttpMethod.TRACE.name()) != -1) {
            return HttpMethod.TRACE;
        }

        return HttpMethod.UNKNOWN;
    }
}
Krótki komentarz do tego co się dzieje w kodzie:
1 - klasa mappera musi (od wersji API 0.20) rozszerzać klasę Mapper. Parametrami są kolejno:
- typ klucza (numer linii)
- typ wartości (linia w pliku)
- typ klucza wynikowego (typ żądania http)
- typ wartości wynikowe (liczba 1)
2 - pobranie typu żądania z linii, wiem, że rozwiązanie jest dalekie od idealnego, ale nie o to chodzi w przykładzie ;)
3 - zapis wyniku

Jak widać Hadoop posiada własny zestaw typów danych. W większości są to obudowane typy podstawowe. Stworzone zostały one aby w troszkę lepszy sposób serializować dane i przesyłać przez sieć. Zaleca się ich używanie. Typ HttpRequest to zwykły enum z listą dostępnych typów żądań HTTP. Dostępny jest w źródłach.

Funkcja reduce:
HttpMethodReducer.java:
package pl.lstachowiak.hadoop.sample1;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;


public class HttpMethodReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 1

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 2
        int counter = countValues(values); // 3
        context.write(key, new IntWritable(counter)); // 4
    }
    
    <T> int countValues(Iterable<T> values) {
        int counter = 0;
        for (T value : values) {
            counter++;
        }
        
        return counter;
    }
}
Ponownie krótki komentarz:
1 - musimy rozszerzać klasę Reducer, parametry kolejno to:
- typ klucza z Map
- typ wartości z Map
- typ klucza wynikowego
- typ wartości wynikowej
2 - tutaj widać efekt działania operacji shuffle. Zamiast otrzymywać tą jedynkę z funkcji Map otrzymujemy iterator po kolekcji jedynek (w naszym przykładzie).
3 - zliczamy elementy
4 - zapisujemy w wynikach :-)

No to jeszcze program do uruchomienia zadania:
HttpMethodJob.java:
package pl.lstachowiak.hadoop.sample1;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class HttpMethodJob {
    
    public static void main(String[] args) throws Exception {
        
        if (args.length != 2) {
            System.err.println("Usage: HttpMethodJob <input path> <output path>");
            System.exit(-1);
        }
        
        Job job = new Job();
        job.setJarByClass(HttpMethodJob.class); // 1
        
        FileInputFormat.addInputPath(job, new Path(args[0])); // 2
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 3
        
        job.setMapperClass(HttpMethodMapper.class); // 4
        job.setReducerClass(HttpMethodReducer.class); // 5
        
        job.setOutputKeyClass(Text.class); // 6
        job.setOutputValueClass(IntWritable.class); // 7
        
        System.exit(job.waitForCompletion(true) ? 0 : 1); // gogogo!
        
    }
}

1 - tworzymy zadanie i podajemy klasę główna
2,3 - podajemy źródło danych oraz miejsce zapisu danych wynikowych (wczytywane z argumentów)
4,5 - podajemy klasę mappera i reducera
6,7 - konfiguracja typów danych wynikowych
I uruchomienie zadania :-)

Zanim jednak zaczniemy potrzebne są jeszcze jakieś dane do analizy. Ja wygooglałem coś tutaj (4,3MB).

Uruchomienie

Tutaj obrazkowo:) Przechodzimy do zakładki Services (NetBeans) -> Servers -> Hadoop Jobs -> New Job. Wybieramy nazwę dla Joba i typ 'Hadoop Job from pre-existing JAR file'.
Następnie wpisujemy ścieżkę do JARa (ja podałem z targeta) oraz klasę z funkcją main.
W następnym oknie wybieramy klaster dla wersji 0.20 oraz wpisujemy argumenty dla zadania. Kolejno plik access.log oraz katalog w którym mają być składowane wyniki końcowe.
I klikamy 'finish' :-)

Teraz możemy odpalać!
Prawy guzik na nowego Joba -> Run Job -> Run.

Wszystko powinno pójść dobrze, stan konsoli na końcu (statystyki):
11/06/19 11:53:26 INFO mapred.JobClient: Job complete: job_local_0001
11/06/19 11:53:26 INFO mapred.JobClient: Counters: 12
11/06/19 11:53:26 INFO mapred.JobClient:   FileSystemCounters
11/06/19 11:53:26 INFO mapred.JobClient:     FILE_BYTES_READ=18153346
11/06/19 11:53:26 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=9462349
11/06/19 11:53:26 INFO mapred.JobClient:   Map-Reduce Framework
11/06/19 11:53:26 INFO mapred.JobClient:     Reduce input groups=4
11/06/19 11:53:26 INFO mapred.JobClient:     Combine output records=0
11/06/19 11:53:26 INFO mapred.JobClient:     Map input records=17484
11/06/19 11:53:26 INFO mapred.JobClient:     Reduce shuffle bytes=0
11/06/19 11:53:26 INFO mapred.JobClient:     Reduce output records=4
11/06/19 11:53:26 INFO mapred.JobClient:     Spilled Records=34968
11/06/19 11:53:26 INFO mapred.JobClient:     Map output bytes=142014
11/06/19 11:53:26 INFO mapred.JobClient:     Combine input records=0
11/06/19 11:53:26 INFO mapred.JobClient:     Map output records=17484
11/06/19 11:53:26 INFO mapred.JobClient:     Reduce input records=17484
W katalogu wynikowym powinien znajdować się także plik part-r-00000 z zawartością:
GET 15341
HEAD 1744
POST 398
PUT 1

I mamy gotowe statystyki :-)
Pełne źródła dostępne są tutaj.

Brak komentarzy:

Prześlij komentarz