一、HDFS简介

本节大纲:

  1. Hadoop2介绍
  2. HDFS概述
  3. HDFS读写流程

 1.  Hadoop2介绍

Hadoop是Apache软件基金会旗下的一个分布式系统基础架构。Hadoop2的框架最核心的设计就是HDFS、MapReduce和YARN,为海量的数据提供了存储和计算。

HDFS主要是Hadoop的存储,用于海量数据的存储;

MapReduce主要运用于分布式计算;

YARN是Hadoop2中的资源管理系统。

Hadoop1和Hadoop2的结构对比:

Hadoop2主要改进:

YARN

通过YARN实现资源的调度与管理,从而使Hadoop 2.0可以运行更多种类的计算框架,如Spark等;

NameNode HA

实现了NameNode的HA方案,即同时有2个NameNode(一个Active另一个Standby),如果ActiveNameNode挂掉的话,另一个NameNode会转入Active状态提供服务,保证了整个集群的高可用;

HDFS federation

实现了HDFS federation,由于元数据放在NameNode的内存当中,内存限制了整个集群的规模,通过HDFS federation使多个NameNode组成一个联邦共同管理DataNode,这样就可以扩大集群规模;

Hadoop RPC序列化扩展性

Hadoop RPC序列化扩展性好,通过将数据类型模块从RPC中独立出来,成为一个独立的可插拔模块。

2. HDFS概述

HDFS是一个分布式文件系统,具有高容错的特点,它可以部署在廉价的通用硬件上,提供高吞吐率的数据访问,适合那些需要处理海量数据集的应用程序。

HDFS主要特性:

支持超大文件

一般是几百MB、GB甚至TB级别的文件

检测和快读应对硬件故障

HDFS的检测和冗余机制很好的克服了大量通用硬件平台上硬件故障的问题;

流式数据访问

HDFS处理的数据规模都很大,应用一次需要访问大量数据,同时,这些应用一般是批量处理,而不是用户交互式处理,HDFS使用户能够以流的形式访问数据集,注重的是数据的吞吐;

简化一致性模型

大部分的HDFS程序操作文件时,需要一次写入,多次读取,在HDFS中,一个文件一旦经过创建写入,关闭后一般不需要修改,这样简单的一致性模型有利于提高吞吐量。

HDFS不适合的场景:

低延迟数据访问

大量的小文件

多用户写入文件,修改文件

HDFS体系结构:

Namenode上保存着HDFS的名字空间,任何对文件系统元数据产生修改的操作都会作用于Namenode上。

Datanode将HDFS数据以文件的形式存储在本地的文件系统中,它并不知道有关HDFS文件的信息,它把每个HDFS数据块存储在本地文件系统的一个单独的文件中。

数据块:

HDFS也有块的概念,Hadoop2中HDFS块默认大小为128MB(此大小可以根据各自的业务情况进行配置,Hadoop1中HDFS块默认大小为64MB),以Linux上普通文件的形式保存在数据节点的文件系统中,数据块是HDFS的文件存储处理的单元。

数据块的好处:

HDFS可以保存比存储节点单一磁盘大的文件

简化了存储子系统和存储管理,也消除了分布式管理文件元数据的复杂性

方便容错,有利于数据复制

3. HDFS读写流程

读流程:

1、客户端client使用open函数打开文件;

2、DistributedFileSystem用RPC调用元数据节点,得到文件的数据块信息;

3、对于每一个数据块,元数据节点返回保存数据块的数据节点的地址;

4、DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据;

5、客户端调用FSDataInputStream的read函数开始读取数据;

6、FSDataInputStream连接保存此文件第一个数据块的最近的数据节点;

7、Data从数据节点读到客户端;

8、当此数据块读取完毕时,FSDataInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点;

9、当客户端读取数据完毕时,调用FSDataInputStream的close函数;

10、在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。失败的数据节点将被记录,以后不再连接。

写流程:

1、客户端client调用create函数创建文件;

2、DistributedFileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件;

3、元数据节点首先确定文件是否存在,并且客户端是否有创建文件的权限,然后创建新文件;

4、DistributedFileSystem返回FSDataOutputStream给客户端用于写数据;

5、客户端开始写入数据,FSDataOutputStream将数据分成块,写入data queue;

6、Data queue由DataStreamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块),分配的数据节点放在一个pipeline里;

7、DataStreamer将数据块写入pipeline中的第一个数据节点,第一个数据节点将数据块发送给第二个数据节点,第二个数据节点将数据发送给第三个数据节点;

8、FSDataOutputStream为发出去的数据块保存了ack queue,等待pipeline中的数据节点告知数据已经写入成功;

