python click
#!/usr/bin/env python
from __future__ import print_function
import click
import django
django.setup() # noqa
from osqa.publish.models import Receiver, Record
from osqa.drago.models.company import DragoPosition
from osqa.publish.tasks import publish_ranking_results, do_publish_ranking_result
@click.group()
def cli():
pass
@cli.command()
def receiver_list():
for receiver in Receiver.objects.all():
display_receiver(receiver)
@cli.command()
@click.option('--position-id', prompt='Position ID',
help='The position id')
@click.option('--callback-url', prompt='Callback url',
help='Callback url')
@click.option('--publisher', prompt='Publisher',
help='Publisher')
def receiver_add(position_id, callback_url, publisher):
p = DragoPosition.objects.get(id=position_id)
receiver = Receiver.objects.create(position=p, callback_url=callback_url, publisher=publisher)
display_receiver(receiver)
@cli.command()
@click.option('--id', prompt='Receiver ID',
help='receiver id')
def receiver_delete(id):
receiver = Receiver.objects.get(id=id)
receiver.delete()
click.echo('receiver {} deleted'.format(id))
@cli.command()
@click.option('--id', prompt='Receiver ID',
help='receiver id')
@click.option('--attribute', prompt='Attribute to update',
help='Attribute to update')
@click.option('--value', prompt='Value')
def receiver_update(id, attribute, value):
receiver = Receiver.objects.get(id=id)
if hasattr(receiver, attribute):
setattr(receiver, attribute, value)
receiver.save()
click.secho('Done', fg='green')
else:
click.secho('Receiver does not have this attribute.', fg='red')
display_receiver(receiver)
def display_receiver(receiver):
click.echo("ID: {}".format(receiver.id))
click.echo("Position: {}".format(receiver.position))
click.echo("Callback URL: {}".format(receiver.callback_url))
click.echo("Publisher: {}".format(receiver.publisher))
click.echo("*****************")
@cli.command(help="publish updated applications of a position")
@click.option('--position-id', '-p', prompt='Position ID',
help='Position id')
@click.option('--async', is_flag=True)
def publish(position_id, async):
if async:
publish_ranking_results.apply_async(args=(position_id,))
else:
do_publish_ranking_result(position_id)
@cli.command(help='list publish records of a position')
@click.option('--position-id', '-p', prompt='Position ID',
help='Position id')
def record_list(position_id):
position = DragoPosition.objects.get(id=position_id)
publish_receivers = position.publish_receivers.all()
for pc in publish_receivers:
for record in pc.publish_records.all():
print(record)
@cli.command()
@click.option('--receiver-id', '-r', prompt='Receiver ID',
help='Receiver id')
def record_delete(receiver_id):
receiver = Receiver.objects.get(id=receiver_id)
receiver.publish_records.all().delete()
click.secho('Done', fg='green')
@cli.command(help='reset record, publish application next time')
@click.option('--record-id', '-r', multiple=True,
help='Record id')
def record_reset(record_id):
for rid in record_id:
record = Record.objects.get(id=rid)
record.last_slscore = -1
record.save()
click.echo(record)
click.secho('Done', fg='green')
if __name__ == '__main__':
cli()
def _print_attribute(name, value):
click.echo(
click.style('{}: '.format(name)) +
click.style(str(value), fg='green')
)