robin-lai
8/31/2015 - 2:20 PM

Secondary sort in mapreduce - Includes code for a simple program that sorts employee information by department ascending and employee name

Secondary sort in mapreduce

  • Includes code for a simple program that sorts employee information by department ascending and employee name desc.
**********************
Reference:
**********************
Hadoop the definitive guide, 3rd edition

**********************
Credits:
**********************
Data from mysql - http://dev.mysql.com/doc/employee/en.index.html
*******************************
*Results
*******************************

--Source record count
hadoop fs -cat sortProject/data/employees/employees_tsv | wc -l
2246830

--Results record count
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* | wc -l
2246830

--Files generated
hadoop fs -ls -R sortProject/data/output-secondarySortBasic/part* | awk '{print $8}'
sortProject/data/output-secondarySortBasic/part-r-00000
sortProject/data/output-secondarySortBasic/part-r-00001
sortProject/data/output-secondarySortBasic/part-r-00002
sortProject/data/output-secondarySortBasic/part-r-00003
sortProject/data/output-secondarySortBasic/part-r-00004
sortProject/data/output-secondarySortBasic/part-r-00005
sortProject/data/output-secondarySortBasic/part-r-00006
sortProject/data/output-secondarySortBasic/part-r-00007

--Output
hadoop fs -cat sortProject/data/output-secondarySortBasic/part* 
d001    Zykh    Sudhanshu       205927
d001    Zykh    Nidapan 452738
..
d001    Yoshimura       Alenka  463297
d001    Yeung   Yuguang 483161
..
d001  Acton  Basim	105207
d001	Aamodt	Sreekrishna	493601
..
d002  Aamodt	Yakkov	43290
..
d003  Acton	Idoia	211583
..
d004  dAstous	Candido	59201
d004	dAstous	Berhard	427930
..
d005  Zizka	Aamer	409151
d005	Zirintsis	Xiaoqiang	52246
....




   

*******************************
*Command to run the program
*******************************

hadoop jar ~/Blog/sortProject/secondarySortBasic/jar/secondarySortBasic.jar SecondarySortBasicDriver /user/akhanolk/sortProject/data/employees/employees_tsv /user/akhanolk/sortProject/data/output-secondarySortBasic
***************************************
*Driver: SecondarySortBasicDriver
***************************************


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SecondarySortBasicDriver extends Configured implements Tool {

  @Override
	public int run(String[] args) throws Exception {

		if (args.length != 2) {
			System.out
					.printf("Two parameters are required for SecondarySortBasicDriver- <input dir> <output dir>\n");
			return -1;
		}

		Job job = new Job(getConf());
		job.setJobName("Secondary sort example");

		job.setJarByClass(SecondarySortBasicDriver.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(SecondarySortBasicMapper.class);
		job.setMapOutputKeyClass(CompositeKeyWritable.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setPartitionerClass(SecondarySortBasicPartitioner.class);
		job.setSortComparatorClass(SecondarySortBasicCompKeySortComparator.class);
		job.setGroupingComparatorClass(SecondarySortBasicGroupingComparator.class);
		job.setReducerClass(SecondarySortBasicReducer.class);
		job.setOutputKeyClass(CompositeKeyWritable.class);
		job.setOutputValueClass(NullWritable.class);

		job.setNumReduceTasks(8);

		boolean success = job.waitForCompletion(true);
		return success ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Configuration(),
				new SecondarySortBasicDriver(), args);
		System.exit(exitCode);
	}
}
***************************************
*Reducer: SecondarySortBasicReducer
***************************************

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SecondarySortBasicReducer
  	extends
		Reducer<CompositeKeyWritable, NullWritable, CompositeKeyWritable, NullWritable> {

	@Override
	public void reduce(CompositeKeyWritable key, Iterable<NullWritable> values,
			Context context) throws IOException, InterruptedException {

		for (NullWritable value : values) {

			context.write(key, NullWritable.get());
		}

	}
}
***************************************************************
*GroupingComparator: SecondarySortBasicGroupingComparator
***************************************************************

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SecondarySortBasicGroupingComparator extends WritableComparator {
  protected SecondarySortBasicGroupingComparator() {
		super(CompositeKeyWritable.class, true);
	}

	@Override
	public int compare(WritableComparable w1, WritableComparable w2) {
		CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
		CompositeKeyWritable key2 = (CompositeKeyWritable) w2;
		return key1.getDeptNo().compareTo(key2.getDeptNo());
	}
}
/***************************************************************
*SortComparator: SecondarySortBasicCompKeySortComparator
*****************************************************************/

import org.apache.hadoop.io.WritableComparator;

public class SecondarySortBasicCompKeySortComparator extends WritableComparator {

  protected SecondarySortBasicCompKeySortComparator() {
		super(CompositeKeyWritable.class, true);
	}

	@Override
	public int compare(WritableComparable w1, WritableComparable w2) {
		CompositeKeyWritable key1 = (CompositeKeyWritable) w1;
		CompositeKeyWritable key2 = (CompositeKeyWritable) w2;

		int cmpResult = key1.getDeptNo().compareTo(key2.getDeptNo());
		if (cmpResult == 0)// same deptNo
		{
			return -key1.getLNameEmpIDPair()
					.compareTo(key2.getLNameEmpIDPair());
			//If the minus is taken out, the values will be in
			//ascending order
		}
		return cmpResult;
	}
}
/***************************************************************
*Partitioner: SecondarySortBasicPartitioner
***************************************************************/

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondarySortBasicPartitioner extends
  	Partitioner<CompositeKeyWritable, NullWritable> {

	@Override
	public int getPartition(CompositeKeyWritable key, NullWritable value,
			int numReduceTasks) {

		return (key.getDeptNo().hashCode() % numReduceTasks);
	}
}
/***************************************************************
*Mapper: SecondarySortBasicMapper
***************************************************************/


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondarySortBasicMapper extends
  	Mapper<LongWritable, Text, CompositeKeyWritable, NullWritable> {

	@Override
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		if (value.toString().length() > 0) {
			String arrEmpAttributes[] = value.toString().split("\\t");

			context.write(
					new CompositeKeyWritable(
							arrEmpAttributes[6].toString(),
							(arrEmpAttributes[3].toString() + "\t"
									+ arrEmpAttributes[2].toString() + "\t" + arrEmpAttributes[0]
									.toString())), NullWritable.get());
		}

	}
}
/***************************************************************
*CustomWritable for the composite key: CompositeKeyWritable
****************************************************************/

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

