ITPub博客

首页 > 大数据 > Hadoop > 《Hadoop管理五》MapReduce类型常用的InputFormat

《Hadoop管理五》MapReduce类型常用的InputFormat

Hadoop 作者:dudu103zw 时间:2014-02-18 16:06:04 0 删除 编辑
MapReduce过程Mapper的输出参数和Reducer的输入参数是一样的,都是中间需要处理的结果,而Reducer的输出结果便是我们想要的输出结果。所以根据需要对InputFormat进行较合理的设置,Job才能正常运行。Job过程中间的Key和Value的对应关系可以简单阐述如下:
map:                             ->     list(k2,v2)
combile:                   ->     list(k2,v2)
reduce:                    ->     list(k3,v3)
至于为什么需要显示指定中间、最终的数据类型,貌似看上去很奇怪,原因是Java的泛型机制有很多限制,类型擦出导致运行过程中类型信息并非一直可见,所以Hadoop不得不明确指定。
InputFormat的结构图如下:
《Hadoop管理五》MapReduce类型常用的InputFormat
还有想说明的是,单个reducer的默认配置对于新手而言很容易上手,但是在真实的应用中,reducer被设置成一个较大的数字,否则作业效率极低。reducer的最大个数与集群中reducer的最大个数有关,集群中reducer的最大个数由节点数与每个节点的reducer数相乘得到。该值在mapred.tasktracker.reduce.tasks.maximum决定
下面介绍一些常用的InputFormat和用法。
FileInpuFormat
FileInputFormat是所有使用文件作为数据源的InputFormat的积累。它提供两个功能:一个是定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现。自动将作业分块 作业分块大小与mapred-site.xml中的mapred.min.split.size和mapred.min.split.size和blocksize有关系。分片大小由如下公式来决定:
分片大小 = max(minimumSize, min(maximumSize, blockSize))
如果想避免文件被切分,可以采用如下两种之一,不过推荐第二种。
1)设置minimum size 大于文件大小即可
2)使用FileInputFormat子类并重载isSplitable方法返回false
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat {
  @Override
  protected boolean isSplitable(FileSystem fs, Path file) {
    return false;
  }
}
CombileFileInputFormat
CombileFileInputFormat是为了解决大批量的小文件作业。
TextInputFormat(LongWritable,Text:字节偏移量,每行的内容)
默认的InputFormat。键是改行文件在源文件中的偏移量,值是该行内容(不包括终止符,如换行符或者回车符)。如
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
被表示成键值对如下:
<0, On the top of the Crumpetty Tree>
<33, The Quangle Wangle sat,>
<57, But his face you could not see,>
<89, On account of his Beaver Hat.>
KeyValueTextInputFormat
如果文件中的每一行就是一个键值对,使用某个分界符进行分隔,比如Tab分隔符。例如Hadoop默认的OutputFormat产生的输出,即是每行用Tab分隔符分隔开的键值对。
可以通过key.value.separator.in.input.line属性来指定分隔符,默认的值是一个Tab分隔符。(注: → 代表一个Tab分隔符)
line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.
被表示成键值对如下:
NLineInputFormat
以行号来分割数据源文件。N作为输入的行数,可以有mapred.line.input.format.linespermap来指定。
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
如果N是2,则一个mapper会收到前两行键值对:
<0, On the top of the Crumpetty Tree>
<33, The Quangle Wangle sat,>
另一个mapper会收到后两行:
<57, But his face you could not see,>
<89, On account of his Beaver Hat.>
posted @ 2012-07-25 20:15 hanyuanbo 阅读(232) 评论(0) 编辑
《Hadoop管理四》从Secondary Namenode恢复Namenode
介绍如何从Secondary Namenode的checkpoint点恢复Namenode,对于以后理解从Checkpoint Node 和Backup Node恢复很有帮助。
在core-site.xml的配置文件中,设置了checkpoint的时间间隔、大小限制和存储位置。
  fs.checkpoint.dir
  ${hadoop.tmp.dir}/dfs/namesecondary
  Determines where on the local filesystem the DFS secondary
      name node should store the temporary images to merge.
      If this is a comma-delimited list of directories then the image is
      replicated in all of the directories for redundancy.
 




  fs.checkpoint.period
  3600
  The number of seconds between two periodic checkpoints.
 

  fs.checkpoint.size
  67108864
  The size of the current edit log (in bytes) that triggers
       a periodic checkpoint even if the fs.checkpoint.period hasn't expired.
 