9、如果数据节点在写入的过程中失败,则进行以下几个操作:一是关闭pipeline并将ack queue中的数据块放入data queue的开始;二是当前数据块在已写入的数据节点中被元数据节点赋予新的标示,错误节点重启后察觉其数据块过时而被删除;三是失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点;四是元数据节点被通知此数据块的复制块数不足,从而再创建第三份备份;

10、当客户端结束写入数据,则调用close函数将所有的数据块写入pipeline中的数据节点,并等待ack queue返回成功,最后通知元数据节点写入完毕。

创建子路径流程:

删除数据流程:

二、HDFS开发实例

大纲:

演示hdfs部分功能开发过程

使用eclipse开发时,可以新建java工程然后手动导入jar包,也可以直接新建Map/Reduce工程,新建的同时会将相关jar包导入。

下面这个例子为展示hdfs文件系统指定目录下的文件。

在main方法里配置如下信息:

Configuration conf = new Configuration();//这一步需要引入import org.apache.hadoop.conf.Configuration;
conf.set("fs.defaultFS","hdfs://HDFS访问地址:9000"); //如果将文件系统的地址写在了src上的话,就不需要在这里再次设置了。
//文件系统的API,通过它来获得文件系统的一个实例
FileSystem fs = FileSystem.get(conf); //写这一步时需要抛出异常,并引入import org.apache.hadoop.fs.FileSystem;如果上一步没有设置配置文件,那么这一句代码就应该为下面这样:
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.149:9000"),conf);
//fs即hdfs文件系统的一个实例
//通过这个实例来获取某个路径下的一个元数据,假设文件路径为根目录
String dir = "/";
FileStatus[] filestatus = fs.listStatus(new Path(dir));//这一步需要引入import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path;
Path[] list = FileUtil.stat2Paths(filestatus);//这一步需要引入import org.apache.hadoop.fs.FileUtil;
for(Path path : list){
	System.out.println(path.toString());
}
fs.close();

同样的可以调用文件系统的其他方法:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.149:9000"),conf);
fs.mkdirs(new Path("/hi")); //创建一个文件夹
fs.delete(new Path("/hi"), true);//删除一个文件夹,true代表可以删除非空文件夹
fs.close();

put一个文件

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.149:9000"),conf);
Path src = new Path("要上传文件的路径");
Path dst = new Path("/hello");
fs.copyFromLocalFile(src, dst);
fs.close();

最终结果:

package com.jdk.hdfs;
 
import java.io.IOException;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
 
public class ls {
 
	/**
	 * @param args
	 * @throws IOException 
	 */
	public static void main(String[] args) throws IOException {
//		查看文件夹
//		String dir = "/";
//		Configuration conf = new Configuration(); 
////		conf.set("fs.defaultfs", "hdfs://196.168.152.141:9000/");  //如果是在本地运行,则可以直接使用conf,不需要加URI链接,如果运行的是集群,则需要添加URI链接,下同
//		FileSystem fs = FileSystem.get(conf);
//		FileStatus[] filestatus = fs.listStatus(new Path(dir));
//		Path[] list = FileUtil.stat2Paths(filestatus);
//		for (Path path : list){
//			System.out.println(path.toString());
//		}
//		fs.close();
		
//		新建文件夹
//		Configuration conf = new Configuration(); 
//		FileSystem fs = FileSystem.get(URI.create("hdfs://196.168.152.141:9000/"),conf);
//		fs.mkdirs(new Path("/hi"));    // 文件“hi”即为创建的文件夹
//		fs.close();
		
//		删除文件夹
//		Configuration conf = new Configuration(); 
//		FileSystem fs = FileSystem.get(URI.create("hdfs://196.168.152.141:9000/"),conf);
//		fs.delete(new Path("/hi"),true);    // 文件“hi”即为删除的文件夹
//		fs.close();
		
//		导入文件
//		Configuration conf = new Configuration(); 
//		FileSystem fs = FileSystem.get(URI.create("hdfs://196.168.152.141:9000/"),conf);
//		Path src = new Path("需要导入的文件的路径");
//		Path dst = new Path("导入到HDFS的目标文件夹,如刚才创建的“/hi”文件");
//		fs.close();
		
	}
}

读取hdfs文件

String dir = "/hello/study_hdfs.txt";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.1.149:9000"),conf);
FSDataInputStream file = fs.open(new Path(dir)); //需import org.apache.hadoop.fs.FSDataInputStream;
BufferedReader in = null; //需import java.io.BufferedReader;
String line;
in = new BufferedReader(new InputStreamReader(file, "UTF-8")); //需import java.io.InputStreamReader;
while(line = in.readLine() = null){
	System.out.println(line);
}
if(in != null){
	in.close();
}
fs.close();

