用python写MapReduce函数——以WordCount为例

时间:2023-01-26 00:20:07

尽管Hadoop框架是用java写的,但是Hadoop程序不限于java,可以用python、C++、ruby等。本例子中直接用python写一个MapReduce实例,而不是用Jython把python代码转化成jar文件。

例子的目的是统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:文本(每行包括单词和单词的词频,两者之间用'\t'隔开)

1. Python MapReduce 代码

使用python写MapReduce的“诀窍”是利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。

我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。Hadoop流将会帮助我们处理别的任何事情。

1.1 Map阶段:mapper.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" % (word, 1)

文件从STDIN读取文件。把单词切开,并把单词和词频输出STDOUT。Map脚本不会计算单词的总数,而是输出<word> 1。在我们的例子中,我们让随后的Reduce阶段做统计工作。

为了是脚本可执行,增加mapper.py的可执行权限

chmod +x hadoop-0.20.2/test/code/mapper.py

1.2 Reduce阶段:reducer.py

在这里,我们假设把文件保存到hadoop-0.20.2/test/code/reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys current_word = None
current_count = 0
word = None for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError: #count如果不是数字的话,直接忽略掉
continue
if current_word == word:
current_count += count
else:
if current_word:
print "%s\t%s" % (current_word, current_count)
current_count = count
current_word = word if word == current_word: #不要忘记最后的输出
print "%s\t%s" % (current_word, current_count)

文件会读取mapper.py 的结果作为reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增加reducer.py的可执行权限

chmod +x hadoop-0.20.2/test/code/reducer.py

细节:split(chara, m),第二个参数的作用,下面的例子很给力

str = 'server=mpilgrim&ip=10.10.10.10&port=8080'
print str.split('=', 1)[0] #1表示=只截一次
print str.split('=', 1)[1]
print str.split('=')[0]
print str.split('=')[1]

输出

server
mpilgrim&ip=10.10.10.10&port=8080
server
mpilgrim&ip 

1.3 测试代码(cat data | map | sort | reduce)

这里建议大家在提交给MapReduce job之前在本地测试mapper.py 和reducer.py脚本。否则jobs可能会成功执行,但是结果并非自己想要的。

功能性测试mapper.py 和 reducer.py

[rte@hadoop-0.20.2]$cd test/code
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
bar 1
foo 3
labs 1
quux 2

细节:sort -k1,1  参数何意?

-k, -key=POS1[,POS2]     键以pos1开始,以pos2结束

有时候经常使用sort来排序,需要预处理把需要排序的field语言在最前面。实际上这是

完全没有必要的,利用-k参数就足够了。

比如sort all

1 4
2 3
3 2
4 1
5 0

如果sort -k 2的话,那么执行结果就是

5 0
4 1
3 2
2 3
1 4

2. 在Hadoop上运行python代码

2.1 数据准备

下载以下三个文件的

我把上面三个文件放到hadoop-0.20.2/test/datas/目录下

2.2 运行

把本地的数据文件拷贝到分布式文件系统HDFS中。

bin/hadoop dfs -copyFromLocal /test/datas  hdfs_in

查看

bin/hadoop dfs -ls

结果

drwxr-xr-x   - rte supergroup          0 2014-07-05 15:40 /user/rte/hdfs_in

查看具体的文件

bin/hadoop dfs -ls /user/rte/hdfs_in

