Merge pull request #47609 from rrroo/improve-aws-retries

Unify retries for AWS API
This commit is contained in:
Nicole Thomas 2018-05-22 08:50:03 -04:00 committed by GitHub
commit 70b62074b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 97 deletions

View File

@ -75,7 +75,6 @@ import time
import uuid import uuid
import pprint import pprint
import logging import logging
import random
# Import libs for talking to the EC2 API # Import libs for talking to the EC2 API
import hmac import hmac
@ -302,8 +301,8 @@ def query(params=None, setname=None, requesturl=None, location=None,
# Retrieve access credentials from meta-data, or use provided # Retrieve access credentials from meta-data, or use provided
access_key_id, secret_access_key, token = aws.creds(provider) access_key_id, secret_access_key, token = aws.creds(provider)
attempts = 5 attempts = 0
while attempts > 0: while attempts < aws.AWS_MAX_RETRIES:
params_with_headers = params.copy() params_with_headers = params.copy()
timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
@ -364,15 +363,15 @@ def query(params=None, setname=None, requesturl=None, location=None,
querystring = querystring.replace('+', '%20') querystring = querystring.replace('+', '%20')
canonical_request = method + '\n' + canonical_uri + '\n' + \ canonical_request = method + '\n' + canonical_uri + '\n' + \
querystring + '\n' + canonical_headers + '\n' + \ querystring + '\n' + canonical_headers + '\n' + \
signed_headers + '\n' + payload_hash signed_headers + '\n' + payload_hash
algorithm = 'AWS4-HMAC-SHA256' algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request' credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amz_date + '\n' + \ string_to_sign = algorithm + '\n' + amz_date + '\n' + \
credential_scope + '\n' + \ credential_scope + '\n' + \
salt.utils.hashutils.sha256_digest(canonical_request) salt.utils.hashutils.sha256_digest(canonical_request)
kDate = sign(('AWS4' + provider['key']).encode('utf-8'), datestamp) kDate = sign(('AWS4' + provider['key']).encode('utf-8'), datestamp)
kRegion = sign(kDate, region) kRegion = sign(kDate, region)
@ -381,12 +380,11 @@ def query(params=None, setname=None, requesturl=None, location=None,
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
hashlib.sha256).hexdigest() hashlib.sha256).hexdigest()
#sig = binascii.b2a_base64(hashed)
authorization_header = algorithm + ' ' + 'Credential=' + \ authorization_header = algorithm + ' ' + 'Credential=' + \
provider['id'] + '/' + credential_scope + \ provider['id'] + '/' + credential_scope + \
', ' + 'SignedHeaders=' + signed_headers + \ ', ' + 'SignedHeaders=' + signed_headers + \
', ' + 'Signature=' + signature ', ' + 'Signature=' + signature
headers = {'x-amz-date': amz_date, 'Authorization': authorization_header} headers = {'x-amz-date': amz_date, 'Authorization': authorization_header}
log.debug('EC2 Request: %s', requesturl) log.debug('EC2 Request: %s', requesturl)
@ -407,15 +405,14 @@ def query(params=None, setname=None, requesturl=None, location=None,
# check to see if we should retry the query # check to see if we should retry the query
err_code = data.get('Errors', {}).get('Error', {}).get('Code', '') err_code = data.get('Errors', {}).get('Error', {}).get('Code', '')
if attempts > 0 and err_code and err_code in EC2_RETRY_CODES: if err_code and err_code in EC2_RETRY_CODES:
attempts -= 1 attempts += 1
log.error( log.error(
'EC2 Response Status Code and Error: [%s %s] %s; ' 'EC2 Response Status Code and Error: [%s %s] %s; '
'Attempts remaining: %s', 'Attempts remaining: %s',
exc.response.status_code, exc, data, attempts exc.response.status_code, exc, data, attempts
) )
# Wait a bit before continuing to prevent throttling aws.sleep_exponential_backoff(attempts)
time.sleep(2)
continue continue
log.error( log.error(
@ -1562,29 +1559,21 @@ def _modify_eni_properties(eni_id, properties=None, vm_=None):
for k, v in six.iteritems(properties): for k, v in six.iteritems(properties):
params[k] = v params[k] = v
retries = 5 result = aws.query(params,
while retries > 0: return_root=True,
retries = retries - 1 location=get_location(vm_),
provider=get_provider(),
opts=__opts__,
sigver='4')
result = aws.query(params, if isinstance(result, dict) and result.get('error'):
return_root=True, raise SaltCloudException(
location=get_location(vm_), 'Could not change interface <{0}> attributes <\'{1}\'>'.format(
provider=get_provider(), eni_id, properties
opts=__opts__, )
sigver='4')
if isinstance(result, dict) and result.get('error'):
time.sleep(1)
continue
return result
raise SaltCloudException(
'Could not change interface <{0}> attributes '
'<\'{1}\'> after 5 retries'.format(
eni_id, properties
) )
) else:
return result
def _associate_eip_with_interface(eni_id, eip_id, private_ip=None, vm_=None): def _associate_eip_with_interface(eni_id, eip_id, private_ip=None, vm_=None):
@ -1597,44 +1586,35 @@ def _associate_eip_with_interface(eni_id, eip_id, private_ip=None, vm_=None):
be NATted to - useful if you have multiple IP addresses assigned to an be NATted to - useful if you have multiple IP addresses assigned to an
interface. interface.
''' '''
retries = 5 params = {'Action': 'AssociateAddress',
while retries > 0: 'NetworkInterfaceId': eni_id,
params = {'Action': 'AssociateAddress', 'AllocationId': eip_id}
'NetworkInterfaceId': eni_id,
'AllocationId': eip_id}
if private_ip: if private_ip:
params['PrivateIpAddress'] = private_ip params['PrivateIpAddress'] = private_ip
retries = retries - 1 result = aws.query(params,
result = aws.query(params, return_root=True,
return_root=True, location=get_location(vm_),
location=get_location(vm_), provider=get_provider(),
provider=get_provider(), opts=__opts__,
opts=__opts__, sigver='4')
sigver='4')
if isinstance(result, dict) and result.get('error'): if not result[2].get('associationId'):
time.sleep(1) raise SaltCloudException(
continue 'Could not associate elastic ip address '
'<{0}> with network interface <{1}>'.format(
if not result[2].get('associationId'): eip_id, eni_id
break )
log.debug(
'Associated ElasticIP address %s with interface %s',
eip_id, eni_id
) )
return result[2].get('associationId') log.debug(
'Associated ElasticIP address %s with interface %s',
raise SaltCloudException( eip_id, eni_id
'Could not associate elastic ip address '
'<{0}> with network interface <{1}>'.format(
eip_id, eni_id
)
) )
return result[2].get('associationId')
def _update_enis(interfaces, instance, vm_=None): def _update_enis(interfaces, instance, vm_=None):
config_enis = {} config_enis = {}
@ -2011,7 +1991,8 @@ def request_instance(vm_=None, call=None):
params[termination_key] = six.text_type(set_del_root_vol_on_destroy).lower() params[termination_key] = six.text_type(set_del_root_vol_on_destroy).lower()
# Use default volume type if not specified # Use default volume type if not specified
if ex_blockdevicemappings and dev_index < len(ex_blockdevicemappings) and 'Ebs.VolumeType' not in ex_blockdevicemappings[dev_index]: if ex_blockdevicemappings and dev_index < len(ex_blockdevicemappings) and \
'Ebs.VolumeType' not in ex_blockdevicemappings[dev_index]:
type_key = '{0}BlockDeviceMapping.{1}.Ebs.VolumeType'.format(spot_prefix, dev_index) type_key = '{0}BlockDeviceMapping.{1}.Ebs.VolumeType'.format(spot_prefix, dev_index)
params[type_key] = rd_type params[type_key] = rd_type
@ -2182,8 +2163,7 @@ def query_instance(vm_=None, call=None):
provider = get_provider(vm_) provider = get_provider(vm_)
attempts = 0 attempts = 0
# perform exponential backoff and wait up to one minute (2**6 seconds) while attempts < aws.AWS_MAX_RETRIES:
while attempts < 7:
data, requesturl = aws.query(params, # pylint: disable=unbalanced-tuple-unpacking data, requesturl = aws.query(params, # pylint: disable=unbalanced-tuple-unpacking
location=location, location=location,
provider=provider, provider=provider,
@ -2205,7 +2185,7 @@ def query_instance(vm_=None, call=None):
else: else:
break break
time.sleep(random.uniform(1, 2**attempts)) aws.sleep_exponential_backoff(attempts)
attempts += 1 attempts += 1
continue continue
else: else:
@ -2215,7 +2195,6 @@ def query_instance(vm_=None, call=None):
def __query_ip_address(params, url): # pylint: disable=W0613 def __query_ip_address(params, url): # pylint: disable=W0613
data = aws.query(params, data = aws.query(params,
#requesturl=url,
location=location, location=location,
provider=provider, provider=provider,
opts=__opts__, opts=__opts__,
@ -3028,9 +3007,9 @@ def set_tags(name=None,
params['Tag.{0}.Key'.format(idx)] = tag_k params['Tag.{0}.Key'.format(idx)] = tag_k
params['Tag.{0}.Value'.format(idx)] = tag_v params['Tag.{0}.Value'.format(idx)] = tag_v
attempts = 5 attempts = 0
while attempts >= 0: while attempts < aws.AWS_MAX_RETRIES:
result = aws.query(params, aws.query(params,
setname='tagSet', setname='tagSet',
location=location, location=location,
provider=get_provider(), provider=get_provider(),
@ -3064,9 +3043,8 @@ def set_tags(name=None,
if failed_to_set_tags: if failed_to_set_tags:
log.warning('Failed to set tags. Remaining attempts %s', attempts) log.warning('Failed to set tags. Remaining attempts %s', attempts)
attempts -= 1 attempts += 1
# Just a little delay between attempts... aws.sleep_exponential_backoff(attempts)
time.sleep(1)
continue continue
return settags return settags
@ -3405,8 +3383,8 @@ def _get_node(name=None, instance_id=None, location=None):
provider = get_provider() provider = get_provider()
attempts = 10 attempts = 0
while attempts >= 0: while attempts < aws.AWS_MAX_RETRIES:
try: try:
instances = aws.query(params, instances = aws.query(params,
location=location, location=location,
@ -3416,13 +3394,12 @@ def _get_node(name=None, instance_id=None, location=None):
instance_info = _extract_instance_info(instances).values() instance_info = _extract_instance_info(instances).values()
return next(iter(instance_info)) return next(iter(instance_info))
except IndexError: except IndexError:
attempts -= 1 attempts += 1
log.debug( log.debug(
'Failed to get the data for node \'%s\'. Remaining ' 'Failed to get the data for node \'%s\'. Remaining '
'attempts: %s', instance_id or name, attempts 'attempts: %s', instance_id or name, attempts
) )
# Just a little delay between attempts... aws.sleep_exponential_backoff(attempts)
time.sleep(0.5)
return {} return {}
@ -3946,7 +3923,8 @@ def register_image(kwargs=None, call=None):
.. code-block:: bash .. code-block:: bash
salt-cloud -f register_image my-ec2-config ami_name=my_ami description="my description" root_device_name=/dev/xvda snapshot_id=snap-xxxxxxxx salt-cloud -f register_image my-ec2-config ami_name=my_ami description="my description"
root_device_name=/dev/xvda snapshot_id=snap-xxxxxxxx
''' '''
if call != 'function': if call != 'function':

View File

@ -52,6 +52,8 @@ AWS_RETRY_CODES = [
] ]
AWS_METADATA_TIMEOUT = 3.05 AWS_METADATA_TIMEOUT = 3.05
AWS_MAX_RETRIES = 7
IROLE_CODE = 'use-instance-role-credentials' IROLE_CODE = 'use-instance-role-credentials'
__AccessKeyId__ = '' __AccessKeyId__ = ''
__SecretAccessKey__ = '' __SecretAccessKey__ = ''
@ -61,6 +63,21 @@ __Location__ = ''
__AssumeCache__ = {} __AssumeCache__ = {}
def sleep_exponential_backoff(attempts):
"""
backoff an exponential amount of time to throttle requests
during "API Rate Exceeded" failures as suggested by the AWS documentation here:
https://docs.aws.amazon.com/AWSEC2/latest/APIReference/query-api-troubleshooting.html
and also here:
https://docs.aws.amazon.com/general/latest/gr/api-retries.html
Failure to implement this approach results in a failure rate of >30% when using salt-cloud with
"--parallel" when creating 50 or more instances with a fixed delay of 2 seconds.
A failure rate of >10% is observed when using the salt-api with an asyncronous client
specified (runner_async).
"""
time.sleep(random.uniform(1, 2**attempts))
def creds(provider): def creds(provider):
''' '''
Return the credentials for AWS signing. This could be just the id and key Return the credentials for AWS signing. This could be just the id and key
@ -441,9 +458,8 @@ def query(params=None, setname=None, requesturl=None, location=None,
) )
headers = {} headers = {}
MAX_RETRIES = 6
attempts = 0 attempts = 0
while attempts < MAX_RETRIES: while attempts < AWS_MAX_RETRIES:
log.debug('AWS Request: %s', requesturl) log.debug('AWS Request: %s', requesturl)
log.trace('AWS Request Parameters: %s', params_with_headers) log.trace('AWS Request Parameters: %s', params_with_headers)
try: try:
@ -461,23 +477,14 @@ def query(params=None, setname=None, requesturl=None, location=None,
# check to see if we should retry the query # check to see if we should retry the query
err_code = data.get('Errors', {}).get('Error', {}).get('Code', '') err_code = data.get('Errors', {}).get('Error', {}).get('Code', '')
if attempts < MAX_RETRIES and err_code and err_code in AWS_RETRY_CODES: if attempts < AWS_MAX_RETRIES and err_code and err_code in AWS_RETRY_CODES:
attempts += 1 attempts += 1
log.error( log.error(
'AWS Response Status Code and Error: [%s %s] %s; ' 'AWS Response Status Code and Error: [%s %s] %s; '
'Attempts remaining: %s', 'Attempts remaining: %s',
exc.response.status_code, exc, data, attempts exc.response.status_code, exc, data, attempts
) )
# backoff an exponential amount of time to throttle requests sleep_exponential_backoff(attempts)
# during "API Rate Exceeded" failures as suggested by the AWS documentation here:
# https://docs.aws.amazon.com/AWSEC2/latest/APIReference/query-api-troubleshooting.html
# and also here:
# https://docs.aws.amazon.com/general/latest/gr/api-retries.html
# Failure to implement this approach results in a failure rate of >30% when using salt-cloud with
# "--parallel" when creating 50 or more instances with a fixed delay of 2 seconds.
# A failure rate of >10% is observed when using the salt-api with an asyncronous client
# specified (runner_async).
time.sleep(random.uniform(1, 2**attempts))
continue continue
log.error( log.error(