未分类

需要导入的包注意导入mapreduce的新版本包带mapreduce的importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;MapReduce程序结构publicclassWordCountApp{//进行分割的map类publicstaticclassMyMapperextendsMapper<LongWritable,Text,Text,LongWritable>{......}//归并操作的Reducer类publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{......}//定义Drive类(main类)publicstaticvoidmain(String[]args)throwsException{......}}map类:对读取的每一行单词进行分割操作,形成键值对Reducer类:把分割完成的键值对-->key,value,进行归并求和并输出Drive类:设置MapReduce作业提交、数据的输入路径、map的处理类、Reducer的处理类、数据的输出路径map分割操作的详解Mapper类四个泛型参数前两个参数为map输入类型,后两个参数为map的输出类型LongWritable:输入数据的首行的偏移量(相当于Java的Long类型)Text:输入的每一行的数据(相当于Java的String类型)Text:分割之后产生的键值对键的类型LongWritable:分割之后产生的键值对值的类型publicstaticclassMyMapperextendsMapper<LongWritable,Text,Text,LongWritable>{/**reduce参数:*LongWritablekey:输入数据的行的偏移量*Textvalue:读取的每一行数据*Contextcontext上下文连接**/@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//接收到的每一行数据,转化成Java的字符串Stringline=value.toString();//把字符串进行空格分隔,返回一个数组String[]words=line.split("");//返回字符串数组//利用循环使用context.write(key,value);组合成k,v形式for(Stringword:words){//通过context(上下文连接)把map分割的k、v输出context.write(newText(word),newLongWritable(1));//前面设置了返回值为Text,LongWritable类型}}}Reducer归并操作的详解Reducer类四个泛型参数前两个参数为Reducer输入类型,后两个参数为Reducer的输出类型Text:map的输出类型,就是Reduse的输入类型LongWritable:map的输出类型,就是Reduse的输入类型Text:进行归并操作之后的键值对-->键的类型LongWritable:进行归并操作之后的键值对-->的值类型publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{/**reduce参数:*Textkey:Map操作后的键值对的键*Iterable<LongWritable>values:当进行Map操作之后,一个键可能有很多对应的值所以是一个迭代类型*Contextcontext上下文连接**/@Overrideprotectedvoidreduce(Textkey,Iterable<LongWritable>values,Contextcontext)throwsIOException,InterruptedException{//这里只需要把出现的迭代类型进行求和longsum=0;for(LongWritablevalue:values){//把LongWritable转成Java的数据类型进行求和sum+=value.get();}//最终的统计结果通过上下文连接输出context.write(key,newLongWritable(sum));}}定义Drive类(main类)publicstaticvoidmain(String[]args)throwsException{//抛出异常//创建一个Configuration对象Configurationconfiguration=newConfiguration();//注意是hadoop里的//创建一个Job,如有异常,先把异常抛出Jobjob=Job.getInstance(configuration,"wordCount");//设置Job的处理类job.setJarByClass(WordCountApp.class);//类名称//设置需要处理数据的输入路径FileInputFormat.setInputPaths(job,newPath(args[0]));//路径通过脚本参数传入//设置map的处理主类job.setMapperClass(MyMapper.class);//指定Mapper处理类job.setMapOutputKeyClass(Text.class);//设置map处理类的k输出类型job.setMapOutputValueClass(LongWritable.class);//设置map处理类的v输出类型//设置reducer的处理主类job.setReducerClass(MyReducer.class);//指定Reduse处理类job.setOutputKeyClass(Text.class);//设置reducer处理类的k输出类型job.setOutputValueClass(LongWritable.class);//设置reducer处理类的v输出类型//设置作业的输出路径FileOutputFormat.setOutputPath(job,newPath(args[1]));//路径通过脚本参数传入//提交作业booleanb=job.waitForCompletion(true);//参数为true确定提交//退出程序System.exit(b?0:1);//程序推出的状态码0正常}上传到hadoop执行首先把程序打成jar包。idea打jar教程hadoop执行jar命令:hadoopjarJar名称输入路径输出路径hadoopjarhadooptrain.jarhdfs://192.168.5.137:9000/words.txthdfs://192.168.5.137:9000/output执行结果