问题描述:现在有n个文本文件,使用MapReduce的方法实现词频统计。
附上统计词频的关键代码,首先是一个通用的MapReduce模块:
class MapReduce: __doc__ = '''提供map_reduce功能''' @staticmethod def map_reduce(i, mapper, reducer): """ map_reduce方法 :param i: 需要MapReduce的集合 :param mapper: 自定义mapper方法 :param reducer: 自定义reducer方法 :return: 以自定义reducer方法的返回值为元素的一个列表 """ intermediate = [] # 存放所有的(intermediate_key, intermediate_value) for (key, value) in i.items(): intermediate.extend(mapper(key, value)) # sorted返回一个排序好的list,因为list中的元素是一个个的tuple,key设定按照tuple中第几个元素排序 # groupby把迭代器中相邻的重复元素挑出来放在一起,key设定按照tuple中第几个元素为关键字来挑选重复元素 # 下面的循环中groupby返回的key是intermediate_key,而group是个list,是1个或多个 # 有着相同intermediate_key的(intermediate_key, intermediate_value) groups = {} for key, group in itertools.groupby(sorted(intermediate, key=lambda im: im[0]), key=lambda x: x[0]): groups[key] = [y for x, y in group] # groups是一个字典,其key为上面说到的intermediate_key,value为所有对应intermediate_key的intermediate_value # 组成的一个列表 return [reducer(intermediate_key, groups[intermediate_key]) for intermediate_key in groups]
然后需要针对词频统计这个实际问题写好自己的mapper方法和reducer方法:
class WordCount: __doc__ = '''词频统计''' def mapper(self, input_key, input_value): """ 词频统计的mapper方法 :param input_key: 文件名 :param input_value: 文本内容 :return: 以(词,1)为元素的一个列表 """ return [(word, 1) for word in self.remove_punctuation(input_value.lower()).split()] def reducer(self, intermediate_key, intermediate_value_list): """ 词频统计的reducer方法 :param intermediate_key: 某个词 :param intermediate_value_list: 出现记录列表,如[1,1,1] :return: (词,词频) """ return intermediate_key, sum(intermediate_value_list) @staticmethod def remove_punctuation(text): """ 去掉字符串中的标点符号 :param text: 文本 :return: 去掉标点的文本 """ return re.sub(u"\p{P}+", "", text)
用3个文本文件进行测试:
text\a.tex:
The quick brown fox jumped over the lazy grey dogs.
text\b.txt:
That's one small step for a man, one giant leap for mankind.
text\c.txt:
Mary had a little lamb, Its fleece was white as snow; And everywhere that Mary went, The lamb was sure to go.
调用如下:
filenames = ["text\\a.txt", "text\\b.txt", "text\\c.txt"] i = {} for filename in filenames: f = open(filename) i[filename] = f.read() f.close() wc = WordCount() print(MapReduce.map_reduce(i, wc.mapper, wc.reducer))
输出结果:
[('white', 1), ('little', 1), ('sure', 1), ('snow;', 1), ('went,', 1), ('as', 1), ('lamb,', 1), ('go.', 1), ('lamb', 1), ('its', 1), ('a', 1), ('was', 2), ('to', 1), ('fleece', 1), ('that', 1), ('the', 1), ('mary', 2), ('everywhere', 1), ('had', 1), ('and', 1)]
上面提出的方法只使用了最基本的MapReduce思想,所以不支持大数据量的测试,毕竟各种调度之类的内容没有考虑到。
参考资料
1: