Map
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
// key is empty, value is the line
throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
Reduce
public static class ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
// 一个key可能有多个value
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
// key is word, values is a list of 1’s
int sum = 0;
// 求和
while (values.hasNext()) {
sum += values.next().get();
}
// 返回新的键值对
output.collect(key, new IntWritable(sum));
}
}
Driver
// Tells Hadoop how to run your Map-Reduce job
public void run(String inputPath, String outputPath) throws Exception {
// The job. WordCount contains MapClass and Reduce.
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
FileInputFormat.addInputPath(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
JobClient.runJob(conf);
}