1 MapReduce概念 和 MapReduce编程模型
什么是MapReduce
MapReduce分而治之的思想
MapReduce编程分Map和Reduce阶段
MapReduce编程执行步骤
编程模型
Map(in_keyin_value)
--->(out_keyintermediate_value) list
Reduce(out_keyintermediate_value) list
--->out_value list
2 应用MRJob编写MapReduce代码
mrjob 简介
mrjob 安装
pip install mrjob
mrjob实现WordCount
from mrjob.job import MRJob
class MRWordCount(MRJob):
#每一行从line中输入
def mapper(self _ line):
for word in line.split():
yield word1
# word相同的 会走到同一个reduce
def reducer(self word counts):
yield word sum(counts)
if __name__ == __main__:
MRWordCount.run()
运行WordCount代码
打开命令行 找到一篇文本文档 敲如下命令:
python mr_word_count.py my_file.txt
运行MRJOB的不同方式
1、内嵌(-r inline)方式
特点是调试方便,启动单一进程模拟任务执行状态和结果,默认(-r inline)可以省略,输出文件使用 > output-file 或-o output-file,比如下面两种运行方式是等价的
python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt
2、Hadoop(-r hadoop)方式
用于hadoop环境
python word_count.py -r hadoop hdfs:///test.txt -o hdfs:///output
支持Hadoop运行调度控制参数,如:
1)指定Hadoop任务调度优先级(VERY_HIGH|HIGH)如:--jobconf mapreduce.job.priority=VERY_HIGH。
2)Map及Reduce任务个数限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5
3 mrjob 实现 topN统计(实验)
统计数据中出现次数最多的前n个数据
import sys
from mrjob.job import MRJobMRStep
import heapq
class TopNWords(MRJob):
def mapper(self _ line):
if line.strip() != "":
for word in line.strip().split():
yield word1
#介于mapper和reducer之间,用于临时的将mapper输出的数据进行统计
def combiner(self word counts):
yield wordsum(counts)
def reducer_sum(self word counts):
yield None(sum(counts)word)
#利用heapq将数据进行排序,将最大的2个取出
def top_n_reducer(self_word_cnts):
for cntword in heapq.nlargest(2word_cnts):
yield wordcnt
#实现steps方法用于指定自定义的mapper,comnbiner和reducer方法
def steps(self):
#传入两个step 定义了执行的顺序
return [
MRStep(mapper=self.mapper
combiner=self.combiner
reducer=self.reducer_sum)
MRStep(reducer=self.top_n_reducer)
]
def main():
TopNWords.run()
if __name__==__main__:
main()
4 MapReduce原理详解
单机程序计算流程
输入数据--->读取数据--->处理数据--->写入数据--->输出数据
Hadoop计算流程
input data:输入数据
InputFormat:对数据进行切分,格式化处理
map:将前面切分的数据做map处理(将数据进行分类,输出(kv)键值对数据)
shuffle&sort:将相同的数据放在一起,并对数据进行排序处理
reduce:将map输出的数据进行hash计算,对每个map数据进行统计计算
OutputFormat:格式化输出数据
map:将数据进行处理
buffer in memory:达到80%数据时,将数据锁在内存上,将这部分输出到磁盘上
partitions:在磁盘上有很多"小的数据",将这些数据进行归并排序。
merge on disk:将所有的"小的数据"进行合并。
reduce:不同的reduce任务,会从map中对应的任务中copy数据,在reduce中同样要进行merge操作
5 MapReduce架构
MapReduce架构 1.X
JobTracker:负责接收客户作业提交,负责任务到作业节点上运行,检查作业的状态
TaskTracker:由JobTracker指派任务,定期向JobTracker汇报状态,在每一个工作节点上永远只会有一个TaskTracker
MapReduce2.X架构
ResourceManager:负责资源的管理,负责提交任务到NodeManager所在的节点运行,检查节点的状态
NodeManager:由ResourceManager指派任务,定期向ResourceManager汇报状态
{{image.png(uploading...)}}
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/126017.html
阅读 3513·2023-04-25 20:09
阅读 3719·2022-06-28 19:00
阅读 3035·2022-06-28 19:00
阅读 3057·2022-06-28 19:00
阅读 3130·2022-06-28 19:00
阅读 2858·2022-06-28 19:00
阅读 3013·2022-06-28 19:00
阅读 2609·2022-06-28 19:00