最终结果:

package com.jdk.hdfs;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
public class ReadHDFS {
 
	/**
	 * @param args
	 * @throws IOException 
	 */
	public static void main(String[] args) throws IOException {
//		读取HDFS中的文件
		String dir = "HDFS中需要读取的目标文件及其目录";
		Configuration conf = new Configuration(); 
		FileSystem fs = FileSystem.get(conf);
		FSDataInputStream file = fs.open(new Path(dir));
		BufferedReader in = null;
		String line;
		in = new BufferedReader(new InputStreamReader(file,"UTF-8"));
		while((line = in.readLine()) != null){
			System.out.print(line);
		}
		if(in != null){
			in.close();
		}
		fs.close();
 
	}
 
}

直接在java文件上击右键导出,保存为jar包,然后放入集群中。

通过使用hadoop jar命令即可执行。

三、YARN原理介绍

大纲:

YARN产生背景

YARN基本架构及原理

1、YARN产生背景

Hadoop1.x 中的MapReduce构成图:

Hadoop 1.0的弊端包括:

  1. 扩展性差:JobTracker兼备资源管理和作业控制两个功能,这是整个系统的最大瓶颈,它严重制约了整个集群的可扩展性。
  2. 可靠性差:JobTracker存在单点故障,JobTracker出现问题将导致整个集群不可用。
  3. 资源利用率低:资源无法在多个任务间共享或合理分配,导致无法有效利用各种资源。
  4. 无法支持多种计算框架:Hadoop 1.0只支持MapReduce这种离线批处理计算模式,而无法支持内存计算、流式计算、迭代式计算等。

正是由于Hadoop 1.0存在以上这些弊端,所以Hadoop 2.0推出了资源管理器YARN,有效解决了上述问题。

2、YARN基本架构及原理

定义:

YARN是Hadoop 2.0的资源管理器。它是一个通用的资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

应用场景:

通用的统一的资源管理系统:

1)长应用程序;

2)短应用程序

YARN基本结构:

YARN的基本设计思想是将Hadoop 1.0中的JobTracker拆分成了两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配,而ApplicationMaster负责单个应用程序的管理,其基本架构如下图所示:

YARN总体上仍然是Master/Slave结构。在整个资源管理框架中,ResourceManager为Master,NodeManager为Slave,并通过HA方案实现了ResourceManager的高可用。ResourceManager负责对各个NodeManager上的资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。

ResourceManager:它是一个全局的资源管理器,负责整个系统的资源管理和分配,主要由调度器和应用程序管理器两个组件构成。

调度器:根据容量、队列等限制条件,将系统中的资源分配给各个正在运行的应用程序。调度器仅根据应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。

应用程序管理器:负责管理整个系统中所有的应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

ApplicationMaster:用户提交的每个应用程序均包含1个ApplicationMaster,主要功能包括与ResourceManager调度器协商以获取资源、将得到的任务进一步分配给内部的任务、与NodeManager通信以启动/停止任务、监控所有任务运行状态并在任务运行失败时重新为任务申请资源以重启任务等。

NodeManager:它是每个节点上的资源和任务管理器,它不仅定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态,还接收并处理来自ApplicationMaster的Container启动/停止等各种请求。

Container:它是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当ApplicationMaster向ResourceManager申请资源时,返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。

YARN工作流程:

YARN的工作流程如下图所示:

1:用户向YARN中提交应用程序,其中包括用户程序、ApplicationMaster程序、ApplicationMaster启动命令等。

2:ResourceManager为应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster。

3:ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后ApplicationMaster为各个任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4-7。

4:ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

5:一旦ApplicationMaster成功申请到资源,便开始与对应的NodeManager通信,要求它启动任务。

6:NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。

7:各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,使ApplicationMaster能够随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。

8:应用程序运行完成后,ApplicationMaster通过RPC协议向ResourceManager注销并关闭自己。

四、MapReduce原理介绍

大纲:

MapReduce介绍

MapReduce2运行原理

shuffle及排序

1、MapReduce介绍

定义:

MapReduce是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法,是Hadoop面向大数据并行处理的计算模型、框架和平台。

MapReduce执行流包括input、map、shuffle、reduce和output共5个过程,如下图所示:

2、MapReduce2运行原理

YARN框架下的Mapreduce工作流程如下图所示:

6

1:客户端向集群提交作业。

2:Job从ResourceManager获取新的作业应用程序ID。

3:客户端检查作业的输出情况,计算输入分片,并将作业jar包、配置、分片信息等作业资源复制到HDFS。

4:Job向ResourceManager提交作业。

