Very simple interface for multi-threaded execution of functions in python.
import time
# EXAMPLE: ------------------------------------------------
def dummy_func(dum_var, arg_rec):
if type(arg_rec) is tuple: # important to accept various input
arg_rec = arg_rec[0]
time.sleep(1)
print(('dum_var: %s' % dum_var))
for arg in arg_rec:
print(('Another arg: ', arg))
# How to run:
if __name__ == '__main__':
dum_list = [i for i in range(50)] # generate some dummy data
# test_threads = EasyThreading(dummy_func, dum_list, 10, True, True, 'I am another arg!', 'Me too.')
test_threads = EasyThreading(dummy_func, dum_list, arg_rec=['I am another arg!', 'Me too.'], pool_size=10, verbose=True)
# starts thread execution
test_threads.start_all()
time.sleep(10)
# testing out adding additional data to queue
new_args = [y for y in range(50, 90, 2)]
test_threads.in_queue(new_args)
# wait until all the threads got executed
test_threads.join_all()
print("Everything finished")
import threading
from queue import Queue
class EasyThreading(object):
"""
To Process the functions in parallel
"""
def __init__(self,
func,
data,
*args, # put here to enforce keyword usage
pool_size=None,
daemon=True,
verbose=False,
**kwargs
):
"""
:type kwargs: object
:type args: object
"""
self.func = func
self.data = data # must be iterable!
# TODO: iter check
# https://stackoverflow.com/questions/1952464/in-python-how-do-i-determine-if-an-object-is-iterable
# number of threads according to length of iterable, else as specified:
if pool_size is None:
pool_size = len(data)
self.verbose = verbose
self.args = args
self.kwargs = kwargs
# stop sign:
self.on = True
# setting up and filling queue:
self.q = Queue()
self.in_queue(self.data)
# initializing threads:
self.threads = [threading.Thread(target=self._threader) for _ in range(pool_size)]
[t.setDaemon(daemon) for t in self.threads]
self.print_lock = threading.Lock()
def _threader(self):
# while True: # while loop crucial here, to process all jobs in queue
# while not self.q.empty(): # works as long queue is not empty requires external while loop
# while self.on: # stop sign
while True:
# generating indexed thread name in the form of: func_name.thread.001:
thread_index = int(threading.current_thread().name.split('-')[-1])
thread_name = self.func.__name__ + '.thread.' + '{0:03d}'.format(thread_index)
# gets a worker from the queue
in_put = self.q.get()
# print what thread is currently working on:
if self.verbose:
with self.print_lock:
print('{} is working on: {}'.format(thread_name, in_put))
# run the job with the available worker in queue (thread)
if self.args and self.kwargs:
self.func(in_put, self.args, self.kwargs)
elif self.args:
self.func(in_put, self.args)
elif self.kwargs:
self.func(in_put, self.kwargs)
else:
self.func(in_put)
# completed with the job
self.q.task_done()
if self.verbose:
with self.print_lock:
print(('{} finished job.'.format(thread_name)))
# ending thread:
if not self.on:
if self.q.empty(): # only if queue empty at same time
break # <-- this one ends threads!!!
def in_queue(self, in_arg):
if self.on: # accepts input as long as stop sign is not shown
[self.q.put(a) for a in in_arg if a is not None]
elif not self.on:
raise Exception('No active threads.')
def start_all(self):
[t.start() for t in self.threads]
def join_all(self):
self.on = False
[t.join() for t in self.threads]