首页

文章

如何实现mapreduce计算框架以有效实现迭代

发布网友 发布时间:2022-03-23 00:27

我来回答

2个回答

懂视网 时间:2022-03-23 04:49

mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。

  MapReduce就是”任务的分解与结果的汇总”,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

 

  

热心网友 时间:2022-03-23 01:57

  MapRece从出现以来,已经成为Apache Hadoop计算范式的扛鼎之作。它对于符合其设计的各项工作堪称完美:大规模日志处理,ETL批处理操作等。

  随着Hadoop使用范围的不断扩大,人们已经清楚知道MapRece不是所有计算的最佳框架。Hadoop 2将资源管理器YARN作为自己的顶级组件,为其他计算引擎的接入提供了可能性。如Impala等非MapRece架构的引入,使平台具备了支持交互式SQL的能力。

  今天,Apache Spark是另一种这样的替代,并且被称为是超越MapRece的通用计算范例。也许您会好奇:MapRece一直以来已经这么有用了,怎么能突然被取代?毕竟,还有很多ETL这样的工作需要在Hadoop上进行,即使该平台目前也已经拥有其他实时功能。

  值得庆幸的是,在Spark上重新实现MapRece一样的计算是完全可能的。它们可以被更简单的维护,而且在某些情况下更快速,这要归功于Spark优化了刷写数据到磁盘的过程。Spark重新实现MapRece编程范式不过是回归本源。Spark模仿了Scala的函数式编程风格和API。而MapRece的想法来自于函数式编程语言LISP。

  尽管Spark的主要抽象是RDD(弹性分布式数据集),实现了Map,rece等操作,但这些都不是Hadoop的Mapper或Recer API的直接模拟。这些转变也往往成为开发者从Mapper和Recer类平行迁移到Spark的绊脚石。

  与Scala或Spark中经典函数语言实现的map和rece函数相比,原有Hadoop提供的Mapper和Recer API 更灵活也更复杂。这些区别对于习惯了MapRece的开发者而言也许并不明显,下列行为是针对Hadoop的实现而不是MapRece的抽象概念:
  · Mapper和Recer总是使用键值对作为输入输出。
  · 每个Recer按照Key对Value进行rece。
  · 每个Mapper和Recer对于每组输入可能产生0个,1个或多个键值对。
  · Mapper和Recer可能产生任意的keys和values,而不局限于输入的子集和变换。
  Mapper和Recer对象的生命周期可能横跨多个map和rece操作。它们支持setup和cleanup方法,在批量记录处理开始之前和结束之后被调用。

  本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Recer!

  作为元组的键值对
  假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapRece中,我们首先使用一个Mapper,生成为以行的长度作为key,1作为value的键值对。
  public class LineLengthMapper extends
  Mapper<LongWritable, Text, IntWritable, IntWritable> {
  @Override
  protected void map(LongWritable lineNumber, Text line, Context context)
  throws IOException, InterruptedException {
  context.write(new IntWritable(line.getLength()), new IntWritable(1));
  }
  }

  值得注意的是Mappers和Recers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。

  与之对应的Spark实现:

  lines.map(line => (line.length, 1))

  Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(A,B)这样的语法来创建。上面的map操作的结果是(Int,Int)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如receByKey,这对再现MapRece行为将是至关重要的。

  Rece
  rece()与receBykey()
  统计行的长度的键值对,需要在Recer中对每种长度作为key,计算其行数的总和作为value。
  public class LineLengthRecer extends
  Recer<IntWritable, IntWritable, IntWritable, IntWritable> {
  @Override
  protected void rece(IntWritable length, Iterable<IntWritable> counts,
  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable count : counts) {
  sum += count.get();
  }
  context.write(length, new IntWritable(sum));
  }
  }

  Spark中与上述Mapper,Recer对应的实现只要一行代码:
  val lengthCounts = lines.map(line => (line.length, 1)).receByKey(_ + _)

  Spark的RDD API有个rece方法,但是它会将所有key-value键值对rece为单个value。这并不是Hadoop MapRece的行为,Spark中与之对应的是ReceByKey。

  另外,Recer的Rece方法接收多值流,并产生0,1或多个结果。而receByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来rece多个值到一个值。与Recer方法相比,他是一个根据Key来Rece Value的更简单而更精确的API。
  Mapper
  map() 与 flatMap()
  现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。
  public class CountUppercaseMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
  @Override
  protected void map(LongWritable lineNumber, Text line, Context context)
  throws IOException, InterruptedException {
  for (String word : line.toString().split(" ")) {
  if (Character.isUpperCase(word.charAt(0))) {
  context.write(new Text(word), new IntWritable(1));
  }
  }
  }
  }

  Spark对应的写法:
  lines.flatMap(
  _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
  )
  简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Spark的map函数语义更简单,应用范围更窄。

  Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map。
  groupByKey()
  写一个统计次数的recer是简单的,在Spark中,receByKey可以被用来统计每个单词的总数。比如出于某种原因要求输出文件中每个单词都要显示为大写字母和其数量,在MapRece中,实现如下:
  public class CountUppercaseRecer extends
  Recer<Text, IntWritable, Text, IntWritable> {
  @Override
  protected void rece(Text word, Iterable<IntWritable> counts, Context context)
  throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable count : counts) {
  sum += count.get();
  }
  context
  .write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
  }
  }

  但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Recer API的操作。我们知道Recer的rece方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:
  groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }
  groupByKey只是将某一个key的所有值收集在一起,并且不提供rece功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。

  setup()和cleanup()

  在MapRece中,Mapper和Recer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。
  public class SetupCleanupMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
  private Connection dbConnection;
  @Override
  protected void setup(Context context) {
  dbConnection = ...;
  }
  ...
  @Override
  protected void cleanup(Context context) {
  dbConnection.close();
  }
  }

  Spark中的map和flatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setup和cleanup代码放在Sparkmap函数调用之前和之后:
  val dbConnection = ...
  lines.map(... dbConnection.createStatement(...) ...)
  dbConnection.close() // Wrong!

  然而这种方法却不可行,原因在于:
  · 它将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。
  · map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。
  · 即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。

  事实上,map和flatMap都不是Spark中Mapper的最接近的对应函数,Spark中Mapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。

  添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:

  lines.mapPartitions { valueIterator =>
  val dbConnection = ... // OK
  val transformedIterator = valueIterator.map(... dbConnection ...)
  dbConnection.close() // Still wrong! May not have evaluated iterator
  transformedIterator
  }

  一个完整的范式应该看起来类似于:
  lines.mapPartitions { valueIterator =>
  if (valueIterator.isEmpty) {
  Iterator[...]()
  } else {
  val dbConnection = ...
  valueIterator.map { item =>
  val transformedItem = ...
  if (!valueIterator.hasNext) {
  dbConnection.close()
  }
  transformedItem
  }
  }
  }

  虽然后者代码翻译注定不如前者优雅,但它确实能够完成工作。

  flatMapPartitions方法并不存在,然而,可以通过调用mapPartitions,后面跟一个flatMap(a= > a)的调用达到同样效果。

  带有setup和cleanup的Recer对应只需仿照上述代码使用groupByKey后面跟一个mapPartition函数。

  别急,等一下,还有更多
  MapRece的开发者会指出,还有更多的还没有被提及的API:
  · MapRece支持一种特殊类型的Recer,也称为Combiner,可以从Mapper中减少洗牌(shuffled)数据大小。
  · 它还支持同通过Partitioner实现的自定义分区,和通过分组Comparator实现的自定义分组。
  · Context对象授予Counter API的访问权限以及它的累积统计。
  · Recer在其生命周期内一直能看到已排序好的key 。
  · MapRece有自己的Writable序列化方案。
  · Mapper和Recer可以一次发射多组输出。
  · MapRece有几十个调优参数。

  有很多方法可以在Spark中实现这些方案,使用类似Accumulator的API,类似groupBy和在不同的这些方法中加入partitioner参数的方法,Java或Kryo序列化,缓存和更多。由于篇幅*,在这篇文章中就不再累赘介绍了。

  需要指出的是,MapRece的概念仍然有用。只不过现在有了一个更强大的实现,并利用函数式语言,更好地匹配其功能性。理解Spark RDD API和原来的Mapper和RecerAPI之间的差异,可以帮助开发者更好地理解所有这些函数的工作原理,以及理解如何利用Spark发挥其优势。
