Build a Java Map Reduce Application

(Last Updated On: )

I will attempt to explain how to setup a map, reduce, Combiner, Path Filter, Partitioner, Outputer using Java Eclipse with Maven. If you need to know how to install Eclipse go here. Remember that these are not complete code just snipets to get you going.

A starting point I used was this tutorial however it was built using older Hadoop code.

Mapper: Maps input key/value pairs to a set of intermediate key/value pairs.
Reducer: Reduces a set of intermediate values which share a key to a smaller set of values.
Partitioner: http://www.tutorialspoint.com/map_reduce/map_reduce_partitioner.htm
Combiner: http://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm

First you will need to create a maven project. You can follow any tutorial on how to do that if you don’t know how.

pom.xml:

<properties>
      <hadoop.version>2.7.2</hadoop.version>
</properties>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-api</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-common</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-auth</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
      <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
      <version>${hadoop.version}</version>
</dependency>

Job Driver:

public class JobDriver extends Configured implements Tool {
      private Configuration conf;
      private static String hdfsURI = "hdfs://localhost:54310";

      public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new JobDriver(), args);
            System.exit(res);
      }

      @Override
      public int run(String[] args) throws Exception {
            BasicConfigurator.configure();
            conf = this.getConf();

            //The paths for the configuration
            final String HADOOP_HOME = System.getenv("HADOOP_HOME");
            conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/core-site.xml"));
            conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/hdfs-site.xml"));
            conf.addResource(new Path(HADOOP_HOME, "etc/hadoop/yarn-site.xml"));
            hdfsURI = conf.get("fs.defaultFS");

            Job job = Job.getInstance(conf, YOURJOBNAME);
            //You can setup additional configuration information by doing the below.
            job.getConfiguration().set("NAME", "VALUE");

            job.setJarByClass(JobDriver.class);

            //If you are going to use a mapper class
            job.setMapperClass(MAPPERCLASS.class);

            //If you are going to use a combiner class
            job.setCombinerClass(COMBINERCLASS.class);

            //If you plan on splitting the output
            job.setPartitionerClass(PARTITIONERCLASS.class);
            job.setNumReduceTasks(NUMOFREDUCERS);

            //if you plan on use a reducer
            job.setReducerClass(REDUCERCLASS.class);

            //You need to set the output key and value types. We will just use Text for this example
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //If you want to use an input filter class
            FileInputFormat.setInputPathFilter(job, INPUTPATHFILTER.class);

            //You must setup what the input path is for the files you want to parse. It takes either string or Path
            FileInputFormat.setInputPaths(job, inputPaths);

            //Once you parse the data you must put it somewhere.
            job.setOutputFormatClass(OUTPUTFORMATCLASS.class);
            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPATH));

            return job.waitForCompletion(true) ? 0 : 1;
      }
}

INPUTPATHFILTER:

public class InputPathFilter extends Configured implements PathFilter {
      Configuration conf;
      FileSystem fs;
      Pattern includePattern = null;
      Pattern excludePattern = null;

      @Override
      public void setConf(Configuration conf) {
            this.conf = conf;

            if (conf != null) {
                  try {
                        fs = FileSystem.get(conf);

                        //If you want you can always pass in regex patterns from the job driver class and filter that way. Up to you!
                        if (conf.get("file.includePattern") != null)
                              includePattern = conf.getPattern("file.includePattern", null);

                        if (conf.get("file.excludePattern") != null)
                              excludePattern = conf.getPattern("file.excludePattern", null);
                  } catch (IOException e) {
                        e.printStackTrace();
                  }
            }
      }

      @Override
      public boolean accept(Path path) {
            //Here you could filter based on your include or exclude regex or file size.
            //Remember if you have sub directories you have to return true for that

            if (fs.isDirectory(path)) {
                  return true;
            }
            else {
                  //You can also do this to get file size in case you want to do anything when files are certains size, etc
                  FileStatus file = fs.getFileStatus(path);
                  String size = FileUtils.byteCountToDisplaySize(file.getLen());

                  //You can also move files in this section
                  boolean move_success = fs.rename(path, new Path(NEWPATH + path.getName()));
            }
      }
}

