james-l
3/9/2017 - 10:47 PM

amazon boto3 sqs example

amazon boto3 sqs example

def next_task(self):
    """get next task from SQS"""
    task = dict()
    self.logger.info('get next task from SQS start')
    for sqs_queue in self.sqs_queues:
        messages = sqs_queue.receive_messages(
            MaxNumberOfMessages=1,
            VisibilityTimeout=60 * 20,
            WaitTimeSeconds=15,
        )
        if len(messages) > 0:

            message = messages[0]
            try:
                msg_dict = json.loads(message.body)
            except ValueError:
                self.logger.warning(str(traceback.format_exc()))
                continue

            if 'Message' not in msg_dict or 'job_id' not in msg_dict:
                self.logger.error('md5 or job_id is not in SQS message')
                continue

            task['md5'] = msg_dict['Message']
            task['job_id'] = msg_dict['job_id']
            task['message'] = message
            self.logger.info('get next task from SQS finish, md5[%s]', task['md5'])

            return task

    self.logger.info('get next task from SQS finish, no task found')
    return task



def load_sqs_queues(self):
    """load the SQS queues"""
    sqs = boto3.resource(
        'sqs',
        region_name=self.config.get('sqs_region_name'),
        aws_access_key_id=self.config.get('sqs_aws_access_key_id'),
        aws_secret_access_key=self.config.get('sqs_aws_secret_access_key'),
    )

    for qname in self.config.get('sqs_apk_queues'):
        queue = sqs.get_queue_by_name(QueueName=qname)
        self.sqs_queues.append(queue)
import boto3
import json

sqs = boto3.resource(
    'sqs',
    region_name='us-west-1',
    aws_access_key_id ='xxx',
    aws_secret_access_key='xxxx'
)

queue = sqs.get_queue_by_name(QueueName='testonly')

messages = queue.receive_messages(
    MaxNumberOfMessages=1,
    VisibilityTimeout=10,
    WaitTimeSeconds=20,
)
message = messages[0]
data = json.loads(message.body)
print(data.get('md5'))
message.delete()