Ebben az oktatóanyagban megtanulja használni a Hadoop-ot a MapReduce példákkal. A felhasznált bemeneti adatok a SalesJan2009.csv. Ez az értékesítéssel kapcsolatos információkat tartalmaz, például a termék nevét, árát, fizetési módját, városát, az ügyfél országát stb. A cél az egyes országokban eladott termékek számának megismerése.
Ebben az oktatóanyagban megtanulja-
- Első Hadoop MapReduce program
- A SalesMapper osztály magyarázata
- A SalesCountryReducer osztály magyarázata
- A SalesCountryDriver osztály magyarázata
Első Hadoop MapReduce program
Ebben a MapReduce oktatóanyagban elkészítjük az első Java MapReduce programunkat:
Győződjön meg arról, hogy telepítette a Hadoop programot. Mielőtt elkezdené a tényleges folyamatot, változtassa meg a felhasználót „hduser” -re (a Hadoop konfiguráció közben használt azonosítót válthat a Hadoop programozási konfiguráció során használt felhasználói azonosítóra).
su - hduser_
1. lépés)
Hozzon létre egy új könyvtárat a MapReduceTutorial névvel shwon néven az alábbi MapReduce példában
sudo mkdir MapReduceTutorial
Adjon engedélyeket
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Fájlok letöltése itt
Ellenőrizze a fájlok engedélyeit
és ha hiányoznak az „olvasási” engedélyek, akkor adja meg az
2. lépés)
Exportálja a classpath-ot az alábbi Hadoop-példa szerint
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
3. lépés)
Fordítson le Java fájlokat (ezek a fájlok megtalálhatók a Final-MapReduceHandsOn könyvtárban ). Osztályfájljai a csomag könyvtárba kerülnek
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Ezt a figyelmeztetést nyugodtan figyelmen kívül lehet hagyni.
Ez az összeállítás létrehoz egy könyvtárat egy aktuális könyvtárban, amelynek neve a java forrásfájlban megadott csomagnév (azaz esetünkben a SalesCountry ), és az összes lefordított osztályfájlt beleteszi.
4. lépés)
Hozzon létre egy új fájlt Manifest.txt
sudo gedit Manifest.txt
adja hozzá a következő sorokat,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver a főosztály neve. Kérjük, vegye figyelembe, hogy a sor végén meg kell nyomnia az Enter billentyűt.
5. lépés)
Hozzon létre egy Jar fájlt
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Ellenőrizze, hogy a jar fájl létrejött-e
6. lépés)
Indítsa el a Hadoop alkalmazást
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
7. lépés)
Másolja a FileJan2009.csv fájlt a ~ / inputMapReduce fájlba
Most használja az alábbi parancsot a ~ / inputMapReduce másolásához HDFS-be.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Ezt a figyelmeztetést nyugodtan figyelmen kívül hagyhatjuk.
Ellenőrizze, hogy a fájl valóban másolva van-e.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
8. lépés)
Futtassa a MapReduce feladatot
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Ez létrehoz egy mapreduce_output_sales nevű kimeneti könyvtárat a HDFS-en. A könyvtár tartalma egy fájl lesz, amely országonként tartalmazza a termékértékesítést.
9. lépés)
Az eredmény a parancs interfészén keresztül látható,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Az eredmények egy webes felületen keresztül is megtekinthetők
Nyissa meg r webböngészőben.
Most válassza a "Tallózás a fájlrendszerben" lehetőséget, és keresse meg a / mapreduce_output_sales fájlt
Nyissa meg az r-00000 részt
A SalesMapper osztály magyarázata
Ebben a szakaszban megértjük a SalesMapper osztály megvalósítását.
1. Először megadjuk az osztályunk számára a csomag nevét. A SalesCountry a csomagunk neve. Felhívjuk figyelmét, hogy az összeállítás kimenete, a SalesMapper.class , egy könyvtárba kerül, amely a csomag neve: SalesCountry .
Ezt követően könyvtári csomagokat importálunk.
Az alábbi pillanatkép a SalesMapper class- megvalósítását mutatja be
Minta kód magyarázat:
1. SalesMapper osztály definíció-
a public class SalesMapper kiterjeszti a MapReduceBase megvalósítja a Mapper
Minden mapper osztályt ki kell terjeszteni a MapReduceBase osztályból, és meg kell valósítania a Mapper felületet.
2. A „térkép” függvény meghatározása
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
A Mapper osztály fő része egy 'map ()' metódus, amely négy argumentumot fogad el.
A „map ()” módszer minden hívásakor átadódik egy kulcs-érték pár ( ebben a kódban „kulcs” és „érték” ).
A 'map ()' módszer az argumentumként kapott beviteli szöveg felosztásával kezdődik. A tokenizer segítségével ezeket a sorokat szavakra bontja.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Itt a ',' jelölőt használjuk elválasztóként.
Ezt követően egy pár képződik az 'SingleCountryData' tömb 7. indexében lévő rekord és az '1' érték felhasználásával .
output.collect (új szöveg (SingleCountryData [7]), egy);
Mi választja rekord 7. index, mert szükségünk van Country adatokat, és ez található a 7-es index a tömbben „SingleCountryData” .
Felhívjuk figyelmét, hogy a bemenő adatok az alábbi formátumban (ahol Country pedig 7 -én index, 0 kiindulási index) -
Tranzakció_dátum, Termék, Ár, Fizetési Típus, Név, Város, Állam, Ország , Fiók_Létrehozva, Utolsó Belépés, Szélesség, Hosszúság
A mapper kimenete ismét egy kulcs-érték pár, amelyet az 'OutputCollector' 'collect ()' módszerével adnak ki .
A SalesCountryReducer osztály magyarázata
Ebben a szakaszban megértjük a SalesCountryReducer osztály megvalósítását.
1. Először megadjuk az osztályunk számára a csomag nevét. A SalesCountry az out csomag neve. Felhívjuk figyelmét, hogy az összeállítás, a SalesCountryReducer.class kimenete egy könyvtárba kerül, amelynek neve a csomag neve: SalesCountry .
Ezt követően könyvtári csomagokat importálunk.
Az alábbi pillanatkép a SalesCountryReducer class- megvalósítását mutatja be
Kód Magyarázat:
1. SalesCountryReducer Class Definition-
public class SalesCountryReducer kiterjeszti a MapReduceBase megvalósítja a Reducer
Itt az első két adattípus, a „Text” és az „IntWritable” a bemeneti kulcsérték adattípusa a reduktorhoz.
A leképező kimenete
Az utolsó két adattípus, a „Text” és az „IntWritable” a reduktor által generált kimenet adattípusa kulcs-érték pár formájában.
Minden reduktor osztályt ki kell terjeszteni a MapReduceBase osztályból, és meg kell valósítania a Reducer interfészt.
2. A „csökkentés” funkció definiálása
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
A reduc () metódus bevitele egy kulcs, amely több értéket tartalmaz.
Például a mi esetünkben
Ezt a reduktornak adjuk meg, mint
Tehát az ilyen formájú argumentumok elfogadásához az első két adattípust kell használni, például: Szöveg és Iterátor
A következő argumentum OutputCollector
A reduc () módszer a kulcsérték átmásolásával és a frekvenciaszám 0-ra inicializálásával kezdődik.
Szövegkulcs = t_kulcs; int frequencyForCountry = 0;
Ezután a ' while' ciklus segítségével végigvezetjük a kulcshoz tartozó értékek listáját, és az összes érték összegzésével kiszámoljuk a végső frekvenciát.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Most az eredményt a kimeneti gyűjtőhöz toljuk kulcs és kapott frekvenciaszám formájában .
A kód alatt ezt teszi-
output.collect(key, new IntWritable(frequencyForCountry));
A SalesCountryDriver osztály magyarázata
Ebben a szakaszban megértjük a SalesCountryDriver osztály megvalósítását
1. Először megadjuk az osztályunk számára a csomag nevét. A SalesCountry az out csomag neve. Felhívjuk figyelmét, hogy az összeállítás, a SalesCountryDriver.class kimenete a csomag neve: SalesCountry lesz .
Itt egy sor adja meg a csomag nevét, majd a kódot a könyvtárcsomagok importálásához.
2. Adjon meg egy illesztőprogram osztályt, amely új kliens feladatot, konfigurációs objektumot hoz létre, és hirdeti a Mapper és a Reducer osztályokat.
A járművezető osztály felelős azért, hogy a MapReduce munkánkat Hadoop-ban futtassuk. Ebben az osztályban megadjuk a feladat nevét, a bemenet / kimenet adattípusát, valamint a leképező és a csökkentő osztályok nevét .
3. Az alábbi kódrészletben be- és kimeneti könyvtárakat állítunk be, amelyeket a bemeneti adatkészlet felhasználására és a kimenet előállítására használunk.
Az arg [0] és az arg [1] azok a parancssori argumentumok, amelyeket a MapReduce hands-on paranccsal adtak meg, azaz
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Indítsa el a munkánkat
A kód alatt a MapReduce job végrehajtásának megkezdése
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}