MAPPERCLASS:

//Remember at the beginning I said we will use key and value as Text. That is the second part of the extends mapper
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
      //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration conf = context.getConfiguration();
      }

      //This is the main map method.
      @Override
      public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //This will get the file name you are currently processing if you want. However not necessary.
            String filename = ((FileSplit) context.getInputSplit()).getPath().toString();

            //Do whatever you want in the mapper. The context is what you print out to.

            //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>.
            //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>.
      }
}

If you decided to embed Python or JavaScript you will need these scripts as an example. map_python and map

COMBINERCLASS:

public class MyCombiner extends Reducer<Text, Text, Text, Text> {
      //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration conf = context.getConfiguration();
      }

      @Override
      protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            //Do whatever you want in the mapper. The context is what you print out to.
            //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>.
            //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>.
      }
}

If you decided to embed Python or JavaScript you will need these scripts as an example. combiner_python and combiner_js

REDUCERCLASS:

public class MyReducer extends Reducer<Text, Text, Text, Text> {
      //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            Configuration conf = context.getConfiguration();
      }

      @Override
      protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            //Do whatever you want in the mapper. The context is what you print out to.
            //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>.
            //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>.
      }
}

If you decided to embed Python or JavaScript you will need these scripts as an example. reduce_python and reduce_js

PARTITIONERCLASS:

public class MyPartitioner extends Partitioner<Text, Text> implements Configurable
{
      private Configuration conf;

      @Override
      public Configuration getConf() {
            return conf;
      }

      //Do whatever setup you would like. Remember in the job drive you could set things to configuration well you can access them here now
      @Override
      public void setConf(Configuration conf) {
            this.conf = conf;
      }

      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
            Integer partitionNum = 0;

            //Do whatever logic you would like to figure out the way you want to partition.
            //If you want to embed javascript go <a href="http://www.gaudreault.ca/java-embed-javascript/" target="_blank">here</a>.
            //If you want to embed Python go <a href="http://www.gaudreault.ca/java-embed-python/" target="_blank">here</a>.

            return partionNum;
      }
}

If you decided to embed Python or JavaScript you will need these scripts as an example. partitioner_python and partitioner_js

OUTPUTFORMATCLASS:

public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> {
      protected static int outputCount = 0;

      protected static class JsonRecordWriter<K, V> extends RecordWriter<K, V> {
            protected DataOutputStream out;

            public JsonRecordWriter(DataOutputStream out) throws IOException {
                  this.out = out;
            }

            @Override
            public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
                  out.writeBytes(WRITE_WHATEVER_YOU_WANT);
                  out.close();
            }

            @Override
            public void write(K key, V value) throws IOException, InterruptedException {
                  //write the value
                  //You could also send to a database here if you wanted. Up to you how you want to deal with it.
            }
      }

      @Override
      public RecordWriter<K, V> getRecordWriter(TaskAttemptContext tac) throws IOException, InterruptedException {
            Configuration conf = tac.getConfiguration();
            Integer numReducers = conf.getInt("mapred.reduce.tasks", 0);
            //you can set output filename in the config from the job driver if you want
            String outputFileName = conf.get("outputFileName");
            outputCount++;

            //If you used a partitioner you need to split out the content so you should break the output filename into parts
            if (numReducers > 1) {
                  //Do whatever logic you need to in order to get unique filenames per split
            }

            Path file = FileOutputFormat.getOutputPath(tac);
            Path fullPath = new Path(file, outputFileName);
            FileSystem fs = file.getFileSystem(conf);
            FSDataOutputStream fileout = fs.create(fullPath);
            return new JsonRecordWriter<K, V>(fileout);
      }
}