关于Hadoop-Streaming学习中碰到的问题

时间:2022-11-12 04:50:47
Hadoop在分布式计算方面很强大,而Python在文本处理也是相当方便,那么有这两者的结合吗?有,答案就是Hadoop-Streaming。Hadoop-Streaming可以将Hadoop与主流语言结合起来,使用方便,效果很好。个人觉得Pig在处理数据集时很不方便,特别是在计算百分比等运算时,而Hadoop-Streaming是可以替代Pig的。

1.Streaming固定的代码,该代码可以放在shell脚本中

RED_NUM=47

mapper_file="wordcount.py"
mapper_cmd="$mapper_file"
reducer_cmd="cat"

Hadoop="/usr/bin/hadoop"
Streaming="/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u1.jar"

hadoop fs -rmr $OUTPUT
${Hadoop} jar ${Streaming} \
-D mapred.job.name="$JOBNAME" \
-D mapred.job.queue.name=$queuename \
-D mapred.map.tasks=500 \
-D mapred.min.split.size=1073741824 \
-D mapred.reduce.tasks="$RED_NUM" \
-D stream.num.map.output.key.fields=3 \
-D num.key.fields.for.partition=3 \
-input "$INPUT" \
-output "$OUTPUT" \
-mapper "$mapper_cmd" \
-reducer "$reducer_cmd" \
-file "$mapper_file" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
有几个重要参数解释下

(1)-input:输入文件路径

(2)-output:输出文件路径

(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

         这个一般是必须有的,因为mapper和reducer函数都是写在本地的文件中,因此需要将文件上传到集群中才能被执行

(6)-partitioner:用户自定义的partitioner程序

(7)-D:作业的一些属性(以前用的是-jonconf),具体有:
              1)mapred.map.tasks:map task数目  

          设置的数目与实际运行的值并不一定相同,若输入文件含有M个part,而此处设置的map_task数目超过M,那么实际运行map_task仍然是M

        2)mapred.reduce.tasks:reduce task数目  不设置的话,默认值就为1
              3)num.key.fields.for.partition=N:shuffle阶段将数据集的前N列作为Key;所以对于wordcount程序,map输出为“word  1”,shuffle是以word作为Key,因此这里N=1

(8)-D stream.num.map.output.key.fields=3 这个是指在reduce之前将数据按前3列做排序,一般情况下可以去掉

这里说一下,之前在执行Streaming-Python时,总是报找不到-file上传的文件,找这个错误花了好长时间;最后终于发现是文件格式的问题:我在Linux上写的mapper文件不知咋的变为dos类型文件格式:

关于Hadoop-Streaming学习中碰到的问题

于是将dos文件转换为UNIX文件:dos2unix  xxx.py

之后就上传成功!并成功执行!

2.MapReduce是万能的吗?

之前我以为是的,于是一碰到数据量大的就尝试使用MapReduce。但是最近的工作中发现有种情况并不适用于MapReduce,就是涉及到整个数据集的一些运算!

举个例子,在Wordcount中,如果要计算每个单词出现的词频占整个文档中单词词频的百分比,这时候写MapReduce程序就要注意,在每个reducer中“整个文档中单词词频”必须是真正的“整个文档中单词词频”;这个例子还比较特殊,涉及到整个数据集的仅仅是总词频,可以通过传入多个输入来解决。但是如果涉及到的一些运算必须要整个数据集,那么就没法解决了。或许你可以在每个mapper中输出为“flag    word    1”,然后num.key.fields.for.partition=1  这样看似可以解决,其实这样就将所有数据都reduce到一台机器上,成了单机计算,而数据集过大的话,单机是无法完成任务的。

3.Streaming程序中,当map或reduce中有一个是cat时

需要注意到底谁是cat,如果只是纯粹的过滤数据或打印,两种情况都行;但如果涉及到聚合操作,这里面就有区别了。


未完待续... ...

4.执行过程中,报“1: import: command not found”

这个就是Python文件的顶部没有写上执行方式: #!/usr/bin/python

5.执行过程中,报Broken Pipe错误:
2014-10-31 21:28:14,319 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/python, getBidFeatureMap_suning.py]
2014-10-31 21:28:14,449 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2014-10-31 21:28:14,450 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2014-10-31 21:28:14,473 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2014-10-31 21:28:14,516 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=155/0/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 HOST=null
USER=lming_08
HADOOP_USER=null
last tool output: |null|

java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

刚开始以为是map程序中对空行未处理导致的,后来发现是map中有对一个上传的文件进行读取,而该文件超过100MB,在处理该文件时会执行10多亿次循环,导致超时了。

6.
SyntaxError: Non-ASCII character '\xe6' in file /hadoop/yarn/local/usercache/lming_08/appcache/application_1415110953023_38985/container_1415110953023_38985_01_000003/./get_visitor_company_info_map.py on line 8, but no encoding declared; see http://www.python.org/peps/pep-0263.html for details
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.impl.MetricsSystemImpl).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Python文件中出现中文,导致上传到服务器中无法执行
 
 
7.Streaming error: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
报code 1 错误可能是因为输出路径不合法,例如路径中含空格、tab等
 
 
 

7.如果想对第一列和第二列排序,其中第一列相同时,第二列逆排序
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k1,1 -k2rn" \
-k1,1表示对第一列到第一列正排序(即对第一列正排序)
-k2rn 表示对第二列逆排序
未完待续... ...

参考资料:

http://dongxicheng.org/mapreduce/hadoop-streaming-programming/