kashimotoxiang
4/15/2019 - 4:47 PM

Hadoop

wget http://mirrors.advancedhosters.com/apache/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz
tar -zxvf hadoop-2.8.5.tar.gz
mv hadoop-2.8.5.tar.gz /usr/lib/hadoop

Map

public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value,
                  OutputCollector<Text, IntWritable> output, Reporter reporter)
      // key is empty, value is the line
      throws IOException {
    String line = value.toString();
    StringTokenizer itr = new StringTokenizer(line);
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      output.collect(word, one);
    }
  }
}

Reduce

public static class ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    // 一个key可能有多个value
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
            Reporter reporter) throws IOException {
        // key is word, values is a list of 1’s
        int sum = 0;
        // 求和
        while (values.hasNext()) {
            sum += values.next().get();
        }
        // 返回新的键值对
        output.collect(key, new IntWritable(sum));
    }
}

Driver

// Tells Hadoop how to run your Map-Reduce job
public void run(String inputPath, String outputPath) throws Exception {
  // The job. WordCount contains MapClass and Reduce.

  JobConf conf = new JobConf(WordCount.class);
  conf.setJobName("wordcount");

  // the keys are words (strings)
  conf.setOutputKeyClass(Text.class);
  // the values are counts (ints)
  conf.setOutputValueClass(IntWritable.class);

  conf.setMapperClass(MapClass.class);
  conf.setReducerClass(Reduce.class);

  FileInputFormat.addInputPath(conf, new Path(inputPath));
  FileOutputFormat.setOutputPath(conf, new Path(outputPath));

  JobClient.runJob(conf);
}
# yarn kill application
yarn application -kill application_1556751503367_0053
# yarn show applications log
yarn logs -applicationId application_1556751503367_0053
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <stream_name>