需要导入的包
注意导入 mapreduce 的新版本包 带 mapreduce 的
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;
MapReduce 程序结构
public class WordCountApp { // 进行分割 的 map类 public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ ...... } // 归并操作 的 Reducer类 public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ ...... } // 定义 Drive 类(main类) public static void main(String[] args) throws Exception{ ...... } }
- map 类:对读取的每一行单词进行分割操作,形成
键值对
- Reducer 类:把分割完成的
键值对 --> key,value
,进行归并求和并输出 - Drive 类:设置
MapReduce作业提交
、数据的输入路径
、map的处理类
、Reducer的处理类
、数据的输出路径
map 分割操作的详解
Mapper 类四个泛型参数 前两个参数为 map 输入类型,后两个参数为 map 的输出类型
- LongWritable:输入数据的首行的偏移量 (相当于Java的 Long 类型)
- Text:输入的每一行的数据 (相当于Java的 String 类型)
- Text:分割之后产生的
键值对
键的类型 - LongWritable:分割之后产生的
键值对
值的类型
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ /* * reduce 参数: * LongWritable key:输入数据的行的偏移量 * Text value:读取的每一行数据 * Context context 上下文连接 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行数据,转化成 Java 的字符串 String line = value.toString(); // 把字符串进行空格分隔,返回一个数组 String[] words = line.split(" ");// 返回字符串数组 // 利用循环 使用 context.write( key, value);组合成 k,v 形式 for (String word: words) { // 通过 context (上下文连接) 把map分割的k、v输出 context.write(new Text(word), new LongWritable(1)); //前面设置了返回值为 Text, LongWritable 类型 } } }
Reducer 归并操作的详解
Reducer 类四个泛型参数 前两个参数为 Reducer 输入类型,后两个参数为 Reducer 的输出类型
- Text:map 的输出类型,就是 Reduse 的输入类型
- LongWritable:map 的输出类型,就是 Reduse 的输入类型
- Text:进行归并操作之后的
键值对 --> 键的类型
- LongWritable:进行归并操作之后的
键值对 --> 的值类型
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ /* * reduce 参数: * Text key:Map操作后的键值对的 键 * Iterable<LongWritable> values:当进行Map操作之后,一个键可能有很多对应的值 所以是一个迭代类型 * Context context 上下文连接 * */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // 这里只需要 把出现的迭代类型进行求和 long sum = 0; for (LongWritable value:values) { // 把 LongWritable 转成 Java 的数据类型进行求和 sum += value.get(); } // 最终的统计结果 通过上下文连接输出 context.write(key, new LongWritable(sum)); } }
定义 Drive 类(main类)
public static void main(String[] args) throws Exception{ // 抛出异常 // 创建一个 Configuration 对象 Configuration configuration = new Configuration(); // 注意是 hadoop 里的 // 创建一个 Job ,如有异常,先把异常抛出 Job job = Job.getInstance(configuration, "wordCount"); // 设置Job的处理类 job.setJarByClass(WordCountApp.class); // 类名称 // 设置需要处理数据的输入路径 FileInputFormat.setInputPaths(job,new Path(args[0])); // 路径通过 脚本参数传入 // 设置map的处理主类 job.setMapperClass(MyMapper.class); // 指定 Mapper处理类 job.setMapOutputKeyClass(Text.class); // 设置 map 处理类的 k 输出类型 job.setMapOutputValueClass(LongWritable.class); // 设置 map 处理类的 v 输出类型 // 设置 reducer 的处理主类 job.setReducerClass(MyReducer.class); // 指定 Reduse处理类 job.setOutputKeyClass(Text.class); // 设置 reducer处理类的 k 输出类型 job.setOutputValueClass(LongWritable.class); // 设置 reducer处理类的 v 输出类型 // 设置作业的输出路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); // 路径通过 脚本参数传入 // 提交作业 boolean b = job.waitForCompletion(true); // 参数为 true 确定提交 // 退出程序 System.exit(b ? 0:1); // 程序推出的状态码 0 正常 }
上传到 hadoop 执行
首先把程序打成 jar 包。idea 打 jar 教程
hadoop 执行 jar 命令:
hadoop jar Jar名称 输入路径 输出路径
hadoop jar hadooptrain.jar hdfs://192.168.5.137:9000/words.txt hdfs://192.168.5.137:9000/output
- 执行结果
版权声明:《 MapReduce 的单词统计案例 》为明妃原创文章,转载请注明出处!
最后编辑:2019-10-28 09:10:30