author:魏静崎
2024年9月26日
Hadoop权威指南
笔记来源:晓之木初
该文章由于学校文献考试而整理
第一章 Hadoop概述
Hadoop不仅仅是一个项目
- 提到大数据,不可避免的会提到Hadoop,其实Hadoop已经不再是一个分布式存储和计算引擎了。
- 它已经超越了批处理本身,成为了一个拥有多个涉及分布式计算和大规模数据处理子项目的大数据生态系统
为什么需要Hadoop
- 从磁盘读数据的瓶颈是磁盘的寻道时间,磁盘寻道时间的提升远低于数据传输速率的提升
- 数据库系统,只更新一小部分,B树非常适合;如果大量更新,B树将涉及排序/合并,效率明显落后于MR
- RDBMS适合于建立索引后的数据集的点查询和更新,更适合持续更新的数据集;MR适合一次写入、多次读取数据的应用
- MR对数据计算任务是可以线性扩展的,任务扩展为N倍,只需要将集群规模扩展为N倍;这是RDBMS无法达到的
第二章 MapReduce概述
Hadoop利用MapReduce(简称MR),实现分布式计算,顾名思义MR包含两个操作:
- Map操作:map在编程语言中有映射含义,即对每条数据执行相同的操作,从而实现将原始的输入数据转化为key-value形式。也就是说,通过map操作可以进行数据准备。
- Reduce操作:对map操作的结果(即中间结果)进行汇总,如求和、求平均等,从而得到最终结果 —— 一个新的key-value集合。
- 其中,map操作的结果将存储在本地磁盘,又叫中间结果;reduce操作的结果将存入本地HDFS,然后按照副本放置策略将副本存储到其他节点。
总结:
- MR通过map进行数据预处理,得到中间结果存入本地磁盘;接着,使用shuffle将中间结果按照key进行排序和分组,使到达reduce的数据是有序的数据集合;然后,reduce函数按照指定的方式对数据进行汇总,并汇总后的数据写入HDFS。
- MR更关注的是用户可编程的map和reduce操作,实际上shuffle操作也至关重要。它可以将map的输出进行归纳归纳整理,减少map与reduce间的数据传输量
MR的数据流
split和map任务
- 对于一个MR作业,输入的数据会被划分成等大的数据块,被称为Split(分片)
- 每个split对应一个map任务,map任务通过运行用户自定义的map函数来实现对split的处理
- 关于split大小:
① split越小,并行处理split的时间会远小于处理整体数据的时间;同时,节点间的负载均衡效果也会更好
② split过分细分,管理split和创建map任务时间将会决定整个作业的执行时间
③ split默认是hdfs一个数据块的大小,128MB
。这样,一个split不会跨越多个节点,能满足数据本地化原理,可以提高map任务的效率 —— 因为,跨节点时难以保证一个节点存在多个数据块,需要在节点间进行网络传输 - split与map任务可能的三种情况:
① 存储split的节点有空闲slot,map任务可以在节点上运行
② 存储split的所有节点均没有空闲slot,map任务优先在同一机架的空闲节点上运行。这时,split会在同一机架的两个节点间传输
② 同一机架也没有空闲节点,map任务只能在不同机架的空闲节点上运行。这时split会跨机架传输
不同类型的数据流
- 关于任务数: map任务的数据量由split的数量决定,而reduce任务的数量需要独立指定。
- 关于分区: 如果存在多个reduce任务,map任务会为每个reduce任务创建分区。一般,按照输出数据的key将数据分配到不同的分区。同时,同一key的所有输出均在同一分区。
多个map任务,一个reduce任务
- 所有map任务的输出,会先存储到本地磁盘。作业完成后,才会删除中间结果。如果在执行map任务的节点在将中间结果传输给reduce任务之前失败,Hadoop会在其他节点重试map任务
- 排序过的map输出,通过网络传递给reduce节点,并在reduce端进行合并。
- reduce对合并后的数据进行处理,处理后的结果优先存到本地HDFS,其他副本按照放置策略存储到其他节点。
多个map任务,多个reduce任务
- map任务的输出按照一定的策略分配到不同的分区
- map输出到reduce输入之间的数据处理,就是shuffle
无reduce任务
- 有时,map任务处理后的数据,无需进行shuffle,可能也就无需reduce任务
- 此时,map任务会将输出优先写入本地hdfs和其他节点的hjdfs
第三章 Hadoop的全方位文件系统
1. HDFS概述
1. HDFS的特性、应用场景(适合/不适合什么?why?)
- HDFS的是Hadoop的分布式文件系统,全称
Hadoop Distributed Filesystem
HDFS具有以下特性:
- 超大文件:几百MB、几百GB甚至几百TB的大文件,甚至还出现了存储PB级文件的Hadoop集群
- 流式数据访问:HDFS基于一次写入、多次读取是高效的文件访问模式构建的。
- 商用硬件(普通硬件):Hadoop将运行在商用硬件集群上,节点故障将会是常态。HDFS在遇到节点故障时,能继续提供服务而不被用户感知。
HDFS不适用的场景
- 大量的小文件:HDFS中,文件的元数据信息存储在namenode的内存中。一般,一个文件的元数据信息为
150 Byte
。如果,HDFS中存在大量的小文件(如数十亿个小文件),则会超过内存的存储容量。 - 低延迟数据访问:HDFS专为大文件实现高吞吐,可能会以提高数据时延为代价。因此,HDFS不适合低延迟数据访问。 —— HBase是更好的选择
- 随机写/多用户写:因为HDFS只允许顺序写,支持单个用户写入。
4. 总结
- 说先是对HDFS的一个认识吧:HDFS适合什么场景(超大文件、流式文件访问、商用硬件)?不适合什么场景(低延迟访问、多用户写入/随机写、小文件存储)?why?
- HDFS的两种节点:
① namenode(namespace管理)、datanode(文件存储/检索、block信息上报)
② namenode的容错机制:NFS、辅助namenode(只是负责编辑日志与namespace镜像文件的合并,减轻namenode压力);或者NFS + 辅助namenode - namenode的HA:
① namenode存在单点故障,Hadoop 2提供对HDFS的HA支持:提出了active namenode
和standby namenode
②active namenode
:任意时刻处于active状态,接受客户端操作;standby namenode
:拥有足够多的状态信息,可以提供快速的故障转移
③ 通过QJM(群体日志管理器),HDFS专用的日志编辑器,要求至少有3个节点,支持 ( n − 1 ) / 2 (n-1)/2 (n−1)/2的节点故障 - Hadoop的文件系统:
① 提供一个抽象的文件系统概念,FileSystem
是Hadoop的抽象文件接口
② 基于FileSystem
实现了多种文件系统:Local、HDFS、webHDFS、S3等
③ 文件系统的命令:hadoop fs
,以及专门针对HDFS的hdfs dfs
④ 通过FileSystem
进行简单的HDFS文件操作:读写、创建目录、删除文件/目录、获取元数据信息
⑤hdfs dfs -ls
,查看文件系统的相关信息(文件权限、修改时间、副本数等) - HDFS文件的读写:
①FSDataInputStream
实现了Seekable
接口和PositionedReadable
接口,从而支持随机读
②FSDataOutputStream
,只有getPos()
方法,不支持随机写
第四章 HDFS的读写流程
HDFS的读流程:
- open()方法 + get block locations,获取block的副本位置,返回
FSDataInputStream
,内含DFSDataStream
- 调用
FSDataInputStream
的read()
方法,实际由DFSDataStream
与最佳DataNode创建连接,不断调用read()
读取数据; - 结束一个block的读取,
DFSDataStream
与下一个block的最佳DataNode连接连接,继续进行数据读取 - 必要时,
DFSDataStream
从NameNode处获取下一批block的位置 - 调用
FSDataInputStream
的close()
方法,关闭输入流
- open()方法 + get block locations,获取block的副本位置,返回
HDFS读流程中的故障:
- DataNode故障,记住该DataNode,切换到最邻近的、有相同副本的DataNode
- 读数据时进行checksum,发现block损坏,切换到另一副本继续读。同时向NameNode上报损坏的block
HDFS的三种数据块:数据存储单位block(
128 MB
)、数据传输单位packet(64 kb
)、数据校验单位chunk(512 byte
+4 byte的校验信息
)HDFS基于chunk、packet、DataQueue/AckQueue的三层缓存
- chunk写满,校验后写入packet
- packet写满,放到DataQueue
DataStreamer
将DataQueue中的packet发送给DataNode,并在未确认写入ok时,将packet存入AckQueue- 收到足够的应答,
ResponseProcessor
将packet从AckQueue移除;否则,移回DataQueue中,重新写入
HDFS的写流程
- create() + create, NameNode进行权限、是否已存在等校验;通过校验,向EditLog新增一条记录(WAL)并返回
FSDataOutputStream
,内含DFSOutputStream
;否则,抛出IOException
异常 - 调用
FSDataOutputStream
的write()
方法,实际由DFSOutputStream
将数据分成一个个packet放入DataQueue DataStreamer
负责选取一组DataNode进行数据写入,这些DataNode之间会建立pipeline,packet通过pipeline进行传输- 为确认写入ok的packet放入AckQueue,只有收到足够的ack packet,才会从AckQueue中移除
- 调用
FSDataOutputStream
的close()
方法,关闭写入流;NameNode执行complete,等待文件写入完成前,确认成功写入
- create() + create, NameNode进行权限、是否已存在等校验;通过校验,向EditLog新增一条记录(WAL)并返回
HDFS写流程故障:
- 删除pipeline、将AckQueue中的所有packet移动到DataQueue的前端,确保数据不会丢失
- 标记当前block,并向NameNode上报信息,等到故障DataNode恢复,删除不完整的block
- 在正常的DataNode之间重新建立pipeline,继续进行数据写入
- NameNode发现副本数不足,在其他DataNode创建新的副本(异步副本复制?)
- 写入成功:只要满足最小副本数,即可认为写入成功
副本的放置策略:随机选择,尽量不在一个机架放置过多的副本
hflush()
和hsync()
方法:以适合的频率调用,保证写入数据对新的reader的可见性ditscp
:实质是只有map任务的MR作业,可以通过-m
指定map任务数
第五章 Hadoop YARN入门
YARN概述
Hadoop 1.x,MR既要负责分布式计算,还需要负责计算过程中的资管管理和任务调度
- Hadoop2.x,更新了Hadoop的架构,使用YARN(
Yet Another Resource Negotiator
,另一种资源判决者/调度者)进行专门的资源组管理和任务调度,而MR通过调用YRN的API实现分布式计算,减轻了MR的压力 - 同时,YARN还具备足够的通用性,除了能支持MR之外,它还能支持其他的分布式计算模式,如Spark、Tez等
- 可以说,YARN是一种通用的资源管理和任务调度框架。
- 大部分时候,用户并不是直接使用YARN请求和使用集群资源的API,而是调用构建在YARN之上的分布式计算框架的API实现分布式计算。
- YARN很好地向用户隐藏了资源管理的细节,书中将HDF、HBase称为集群中的存储层,YARN称为是集群中的计算层。而像MR、Spark这样的分布式计算框架,是集群中的应用层。
YARN的两种守护进程及application master
守护进程1:
Resource Manager
,一个全局的资源管理器,负责管理和分配整个集群中的计算资源
守护进程2:
Node Manager
,运行在所有的节点上,是YARN在每个节点上的代理,管理集群中的单个计算节点,负责启动和监控container- container是有资源限制的(如内存和CPU)、用于执行特定应用程序的进程
应用程序中Application master
- 应用程序级别(
application-specified
)的进程,负责管理运行在YARN上的应用程序
YARN应用的运行机制
- 书中给出了一幅很简单的图,用于描述YARN应用的运行机制
- 相对详细的图,如果需要了解更多的内部机制可以参考:[yarn详解][yarn]
YARN应用的运行流程如下:
client向YARN的
Resource Manager
提交应用程序,包括启动Application master
的必需信息,如Application master程序、启动命令和用户程序等。Resource manager
为该应用程序分配第一个container,并与container所在的Node manager
通信,要求Node manager
启动container并在container中运行Application master
。Application master
首先向Resource manager
注册自己,并与Resource maneger
保持心跳。 —— 用户可以通过Resource maneger
获取应用程序的运行状态。Application master
采用轮询的方式、通过RPC向Resource manager
申请更多的container申请到container后,
Application master
会与container所在的Node manager
建立通信,要求Node manager启动container,从而运行具体的应用程序任务各个任务会通过RPC向
Application master
上报自己的进度和运行状态,这样Application master
便可以对任务进行监控和管理- 当任务运行失败时,
Application master
可以向Resource manager
申请新的container,以重新运行该任务 - client也可以通过RPC直接访问
Application master
,获取任务的运行状态
- 当任务运行失败时,
应用程序运行完成后,
Application master
向Resource manager
注销并关闭自己。这时,Application master
分配到的container,可以被Resource manager
回收
通过运行机制,总结三种角色的作用
Resource maneger
- 与client交互,处理来自client的请求(提交作业、查询应用程序状态等)
- 启动并管理
Application master
:Application master
运行失败,会重启Application master
。 - 资源管理与调度: 接收
Application master
的资源请求,并为之分配资源 ( 其实就是分配container) - 管理
Node Manager
: 接收来自Node Manager
的资源汇报信息,并向Node Manager
下达管理指令 —— 这条我没有啥体会 😂
Node manager
- 接收并处理来自
Application master
启动/停止container的各种请求 - 启动和监控container
- 以心跳的形式向
Resource manager
汇报本节点上的资源使用情况和各个Container的运行状态 —— 这条我没有啥体会 😂
Application master
- 向
Resource maneger
申请container - 将申请到的contianer进一步分配给应用程序中的任务 (资源的二次分配)
- 与
Node manager
通信,从而启动/停止container - 监控任务的运行状态,在任务运行失败时,重新申请资源以重试任务。
YARN的优势:
1. 可扩展性(分离架构克服了jobtracker的扩展性瓶颈,Application master与Google的MapReduce论文更接近)
2. 可用性:jobtracker难以提供HA,YARN中分而治之的HA
3. 利用率(slot方式的利用率低,基于Node manager的资源池实现资源的灵活分配)
4. 多租户:YARN是一种通用的资源管理和任务调度框架,并非仅支持MR
第六章 YARN调度队列
- 三种调度器各自的优缺点:
- FIFO调度器: 简单易懂,先进先出,容易使小作业阻塞,不适合共享集群
- capacity调度器和fair调度器:适合共享集群,既能允许长时间运行的作业及时完成,又能保证小作业在合适的时间内完成
- capacity调度器: 为特定应用分配具有特定资源的队列,保证小作业立即执行,牺牲了集群的利用率,大作业执行时间变长
- fair调度器: 为正在运行的应用动态平衡资源,提高了集群利用率,有保证小作业能及时完成
capacity调度器:
- 队列内,采用FIFO调度策略
- 支持弹性队列: 允许将其他队列的空闲资源分配给资源不足的队里,即使操作队列声明的容量;为避免过度占用其他队列的资源,需要设置最大容量
- capacity调度器的配置:不设置最大容量,则可以使用所在层级的所有资源
- 队列放置:应用应该放置到哪个队列,先按specified,不存在则报错;不指定,按default
fair调度器:
- fair调度器中,队列的资源配置并非
1: 1
,而是由分配文件中比例决定 - 开源Hadoop默认使用capacity调度器,使用fair调度器需要进行配置才能开启
- fair调度器的配置:队列内的调度策略可以是fair、FIFO和DRF
- 放置策略:默认放置策略是先specified,再user
- 支持抢占:终止超过资源份额的队列中的container,将释放后的资源资源分配非低于份额的队列中的container;最小共享抢占和公平共享抢占
- fair调度器中,队列的资源配置并非
延迟调度:
- 等待一定次数的调度机会,才放松本地限制,以金最大可能地利用本地化,提高集群效率
- 调度机会:
Node manager
向Resource manager
发送心跳,是一次潜在的调度机会 - capacity调度器的配置:设置转去匹配同一机架其他节点的最大等待的调度机会次数
- fair调度器的配置:转去匹配同一机架其他节点的调度机会比例,转去匹配其他机架节点的等待时间阈值。
第七章 校验和压缩
3. 总结
3.1 checksum
checksum的常见措施:第一次写入前计算checksum,存储数据和checksum;后续读取时,计算数据的checksum,并与原始的checksum做对比
Hadoop中的checksum:
- HDFS使用
CRC-32
的变体CRC-32C
,该变体会更高效 - Hadoop中,
ChecksumFileSystem
类使用CRC-32,可以对不支持checksum的文件系统进行装饰,使其支持checksum
- HDFS使用
3.1.1 HDFS的checksum
基于packet计算checksum,将packet和checksum一起写入HDFS
通过
dfs.bytes-per-checksum
设置计算checksum的数据大小三种校验checksum的场景:
- client将数据写入DataNode:client负责划分packet并计算checksum,pipeline中的最后一个DataNode负责校验,校验出错返回
IOException
的子类;或DataNode从其他DataNode复制block - client从DataNode读取数据:DataNode永久存储checksum日志文件,记录block最近一次的校验时间;client完成checksum校验后,会报告给DataNode,从而更新checksum日志文件;可以检查磁盘损坏
- DataNode的后台线程DataBlockScanner,定期扫描DataNode上的所有block,是预防物理存储介质
bit rot
的有力措施
- client将数据写入DataNode:client负责划分packet并计算checksum,pipeline中的最后一个DataNode负责校验,校验出错返回
HDFS对数据块损坏的操作:
- 报告给NameNode,再返回
ChecksumException
异常; - NameNode会标记block,并不在将client的读请求发送到该block对应的DataNode;
- NameNode发现副本数不够,在其他DataNode新建block的副本;
- 删除DataNode中已损坏的block
- 报告给NameNode,再返回
3.1.2 LocalFileSystem
- 文件名为
file
,对应的checksum元数据:同一目录的.file.crc
- 当文件系统本身就支持checksum是,可以使用
RawLocalFileSystem
代替LocalFileSystem
3.2 压缩
常见的压缩格式,及对应的压缩算法、文件扩展名、是否支持split、是否有java实现、是否有native实现等
特殊的:
- gzip使用DEFALTE算法,文件在DEFLATE文件基础上增加了文件头和文件尾
- bzip2唯一支持split的压缩算法,且无native实现
- LZO代码库使用GPL,使用时需要单独配置
- Hadoop安装包,64为linux创建了native类库:
libhadoop.so
压缩算法的实现:
CompressionCodec
接口及具体的codec实现、creteOutputStream(OutputStream out)
和createInputStream(InputStream in)
CompressionCodecFactory
,根据压缩文件的扩展名,获取对应的Codec实例CodecPool
:避免频繁创建Codec,实现codec的重复使用HDFS中压缩格式的选择:
- 使用容器文件:avro、ORC、parquet等,它们同时支持压缩和split
- 使用支持split的算法:bzip2;通过为split点创建索引,从而支持split的LZO
- 文件先分块,为每个分块创建压缩文件
- 存储未经压缩的文件
MR中压缩的配置:
- 为reduce的输出设置压缩:
mapreduce.output.fileoutputformat.compress
及mapreduce.output.fileoutputformat.compress.codec
、mapreduce.output.fileoutputformat.compress.type
- 为map任务的输出设置压缩:
mapreduce.map.output..compress
及mapreduce.map.output.compress.codec
- 为reduce的输出设置压缩:
第八章 Hadoop的序列化
3. 总结
3.1 关于序列化
序列化:结构化对象转为字节流
RPC中的序列化:消息转为二进制流
RPC中序列化应该具备的四大理想属性
- 紧凑:减少size
- 快速:序列化或反序列化的性能开销要小
- 可扩展:支持新协议的引入(服务器接口对client的向前兼容)
- 支持互操作:支持多种语言的client,提供一种统一的接口
数据的持久化存储也应该满足以上四大理想属性:
- 紧凑:减少size
- 快速:降低读写时的性能开销
- 可扩展:支持读老格式的数据
- 支持互操作:支持多种语言读写数据
3.2 Hadoop中的序列化
基于
Writable
接口,Hadoop实现了很多序列化类,整体类图如下序列化的统一接口
Writable
以
IntWritable
为例,体会其是如何实现直接比较二进制流,无需反序列化的:IntWritable
实现WritableComparable
接口RawComparator
接口定义了直接比较二进制流的方法,WritableComparator
类实现了RawComparator
接口WritableComparator
类:
① 实现了原始的compare方法,以及RawComparator
中compare方法;
② 提供工厂方法:支持根据WritableComparable
接口的实现类,获取对应的WritableComparator
对象
Java基本数据类型对应的Writable类
Text与String的比较:
- 内部实现:String为
private final char[]
, Text为private byte[] bytes
- 编码方式:String的编码方式取决于char的编码方式,Text固定使用
UTF-8
编码 - 内容是否可变:String内容不可变,Text内容可变
- API:String有丰富的API,Text没有,但
toString()
后可以使用String的API
- 内部实现:String为
BytesWritable
类:- 两大核心数据结构:
int size
和byte[] bytes
getLength()
获取实际的数据长度,getBytes().length
或getCapacity()
有时并不能反映数据的的真实长度
- 两大核心数据结构:
第九章 Hadoop的顺序文件
关于顺序文件:
- 二进制key-value的持久存储格式,适合日志文件存储、大量小文件的合并
- 顺序文件的写:通过
createWriter()
获取SequenceFile.Writer
对象,通过append()
实现顺序写,通过getLength()
获取写入指针的当前位置 - 顺序文件的读:通过构造方法获取
SequenceFile.Reader
对象,通过next()
按记录读取顺序文件(文件末尾返回false
),通过getPosition()
获取文件的读指针的位置 - 关于同步点:
reader.syncSeen()
获取当前位置是否为同步点,reader.sync()
跳到指定position的之后的第一个同步点,writer.sync()
插入同步点 - 顺序文件的格式:文件头 + records + record中穿插的同步点;基于
RECORD
和BLOCK
压缩的顺序文件,结构差异
关于MapFile
- 带索引的、按key排序后的顺序文件
- 索引是一个顺序文件,包含主数据文件中的一小部分key(默认128个key),可以加载到内存中,提供对主数据文件的快速查找
Hadoop中常见的文件格式:
- 顺序文件、MapFile和Avro都是行式存储
- ORC和parquet都是列式存储
行式存储与列式存储
- 行式存储与列式存储的概念
- 行式存储:适合随机的增删该查、适合访问一行中的大部分数据、适合频繁的增加和删除操作
- 列式存储:适合访问一行中的少部分数据、适合按行压缩以减少存储空间、适合并发计算以提交查询效率
第十章 MR应用开发
关于Configuration:
- 配置Hadoop的API
- 通过
addResource()
添加xml配置文件,通过get()
获取属性值(可以为不存在的属性设置默认值)、属性的覆盖(final属性无法被覆盖)
关于开发环境的配置
- Hadoop中HDFS、yarn、MapReduce等的配置,可用配置及描述的查看方式:① 安装包的
share/doc
,② 网站的ConfigurationLA栏中查看 - 通过
-conf
指定运行时连接的Hadoop集群 - 开发MR程序常用的maven依赖:hadoop-client、用于测试的:junit、mrunit、miniCluster
junit
+ mrunit
对mapper函数和reducer函数进行单元测试
任务调试的基础知识
- application ID、job ID、task ID(从0开始计数)、task attempt ID(从0开始计数)的构建方式
- 如何在mapper中添加调试:context的
setStatus()
和getCounter()
- 远程调试:本地重现错误、JVM堆转储、任务分析
- 如果通过Resource Manager的web UI,查看/分析作业、任务的运行情况
第十一章 MR的工作机制
各实体的责任
client:负责提交MR作业
Resource Manager
- 负责接收clinet请求(提交Application、查询Application的运行状态)
- 管理Node Manager:接收Node Manager的资源报告信息,向Node Manager下达管理指令
- 启动并管理Application master,在Application master失败时,重启Application master
- 资源管理与分配:响应Application master的资源请求,为其分配Node Maneger上的container以运行任务
Node Manager
- 接收并处理来自Manager和Application master的启动或停止container的请求
- 启动并监管container(会在任务JVM失败时,告知Application master)
- 向Resource Manager上报自身的资源信息和所有运行中的container状态
Application master
- 向Resource Manager申请container,并将container进一步分配给Application中的具体任务
- 与Node manager通信,从而启动或停止container
- 监控task:可以在task失败时,重新申请container以重试task
分布式文件系统(一般为HDFS):在其他实体间共享作业文件
MR的运行流程
调用Job的
submit()
方法,提交一个MR作业。实质上,submit()
方法会创建一个Submitter
实例,并其submitJobInternal()
提交作业。作业提交后,
waitForComletion()
方法每隔一秒会轮询作业的状态,当作业失败或成功时,会退出该方法对应图中的步骤1
Submitter
实例向Resource Manager申请一个新的Application ID对应图中的步骤2
Submitter会检查输出说明: 如果没有指定输出目录或输出目录已经存在,则不提交作业
Submitter计算作业的输入split: 如果输入split无法计算,如输入目录不存在,则不提交作业
submitter负责将作业运行所需的资源拷贝到以
job ID
命名的共享目录下,包括jar文件、配置文件、计算所得的输入split对应图中的步骤3
Submitter调用Resource Manager的
submitApplication()
方法,向yarn提交一个Application对应图中的步骤4
Resource Manager中的scheduler为Application master分配一个容器
在Resource Manger的指令下,Node Manager启动该容器并运行Application master,其主类是
MRAppMaster
对应图中的步骤5
Application master初始化作业:创建多个簿记对象以保持对作业进度的跟踪
对应图中的步骤6
Application master接收共享目录中的、计算出的输入split,创建对应数量的map任务。同时,也根据设置创建确定数量的reduce任务
对应图中的步骤7
接下来,Application master需要根据作业的大小决定是否需要向Resource Manager申请container以运行map或reducer任务
(1)如果任务可以与Application master在同一个JVM中运行:其开销小于为这些任务单独申请container并运行的开销 —— 这样的作业称为uberized
(2)任务无法与Application master在同一个JVM中运行,则向Resource Manager为所有的map任务和reduce任务申请container
(3)container的请求顺序:首先为map任务请求container,且其优先级高于reduce任务的优先级。
(4) 实际上,直到有5%的map任务运行完成,reduce任务的资源请求才会发出对应图中的步骤8
一旦scheduler分配好了container,则Application master会与container所在的Node Manager建立通信,以启动container,从而运行任务
对应图中的步骤9
任务的运行是由名为
YarnChild
的主类来执行的,YarnChild
首先会将任务所需资源本地化(jar包、作业配置、输入数据等),然后再运行具体的map任务或reduce任务对应图中的步骤10和11
Shuffle
- 在同事的技术分享中,我发现无论是MR的shuffle还是spark的shuffle都是其
灵魂
- 所谓shuffle,英文意思是洗牌。这不是发牌者的那种混洗,而是持牌人的合并、排序
- 针对MR,从map任务的输出到reduce任务的输入,这一中间过程就是shuffle
- 具体来说,包括map端的shuffle和reduce端的shuffle
总结
结合yarn,描述MR任务的具体工作流程
- client提交作业,获取到一个submitter实例;执行
waitForCompletion()
轮询作业状态,作业失败或成功,退出该方法 - submitter向Resource Manager申请Application ID
- submitter负责检查作业的输出说明;计算split;将运行作业所需的各种资源组copy到一个共享目录下
- submitter调用
submitApplication()
方法,向yarn提交一个application - Resource Manager分配一个容器,用于运行Application master,其主类为
MRAppMaster
- Application master进行作业的初始化:创建簿籍对象以跟踪作业进度
- Application master从共享目录获取资源,从而计算出map任务和reduce任务数
- Application master向Resource Manager申请container:可能是uber任务,与Application master运行在同一个JVM;先申请map任务的container,再申请reduce任务的container
- Application master与Node Manager通信,使其启动container以运行任务
- 任务放到
YarnChild
中运行,YarnChild
会先从共享目录获取资源,然后再执行任务
- client提交作业,获取到一个submitter实例;执行
MR任务再运行过程中,可能出现的各种失败情况
任务失败:
- 三种情况会被视为任务失败:① 用户代码抛出异常,导致任务失败;② JVM突然退出;③ task在规定时间未向Application master上报信息
- 失败的任务会记入重试次数,如果超过重试次数,则整个作业失败
- 任务被kill不会记入重试次数
Application master失败
- 由Resource Manager负责检测Application master的运行情况
Node Manager的失败
- Node Manager崩溃或运行缓慢导致心跳超时,则被Resource Manager认为失败
- 重新运行Node Manager上未完成的任务、已完成的map任务
Resource Manager失败
- 需要提供高可用的Resource Manager,避免单点故障
关于shuffle
- map端的shuffle:环形缓冲区、分区、排序(可能存在combiner)、溢写、溢写文件的合并(可能存在combiner)
- reduce端的shuffle:map输出中对应分区的copy、分区的合并
第十二章 MR的其他知识
任务推测执行
- 为了避免运行缓慢的任务拖后腿,调度器会跟踪任务运行速度,为明显低于平均水平的任务创建推测副本
- 任务推测执行的目的:以牺牲集群的整体吞吐量为代价,通过冗余任务来提高作业的运行效率
- 不适合推测执行的三种情况:软件缺陷、非幂等任务、reduce任务
MR的提交协议
4. 自己的理解:作业或任务在执行的不同阶段,系统所需完成的操作
5. 书上的介绍:提交协议可以保证作业或任务,完全失败或成功
6. 作业的提交协议:
- setupJob()
,创建临时工作目录;
- commitJob()
,作业成功,删除临时工作空间、创建_SUCCESS
标志文件;
- abortJob()
,作业执行失败或被终止,删除临时工作空间(默认实现)
7. 任务的提交协议:
- setupTask()
,不做任何操作(默认实现);
- needsTask()
, 如果将其设置为false
,关闭提交阶段,不会执行commitTask()
或abortTask()
;
- commitTask()
,任务执行成功,将临时输出目录中的内容移动到输出目录
- abortTask()
,任务执行不成功,删除临时输出目录
- 如果存在推测执行,先执行成功的任务调用commitTask()
,否则调用abortTask()
;如果失败重试,重试成功的任务调用commitTask()
,之前失败的任务调用 abortTask()
MR的默认设置
- 输入、输出格式的默认设置:
TextInputFormat
、TextOutputFormat
- map和reduce类的设置:
Mapper
、Reducer
,partition类的设置:HashPartitioner
- map的输出key-value类型设置:使用默认mapper,将与输入的key-value类型一致
- reduce的输出key-value类型设置:使用默认reducer,将与输入的key-value类型一致
- 设置reduce任务数:
setNumReduceTasks(1)
streaming作业的默认设置
- map和reduce任务,输入输出的默认分隔符为
\t
,可以通过属性自定义分隔符 - 可以将字段组合,作为key
第十三章 MR的分片、输入输出格式
略
第十四章 MR的组成
各种计数器
第十五章 构建Hadoop集群
为何不使用RAID?
NameNode需要永久性储存文件元数据,可以使用RAID(磁盘阵列)做存储器
但是DataNode不建议使用RAID做存储器,主要原因有三个:
- 原因一: HDFS的多副本已经能满足冗余需求,无需再使用RAID
- 原因二: Hadoop的
JBOD
(just bound of disk)技术,通过实验表明会比RAID的读写更快。因为,JBOD再所有磁盘间循环调度数据块,取决于磁盘的平均读写速度。然而,RAID却取决于读写速度最慢的磁盘。 - 原因三: JBOD的某一磁盘故障,HDFS可以忽略该故障,而RAID中某一磁盘故障则整体瘫痪
守护进程的RPC和HTTP端口
- RPC端口,用于守护进程间相互通信
- HTTP端口,用于提供守护进程与用户交互的web页面
- 端口号,一方面决定了守护进程绑定的网络接口;另一方面决定了用户或其他守护进程与之交互的网络接口。
垃圾回收
- Hadoop文件系统也支持垃圾回收,对应的属性为
fs.trash.interval
- 默认值为0,表示不开启垃圾回收机制
- 垃圾回收是用户级特性,只有通过shell命令删除文件,才能被回收
- 如果通过编写应用程序,实现文件删除,则无法被回收
- 例外: 通过
moveToTrash()
方法将指定文件放入垃圾回收站,成功返回true
;垃圾回收特性未开启,或文件已经在垃圾回收站中,返回false
- 题外话: 运维清理过期数据,输入
rm -rf
命令立马enter后,发现命令使用错误,用户的最新数据没了。然后从故障复盘文档中,发现运维又通过垃圾回收恢复了文件。 - 因此,自己也才对垃圾回收特性有所关注的 😂
慢启动
- 一个普通的MR作业,包含map任务和reduce任务,map任务在reduce任务之前。
- 等所有的map任务都运行完成,再调度reduce任务?或者,一开始就让为map和reduce任务都调度?
- 为了不让reduce任务存在浪费容器不做事的情况,默认当map任务完成
5%
后,才调度reduce任务 - 实际使用时,为了提高集群额吞吐率,可以提高到
80%
Hadoop的验证机制
为了避免Hadoop文件系统的数据,被伪装的恶意用户删除,需要进行安全认证
雅虎工程师,基于
kerberos
(一种成熟的开源网络认证协议)实现了基于kerberos的安全认证kerberos认证只负责鉴定登陆账号是否为声称的用户,而用户的访问权限则是由Hadoop自己负责管理
kerberos的认证过程
- 用户向认证服务器发送一条请求报文,认证服务器返回一个带时间戳的票据授予票据(TGT)
- 用户使用TGT向票据授予服务器(TGS)请求一个服务票据
- 用户向真实的服务器出示服务票据,以证实自己的合法性。
- 注意: TGT具有一定的有效期,无需每次访问服务器都需要进行kerberos认证
第十六章 管理Hadoop集群
NameNode的内存和磁盘会存储了哪些信息?
- 内存:文件系统元数据、block的位置映射
- 磁盘:永久存储edits和fsimage
辅助NameNode如何创建fsimage(永久性检查点)?
- 主NameNode的roll操作
- 辅助NameNode从主NameNode获取最近的edits和fsimage文件,更新seen_txid
- 辅助NameNode基于fsimage逐条执行edits中的事务,创建fsimage.ckpt文件
- 辅助NameNode将fsimage.ckpt被发送给主NameNode
- 主NameNode将fsimage.ckpt更名为fsimage
NameNode的安全模式
- 启动NameNode,直到
99.99%
的block满足最小级别副本的过程 - 期间,对DataNode的写操作都会失败
- 例外:启动一个刚刚格式化的HDFS集群,NameNode不会进入安全模式
fsck工具:
- 能检查:过多副本的块、仍需复制的块、错误复制的块(实质:错误放置的块)、损坏的块(所有副本已损坏)、缺失的副本
- 需要关注:损坏的块、缺失的副本
节点的添加和移除
- include文件和exclude文件,针对DataNode和NodeManager的配置属性
- include文件和slaves文件的却别:前者针对特定守护进程,后者针对集群层面的所有守护进程
- 基于exclude和include文件,如何进行移除DataNode或NodeManager的判断
- 添加或移除文件的过程
第十七章 Flume
Flume概述
简单的Flume代理
- Flume的作用:向HDFS批量导入基于事件的海量数据
- Flume代理的三大基本组件:soure —> channel —> sink
- Flume代理中两个独立的事务,保证了source收集的事件,最终都会到达sink,保证at least once语义
- channel两种类型:file channel 和 memory channel,是否支持久化存储?是否具有较高吞吐等
- Flume的配置
一些典型的Flume配置:
基于HDFS sink的配置:通过timestamp拦截器,实现数据分区
扇出:一个source硬连接多个sink,分流写入或不分流写入,可设置optional channel
分层代理:利用多层代理,实现事件汇总;基于Avro sink - soure对的多层代理;多层代理的交付保证:Avro sink —> Avro source —> channel 2整个流程ok,返回确认 ;才能认为:channel 1 —> Avro sink的事务可以提交
基于Avro sink-source对连接的两层代理sink组:一个source 软连接多个sink,有利于负载均衡或故障转移
- 本文作者: 魏静崎
- 本文链接: https://slightwjq.github.io/2024/10/06/观Hadoop权威指南笔记的笔记/
- 版权声明: 该文章来源及最终解释权归作者所有