Ja do dewelopmentu używam NetBeansa i polecam wtyczkę Karmaspehere Community Edition. Pozwala ona w łatwy sposób definiować i odpalać sobie zadania Hadoopa.
Aby zainstalować tę wtyczkę należy dodać do listy 'update sites' adres: http://hadoopstudio.org/updates/updates.xml. Ja zainstalowałem wszystkie 4 dostępne pluginy od Karmasphere. Polecam też się zarejestrować i aktywować plugin, bo będą się pojawiać komunikaty o aktywacji (ale chyba można pracować bez niej).
Ok, możemy zacząć pisać :-) Projekt oparłem o maven, pom.xml nie jest zbyt rozdbudowany:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>pl.lstachowiak</groupId> <artifactId>HadoopSamples</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>HadoopSamples</name> <url>http://blog.lstachowiak.pl</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.8</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.mahout.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.1</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> </dependencies> </project>
Problem jaki wybrałem dla pierwszego przykładu to analiza statystyk serwera Apache. Bierzemy plik access.log i analizujemy typy żądań do serwera. Na końcu chcemy mieć liczbę żądań każdego typu.
Mój pomysł na Map i Reduce jest następujący:
Czyli:
- Hadoop wczytuje i dzieli plik na tzw 'input splits'
- przekazuje każdy split do funkcji map, gdzie kluczem jest numer linii w pliku a wartością sama linia
- implementowana przez nas funkcja map sprawdza jaki tym żądania HTTP znajduje się w linii i zwraca jako klucz typ żądania a jako wartość liczbę 1 (umownie sobie tak robimy)
- Hadoop wykonuje tzw. shuffle grupując nam wyniki z map w pary [TYP] -> [KOLEKCJA JEDYNEK]
- nasza funkcja reduce jedyne co robi to zlicza liczbę elementów w kolekcji i zwraca jako klucz typ żądania a jako wartość liczbę elementów w kolekcji co jest zarazem wynikiem końcowym
- Hadoop zapisuje dane na dysk
(Bardzo) Możliwe, że nie jest to idealne rozwiązanie, ale nie mam jeszcze wielkiego doświadczenia z Hadoop ;)
Przejdźmy do implementacji. Funkcja map:
HttpMethodMapper.java:
package pl.lstachowiak.hadoop.sample1; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class HttpMethodMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 1 @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { HttpMethod method = getMethodFromLine(value); // 2 context.write(new Text(method.name()), new IntWritable(1)); // 3 } HttpMethod getMethodFromLine(Text value) { if (value.find(HttpMethod.CONNECT.name()) != -1) { return HttpMethod.CONNECT; } if (value.find(HttpMethod.DELETE.name()) != -1) { return HttpMethod.DELETE; } if (value.find(HttpMethod.GET.name()) != -1) { return HttpMethod.GET; } if (value.find(HttpMethod.HEAD.name()) != -1) { return HttpMethod.HEAD; } if (value.find(HttpMethod.OPTIONS.name()) != -1) { return HttpMethod.OPTIONS; } if (value.find(HttpMethod.PATH.name()) != -1) { return HttpMethod.PATH; } if (value.find(HttpMethod.POST.name()) != -1) { return HttpMethod.POST; } if (value.find(HttpMethod.PUT.name()) != -1) { return HttpMethod.PUT; } if (value.find(HttpMethod.TRACE.name()) != -1) { return HttpMethod.TRACE; } return HttpMethod.UNKNOWN; } }Krótki komentarz do tego co się dzieje w kodzie:
1 - klasa mappera musi (od wersji API 0.20) rozszerzać klasę Mapper. Parametrami są kolejno:
- typ klucza (numer linii)
- typ wartości (linia w pliku)
- typ klucza wynikowego (typ żądania http)
- typ wartości wynikowe (liczba 1)
2 - pobranie typu żądania z linii, wiem, że rozwiązanie jest dalekie od idealnego, ale nie o to chodzi w przykładzie ;)
3 - zapis wyniku
Jak widać Hadoop posiada własny zestaw typów danych. W większości są to obudowane typy podstawowe. Stworzone zostały one aby w troszkę lepszy sposób serializować dane i przesyłać przez sieć. Zaleca się ich używanie. Typ HttpRequest to zwykły enum z listą dostępnych typów żądań HTTP. Dostępny jest w źródłach.
Funkcja reduce:
HttpMethodReducer.java:
package pl.lstachowiak.hadoop.sample1; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class HttpMethodReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 1 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 2 int counter = countValues(values); // 3 context.write(key, new IntWritable(counter)); // 4 } <T> int countValues(Iterable<T> values) { int counter = 0; for (T value : values) { counter++; } return counter; } }Ponownie krótki komentarz:
1 - musimy rozszerzać klasę Reducer, parametry kolejno to:
- typ klucza z Map
- typ wartości z Map
- typ klucza wynikowego
- typ wartości wynikowej
2 - tutaj widać efekt działania operacji shuffle. Zamiast otrzymywać tą jedynkę z funkcji Map otrzymujemy iterator po kolekcji jedynek (w naszym przykładzie).
3 - zliczamy elementy
4 - zapisujemy w wynikach :-)
No to jeszcze program do uruchomienia zadania:
HttpMethodJob.java:
package pl.lstachowiak.hadoop.sample1; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HttpMethodJob { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: HttpMethodJob <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(HttpMethodJob.class); // 1 FileInputFormat.addInputPath(job, new Path(args[0])); // 2 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 3 job.setMapperClass(HttpMethodMapper.class); // 4 job.setReducerClass(HttpMethodReducer.class); // 5 job.setOutputKeyClass(Text.class); // 6 job.setOutputValueClass(IntWritable.class); // 7 System.exit(job.waitForCompletion(true) ? 0 : 1); // gogogo! } }
1 - tworzymy zadanie i podajemy klasę główna
2,3 - podajemy źródło danych oraz miejsce zapisu danych wynikowych (wczytywane z argumentów)
4,5 - podajemy klasę mappera i reducera
6,7 - konfiguracja typów danych wynikowych
I uruchomienie zadania :-)
Zanim jednak zaczniemy potrzebne są jeszcze jakieś dane do analizy. Ja wygooglałem coś tutaj (4,3MB).
Uruchomienie
Tutaj obrazkowo:) Przechodzimy do zakładki Services (NetBeans) -> Servers -> Hadoop Jobs -> New Job. Wybieramy nazwę dla Joba i typ 'Hadoop Job from pre-existing JAR file'.
Następnie wpisujemy ścieżkę do JARa (ja podałem z targeta) oraz klasę z funkcją main.
W następnym oknie wybieramy klaster dla wersji 0.20 oraz wpisujemy argumenty dla zadania. Kolejno plik access.log oraz katalog w którym mają być składowane wyniki końcowe.
I klikamy 'finish' :-)
Teraz możemy odpalać!
Prawy guzik na nowego Joba -> Run Job -> Run.
11/06/19 11:53:26 INFO mapred.JobClient: Job complete: job_local_0001 11/06/19 11:53:26 INFO mapred.JobClient: Counters: 12 11/06/19 11:53:26 INFO mapred.JobClient: FileSystemCounters 11/06/19 11:53:26 INFO mapred.JobClient: FILE_BYTES_READ=18153346 11/06/19 11:53:26 INFO mapred.JobClient: FILE_BYTES_WRITTEN=9462349 11/06/19 11:53:26 INFO mapred.JobClient: Map-Reduce Framework 11/06/19 11:53:26 INFO mapred.JobClient: Reduce input groups=4 11/06/19 11:53:26 INFO mapred.JobClient: Combine output records=0 11/06/19 11:53:26 INFO mapred.JobClient: Map input records=17484 11/06/19 11:53:26 INFO mapred.JobClient: Reduce shuffle bytes=0 11/06/19 11:53:26 INFO mapred.JobClient: Reduce output records=4 11/06/19 11:53:26 INFO mapred.JobClient: Spilled Records=34968 11/06/19 11:53:26 INFO mapred.JobClient: Map output bytes=142014 11/06/19 11:53:26 INFO mapred.JobClient: Combine input records=0 11/06/19 11:53:26 INFO mapred.JobClient: Map output records=17484 11/06/19 11:53:26 INFO mapred.JobClient: Reduce input records=17484W katalogu wynikowym powinien znajdować się także plik part-r-00000 z zawartością:
GET 15341 HEAD 1744 POST 398 PUT 1
I mamy gotowe statystyki :-)
Pełne źródła dostępne są tutaj.
Brak komentarzy:
Prześlij komentarz