18

Hadoop简介

Hadoop469 views

集群搭建

  • 通常,集群里的一台机器被指定为 NameNode,另一台的机器被指定为JobTracker,这些机器是masters。余下的机器即作为DataNode也作为TaskTracker。这些机器是slaves
  • 对Hadoop的配置通过conf/目录下的两个重要配置文件完成:
  • 此外,通过设置conf/hadoop-env.sh中的变量为集群特有的值,你可以对bin/目录下的Hadoop脚本进行控制。
  • 配置Hadoop守护进程的运行参数
  • 这部分涉及Hadoop集群的重要参数,这些参数在conf/hadoop-site.xml中指定。
  1. hadoop-default.xml – 只读的默认配置。
  2. hadoop-site.xml – 集群特有的配置。
参数 取值 备注
fs.default.name NameNode的URI。 hdfs://主机名/
mapred.job.tracker JobTracker的主机(或者IP)和端口。 主机:端口
dfs.name.dir NameNode持久存储名字空间及事务日志的本地文件系统路径。 当这个值是一个逗号分割的目录列表时,nametable数据将会被复制到所有目录中做冗余备份。
dfs.data.dir DataNode存放块数据的本地文件系统路径,逗号分割的列表。 当这个值是逗号分割的目录列表时,数据将被存储在所有目录下,通常分布在不同设备上。
mapred.system.dir Map/Reduce框架存储系统文件的HDFS路径。比如/hadoop/mapred/system/。 这个路径是默认文件系统(HDFS)下的路径, 须从服务器和客户端上均可访问。
mapred.local.dir 本地文件系统下逗号分割的路径列表,Map/Reduce临时数据存放的地方。 多路径有助于利用磁盘i/o。
mapred.tasktracker.{map|reduce}.tasks.maximum 某一TaskTracker上可运行的最大Map/Reduce任务数,这些任务将同时各自运行。 默认为2(2个map和2个reduce),可依据硬件情况更改。
dfs.block.size 每个block的大小,byte  
dfs.replication Block缺省的副本数量  
mapred.reduce.tasks 每个任务启动的reduce task数量  

 

HDFS架构和设计简介

  • HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。Namenode执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。
  • 文件系统的名字空间 (namespace)

HDFS支持传统的层次型文件组织结构。 Namenode负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被Namenode记录下来

  • 数据复制

每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有副本

  • 副本选择

为了降低整体的带宽消耗和读取延时,HDFS会尽量让读取程序读取离它最近的副本

  • 安全模式

Namenode启动后会进入一个称为安全模式的特殊状态。 Namenode从所有的 Datanode接收心跳信号和块状态报告,块状态报告包括了某个Datanode所有的数据块列表。

使用指南

  • 启动Hadoop

bin/start-dfs.sh

bin/start-dfs.sh脚本会参照NameNode上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动DataNode守护进程。

bin/start-mapred.sh

bin/start-mapred.sh脚本会参照JobTracker上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动TaskTracker守护进程。

bin/start-all.sh

  • 停止Hadoop

bin/stop-dfs.sh

bin/stop-mapred.sh

bin/stop-all.sh

  • Web接口
  • hdfs

http://analyze03.ad.zzzz:50070/dfshealth.jsp

  • Map/reduce

http://analyze03.ad.zzzz:50030/jobtracker.jsp

FS Shell使用指南

  • 调用文件系统(FS)Shell命令应使用 bin/hadoop fs <args>的形式。
    • cat
    • copyFromLocal
    • copyToLocal
    • cp
    • get
    • getmerge
    • ls
    • lsr
    • mkdir
    • movefromLocal
    • mv
    • put
    • rm

rmr

Map/Reduce核心功能描述

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。

Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。

  • Mapper

Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对集合。

Map是一类将输入记录集转换为中间格式记录集的独立任务。 这种转换的中间格式记录集不需要与输入记录集的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。

Hadoop Map/Reduce框架为每一个InputSplit产生一个map任务,而每个InputSplit是由该作业的InputFormat产生的。

  • Reducer

Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。

用户可以通过 JobConf.setNumReduceTasks(int)设定一个作业中reduce任务的数目。

Reducer有3个主要阶段:shuffle、sort和reduce。

  • Shuffle

Reducer的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。

  • Sort

这个阶段,框架将按照key的值对Reducer的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。

  • Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
  • 无Reducer

如果没有归约要进行,那么设置reduce任务的数目为零是合法的。

这种情况下,map任务的输出会直接被写入由 setOutputPath(Path)指定的输出路径。

  • Partitioner

Partitioner用于划分键值空间(key space)。

Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。

  • Reporter

Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。

  • OutputCollector

OutputCollector是一个Map/Reduce框架提供的用于收集 Mapper或Reducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。

作业配置

  • JobConf代表一个Map/Reduce作业的配置。
  • JobConf是用户向Hadoop框架描述一个Map/Reduce作业如何执行的主要接口。框架会按照JobConf描述的信息忠实地去尝试完成这个作业,然而:
  • 通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path…) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。
  • TaskTracker是在一个单独的jvm上以子进程的形式执行 Mapper/Reducer任务(Task)的。
  • 子任务会继承父TaskTracker的环境。用户可以通过JobConf中的 mapred.child.java.opts配置参数来设定子jvm上的附加选项,例如: 通过-Djava.library.path=<> 将一个非标准路径设为运行时的链接用以搜索共享库,等等。如果mapred.child.java.opts包含一个符号@taskid@, 它会被替换成map/reduce的taskid的值。
  • 用户或管理员也可以使用mapred.child.ulimit设定运行的子任务的最大虚拟内存。mapred.child.ulimit的值以(KB)为单位,并且必须大于或等于-Xmx参数传给JavaVM的值,否则VM会无法启动。
  • JobClient是用户提交的作业与JobTracker交互的主要接口。
  • JobClient 提供提交作业,追踪进程,访问子任务的日志记录,获得Map/Reduce集群状态信息等功能。
  • 作业提交过程包括:
  1. 一些参数可能会被管理者标记为 final,这意味它们不能被更改。
  2. 一些作业的参数可以被直截了当地进行设置(例如: setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如: setNumMapTasks(int))。

任务的执行和环境

作业的提交与监控

  1. 检查作业输入输出样式细节
  2. 为作业计算InputSplit值。
  3. 如果需要的话,为作业的DistributedCache建立必须的统计信息。
  4. 拷贝作业的jar包和配置文件到FileSystem上的Map/Reduce系统目录下。
  5. 提交作业到JobTracker并且监控它的状态。

作业的历史文件记录到指定目录的”_logs/history/”子目录下。这个指定目录由hadoop.job.history.user.location设定,默认是作业输出的目录。因此默认情况下,文件会存放在mapred.output.dir/_logs/history目录下。

  • 用户使用下面的命令可以看到在指定目录下的历史日志记录的摘要。

$ bin/hadoop job -history output-dir

这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。

  • 要查看有关作业的更多细节例如成功的任务、每个任务尝试的次数(task attempt)等,可以使用下面的命令

$ bin/hadoop job -history all output-dir

作业的输入

  • InputFormat 为Map/Reduce作业描述输入的细节规范。
  • Map/Reduce框架根据作业的InputFormat来:
  1. 检查作业输入的有效性。
  2. 把输入文件切分成多个逻辑InputSplit实例, 并把每一实例分别分发给一个 Mapper。
  3. 提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录, 这些记录将由Mapper处理。

基于文件的InputFormat实现(通常是 FileInputFormat的子类) 默认行为是按照输入文件的字节大小,把输入数据切分成逻辑分块(logical InputSplit )。 其中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限可以设置mapred.min.split.size 的值。

考虑到边界情况,对于很多应用程序来说,很明显按照文件大小进行逻辑分割是不能满足需求的。 在这种情况下,应用程序需要实现一个RecordReader来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。

  • TextInputFormat 是默认的InputFormat。

如果一个作业的Inputformat是TextInputFormat, 并且框架检测到输入文件的后缀是.gz和.lzo,就会使用对应的CompressionCodec自动解压缩这些文件。 但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给一个mapper来处理。

作业的输出

  • OutputFormat 描述Map/Reduce作业的输出样式。
  • Map/Reduce框架根据作业的OutputFormat来:
  • TextOutputFormat是默认的 OutputFormat。
  • Counters
  1. 检验作业的输出,例如检查输出路径是否已经存在。
  2. 提供一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。

其他有用的特性

Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以汇集到一个组,其类型为Counters.Group。

应用程序可以定义任意(Enum类型)的Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。

  • DistributedCache

DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。

DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。

Java版WordCount示例

public static class MapClass extends MapReduceBase

    implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    public void map(LongWritable key, Text value,

                    OutputCollector<Text, IntWritable> output,

                    Reporter reporter) throws IOException {

      String line = value.toString();

      StringTokenizer itr = new StringTokenizer(line);

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        output.collect(word, one);

      }

    }

  }

public static class Reduce extends MapReduceBase

    implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,

                       OutputCollector<Text, IntWritable> output,

                       Reporter reporter) throws IOException {

      int sum = 0;

      while (values.hasNext()) {

        sum += values.next().get();

      }

      output.collect(key, new IntWritable(sum));

    }

  }

JobConf conf = new JobConf(getConf(), WordCount.class);

    conf.setJobName(“wordcount”);

    // the keys are words (strings)

    conf.setOutputKeyClass(Text.class);

    // the values are counts (ints)

    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);       

    conf.setCombinerClass(Reduce.class);

    conf.setReducerClass(Reduce.class);

    …

    FileInputFormat.setInputPaths(conf, other_args.get(0));

    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

    JobClient.runJob(conf);

Hadoop Streaming

  • Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。

Streaming工作原理

  • mapper和reducer从标准输入读入数据(一行一行读),并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业,并把它发送给合适的集群,同时监视这个作业的整个执行过程。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key值,value值为null。不过,这可以定制。

将文件打包到提交的作业中

  • 任何可执行文件都可以被指定为mapper/reducer。这些可执行文件不需要事先存放在集群上;如果在集群上还没有,则需要用-file选项让framework把可执行文件作为作业的一部分,一起打包提交。
  • 除了可执行文件外,其他mapper或reducer需要用到的辅助文件(比如字典,配置文件等)也可以用这种方式打包上传。
  • 只使用Mapper的作业

Streaming选项与用法

有时只需要map函数处理输入数据。这时只需把mapred.reduce.tasks设置为零,Map/reduce框架就不会创建reducer任务,mapper任务的输出就是整个作业的最终输出。

为了做到向下兼容,Hadoop Streaming也支持“-reduce None”选项,它与“-jobconf mapred.reduce.tasks=0”等价。

  • 为作业指定其他插件

和其他普通的Map/Reduce作业一样,用户可以为streaming作业指定其他插件:

-inputformat JavaClassName -outputformat JavaClassName -partitioner JavaClassName -combiner JavaClassName 用于处理输入格式的类要能返回Text类型的key/value对。如果不指定输入格式,则默认会使用TextInputFormat。因为TextInputFormat得到的key值是LongWritable类型的(其实key值并不是输入文件中的内容,而是value偏移量),所以key会被丢弃,只把value用管道方式发给mapper。

用户提供的定义输出格式的类需要能够处理Text类型的key/value对。如果不指定输出格式,则默认会使用TextOutputFormat类。

  • Hadoop Streaming中的大文件和档案

任务使用-cacheFile和-cacheArchive选项在集群中分发文件和档案,选项的参数是用户已上传至HDFS的文件或档案的URI。这些文件和档案在不同的作业间缓存。用户可以通过fs.default.name.config配置参数的值得到文件所在的host和fs_port。

这个是使用-cacheFile选项的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink 在上面的例子里,url中#后面的部分是建立在任务当前工作目录下的符号链接的名字。这里的任务的当前工作目录下有一个“testlink”符号链接,它指向testfile.txt文件在本地的拷贝。如果有多个文件,选项可以写成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

为作业指定附加配置参数

  • 用户可以使用“-jobconf <n>=<v>”增加一些配置变量。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

    -input myInputDirs \

    -output myOutputDir \

    -mapper org.apache.hadoop.mapred.lib.IdentityMapper\

    -reducer /bin/wc \

    -jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用两个reducer完成作业。

选项 可选/必须 描述
-cluster name 可选 在本地Hadoop集群与一个或多个远程集群间切换
-dfs host:port or local 可选 覆盖作业的HDFS配置
-jt host:port or local 可选 覆盖作业的JobTracker配置
-additionalconfspec specfile 可选 用一个类似于hadoop-site.xml的XML文件保存所有配置,从而不需要用多个”-jobconf”类型的选项单独为每个配置变量赋值
-cmdenv 可选 传递环境变量给streaming命令
-cacheFile fileNameURI 可选 指定一个上传到HDFS的文件
-cacheArchive fileNameURI 可选 指定一个上传到HDFS的jar文件,这个jar文件会被自动解压缩到当前工作目录下
-inputreader JavaClassName 可选 为了向下兼容:指定一个record reader类(而不是input format类)
-verbose 可选 详细输出

Streaming示例

  • /opt/hadoop/bin/hadoop

jar /opt/hadoop/contrib/streaming/hadoop-0.19.1-streaming.jar

-mapper /opt/hadoop/mapper.php

-reducer /opt/hadoop/reducer.php

-input /test/*

-output /testout

-file /opt/hadoop/mapper.php

-file /opt/hadoop/reducer.php

运行结果

distribution    7

feature 1

shows   1

vmware  1

and     29

developing      1

doing   1

freetds 1

machine 3

ming    5

Map代码

#!/opt/php/bin/php

<?

$word2count = array();

// input comes from STDIN (standard input)

while (($line = fgets(STDIN)) !== false) {

        // remove leading and trailing whitespace and lowercase

        $line = strtolower(trim($line));

        // split the line into words while removing any empty string

        $words = preg_split(‘/\W/’, $line, 0, PREG_SPLIT_NO_EMPTY);

        // increase counters

        foreach ($words as $word) {

                $word2count[$word] += 1; 

        }

}

// write the results to STDOUT (standard output)

// what we output here will be the input for the

// Reduce step, i.e. the input for reducer.py

foreach ($word2count as $word => $count) {

        // tab-delimited

        echo $word, chr(9),$count, PHP_EOL;

}

?>

Reducer代码

#!/usr/bin/php

<?

$word2count = array();

// input comes from STDIN

while (($line = fgets(STDIN)) !== false) {

         // remove leading and trailing whitespace

         $line = trim($line);   

         // parse the input we got from mapper.php   

         list($word, $count) = explode(chr(9), $line);

         // convert count (currently a string) to int

         $count = intval($count);

         // sum counts

         if ($count > 0) $word2count[$word] += $count;

}

// sort the words lexigraphically

//

// this set is NOT required, we just do it so that our

// final output will look more like the official Hadoop

// word count examples

ksort($word2count);

// write the results to STDOUT (standard output)

foreach ($word2count as $word => $count) {

         echo $word, chr(9), $count, PHP_EOL;

}

?>


Tags:

作者:Jock

Leave a Reply

You must be logged in to post a comment.

Switch to our mobile site