/**
 * 
 * @author akhanolkar
 * 
 *         Purpose: A custom writable with two attributes- deptNo and
 *         NameEmpIDPair; 
 */

public class CompositeKeyWritable implements Writable,
  	WritableComparable<CompositeKeyWritable> {

	private String deptNo;
	private String lNameEmpIDPair;

	public CompositeKeyWritable() {
	}

	public CompositeKeyWritable(String deptNo, String lNameEmpIDPair) {
		this.deptNo = deptNo;
		this.lNameEmpIDPair = lNameEmpIDPair;
	}

	@Override
	public String toString() {
		return (new StringBuilder().append(deptNo).append("\t")
				.append(lNameEmpIDPair)).toString();
	}

	public void readFields(DataInput dataInput) throws IOException {
		deptNo = WritableUtils.readString(dataInput);
		lNameEmpIDPair = WritableUtils.readString(dataInput);
	}

	public void write(DataOutput dataOutput) throws IOException {
		WritableUtils.writeString(dataOutput, deptNo);
		WritableUtils.writeString(dataOutput, lNameEmpIDPair);
	}

	public int compareTo(CompositeKeyWritable objKeyPair) {
		// TODO:
		/*
		 * Note: This code will work as it stands; but when CompositeKeyWritable
		 * is used as key in a map-reduce program, it is de-serialized into an
		 * object for comapareTo() method to be invoked;
		 * 
		 * To do: To optimize for speed, implement a raw comparator - will
		 * support comparison of serialized representations
		 */
		int result = deptNo.compareTo(objKeyPair.deptNo);
		if (0 == result) {
			result = lNameEmpIDPair.compareTo(objKeyPair.lNameEmpIDPair);
		}
		return result;
	}

	public String getDeptNo() {
		return deptNo;
	}

	public void setDeptNo(String deptNo) {
		this.deptNo = deptNo;
	}

	public String getLNameEmpIDPair() {
		return lNameEmpIDPair;
	}

	public void setLNameEmpIDPair(String lNameEmpIDPair) {
		this.lNameEmpIDPair = lNameEmpIDPair;
	}
}
*******************************
*Expected results
*******************************
Sort order: [DeptID asc, {LName,FName,EmpID} desc]

DeptID  LName       FName       EmpID
d001    Zykh    Sudhanshu       205927
d001    Zykh    Nidapan 452738
..
d001    Yoshimura       Alenka  463297
d001    Yeung   Yuguang 483161
..
d001  Acton	Basim	105207
d001	Aamodt	Sreekrishna	493601
..
d002  Aamodt	Yakkov	43290
..
d003  Acton	Idoia	211583
..
d004  dAstous	Candido	59201
d004	dAstous	Berhard	427930
..
d005  Zizka	Aamer	409151
d005	Zirintsis	Xiaoqiang	52246
....




   
*******************************
*Sample Data
*******************************
EmpID DOB         FName     LName     Gender  Hire date     DeptID
10003 1959-12-03  Parto     Bamford   M       1986-08-28    d004
10004 1954-05-01  Chirstian Koblick   M       1986-12-01    d004
10005 1955-01-21  Kyoichi   Maliniak  M       1989-09-12    d003
....
*******************************
*Data and code download
*******************************

Data and code:
--------------

gitHub:
<<To be added>>

Email me at airawat.blog@gmail.com if you encounter any issues
 
 
Directory structure
-------------------
 
  sortProject
        data
            employees_tsv
                employees_tsv

        SecondarySortBasic
            src
                CompositeKeyWritable.java
                SecondarySortBasicMapper.java
                SecondarySortBasicPartitioner.java
                SecondarySortBasicCompKeySortComparator.java
                SecondarySortBasicGroupingComparator.java
                SecondarySortBasicReducer.java
                SecondarySortBasicDriver.java
          
            jar
                SecondarySortBasic.jar
Secondary sort in Mapreduce

With mapreduce framework, the keys are sorted but the values associated with each key 
are not.  In order for the values to be sorted, we need to write code to perform what is 
referred to a secondary sort.  The sample code in this gist demonstrates such a sort.

The input to the program is a bunch of employee attributes.
The output required is department number (deptNo) in ascending order, and the employee last name, 
first name and employee ID in descending order.

The recipe to get the effect of sorting by value is:
1) Make the key a composite of the natural key (deptNo) and the natural value (lName, fName and empNo). 
2) The sort comparator should order by the composite key, that is, the natural key and natural 
value.
3) The partitioner and grouping comparator for the composite key should consider only the natural
key for partitioning and grouping.