您现在的位置是:主页 > 数据库技术 > 数据库技术

MapReduce怎么使用

IDCBT2021-12-31服务器技术人已围观

简介本篇内容主要讲解“MapReduce怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce怎么使用”吧! 什么是MR MR是一种分布

本篇内容主要讲解“MapReduce怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce怎么使用”吧!

    什么是MR

    MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。

    MR执行过程

    MR程序启动时,会把输入文件转化成<k1,v1>键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成<k2,v2>键值对。由<k2,v2>转变成reduce函数的输入<k2,{v2,.....}>的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。<k2,{v2...}>进过reduce函数处理,变成了最后的输出<k3,v3>。在到达reduce函数之前,键值对的数目是不变的。

                  Map阶段

     (1).根据输入文件解析成<k1,v1>对,每一对调用一次map函数

     (2).根据自己编写的map函数,将键值对处理,变成新的<k2,v2>键值对输出 

     (3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程

     (4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。

      (5).进行规约(可选)

                     Reduce阶段

         (1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。

          (2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理<k2,{v2..}>,转换成新的键值对输出

         (3).输出保存文件

  3.简单例子

      Wordcount

public class WordCount {
 public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
  Text k2=new Text();
  LongWritable v2=new LongWritable();
  @Override
  protected void map(LongWritable k1, Text v1,Context context)
    throws IOException, InterruptedException {
    String[] words=v1.toString().split("\t");
    for (String string : words) {
     k2.set(string);
     v2.set(1L);
    context.write(k2, v2);
   }
  }
 }
 public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
  LongWritable v3=new LongWritable();
  @Override
  protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
   long sum=0; 
   for (LongWritable longWritable : v2s) {
    sum=sum+longWritable.get();
   }
   v3.set(sum);
   context.write(k2, v3);
  }
  
 }
 public static void main(String[] args) throws Exception {
  Configuration conf=new Configuration();
  Job job=Job.getInstance(conf, WordCount.class.getSimpleName());
  job.setJarByClass(WordCount.class);
  job.setMapperClass(MyMapper.class);
  job.setReducerClass(MyReduce.class);
  
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(LongWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(LongWritable.class);
  
  FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));
  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));
  
  job.waitForCompletion(true);
 } 
 
}

标签:

很赞哦! ()

本栏推荐