执行MapReduce job

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file test/code/mapper.py -mapper test/code/mapper.py \
-file test/code/reducer.py -reducer test/code/reducer.py \
-input /user/rte/hdfs_in/* -output /user/rte/hdfs_out

实例输出

用python写MapReduce函数——以WordCount为例

查看输出结果是否在目标目录/user/rte/hdfs_out

bin/hadoop dfs -ls /user/rte/hdfs_out

输出

Found 2 items
drwxr-xr-x - rte supergroup 0 2014-07-05 20:51 /user/rte/hdfs_out2/_logs
-rw-r--r-- 2 rte supergroup 880829 2014-07-05 20:51 /user/rte/hdfs_out2/part-00000

查看结果

bin/hadoop dfs -cat /user/rte/hdfs_out2/part-

输出

用python写MapReduce函数——以WordCount为例

以上已经达成目的了,但是可以利用python迭代器和生成器优化

3. 利用python的迭代器和生成器优化Mapper 和 Reducer代码

3.1 python中的迭代器和生成器

看这

3.2 优化Mapper 和 Reducer代码

mapper.py

#!/usr/bin/env python
import sys
def read_input(file):
for line in file:
yield line.split() def main(separator='\t'):
data = read_input(sys.stdin)
for words in data:
for word in words:
print "%s%s%d" % (word, separator, 1) if __name__ == "__main__":
main()

reducer.py

#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys def read_mapper_output(file, separator = '\t'):
for line in file:
yield line.rstrip().split(separator, 1) def main(separator = '\t'):
data = read_mapper_output(sys.stdin, separator = separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except valueError:
pass if __name__ == "__main__":
main()

细节:groupby

from itertools import groupby
from operator import itemgetter things = [('2009-09-02', 11),
('2009-09-02', 3),
('2009-09-03', 10),
('2009-09-03', 4),
('2009-09-03', 22),
('2009-09-06', 33)] sss = groupby(things, itemgetter(0))
for key, items in sss:
print key
for subitem in items:
print subitem
print '-' * 20

结果

>>>
2009-09-02
('2009-09-02', 11)
('2009-09-02', 3)
--------------------
2009-09-03
('2009-09-03', 10)
('2009-09-03', 4)
('2009-09-03', 22)
--------------------
2009-09-06
('2009-09-06', 33)
--------------------

  • groupby(things, itemgetter(0)) 以第0列为排序目标
  • groupby(things, itemgetter(1))以第1列为排序目标
  • groupby(things)以整行为排序目标

4. 参考

python中的split函数中的参数问题

Writing an Hadoop MapReduce Program in Python

shell的sort命令的-k参数

用python写MapReduce函数——以WordCount为例的更多相关文章

  1. 自动化测试(三)如何用python写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复。

    写一个函数,这个函数的功能是,传入一个数字,产生N条邮箱,产生的邮箱不能重复.邮箱前面的长度是6-12之间,产生的邮箱必须包含大写字母.小写字母.数字和特殊字符 和上一期一样 代码中间有段比较混沌 有 ...

  2. python写mapReduce初步

    最近在学了python了,从mapReduce开始 ,话不多说了,直接上代码了哈 map阶段,map.py文件 import sys # 标准输入 # 在终端的话,就需要这样了 cat a.txt | ...

  3. Python实现MapReduce&comma;wordcount实例,MapReduce实现两表的Join

    Python实现MapReduce 下面使用mapreduce模式实现了一个简单的统计日志中单词出现次数的程序: from functools import reduce from multiproc ...

  4. hadoop学习笔记——用python写wordcount程序

    尝试着用3台虚拟机搭建了伪分布式系统,完整的搭建步骤等熟悉了整个分布式框架之后再写,今天写一下用python写wordcount程序(MapReduce任务)的具体步骤. MapReduce任务以来H ...

  5. 用Python写了一个postgresql函数,感觉很爽

    用Python写了一个postgresql函数,感觉很爽 CREATE LANGUAGE plpythonu; postgresql函数 CREATE OR REPLACE FUNCTION myfu ...

  6. Python 为什么没有 main 函数?为什么我不推荐写 main 函数?

    毫无疑问 Python 中没有所谓的 main 入口函数,但是网上经常看到一些文章提"Python 的 main 函数"."建议写 main 函数"-- 有些人 ...

  7. python递归练习:生成一个n级深度的字典,例如:&lbrack;1&comma;2&comma;3&comma;4&comma;5&comma;6&rsqb; 可以生成&lbrace;1&colon; &lbrace;2&colon; &lbrace;3&colon; &lbrace;4&colon; &lbrace;6&colon; 5&rcub;&rcub;&rcub;&rcub;&rcub;,写一个函数定义n级

    结果#encoding = utf-8#题目:#生成一个n级深度的字典,例如:[1,2,3,4,5,6] 可以生成{1: {2: {3: {4: {6: 5}}}}},写一个函数定义n级a=[1,2, ...

  8. Python高阶函数&lowbar;map&sol;reduce&sol;filter函数

    本篇将开始介绍python高阶函数map/reduce/filter的用法,更多内容请参考:Python学习指南 map/reduce Python内建了map()和reduce()函数. 如果你读过 ...

  9. 快速掌握用python写并行程序

    目录 一.大数据时代的现状 二.面对挑战的方法 2.1 并行计算 2.2 改用GPU处理计算密集型程序 3.3 分布式计算 三.用python写并行程序 3.1 进程与线程 3.2 全局解释器锁GIL ...

随机推荐

  1. &period;NET跨平台之旅:将示例站点从ASP&period;NET 5 Beta7升级至RC1

    今天,我们将示例站点(about.cnblogs.com,服务器操作系统是Ubuntu)从ASP.NET 5 Beta7升级到了RC1,在升级过程中只遇到了一个问题. 在运行 dnvm upgrade ...

  2. reactjs入门到实战(二)---- jxs详解

    >>>如何转换    JSX transformer   Babel    官网:http://babeljs.io/   里面有一个可以看转换的测试器,es6什么的也可以应用: 注 ...

  3. DDL、DML、

     SQL语言的分类 SQL语言共分为四大类:数据查询语言DQL,数据操纵语言DML,数据定义语言DDL,数据控制语言DCL. 1. 数据查询语言DQL数据查询语言DQL基本结构是由SELECT子句,F ...

  4. SQL之trigger(触发器)

    先来看一小段程序 有如下三张表: 帐户(编号,姓名,余额,建立日期,储蓄所编号) 储蓄所(编号,名称,地址,人数,所属城市) 借贷(帐户,借贷类型,金额,日期) create trigger tri_ ...

  5. logstash分析日志

    待处理日志格式如下: [totalCount: 298006556, count: 287347623, queryCount: 259027994, exeCount: 28319629, tota ...

  6. Python简单实现决策树

    __author__ = '糖衣豆豆' #决策树 import pandas as pda fname="~/coding/python/data/lesson.csv" data ...

  7. SQL语句整理

  8. SCOI2019d1t1平台跳跃&lbrack;高精&rsqb;

    分析 首先考虑相邻柱子之间没有浮台. 记前 \(m-1\) 个盘子为 x, 第 \(m\) 个盘子为 y,有如下过程:\(x\rightarrow C, y\rightarrow B, x\right ...

  9. 小学四则运算APP 第三阶段冲刺-第一天

    团队成员:陈淑筠.杨家安.陈曦 团队选题:小学四则运算APP 第三次冲刺阶段时间:12.12~12.19 本次发布的是音乐播放功能,可以根据用户需求一边播放音乐一边做题,也拥有暂停播放音乐的功能,增强 ...

  10. elasticsearch安装IK分词插件

    一 打开网页:https://github.com/medcl/elasticsearch-analysis-ik/releases 这个是ik相关的包,找到你想下载的版本,下载对应的zip包 二 然 ...