Hadoop MapReduce
简介
一种分布式的计算方式指定一个Map(映#x5C04;)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组
Pattern
map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)
Map输出格式和Reduce输入格式一定是相同的
基本流程
MapReduce主要是先读取文件数据,然后进行Map处理,接着Reduce处理,最后把处理结果写到文件中
详细流程
多节点下的流程
主要过程
Map Side
Record reader
记录阅读器会翻译由输入格式生成的记录,记录阅读器用于将数据解析给记录,并不分析记录自身。记录读取器的目的是将数据解析成记录,但不分析记录本身。它将数据以键值对的形式传输给mapper。通常键是位置信息,值是构成记录的数据存储块.自定义记录不在本文讨论范围之内.
Map
在映射器中用户提供的代码称为中间对。对于键值的具体定义是慎重的,因为定义对于分布式任务的完成具有重要意义.键决定了数据分类的依据,而值决定了处理器中的分析信息.本书的设计模式将会展示大量细节来解释特定键值如何选择.
Shuffle and Sort
ruduce任务以随机和排序步骤开始。此步骤写入输出文件并下载到本地计算机。这些数据采用键进行排序以把等价密钥组合到一起。
Reduce
reducer采用分组数据作为输入。该功能传递键和此键相关值的迭代器。可以采用多种方式来汇总、过滤或者合并数据。当ruduce功能完成,就会发送0个或多个键值对。
输出格式
输出格式会转换最终的键值对并写入文件。默认情况下键和值以tab分割,各记录以换行符分割。因此可以自定义更多输出格式,最终数据会写入HDFS。类似记录读取,自定义输出格式不在本书范围。
示例场景
下面给出了关于组织的电力消耗的数据。它包含每月的电力消耗和各年的年平均值。
一月 | 二月 | 三月 | 四月 | 五月 | 六月 | 七月 | 八月 | 九月 | 十月 | 十一月 | 十二月 | 平均 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979年 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980年 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981年 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984年 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985年 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果上述数据作为输入,我们必须编写应用程序来处理它,并产生结果,如找到最大使用年份,最小使用年份等。这是一个对于有限数量的记录的程序员的walkover。它们将简单地写入逻辑以产生所需的输出,并将数据传递给所写的应用程序。
但是,考虑一个特定国家的所有大型产业的电力消耗的数据,因为它的形成。
当我们编写应用程序来处理这样的批量数据时,
- 他们将需要很多时间来执行。
- 当我们将数据从源服务器移动到网络服务器时,会有很大的网络流量,等等。
为了解决这些问题,我们有MapReduce框架。
输入数据
上述数据保存为sample.txt并作为输入。输入文件如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例程序
下面给出了程序对使用MapReduce框架的示例数据。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line," "); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(ProcessUnits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
将上述程序保存为ProcessUnits.java。程序的编译和执行说明如下。
过程单元程序的编译和执行
让我们假设在Hadoop用户的主目录(例如/home/hadoop)。。
按照以下步骤编译并执行上述程序。
第1步
以下命令是创建一个目录来存储编译的java类。
$ mkdir units
第2步
下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载jar。让我们假设下载的文件夹是/ home / hadoop /。
第3步
以下命令用于编译ProcessUnits.java程序并为该程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
第4步
以下命令用于在HDFS中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
第5步
下命令用于复制名为sample.txt的输入文件,在HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
第6步
以下命令用于验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
第7步
以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件被执行。执行后,如下所示,输出将包含输入拆分的数量,Map任务的数量,reducer任务的数量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
第8步
以下命令用于验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
第9步
以下命令用于查看Part-00000文件中的输出。此文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是MapReduce程序生成的输出。
1981 34 1984 40 1985 45
第10步
以下命令用于将输出文件夹从HDFS复制到本地文件系统进行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要命令
所有Hadoop命令都由$ HADOOP_HOME / bin / hadoop命令调用。运行不带任何参数的Hadoop脚本会打印所有命令的描述。
用法 :hadoop [--config confdir] COMMAND
下表列出了可用的选项及其说明。
选项 | 描述 |
---|---|
namenode -format | 格式化DFS文件系统。 |
secondarynamenode | 运行DFS二次名称节点。 |
namenode | 运行DFS名称节点。 |
datanode | 运行DFS数据节点。 |
dfsadmin | 运行DFS管理客户端。 |
mradmin | 运行Map-Reduce管理客户端。 |
fsck | 运行DFS文件系统检查实用程序。 |
fs | 运行通用文件系统用户客户端。 |
balancer | 运行集群平衡实用程序。 |
oiv | 将离线fsimage查看器应用于fsimage。 |
fetchdt | 从NameNode获取委派令牌。 |
jobtracker | 运行MapReduce作业跟踪节点。 |
pipes | 运行管道作业。 |
tasktracker | 运行MapReduce任务跟踪节点。 |
historyserver | 作为独立的守护程序运行作业历史记录服务器。 |
job | 操作MapReduce作业。 |
queue | 获取有关JobQueues的信息。 |
version | 打印版本。 |
jar <jar> | 运行jar文件。 |
distcp <srcurl> <desturl> | 递归复制文件或目录。 |
distcp2 <srcurl> <desturl> | DistCp版本2。 |
archive -archiveName NAME -p | 创建hadoop归档。 |
<parent path> <src>* <dest> | |
classpath | 打印获取Hadoop jar所需的类路径和所需的库。 |
daemonlog | 获取/设置每个守护程序的日志级别 |
如何使用MapReduce任务交互
用法:Hadoop的工作[GENERIC_OPTIONS]
以下是在Hadoop作业的可用通用的选项。
通用选项 | 描述 |
---|---|
-submit <job-file> | 提交作业。 |
-status <job-id> | 打印映射并减少完成百分比和所有作业计数器。 |
-counter <job-id> <group-name> <countername> | 打印计数器值。 |
-kill <job-id> | 终止作业 |
-events <job-id> <fromevent-#> <#-of-events> | 打印jobtracker为给定范围接收的事件详细信息。 |
-history [all] <jobOutputDir> - history < jobOutputDir> | 打印作业详细信息,失败并停用提示详细信息。可以通过指定[all]选项查看有关作业的更多详细信息,如每个任务的成功任务和任务尝试。 |
-list[all] | 显示所有作业。 -list仅显示尚未完成的作业。 |
-kill-task <task-id> | 终止任务。已终止的任务不会计入失败的尝试次数。 |
-fail-task <task-id> | 失败的任务。失败的任务将根据失败的尝试进行计数。 |
-set-priority <job-id> <priority> | 更改作业的优先级。允许的优先级值为VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW |
查看作业的状态
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
查看job output-dir的历史
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
终止作业
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004