ups快递客服电话24小时 贷款记录在征信保留几年? 安徽徽商城有限公司公司简介 安徽省徽商集团新能源股份有限公司基本情况 安徽省徽商集团有限公司经营理念 2019哈尔滨煤气费怎么有税? 快手删除的作品如何恢复 体育理念体育理念 有关体育的格言和理念 什么是体育理念 万里挑一算彩礼还是见面礼 绿萝扦插多少天后发芽 绿萝扦插多久发芽 扦插绿萝多久发芽 炖牛排骨的做法和配料 网络诈骗定罪标准揭秘 “流水不争先”是什么意思? mc中钻石装备怎么做 为什么我的MC里的钻石块是这样的?我想要那种。是不是版本的问题?如果是... 带“偷儿”的诗句 “君不见巴丘古城如培塿”的出处是哪里 带“奈何”的诗句大全(229句) 里翁行()拼音版、注音及读音 带“不虑”的诗句 “鲁肃当年万人守”的出处是哪里 无尘防尘棚 进出口报关流程,越详细越好。谢谢大家指教。 双线桥不是看化合价升多少就标多少的吗?为什么CL2+2KI=2KCL+I2中I失... 出师表高锰酸钾有画面了吗 2021年幼儿园新学期致家长一封信 电脑屏幕一条黑线怎么办? 销售代理商销售代理商的特点 商业代理商业代理的特征 如何看微信有没有开通微众银行 为什么微众没有开户 微众银行怎么开户 微众银行APP开户流程是什么? 唐古拉山海拔唐古拉山海拔是多少 怎么看待取消跳广场舞的人的退休金 如何选购新鲜的蓝田水柿? 恭城水柿柿树作用 创维洗衣机使用教程 创维全自动洗衣机怎么使用 自动开门器 狗羊属相婚姻相配吗 3岁的小孩不会说话怎么办 3岁孩子不会说话,应该挂什么科? 3岁小孩不会说话正常吗 鹿茸炖乌鸡怎么做? 新型冠状肺炎吃什么药可以预防 冰箱上电后一直响 食品生产许可证编号开头为“ G”。 什么是MapReduce? mapreduce的工作流程 Hadoop的工作原理是什么 mapreduce处理海量数据的工作原理是什么? 请简要描述Hadoop计算框架MapReduce的工作原理 mapreduce实现原理是怎样的 圆通快递的电话 为什么总是打不通 一个电话打不通是啥原因? 什么情况下电话对方打不通? 为什么我打别人电话打得通,别人打我却打不通 打的电话怎么打不通? 打不通对方的电话是什么原因 固定电话打不通的原因有哪些 电话打不出,也呼不进,是什么原因电话打不通,也接不进来,怎么回事,我还是头一次见? 电话怎么打不通啊,什么原因 为什么打不通电话 电话打不通有几种原因 为什么电话打不通 手机怎么打电话打不通了,所有电话都这样? 为什么电话打不通? Hadoop和MapReduce究竟分别是做什么用的 什么是Map/Reduce-Mapreduce-about云开发 请简述mapreduce的技术思想 简述MapReduce基本思想,想想在生活中有没有相似的例子? 大数据需要掌握哪些技能 大数据培训一般都将些什么内容? 简述Hadoop的MapReduce与Googl的MapReducc 之间的关系 mapreduce中map是怎么做的?参数又是怎么解析传递给map方法的 mapreduce处理什么任务 大数据培训到底是培训什么 简述HDFS和MapReduce在Hadoop中的角色 如何快速地编写和运行一个属于自己的MapReduce例子程序 为什么手机没有欠费,但是打电话出去说我停机了? 为什么号码没有欠费却被停机了? 手机没欠费被暂停服务怎么回事 为什么我的手机没有停机但是别人打我的电话显示停机 手机没欠费为什么停机 我的手机号码没欠费,为什么会暂停服务?如何解决? 手机没有欠费,为什么是停机状态?只能打进不能打出 为什么号码没有欠费却被停机了
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com