package cn.lmj.mapreduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount
{
//mapper
public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,LongWritable> { LongWritable count = new LongWritable(1); Text content = new Text(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter report) throws IOException{
//切割字符串
String str = value.toString(); String[] arr = str.split(" "); for(String s : arr) { content.set(s); output.collect(content,count); } } }//reducer
public static class WordCountReduce extends MapReduceBase implements Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter rep) throws IOException{
//将同样key的value累加
long sum = 0; while(values.hasNext()) { sum+=values.next().get(); } output.collect(key,new LongWritable(sum)); } } public static void main(String[] args) throws Exception{
//创建一个JobConf
JobConf conf = new JobConf(WordCount2.class); conf.setJobName("lmj");//设置输出类型
conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(LongWritable.class);//设置Map、Combine和Reduce处理类
conf.setMapperClass(WordCountMapper.class); conf.setCombinerClass(WordCountReduce.class); conf.setReducerClass(WordCountReduce.class);//设置输入类型
conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class);//设置输入和输出文件夹
FileInputFormat.setInputPaths(conf,new Path("/aaa/hadoop.txt")); FileOutputFormat.setOutputPath(conf,new Path("/aaa/output"));//启动jobConf
JobClient.runJob(conf); } }