5:ResourceManager接收到作业后,将作业请求传递给调度器,调度器根据作业信息为ResourceManager分配一个container,然后ResourceManager在NodeManager的管理下,在container中启动一个ApplicationMaster进程。

6:ApplicationMaster对作业进行初始化,并保持对作业的跟踪,判断作业是否完成。

7:ApplicationMaster根据存储在HDFS中的分片信息确定Map和Reduce的数量。

8:ApplicationMaster为本次作业的Map和Reduce以轮询的方式向ResourceManager申请container。

9:ApplicationMaster获取到container后,与NodeManager进行通信启动container。

10:container从HDFS中获取作业的jar包、配置和分布式缓存文件等,将任务需要的资源本地化。

11:container启动Map或Reduce任务。

MapReduce2进度状态更新:

7

shuffle及排序:

Mapreduce的map端输出作为输入传递给reduce端,并按键排序的过程称为shuffle。shuffle的字面含义是洗牌,即将map产生的数据通过分区、排序等过程分配给了不同的reduce任务。Mapreduce的数据处理流程如下图所示:

8

Map阶段:

1、每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M,可设置)为一个分片。map输出的结果会暂时放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制)。当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

2、在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行combine操作,这样做可以有效减少磁盘IO和网络IO。

3、当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combine操作,这样做是为了尽量减少每次写入磁盘的数据量和尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

4、将分区中的数据拷贝给相对应的reduce任务。那么分区中的数据如何知道它对应的reduce是哪个呢? ApplicationMaster保存了整个作业的宏观信息,只要reduce任务向ApplicationMaster获取对应的map输出位置就可以了。

Reduce阶段:

1、Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce接受的数据量相当小,则直接存储在内存中,如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中。

2、随着溢写文件的增多,后台线程会将它们合并成一个更大的有序文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序、合并操作,所以说排序是hadoop的灵魂。

3、在合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。

五、MapReduce实例上

大纲:

演示实例讲解

演示编写mapreduce实例

1、演示实例讲解

处理前数据:

1

需求:

字符串相同的数字相加

2、演示编写mapreduce实例

新建一个mapreduce工程,名为study2。

创建一个com.jdb.mapreduce

创建一个名为MRClazz的java文件

引入下图中的包:

2

在这个里面首先创建一个继承于Mapper类的ClazzMap类:

public static class ClazzMap extends Mapper<Object,Text,Text,Intwritable>{
	@Override
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
		String[] line = value.toString().split("\t");
		if(line.length == 2){
			context.write(new Text(line[0]), new IntWritable(Integer.parseInt(line[1])));
		}
	}
}

public static class ClazzReduce extends Reducer<Text, IntWritable, Text, Text>{
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		int i = 0;
		for(IntWritable v : values){
			i += v.get();
		}

		context.write(key, new Text(String.valueOf(i)));
	}
}

public static void main(String[] args){
	//args0 : dst
	//args1 : out
	//args2 : split MB
	//args3 : reduce num

	String dst = args[0];
	String out = args[1];
	int splitMB = Integer.valueOf(args[2]);
	int reduceNum = Integer.valueOf(args[3]);

	Configuration conf = new Configuration();
	conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(splitMB * 1024 * 1024));
	conf.set("mapred.min.split.size", String.valueOf(splitMB * 1024 * 1024));
	conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(splitMB * 1024 * 1024));
	conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(splitMB * 1024 * 1024));

	Job job = new Job(conf);
	FileInputFormat.addInputPath(job, new Path(dst));
	FileOutputFormat.setOutputPaht(job, new Path(out));

	job.setMapperClass(ClazzMap.class);
	job.setReducerClass(ClazzReduce.class);

	job.setNumReduceTasks(reduceNum);

	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(IntWritable.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

	job.setJarByClass(MRClazz.class);

	job.waitForCompletion(true);
}

导出该类,取名为mrclazz.jar

放到集群上,使用hadoop jar mrclazz.jar com.jdb.mapreduce.MRClazz /edu/mr1 /edu/out1 128 2

数据量比较大的时候,可以设置combiner

首先需要改输出,Reduce的输出要还给Reduce,即输出保持一致:

public static class ClazzReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
		int i = 0;
		for(IntWritable v : values){
			i += v.get();
		}

		context.write(key, new IntWritable(i));
	}
}

main函数中需要设置:

job.setCombinerClass(ClazzReduce.class);

打包为mrclazz2.jar

执行:hadoop jar mrclazz2.jar com.jdb.mapreduce.MRClazz /edu/mr1 /edu/out2 128 2

查看输出结果:hadoop fs -cat /edu/out2/part-r-00000 | head -10 (后面两个参数表示查看前十行)。

如果觉得我的文章对你有用,请随意赞赏