如果在规定时间内没有edits 大小没有达到上限,则Hadoop会强制执行checkpoint过程,如果没有达到时间间隔但是edits大小已经达到上限,则也会强制执行checkpoint过程。
 
 
恢复过程如下:
1. 将dfs.name.dir所指向的文件夹下的内容全部删掉。 rm -rf *
2. 将Secondary Namenode节点上core-site.xml配置文件中fs.checkpoint.dir指定路径下的文件到Namenode节点上core-site.xml配置文件中fs.checkpoint.dir指定的路径下
3. 执行 bin/hadoop namenode –importCheckpoint(需要较长的时间,需要加载到内存中)
4. 执行 bin/hadoop fsck /
5. 删除Namenode节点的namesecondary目录下的文件
6. 启动Namenode守护进程。bin/hadoop-daemon.sh start namenode
7. 执行balancer。 bin/hadoop balancer 
posted @ 2012-07-25 20:12 hanyuanbo 阅读(101) 评论(0) 编辑
《Hadoop管理三》集群移除旧节点
Hadoop集群管理员可能需要从集群中移除节点。过程其实很简单:
将待移除的节点的ip地址添加到exclude文件中,exclude文件有由hdfs-site.xml中的dfs.hosts.exclude指定的文件内容指定。
重启MapReduce集群,这是为了终止在待移除节点上运行的tasktracker。
执行命令 bin/hadoop dfsadmin -refreshNodes。 这个过程Hadoop会将待移除节点上的数据移动到其他的节点上。此时待移除节点处在Decommission in Progress。
当所有的节点变为 Decommissioned状态的时候,即可关闭待移除的节点机器
从conf/slaves、dfs.hosts.exclude和mapred.hosts.exclude指定的文件中移除待移除节点的ip
执行命令 bin/hadoop dfsadmin -refreshNodes。
 
dfs.hosts.exclude
Names a file that contains a list of hosts that are
not permitted to connect to the namenode. The full pathname of the
file must be specified. If the value is empty, no hosts are
excluded.
 
  mapred.hosts.exclude
 
  Names a file that contains the list of hosts that
  should be excluded by the jobtracker.  If the value is empty, no
  hosts are excluded.
posted @ 2012-07-25 19:56 hanyuanbo 阅读(52) 评论(0) 编辑
《Hadoop管理二》集群添加新节点
Hadoop集群管理员需要经常向集群中添加节点,过程其实很简单:
按照之前datanode上的过程在新的机器上安装JDK、无密码SSH登录、解压相应版本的Hadoop
按照之前datanode上的过程在新的机器上配置hadoop-env.sh、core-site.xml、hdfs-site.xml、mapred-site.xml四个配置文件,将JAVA_HOME、hadoop.tmp.dir和fs.default.name、 dfs.replication、mapred.job.tracker进行相应的配置
配置网络地址加入到原来的集群中去
修改Namenode上的dfs.hosts(在hdfs-site.xml中有指定文件)和mapred.hosts(在mapred-site.xml中有指定文件),将新机器的ip地址加入其中。(如果有指定文件的话必须加入新的机器的ip地址。但是dfs.hosts和mapred.hosts属性可以为空,即不指定,这样所有的机器都可以加入集群)修改conf/slaves文件将新机器的ip地址加入其中
启动守护进程,命令分别是 bin/hadoop-daemon.sh start datanode 和 bin/hadoop-daemon.sh start tasktracker
还需要进行一下balance操作,将原来集群中的文件进行负载均衡存储到新的节点上,命令为bin/start-balancer.sh。
 
