spock
5/12/2017 - 11:32 AM

Very simple interface for multi-threaded execution of functions in python.

Very simple interface for multi-threaded execution of functions in python.

import time

# EXAMPLE: ------------------------------------------------
def dummy_func(dum_var, arg_rec):
    if type(arg_rec) is tuple:  # important to accept various input
        arg_rec = arg_rec[0]
   
    time.sleep(1)
    print(('dum_var: %s' % dum_var))
    for arg in arg_rec:
        print(('Another arg: ', arg))


# How to run:
if __name__ == '__main__': 
    dum_list = [i for i in range(50)]  # generate some dummy data

    # test_threads = EasyThreading(dummy_func, dum_list, 10, True, True, 'I am another arg!', 'Me too.')
    test_threads = EasyThreading(dummy_func, dum_list, arg_rec=['I am another arg!', 'Me too.'], pool_size=10, verbose=True)

    # starts thread execution
    test_threads.start_all()

    time.sleep(10)
    # testing out adding additional data to queue
    new_args = [y for y in range(50, 90, 2)]
    test_threads.in_queue(new_args)

    # wait until all the threads got executed
    test_threads.join_all()
    print("Everything finished")
import threading
from queue import Queue


class EasyThreading(object):
    """
    To Process the functions in parallel

    """

    def __init__(self,
                 func,
                 data,
                 *args,  # put here to enforce keyword usage
                 pool_size=None,
                 daemon=True,
                 verbose=False,
                 **kwargs
                 ):

        """

        :type kwargs: object
        :type args: object
        """
        self.func = func
        self.data = data  # must be iterable!
        # TODO: iter check
        # https://stackoverflow.com/questions/1952464/in-python-how-do-i-determine-if-an-object-is-iterable

        # number of threads according to length of iterable, else as specified:
        if pool_size is None:
            pool_size = len(data)

        self.verbose = verbose

        self.args = args
        self.kwargs = kwargs

        # stop sign:
        self.on = True

        # setting up and filling queue:
        self.q = Queue()
        self.in_queue(self.data)

        # initializing threads:
        self.threads = [threading.Thread(target=self._threader) for _ in range(pool_size)]
        [t.setDaemon(daemon) for t in self.threads]

        self.print_lock = threading.Lock()

    def _threader(self):
        # while True:  # while loop crucial here, to process all jobs in queue
        # while not self.q.empty():  # works as long queue is not empty requires external while loop
        # while self.on:  # stop sign
        while True:
            # generating indexed thread name in the form of: func_name.thread.001:
            thread_index = int(threading.current_thread().name.split('-')[-1])
            thread_name = self.func.__name__ + '.thread.' + '{0:03d}'.format(thread_index)

            # gets a worker from the queue
            in_put = self.q.get()

            # print what thread is currently working on:
            if self.verbose:
                with self.print_lock:
                    print('{} is working on: {}'.format(thread_name, in_put))

            # run the job with the available worker in queue (thread)
            if self.args and self.kwargs:
                self.func(in_put, self.args, self.kwargs)
            elif self.args:
                self.func(in_put, self.args)
            elif self.kwargs:
                self.func(in_put, self.kwargs)
            else:
                self.func(in_put)

            # completed with the job
            self.q.task_done()
            if self.verbose:
                with self.print_lock:
                    print(('{} finished job.'.format(thread_name)))

            # ending thread:
            if not self.on:
                if self.q.empty():  # only if queue empty at same time
                    break  # <-- this one ends threads!!!

    def in_queue(self, in_arg):
        if self.on:  # accepts input as long as stop sign is not shown
            [self.q.put(a) for a in in_arg if a is not None]
        elif not self.on:
            raise Exception('No active threads.')

    def start_all(self):
        [t.start() for t in self.threads]

    def join_all(self):
        self.on = False
        [t.join() for t in self.threads]