MapReduce 的单词统计案例

需要导入的包

  • 注意导入 mapreduce 的新版本包 带 mapreduce 的
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

MapReduce 程序结构

public class WordCountApp {

    // 进行分割 的 map类
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        ......
    }

    // 归并操作 的 Reducer类
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        ......
    }

    // 定义 Drive 类(main类)
    public static void main(String[] args) throws Exception{
        ......
    }
}
  • map 类:对读取的每一行单词进行分割操作,形成键值对
  • Reducer 类:把分割完成的键值对 --> key,value,进行归并求和并输出
  • Drive 类:设置MapReduce作业提交数据的输入路径map的处理类Reducer的处理类数据的输出路径

map 分割操作的详解

Mapper 类四个泛型参数 前两个参数为 map 输入类型,后两个参数为 map 的输出类型

  • LongWritable:输入数据的首行的偏移量 (相当于Java的 Long 类型)
  • Text:输入的每一行的数据 (相当于Java的 String 类型)
  • Text:分割之后产生的 键值对 键的类型
  • LongWritable:分割之后产生的 键值对 值的类型
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    /*
         * reduce 参数:
         * LongWritable key:输入数据的行的偏移量
         * Text value:读取的每一行数据
         * Context context 上下文连接
         * */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 接收到的每一行数据,转化成 Java 的字符串
        String line = value.toString();

        // 把字符串进行空格分隔,返回一个数组
        String[] words = line.split(" ");// 返回字符串数组

        // 利用循环 使用 context.write( key, value);组合成 k,v 形式
        for (String word: words) {
            // 通过 context (上下文连接) 把map分割的k、v输出
            context.write(new Text(word), new LongWritable(1)); //前面设置了返回值为 Text, LongWritable  类型
        }
    }
}

Reducer 归并操作的详解

Reducer 类四个泛型参数 前两个参数为 Reducer 输入类型,后两个参数为 Reducer 的输出类型

  • Text:map 的输出类型,就是 Reduse 的输入类型
  • LongWritable:map 的输出类型,就是 Reduse 的输入类型
  • Text:进行归并操作之后的键值对 --> 键的类型
  • LongWritable:进行归并操作之后的键值对 --> 的值类型
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    /*
        * reduce 参数:
        * Text key:Map操作后的键值对的 键
        * Iterable<LongWritable> values:当进行Map操作之后,一个键可能有很多对应的值 所以是一个迭代类型
        * Context context 上下文连接
        * */

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        // 这里只需要 把出现的迭代类型进行求和
        long sum = 0;
        for (LongWritable value:values) {
            // 把 LongWritable 转成 Java 的数据类型进行求和
            sum += value.get();
        }

        // 最终的统计结果 通过上下文连接输出
        context.write(key, new LongWritable(sum));

    }
}

定义 Drive 类(main类)

public static void main(String[] args) throws Exception{   // 抛出异常

        // 创建一个 Configuration 对象
        Configuration configuration = new Configuration();     // 注意是 hadoop 里的

        // 创建一个 Job ,如有异常,先把异常抛出
        Job job = Job.getInstance(configuration, "wordCount");

        // 设置Job的处理类
        job.setJarByClass(WordCountApp.class);   // 类名称

        // 设置需要处理数据的输入路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));  // 路径通过 脚本参数传入

        // 设置map的处理主类
        job.setMapperClass(MyMapper.class);              // 指定 Mapper处理类
        job.setMapOutputKeyClass(Text.class);            // 设置 map 处理类的 k 输出类型
        job.setMapOutputValueClass(LongWritable.class);  // 设置 map 处理类的 v 输出类型

        // 设置 reducer 的处理主类
        job.setReducerClass(MyReducer.class);            // 指定 Reduse处理类
        job.setOutputKeyClass(Text.class);               // 设置 reducer处理类的 k 输出类型
        job.setOutputValueClass(LongWritable.class);     // 设置 reducer处理类的 v 输出类型

        // 设置作业的输出路径
        FileOutputFormat.setOutputPath(job,new Path(args[1]));  // 路径通过 脚本参数传入

        // 提交作业
        boolean b = job.waitForCompletion(true); // 参数为 true 确定提交

        // 退出程序
        System.exit(b ? 0:1); // 程序推出的状态码 0 正常
    }

上传到 hadoop 执行

  • 首先把程序打成 jar 包。idea 打 jar 教程

  • hadoop 执行 jar 命令:hadoop jar Jar名称 输入路径 输出路径

hadoop jar hadooptrain.jar hdfs://192.168.5.137:9000/words.txt hdfs://192.168.5.137:9000/output
  • 执行结果

mark

发表评论 / Comment

用心评论~