注:
dfs.hosts和mapred.hosts与conf/slaves 的区别。
dfs.host和mapred.hosts属性指定的文件不同于slaves文件。前者供Namenode和jobtracker使用,用来决定可以链接哪个工作节点。Hadoop控制脚本使用slaves文件执行集群范围的操作,例如重启集群。Hadoop守护进程从不使用slaves文件
balancer
1)如果不执行balancer命令,那么集群会把新的数据都存放在新的node上,这样会降低mapred的工作效率 
2)设置平衡阈值,默认是10%,值越低各节点越平衡,但消耗时间也更长。bin/start-balancer.sh -threshold 5 
3)设置balance的带宽,dfs.balance.bandwidthPerSec属性,默认只有1M/s。(在hdfs-site.xml中)
  dfs.balance.bandwidthPerSec
  1048576
 
        Specifies the maximum amount of bandwidth that each datanode
        can utilize for the balancing purpose in term of
        the number of bytes per second.
 
 
dfs.hosts
  dfs.hosts
 
  Names a file that contains a list of hosts that are
  permitted to connect to the namenode. The full pathname of the file
  must be specified.  If the value is empty, all hosts are
  permitted.
 
mapred.hosts
dfs.hosts

  dfs.hosts
 
  Names a file that contains a list of hosts that are
  permitted to connect to the namenode. The full pathname of the file
  must be specified.  If the value is empty, all hosts are
  permitted.
posted @ 2012-07-25 17:39 hanyuanbo 阅读(128) 评论(0) 编辑
《Hadoop管理一》checkpoint原理和过程
理解这两个概念,对于理解Hadoop是如何管理备份,Secondary Namenode、Checkpoint Namenode和Backup Node如何工作的很重要。
fsimage:文件是文件系统元数据的一个永久性检查点,包含文件系统中的所有目录和文件idnode的序列化信息。
edits:文件系统的写操作首先把它记录在edit中
将文件系统个元数据操作分开操作,是为了提升内存的处理效率。如果不分开处理,即所有的写操作均记录在一个文件中,比如,fsimage中,那么每个操作都会对这个文件进行修改,因为这个文件可能会很大,所以每次进行写操作的时候就会很慢,随着fsimage越来越大,速度便会越来越低。
Hadoop的解决方案是辅助Namenode节点,文件系统的写操作不是直接被修改到fsimage中,二是edits中,辅助Namenode节点负责将两者在自己的内存中整合然后进行相应的替换操作,这样会频繁的对edits进行修改而不是fsimage,而edits文件的大小是可以接受的,而且大小也不会不断增加。具体的checkpoint执行过程如下:
《Hadoop管理五》MapReduce类型常用的InputFormat
 
以下即是checkpoint过程:
辅助Namenode请求主Namenode停止使用edits文件,暂时将新的写操作记录到一个新文件中,如edits.new。
辅助Namenode节点从主Namenode节点获取fsimage和edits文件(采用HTTP GET)
辅助Namenode将fsimage文件载入到内存,逐一执行edits文件中的操作,创建新的fsimage文件
辅助Namenode将新的fsimage文件发送回主Namenode(使用HTTP POST)
主Namenode节点将从辅助Namenode节点接收的fsimage文件替换旧的fsimage文件,用步骤1产生的edits.new文件替换旧的edits文件(即改名)。同时更新fstime文件来记录检查点执行的时间
注:从Hadoop0.21.0开始,辅助Namenode已经放弃不用,由checkpoint节点取而代之,功能不变。新版本同时引入一种新的Namenode,名为BackupNode。
<!-- 正文结束 -->

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/22115641/viewspace-1119926/,如需转载,请注明出处,否则将追究法律责任。

上一篇: 没有了~
下一篇: 没有了~
请登录后发表评论 登录
全部评论

注册时间:2009-07-15