amazon boto3 sqs example
def next_task(self):
"""get next task from SQS"""
task = dict()
self.logger.info('get next task from SQS start')
for sqs_queue in self.sqs_queues:
messages = sqs_queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=60 * 20,
WaitTimeSeconds=15,
)
if len(messages) > 0:
message = messages[0]
try:
msg_dict = json.loads(message.body)
except ValueError:
self.logger.warning(str(traceback.format_exc()))
continue
if 'Message' not in msg_dict or 'job_id' not in msg_dict:
self.logger.error('md5 or job_id is not in SQS message')
continue
task['md5'] = msg_dict['Message']
task['job_id'] = msg_dict['job_id']
task['message'] = message
self.logger.info('get next task from SQS finish, md5[%s]', task['md5'])
return task
self.logger.info('get next task from SQS finish, no task found')
return task
def load_sqs_queues(self):
"""load the SQS queues"""
sqs = boto3.resource(
'sqs',
region_name=self.config.get('sqs_region_name'),
aws_access_key_id=self.config.get('sqs_aws_access_key_id'),
aws_secret_access_key=self.config.get('sqs_aws_secret_access_key'),
)
for qname in self.config.get('sqs_apk_queues'):
queue = sqs.get_queue_by_name(QueueName=qname)
self.sqs_queues.append(queue)
import boto3
import json
sqs = boto3.resource(
'sqs',
region_name='us-west-1',
aws_access_key_id ='xxx',
aws_secret_access_key='xxxx'
)
queue = sqs.get_queue_by_name(QueueName='testonly')
messages = queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=10,
WaitTimeSeconds=20,
)
message = messages[0]
data = json.loads(message.body)
print(data.get('md5'))
message.delete()