Very easy interface for multi-processed execution of functions in python.
import time
# EXAMPLE: ------------------------------------------------
def dummy_func(dum_var, arg_rec):
if type(arg_rec) is tuple: # important to accept various input
arg_rec = arg_rec[0]
time.sleep(1)
print(('dum_var: %s' % dum_var))
for arg in arg_rec:
print(('Another arg: ', arg))
# How to run:
if __name__ == '__main__':
dum_list = [i for i in range(20)]
# test_threads = EasyThreading2(dummy_func, dum_list, 10, True, True, 'I am another arg!', 'Me too.')
test_threads = EasyMultiProcessing(dummy_func, dum_list, arg_rec=['I am another arg!', 'Me too.'], verbose=True)
# starts process execution
test_threads.start_all()
time.sleep(5)
# testing out adding to queue
new_args = [y for y in range(50, 90, 2)]
test_threads.fill_queue(new_args)
# wait until all the threads got executed
test_threads.join_all()
print("Everything finished")
import multiprocessing
class EasyMultiProcessing(object):
"""
To Process the functions in parallel
"""
def __init__(self,
func,
data,
*args, # put here to enforce keyword usage
pool_size=None,
daemon=True,
verbose=False,
**kwargs
):
"""
:type kwargs: object
:type args: object
"""
self.func = func
self.data = data # must be iterable!
# number of threads according to length of iterable, else as specified:
if pool_size is None:
pool_size = multiprocessing.cpu_count()
self.verbose = verbose
self.args = args
self.kwargs = kwargs
# stop sign:
self.on = True
# setting up and filling queue:
self.q = multiprocessing.JoinableQueue()
self.fill_queue(self.data)
# initializing threads:
self.processes = [multiprocessing.Process(target=self._processor) for _ in range(pool_size)]
for p in self.processes:
p.daemon = daemon
self.print_lock = multiprocessing.Lock()
self.sentinel = "SENTINEL"
def _processor(self):
# while True: # while loop crucial here, to process all jobs in queue
# while not self.q.empty(): # works as long queue is not empty requires external while loop
# while self.on: # stop sign
while True:
# gets task (=data item) from the queue
task = self.q.get()
if task == self.sentinel:
break
# print what process is currently working on:
if self.verbose:
# generating indexed thread name in the form of: func_name.thread.001:
process_index = int(multiprocessing.current_process().name.split('-')[-1])
process_name = self.func.__name__ + '.process.' + '{0:03d}'.format(process_index)
with self.print_lock:
print('{} is working on: {}'.format(process_name, task))
# run the job with the available worker in queue (process)
if self.args and self.kwargs:
self.func(task, self.args, self.kwargs)
elif self.args:
self.func(task, self.args)
elif self.kwargs:
self.func(task, self.kwargs)
else:
self.func(task)
# on job completion
self.q.task_done()
if self.verbose:
with self.print_lock:
print(('{} finished job.'.format(process_name)))
def fill_queue(self, in_arg):
if self.on: # accepts input as long as stop sign is not shown
[self.q.put(a) for a in in_arg if a is not None]
elif not self.on:
raise Exception('No active threads.')
def start_all(self):
[p.start() for p in self.processes]
def join_all(self):
self.on = False
[self.q.put(self.sentinel) for _ in self.processes] # putting sentinels in Queue
[p.join() for p in self.processes]