from alive_progress import alive_bar, BARS, SPINNERS
class Progress():
def __init__(self, total:int, title:str, length:int=20,manual=False,
calibrate=None, **options):
self.total = total
self.title = title if len(title)<=3 else title[:3]+"..."
self.length = length
self.manual = manual
self.calibrate = calibrate
self.options = options
self.current = 0
self.closed = False
def start(self, c=None):
def gen():
options = {
"length":self.length,
"manual":self.manual,
"bar":self._randbs(BARS),
"spinner":self._randbs(SPINNERS)} if not self.options else self.options
with alive_bar(self.total, self.title, **options) as bar:
while not self.closed:
if self.paused:
yield
continue
args = yield
if not args:
continue
self.current = bar(**args)
if not self.closed:
print("[!]progress is running.")
return
self.closed = False
self.__progressbar = gen()
next(self.__progressbar)
if c:
self.update(c)
def _randbs(self, bs:dict, n:int=None)->str:
l = list(bs.keys())
if not n:
n = random.randint(0, len(l)-1)
return l[n]
def update(self, c):
if self.manual and c > 1:
args = {'perc':self.current+c/self.total}
elif self.manual:
args = {'perc':self.current+c}
else:
args = {'incr':c}
self.__progressbar.send(args)
return self.current
def reset(self):
if not self.manual:
raise "manual is {}".format(self.manual)
if not self.current:
self.__progressbar.send(0)
return self.current
def rollback(self, t=1):
if not self.current:
return self.current
self.roll(0)
return self.current
def roll(self, perc:float, t=1):
if not self.manual:
raise "manual is {}".format(self.manual)
if perc>1:
raise "perc > 1 {}".format(perc)
if self.current == perc:
return self.current
c = d = self.current-perc if self.current > perc else perc - self.current
n = int(d*100)
for i in range(n):
time.sleep(t/n)
c -= d/n
self.__progressbar.send({'perc':c})
return self.current
def finish(self):
args = {'perc':1} if self.manual else {'incr':self.total-self.current}
self.__progressbar.send(args)
def close(self):
if not self.closed:
self.closed = True
try:
next(self.__progressbar)
except StopIteration:
pass
def pause(self):
self.paused = True
next(self.__progressbar)
def go_on(self):
self.paused = False
next(self.__progressbar)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.closed:
self.close()