博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
map reduce相关程序
阅读量:5855 次
发布时间:2019-06-19

本文共 24482 字,大约阅读时间需要 81 分钟。

Test_1.java

/**   * Hadoop网络课程模板程序 * 编写者:James */  import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**   * 无Reducer版本 */  public class Test_1 extends Configured implements Tool {            /**       * 计数器     * 用于计数各种异常数据     */      enum Counter     {        LINESKIP,    //出错的行    }        /**       * MAP任务     */      public static class Map extends Mapper
{ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String month = lineSplit[0]; String time = lineSplit[1]; String mac = lineSplit[6]; Text out = new Text(month + ' ' + time + ' ' + mac); context.write( NullWritable.get(), out); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "Test_1"); //任务名 job.setJarByClass(Test_1.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 2 ) { System.err.println(""); System.err.println("Usage: Test_1 < input path > < output path > "); System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Test_1(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); }}

Test_1数据

Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:52 hostapd: wlan0: STA 74:e5:0b:04:28:f2Apr 23 11:49:50 hostapd: wlan0: STA cc:af:78:cc:d5:5d Apr 23 11:49:44 hostapd: wlan0: STA cc:af:78:cc:d5:5d Apr 23 11:49:43 hostapd: wlan0: STA 74:e5:0b:04:28:f2Apr 23 11:49:42 hostapd: wlan0: STA 14:7d:c5:9e:fb:84

Test_2.java

/**   * Hadoop网络课程模板程序 * 编写者:James */  import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; /**   * 有Reducer版本 */  public class Test_2 extends Configured implements Tool {            /**       * 计数器     * 用于计数各种异常数据     */      enum Counter     {        LINESKIP,    //出错的行    }        /**       * MAP任务     */      public static class Map extends Mapper
{ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write( new Text(bnum), new Text(anum) ); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } /** * REDUCE任务 */ public static class Reduce extends Reducer
{ public void reduce ( Text key, Iterable
values, Context context ) throws IOException, InterruptedException { String valueString; String out = ""; for ( Text value : values ) { valueString = value.toString(); out += valueString + "|"; } context.write( key, new Text(out) ); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "Test_2"); //任务名 job.setJarByClass(Test_2.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 2 ) { System.err.println(""); System.err.println("Usage: Test_2 < input path > < output path > "); System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Test_2(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); }}

Test_2数据

13599999999 1008613899999999    12013944444444 1380013800013722222222 1380013800018800000000 12013722222222 1008618944444444 10086

Exercise_1.java

/**   * Hadoop网络课程作业程序 * 编写者:James */  import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Exercise_1 extends Configured implements Tool {            /**       * 计数器     * 用于计数各种异常数据     */      enum Counter     {        LINESKIP,    //出错的行    }        /**       * MAP任务     */      public static class Map extends Mapper
{ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String month = lineSplit[0]; String time = lineSplit[1]; String mac = lineSplit[6]; /** 需要注意的部分 **/ String name = context.getConfiguration().get("name"); Text out = new Text(name + ' ' + month + ' ' + time + ' ' + mac); /** 需要注意的部分 **/ context.write( NullWritable.get(), out); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); /** 需要注意的部分 **/ conf.set("name", args[2]); /** 需要注意的部分 **/ Job job = new Job(conf, "Exercise_1"); //任务名 job.setJarByClass(Exercise_1.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 3 ) { System.err.println(""); System.err.println("Usage: Test_1 < input path > < output path > < name >"); System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output hadoop"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Exercise_1(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); }}

result_1

hadoop Apr 23 14:7d:c5:9e:fb:84hadoop Apr 23 74:e5:0b:04:28:f2hadoop Apr 23 cc:af:78:cc:d5:5dhadoop Apr 23 cc:af:78:cc:d5:5dhadoop Apr 23 74:e5:0b:04:28:f2hadoop Apr 23 14:7d:c5:9e:fb:84

Exercise_2.java

/**   * Hadoop网络课程作业程序 * 编写者:James */  import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Exercise_2 extends Configured implements Tool {            /**       * 计数器     * 用于计数各种异常数据     */      enum Counter     {        LINESKIP,    //出错的行    }        /**       * MAP任务     */      public static class Map extends Mapper
{ /** 需要注意的部分 **/ private String name; public void setup ( Context context ) { this.name = context.getConfiguration().get("name"); //读取名字 } /** 需要注意的部分 **/ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String month = lineSplit[0]; String time = lineSplit[1]; String mac = lineSplit[6]; /** 需要注意的部分 **/ Text out = new Text(this.name + ' ' + month + ' ' + time + ' ' + mac); /** 需要注意的部分 **/ context.write( NullWritable.get(), out); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); /** 需要注意的部分 **/ conf.set("name", args[2]); /** 需要注意的部分 **/ Job job = new Job(conf, "Exercise_2"); //任务名 job.setJarByClass(Exercise_2.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 3 ) { System.err.println(""); System.err.println("Usage: Test_1 < input path > < output path > < name >"); System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output hadoop"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Exercise_2(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); }}

改写test_2

/**   * Hadoop网络课程模板程序 * 编写者:James */  import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; /**   * 有Reducer版本 */  public class Test_2 extends Configured implements Tool {            /**       * 计数器     * 用于计数各种异常数据     */      enum Counter     {        LINESKIP,    //出错的行    }        /**       * MAP任务     */      public static class Map extends Mapper
{ public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write( new Text(bnum), new Text(anum) ); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } /** * REDUCE任务 */ public static class Reduce extends Reducer
{ public void reduce ( Text key, Iterable
values, Context context ) throws IOException, InterruptedException { String valueString; String out = ""; String name = context.getConfiguration().get("name"); for ( Text value : values ) { valueString = value.toString(); out += valueString + "|"; } context.write( key, new Text(out) + "|" + name ); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("name", args[2]); Job job = new Job(conf, "Test_2"); //任务名 job.setJarByClass(Test_2.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 3 ) { System.err.println(""); System.err.println("Usage: Test_2 < input path > < output path > "); System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output hadoop"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Test_2(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); }}

 

result_2

10086    13599999999|13722222222|18944444444|hadoop120    18800000000|hadoop13800138000    13944444444|13722222222|hadoop

转载于:https://www.cnblogs.com/zl0372/p/map_reduce.html

你可能感兴趣的文章
数据中心设计最佳方案:超越功率和冷却措施的效率提升
查看>>
AirBulb:会唱歌的灯泡
查看>>
信息泄露事件大盘点!!
查看>>
微软:吸引云技术面临哪些困难?
查看>>
来自斯坦福的声音:可穿戴设备能预测疾病发生
查看>>
CRM逐渐在被淘汰,但这是件好事!
查看>>
新型智慧城市如何建?
查看>>
WCF的追踪分析工具——SvcPerf
查看>>
中国酒业大数据中心成立
查看>>
为了上网更安全 不要再用密码是一个方法
查看>>
福特狂挖黑莓员工,只为车联网
查看>>
深度:自动驾驶特斯拉背后核心技术解析
查看>>
74个顶级的物联网设备一览
查看>>
智能化需求促进安防行业进一步升级
查看>>
Amazon Go 又添新伙伴,松下电器推出自动收银台(附视频)
查看>>
无人机供网计划中止:Facebook在印度欲推Express Wifi收费网络计划
查看>>
开年发力!华为FusionServer 2路服务器Java应用性能领先
查看>>
对现有城市环境不满 孵化器Y Combinator打算自己建座城
查看>>
MSP:为云计算客户解除后顾之忧
查看>>
我国智能安防细分市场规模分析
查看>>