using python archive thread parallel
#!/usr/bin/env python
#coding: utf-8
import os
import sys
import logging
from threading import Thread
import Queue
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
class Worker(Thread):
def __init__(self, do_work, task_queue, workid):
Thread.__init__(self)
self.do_work = do_work
self.task_queue = task_queue
self.workid = workid
def run(self):
while True:
try:
task = self.task_queue.get()
if task is None:
break
self.do_work(task, self.workid)
except Exception as e:
logging.warning('Failed to execute task: %s' % e)
finally:
self.task_queue.task_done()
class ThreadPool(object):
def __init__(self, do_work, nworker=20):
self.do_work = do_work
self.nworker = nworker
self.workid = 0
self.task_queue = Queue.Queue()
def start(self):
for i in xrange(self.nworker):
self.workid = i
Worker(self.do_work, self.task_queue, self.workid).start()
def put_task(self, task):
self.task_queue.put(task)
def join(self):
self.task_queue.join()
# notify all thread to stop
for i in xrange(self.nworker):
self.task_queue.put(None)
class Task(object):
def __init__(self, repo_id, repo_version, obj_id):
self.repo_id = repo_id
self.repo_version = repo_version
self.obj_id = obj_id
class ObjMigrateWorker(Thread):
def __init__(self, top_path, stype, dst_pool):
Thread.__init__(self)
self.top_path = top_path
self.stype = stype
self.dst_pool = dst_pool
self.thread_pool = ThreadPool(self.do_work)
def run(self):
logging.info('Start to migrate [%s] object' % self.stype)
self.thread_pool.start()
self.migrate()
self.thread_pool.join()
logging.info('Complete migrate [%s] object' % self.stype)
def do_work(self, task, workid):
print "workerID:",workid,"--task is:",os.path.join(self.stype,task.repo_id,task.obj_id)
def migrate(self):
#put work to queue
top_path = self.top_path
for repo_id in os.listdir(top_path):
repo_path = os.path.join(top_path, repo_id)
for spath in os.listdir(repo_path):
obj_path = os.path.join(repo_path, spath)
for lpath in os.listdir(obj_path):
obj_id = spath + lpath
task = Task(repo_id, 1, obj_id)
self.thread_pool.put_task(task)
def main():
top_path = sys.argv[1]
dtypes = {'commits':'commits', 'fs':'fs', 'blocks':'blocks'}
for stype in dtypes:
ObjMigrateWorker(os.path.join(top_path,stype), stype, dtypes[stype]).start()
if __name__ == '__main__':
if sys.argv[0] < 1:
print "Top storage absolute path is needed."
main()