9/13/2016 - 6:01 AM

使用python多进程实现的简易mapreduce功能, 出处及调用示例参照:https://pymotw.com/2/multiprocessing/mapreduce.html

使用python多进程实现的简易mapreduce功能, 出处及调用示例参照:https://pymotw.com/2/multiprocessing/mapreduce.html

import collections
import itertools
import multiprocessing

class SimpleMapReduce(object):
    def __init__(self, map_func, reduce_func, num_workers=None):

          Function to map inputs to intermediate data. Takes as
          argument one input value and returns a tuple with the key
          and a value to be reduced.

          Function to reduce partitioned version of intermediate data
          to final output. Takes as argument a key as produced by
          map_func and a sequence of the values associated with that

          The number of workers to create in the pool. Defaults to the
          number of CPUs available on the current host.
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)
    def partition(self, mapped_values):
        """Organize the mapped values by their key.
        Returns an unsorted sequence of tuples with a key and a sequence of values.
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
        return partitioned_data.items()
    def __call__(self, inputs, chunksize=1):
        """Process the inputs through the map and reduce functions given.
          An iterable containing the input data to be processed.
          The portion of the input data to hand to each worker.  This
          can be used to tune performance during the mapping phase.
        map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize)
        partitioned_data = self.partition(itertools.chain(*map_responses))
        reduced_values = self.pool.map(self.reduce_func, partitioned_data)
        return reduced_values