Hadoop – Map Reduce – Development – Java

Requirements

Here are some things that we need:

  • Java Compiler
  • Hadoop Jar files
  • Text Editor
  • Hadoop Installation

 

Sample Data – Introduction

Any well formatted ASCII\Unicode sample data can be used.  Interestingly enough most of the sample applications on the Internet use regular special characters (often enough comma) delimited datafiles.  One would expect XML.

But, to make things easier for those not as versed in XML let us you a simple ASCII file.

 

Sample Data – What type of data

To facilitate quick familiarization let us use a simple personal checkbook.

The columns \ fields will be:

  • payee
  • timestamp
  • category
  • amount

Our purpose will be to read the first and fourth column and aggregate the 4th column; BTW, the fourth column is the amount column and we will aggregate for each payee.

Text Data File (checkbook__201304.txt)

Home Depot//2013-02-09 12:09//House Keeping//10.00
Safeway//2013-02-23 09:12//Grocery//20.00
In and Out Burger//2013-02-19 12:19//Restaurant//30.00
USPS//2013-02-05 10:15//Office Supply//40.0
Shell//2013-02-03 16:17//Auto-Gas//50.00
Home Depot//2013-02-19 24:11//House Keeping//60.0
Safeway//2013-02-13 09:12//Grocery//70.0
Home Depot//2013-02-27 24:11//House Keeping//80.00
Safeway//2013-02-18 09:22//Grocery//90.
Toshiba//2013-02-19 11:45//Computer//2450.00
Lucent//2013-02-20 10:00//Communication//1000
Lucent//2013-02-21 13:00//Communication//3275.52

Hadoop Jar Files

On the Hadoop Server, issue find command to find Hadoop related Jar files:

find /  -name  "*hadoop*.jar" 2> /dev/null 

Hadoop - MapReduce - Development - Jar Files

Code

  • We have chosen to create three public classes (bookKeepingRoot, bookKeepingMap, bookKeepingReducer)
  • The bookKeepingRoot file will be the root/driver class that acts as the controller
  • The bookKeepingMap file will map the entrant data files into our two output columns – Key and Value
  • The bookKeepingReducer file will perform our analysis and “pivot” the data

bookKeepingRoot

  • Sets main class to bookKeeping.bookKeepingRoot.class – As our class (bookKeepingRoot) is part of a package, we have indicated the full package name
  • Sets job name to bookKeeping
  • Sets output key class to Text; that is a string field
  • Sets output value class to Double; a lot of examples on the Internet are integer simple count, but our example is targeting summarized amount balances
  • Sets mapper class to bookKeeping.bookKeepingMap.class – Also see that we are using package name
  • Sets combiner class to bookKeeping.bookKeepingReducer.class – Also see that we are using package name — The combiner class is a place holder
  • Sets reducer class to bookKeeping.bookKeepingReducer.class – Also see that we are using package name
  • Sets SetInputFormat to TextInputFormat.class
  • Sets SetOutputFormat to TextOutputFormat.class
  • Sets FileInputFormat to 2nd command line parameter
  • Sets FileOutputFormat to 3rd command line parameter
  • If the user does not pass in three command line arguments, then say so.  And, exit.


package bookKeeping;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class bookKeepingRoot
 {

    	private static String jobType = null;
        private static String invocationErr =

           "[operation] /path/to/2kh/files /path/to/output"

        public void setJobType(String jobType) 
	{	
	   this.jobType = jobType;
	}

	public static void main(String[] args) throws Exception 
	{

	    JobConf conf = new JobConf(bookKeeping.bookKeepingRoot.class);
	    conf.setJobName("bookKeeping");
	    conf.setOutputKeyClass(Text.class); 
	    conf.setOutputValueClass(DoubleWritable.class);

	    conf.setMapperClass(bookKeepingMap.class);
	    conf.setCombinerClass(bookKeeping.bookKeepingReducer.class);
	    conf.setReducerClass(bookKeeping.bookKeepingReducer.class);

	    if (args.length != 3) 
            {
		System.out.println("Usage:");
		System.out.println();
		System.exit(1);
	    }

	    bookKeepingRoot.jobType = args[0];

	    conf.setInputFormat(TextInputFormat.class);
	    conf.setOutputFormat(TextOutputFormat.class);
	    FileInputFormat.setInputPaths(conf, new Path(args[1]));
	    FileOutputFormat.setOutputPath(conf, new Path(args[2]));
	    JobClient.runJob(conf);

	} //main
}

bookKeepingMap – Method – map

  • Our text file is separator delimited
  • We use Java Split method to parse and get individual elements
  • The third method argument is the output argument.  Data to be pipelined is set via calling the collect method of output variable


package bookKeeping;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class bookKeepingMap 
extends MapReduceBase
implements Mapper
	<
           Object
         , Object
         , org.apache.hadoop.mapred.OutputCollector
         , org.apache.hadoop.mapred.Reporter>
{

   private Text locText = new Text();
   private static final String REGEX_NUMERIC = "\\d";
   private static long lRow = 0;
   private static String strJobName = null;

   private String line = null;

   private static String [] arrayData;
   private static String SPLITTER = "//";
   private static String HARDCODED_TAB = "\t"; 

   private String aMessage = null;

   private String strVendor = null;
   private String strAmount = null;
   private double dblAmount = -0;

   private DoubleWritable dblValue = new DoubleWritable();

   public void map( java.lang.Object objObject1
	 	    ,java.lang.Object objObject2
		    ,org.apache.hadoop.mapred.OutputCollector output
		    ,org.apache.hadoop.mapred.Reporter reporter
		 )
  throws IOException			   
  {

	line = objObject2.toString();

	strVendor = line.split(SPLITTER)[0];
	strAmount = line.split(SPLITTER)[3];

	lRow = lRow + 1;

	try
	{

	   dblAmount = Double.parseDouble(strAmount);
	}
	catch (Exception ex)
	{

	   aMessage = "Error with Column Amount : "
			  + " Row # : " + lRow 
			  + " Amount " + strAmount
			  + " Error " + ex.getMessage()
			  ;

			System.out.println(aMessage);

			System.exit(-2);

	}

        dblValue = new DoubleWritable(dblAmount);

	output.collect (new Text(strVendor), dblValue);

   } //map

} //class


BookKeeping – Reducer

bookKeepingReducer – Method – reduce

  • The map method arranges data into keys and corresponding values
  • For each key, sum up the corresponding value
  • The third method argument is the output argument.  Data to be pipelined is set via calling the collect method of output variable

package bookKeeping;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class bookKeepingReducer
extends MapReduceBase
  implements org.apache.hadoop.mapred.Reducer
		<Text, DoubleWritable, Text, DoubleWritable>

 {

	private DoubleWritable dblSummation = new DoubleWritable();

	public void
		 reduce(
			   Text key
		         , Iterator values
			 , OutputCollector<Text, DoubleWritable> output
			 , Reporter reporter
			) 
			throws IOException
	 {

		double summation = 0;

		while (values.hasNext())
		{

		   DoubleWritable summationInterval = 
                             (DoubleWritable) values.next();

		   summation += summationInterval.get();

		}

		dblSummation.set(summation);

		output.collect(key, dblSummation);

	 } // method reduce

 } //class bookKeepingReducer

Compilation

 

 mkdir java_classes

 rm  java_classes/*.class

 rm  bookKeeping.jar

 javac -cp hadoop-core-1.0.3.jar:commons-cli-1.2/commons-cli-1.2.jar \
 	 -d java_classes \
 	 bookKeepingMap.java

 javac -cp hadoop-core-1.0.3.jar:commons-cli-1.2/commons-cli-1.2.jar \
 	 -d java_classes \
 	 bookKeepingReducer.java

 javac \
-cp hadoop-core-1.0.3.jar:commons-cli-1.2/commons-cli-1.2.jar:java_classes \
  -d java_classes \
  bookKeepingRoot.java

 jar -cfv bookKeeping.jar -C java_classes .

   

Deployment


scp – copy files


scp -r Checkbook dadeniji@<hadoop>:/tmp



Copy Datafile from local host to Hadoop hdfs file system



bin/hadoop dfs -copyFromLocal /tmp/checkbook__201304.txt  /user/hduser/bookKeeping/*

 

Invoke

 

Invoke Map Reduce Job



bin/hadoop jar bookKeeping.jar bookKeeping CAT /user/hduser/bookKeeping  /user/hduser/bookKeeping-output-20130226__0841

Invoke Map Reduce Job – Files in package



bin/hadoop jar bookKeeping.jar bookKeeping.bookKeepingRoot CAT /user/hduser/bookKeeping  /user/hduser/bookKeeping-output-20130226__1200

Output

Hadoop - BookKeeping - Output

 

References – Java – MapReduce

References – Java – MapReduce – External Jar File

References – Java – Jar and ClassPath

References – Java – File I/O


References – Java String Split/Regular Expressions:

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s