Merge pull request #48705 from isbm/isbm-python37-support

Python 3.7 support
This commit is contained in:
Nicole Thomas 2018-08-10 09:00:12 -04:00 committed by GitHub
commit 3de190bcdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 132 additions and 119 deletions

View File

@ -283,7 +283,7 @@ class LocalClient(object):
'No command was sent, no jid was assigned.')
return {}
# don't install event subscription listeners when the request is async
# don't install event subscription listeners when the request is asynchronous
# and doesn't care. this is important as it will create event leaks otherwise
if not listen:
return pub_data

View File

@ -93,7 +93,7 @@ class APIClient(object):
The cmd dict items are as follows:
mode: either 'sync' or 'async'. Defaults to 'async' if missing
mode: either 'sync' or 'asynchronous'. Defaults to 'asynchronous' if missing
fun: required. If the function is to be run on the master using either
a wheel or runner client then the fun: includes either
'wheel.' or 'runner.' as a prefix and has three parts separated by '.'.
@ -120,7 +120,7 @@ class APIClient(object):
'''
cmd = dict(cmd) # make copy
client = 'minion' # default to local minion client
mode = cmd.get('mode', 'async') # default to 'async'
mode = cmd.get('mode', 'async')
# check for wheel or runner prefix to fun name to use wheel or runner client
funparts = cmd.get('fun', '').split('.')
@ -162,7 +162,7 @@ class APIClient(object):
'''
return self.runnerClient.master_call(**kwargs)
runner_sync = runner_async # always runner async, so works in either mode
runner_sync = runner_async # always runner asynchronous, so works in either mode
def wheel_sync(self, **kwargs):
'''

View File

@ -447,7 +447,7 @@ class SyncClientMixin(object):
class AsyncClientMixin(object):
'''
A mixin for *Client interfaces to enable easy async function execution
A mixin for *Client interfaces to enable easy asynchronous function execution
'''
client = None
tag_prefix = None
@ -499,7 +499,7 @@ class AsyncClientMixin(object):
tag = salt.utils.event.tagify(jid, prefix=self.tag_prefix)
return {'tag': tag, 'jid': jid}
def async(self, fun, low, user='UNKNOWN', pub=None):
def asynchronous(self, fun, low, user='UNKNOWN', pub=None):
'''
Execute the function in a multiprocess and return the event tag to use
to watch for the return

View File

@ -888,7 +888,7 @@ def _wait_for_async(conn, request_id):
while result.status == 'InProgress':
count = count + 1
if count > 120:
raise ValueError('Timed out waiting for async operation to complete.')
raise ValueError('Timed out waiting for asynchronous operation to complete.')
time.sleep(5)
result = conn.get_operation_status(request_id)

View File

@ -1279,7 +1279,7 @@ def _wait_for_completion(conn, promise, wait_timeout, msg):
)
raise Exception(
'Timed out waiting for async operation {0} "{1}" to complete.'.format(
'Timed out waiting for asynchronous operation {0} "{1}" to complete.'.format(
msg, six.text_type(promise['requestId'])
)
)

View File

@ -719,7 +719,7 @@ def _wait_for_ip(name, session):
def _run_async_task(task=None, session=None):
'''
Run XenAPI task in async mode to prevent timeouts
Run XenAPI task in asynchronous mode to prevent timeouts
'''
if task is None or session is None:
return None

View File

@ -1075,9 +1075,9 @@ class LocalFuncs(object):
try:
fun = load.pop('fun')
runner_client = salt.runner.RunnerClient(self.opts)
return runner_client.async(fun,
load.get('kwarg', {}),
username)
return runner_client.asynchronous(fun,
load.get('kwarg', {}),
username)
except Exception as exc:
log.exception('Exception occurred while introspecting %s')
return {'error': {'name': exc.__class__.__name__,

View File

@ -787,7 +787,7 @@ class SlackClient(object):
:param interval: time to wait between ending a loop and beginning the next
'''
log.debug('Going to run a command async')
log.debug('Going to run a command asynchronous')
runner_functions = sorted(salt.runner.Runner(__opts__).functions)
# Parse args and kwargs
cmd = msg['cmdline'][0]
@ -809,7 +809,7 @@ class SlackClient(object):
log.debug('Command %s will run via runner_functions', cmd)
# pylint is tripping
# pylint: disable=missing-whitespace-after-comma
job_id_dict = runner.async(cmd, {'args': args, 'kwargs': kwargs})
job_id_dict = runner.asynchronous(cmd, {'args': args, 'kwargs': kwargs})
job_id = job_id_dict['jid']
# Default to trying to run as a client module.

View File

@ -1935,9 +1935,9 @@ class ClearFuncs(object):
try:
fun = clear_load.pop('fun')
runner_client = salt.runner.RunnerClient(self.opts)
return runner_client.async(fun,
clear_load.get('kwarg', {}),
username)
return runner_client.asynchronous(fun,
clear_load.get('kwarg', {}),
username)
except Exception as exc:
log.error('Exception occurred while introspecting %s: %s', fun, exc)
return {'error': {'name': exc.__class__.__name__,

View File

@ -937,7 +937,7 @@ class MinionManager(MinionBase):
install_zmq()
self.io_loop = ZMQDefaultLoop.current()
self.process_manager = ProcessManager(name='MultiMinionProcessManager')
self.io_loop.spawn_callback(self.process_manager.run, async=True)
self.io_loop.spawn_callback(self.process_manager.run, **{'async': True}) # Tornado backward compat
def __del__(self):
self.destroy()
@ -1134,7 +1134,7 @@ class Minion(MinionBase):
time.sleep(sleep_time)
self.process_manager = ProcessManager(name='MinionProcessManager')
self.io_loop.spawn_callback(self.process_manager.run, async=True)
self.io_loop.spawn_callback(self.process_manager.run, **{'async': True})
# We don't have the proxy setup yet, so we can't start engines
# Engines need to be able to access __proxy__
if not salt.utils.platform.is_proxy():

View File

@ -93,6 +93,7 @@ from salt.exceptions import CommandExecutionError
# Import 3rd-party libs
from salt.ext import six
from salt.ext.six.moves import range
import salt.utils.versions
SSL_VERSION = 'ssl_version'
@ -128,7 +129,7 @@ def __virtual__():
def _async_log_errors(errors):
log.error('Cassandra_cql async call returned: %s', errors)
log.error('Cassandra_cql asynchronous call returned: %s', errors)
def _load_properties(property_name, config_option, set_default=False, default=None):
@ -361,9 +362,8 @@ def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=Non
return ret
def cql_query_with_prepare(query, statement_name, statement_arguments, async=False,
callback_errors=None,
contact_points=None, port=None, cql_user=None, cql_pass=None):
def cql_query_with_prepare(query, statement_name, statement_arguments, callback_errors=None, contact_points=None,
port=None, cql_user=None, cql_pass=None, **kwargs):
'''
Run a query on a Cassandra cluster and return a dictionary.
@ -377,8 +377,8 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal
:type statement_name: str
:param statement_arguments: Bind parameters for the SQL statement
:type statement_arguments: list[str]
:param async: Run this query in asynchronous mode
:type async: bool
:param async: Run this query in asynchronous mode
:type async: bool
:param callback_errors: Function to call after query runs if there is an error
:type callback_errors: Function callable
:param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs.
@ -401,12 +401,14 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal
# Insert data asynchronously
salt this-node cassandra_cql.cql_query_with_prepare "name_insert" "INSERT INTO USERS (first_name, last_name) VALUES (?, ?)" \
statement_arguments=['John','Doe'], async=True
statement_arguments=['John','Doe'], asynchronous=True
# Select data, should not be asynchronous because there is not currently a facility to return data from a future
salt this-node cassandra_cql.cql_query_with_prepare "name_select" "SELECT * FROM USERS WHERE first_name=?" \
statement_arguments=['John']
'''
# Backward-compatibility with Python 3.7: "async" is a reserved word
asynchronous = kwargs.get('async', False)
try:
cluster, session = _connect(contact_points=contact_points, port=port,
cql_user=cql_user, cql_pass=cql_pass)
@ -431,7 +433,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal
ret = []
try:
if async:
if asynchronous:
future_results = session.execute_async(bound_statement.bind(statement_arguments))
# future_results.add_callbacks(_async_log_errors)
else:
@ -441,7 +443,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal
msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, e)
raise CommandExecutionError(msg)
if not async and results:
if not asynchronous and results:
for result in results:
values = {}
for key, value in six.iteritems(result):
@ -456,7 +458,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal
# If this was a synchronous call, then we either have an empty list
# because there was no return, or we have a return
# If this was an async call we only return the empty list
# If this was an asynchronous call we only return the empty list
return ret

View File

@ -24,6 +24,7 @@ import logging
# Import Salt libs
import salt.utils.json
import salt.utils.versions
# import third party
try:
@ -137,12 +138,13 @@ def _http_request(url,
def send(message,
async=False,
asynchronous=False,
ip_pool=None,
send_at=None,
api_url=None,
api_version=None,
api_key=None):
api_key=None,
**kwargs):
'''
Send out the email using the details from the ``message`` argument.
@ -151,14 +153,14 @@ def send(message,
sent as dictionary with at fields as specified in the Mandrill API
documentation.
async: ``False``
asynchronous: ``False``
Enable a background sending mode that is optimized for bulk sending.
In async mode, messages/send will immediately return a status of
"queued" for every recipient. To handle rejections when sending in async
In asynchronous mode, messages/send will immediately return a status of
"queued" for every recipient. To handle rejections when sending in asynchronous
mode, set up a webhook for the 'reject' event. Defaults to false for
messages with no more than 10 recipients; messages with more than 10
recipients are always sent asynchronously, regardless of the value of
async.
asynchronous.
ip_pool
The name of the dedicated ip pool that should be used to send the
@ -229,6 +231,11 @@ def send(message,
result:
True
'''
if 'async' in kwargs: # Remove this in Sodium
salt.utils.versions.warn_until('Sodium', 'Parameter "async" is renamed to "asynchronous" '
'and will be removed in version {version}.')
asynchronous = bool(kwargs['async'])
params = _get_api_params(api_url=api_url,
api_version=api_version,
api_key=api_key)
@ -238,7 +245,7 @@ def send(message,
data = {
'key': params['api_key'],
'message': message,
'async': async,
'async': asynchronous,
'ip_pool': ip_pool,
'send_at': send_at
}

View File

@ -986,10 +986,11 @@ def refresh_pillar():
ret = False # Effectively a no-op, since we can't really return without an event system
return ret
pillar_refresh = salt.utils.functools.alias_function(refresh_pillar, 'pillar_refresh')
def refresh_modules(async=True):
def refresh_modules(**kwargs):
'''
Signal the minion to refresh the module and grain data
@ -1003,8 +1004,9 @@ def refresh_modules(async=True):
salt '*' saltutil.refresh_modules
'''
asynchronous = bool(kwargs.get('async', True))
try:
if async:
if asynchronous:
# If we're going to block, first setup a listener
ret = __salt__['event.fire']({}, 'module_refresh')
else:

View File

@ -529,7 +529,7 @@ described above, the most effective and most scalable way to use both Salt and
salt-api is to run commands asynchronously using the ``local_async``,
``runner_async``, and ``wheel_async`` clients.
Running async jobs results in being able to process 3x more commands per second
Running asynchronous jobs results in being able to process 3x more commands per second
for ``LocalClient`` and 17x more commands per second for ``RunnerClient``, in
addition to much less network traffic and memory requirements. Job returns can
be fetched from Salt's job cache via the ``/jobs/<jid>`` endpoint, or they can
@ -2534,7 +2534,7 @@ class WebsocketEndpoint(object):
parent_pipe, child_pipe = Pipe()
handler.pipe = parent_pipe
handler.opts = self.opts
# Process to handle async push to a client.
# Process to handle asynchronous push to a client.
# Each GET request causes a process to be kicked off.
proc = Process(target=event_stream, args=(handler, child_pipe))
proc.start()

View File

@ -180,7 +180,7 @@ class SaltInfo(object):
'expr_type': 'list',
'mode': 'client',
'client': 'local',
'async': 'local_async',
'asynchronous': 'local_async',
'token': token,
})

View File

@ -194,7 +194,7 @@ class SaltInfo(object):
'expr_type': 'list',
'mode': 'client',
'client': 'local',
'async': 'local_async',
'asynchronous': 'local_async',
'token': token,
})

View File

@ -244,7 +244,7 @@ def _json_dumps(obj, **kwargs):
# # master side
# - "runner" (done)
# - "wheel" (need async api...)
# - "wheel" (need asynchronous api...)
AUTH_TOKEN_HEADER = 'X-Auth-Token'
@ -273,7 +273,7 @@ class Any(Future):
class EventListener(object):
'''
Class responsible for listening to the salt master event bus and updating
futures. This is the core of what makes this async, this allows us to do
futures. This is the core of what makes this asynchronous, this allows us to do
non-blocking work in the main processes and "wait" for an event to happen
'''
@ -336,7 +336,7 @@ class EventListener(object):
timeout=None
):
'''
Get an event (async of course) return a future that will get it later
Get an event (asynchronous of course) return a future that will get it later
'''
# if the request finished, no reason to allow event fetching, since we
# can't send back to the client
@ -653,7 +653,7 @@ class SaltAuthHandler(BaseSaltAPIHandler): # pylint: disable=W0223
self.write(self.serialize(ret))
# TODO: make async? Underlying library isn't... and we ARE making disk calls :(
# TODO: make asynchronous? Underlying library isn't... and we ARE making disk calls :(
def post(self):
'''
:ref:`Authenticate <rest_tornado-auth>` against Salt's eauth system

View File

@ -411,7 +411,7 @@ class FormattedEventsHandler(AllEventsHandler): # pylint: disable=W0223,W0232
'tgt': '*',
'token': self.token,
'mode': 'client',
'async': 'local_async',
'asynchronous': 'local_async',
'client': 'local'
})
while True:

View File

@ -202,7 +202,7 @@ def returner(ret):
__salt__['cassandra_cql.cql_query_with_prepare'](query,
'returner_return',
tuple(statement_arguments),
async=True)
asynchronous=True)
except CommandExecutionError:
log.critical('Could not insert into salt_returns with Cassandra returner.')
raise
@ -223,7 +223,7 @@ def returner(ret):
__salt__['cassandra_cql.cql_query_with_prepare'](query,
'returner_minion',
tuple(statement_arguments),
async=True)
asynchronous=True)
except CommandExecutionError:
log.critical('Could not store minion ID with Cassandra returner.')
raise
@ -265,7 +265,7 @@ def event_return(events):
try:
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'salt_events',
statement_arguments,
async=True)
asynchronous=True)
except CommandExecutionError:
log.critical('Could not store events with Cassandra returner.')
raise
@ -295,7 +295,7 @@ def save_load(jid, load, minions=None):
try:
__salt__['cassandra_cql.cql_query_with_prepare'](query, 'save_load',
statement_arguments,
async=True)
asynchronous=True)
except CommandExecutionError:
log.critical('Could not save load in jids table.')
raise

View File

@ -240,13 +240,13 @@ class Runner(RunnerClient):
if self.opts.get('eauth'):
async_pub = self.cmd_async(low)
else:
async_pub = self.async(self.opts['fun'],
low,
user=user,
pub=async_pub)
async_pub = self.asynchronous(self.opts['fun'],
low,
user=user,
pub=async_pub)
# by default: info will be not enougth to be printed out !
log.warning(
'Running in async mode. Results of this execution may '
'Running in asynchronous mode. Results of this execution may '
'be collected by attaching to the master event bus or '
'by examing the master job cache, if configured. '
'This execution is running under tag %s', async_pub['tag']

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
React by calling async runners
React by calling asynchronous runners
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
@ -14,7 +14,7 @@ def cmd(
arg=(),
**kwargs):
'''
Execute a runner async:
Execute a runner asynchronous:
USAGE:
@ -42,7 +42,7 @@ def cmd(
func = name
local_opts = {}
local_opts.update(__opts__)
local_opts['async'] = True # ensure this will be run async
local_opts['async'] = True # ensure this will be run asynchronous
local_opts.update({
'fun': func,
'arg': arg,

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
React by calling async runners
React by calling asynchronous runners
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
@ -14,7 +14,7 @@ def cmd(
arg=(),
**kwargs):
'''
Execute a runner async:
Execute a runner asynchronous:
USAGE:

View File

@ -10,7 +10,7 @@ from __future__ import absolute_import, print_function, unicode_literals
import logging
# Import Salt Libs
from salt.utils.async import SyncWrapper
from salt.utils.asynchronous import SyncWrapper
log = logging.getLogger(__name__)

View File

@ -130,7 +130,7 @@ class IPCServer(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
@ -196,7 +196,7 @@ class IPCServer(object):
log.trace('IPCServer: Handling connection '
'to address: %s', address)
try:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
stream = IOStream(
connection,
)
@ -329,7 +329,7 @@ class IPCClient(object):
break
if self.stream is None:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self.stream = IOStream(
socket.socket(sock_type, socket.SOCK_STREAM),
)
@ -510,7 +510,7 @@ class IPCMessagePublisher(object):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
@ -549,7 +549,7 @@ class IPCMessagePublisher(object):
if self.opts['ipc_write_buffer'] > 0:
kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer']
log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer']))
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
stream = IOStream(
connection,
**kwargs

View File

@ -55,7 +55,7 @@ class ReqServerChannel(object):
'''
Do anything you need post-fork. This should handle all incoming payloads
and call payload_handler. You will also be passed io_loop, for all of your
async needs
asynchronous needs
'''
pass

View File

@ -19,7 +19,7 @@ import traceback
# Import Salt Libs
import salt.crypt
import salt.utils.async
import salt.utils.asynchronous
import salt.utils.event
import salt.utils.files
import salt.utils.platform
@ -480,7 +480,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran
'tok': self.tok,
'data': data,
'tag': tag}
req_channel = salt.utils.async.SyncWrapper(
req_channel = salt.utils.asynchronous.SyncWrapper(
AsyncTCPReqChannel, (self.opts,)
)
try:
@ -607,7 +607,7 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra
self.payload_handler = payload_handler
self.io_loop = io_loop
self.serial = salt.payload.Serial(self.opts)
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if USE_LOAD_BALANCER:
self.req_server = LoadBalancerWorker(self.socket_queue,
self.handle_message,
@ -873,7 +873,7 @@ class SaltMessageClient(object):
self.io_loop = io_loop or tornado.ioloop.IOLoop.current()
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver)
self._mid = 1
@ -899,7 +899,7 @@ class SaltMessageClient(object):
if hasattr(self, '_stream') and not self._stream.closed():
# If _stream_return() hasn't completed, it means the IO
# Loop is stopped (such as when using
# 'salt.utils.async.SyncWrapper'). Ensure that
# 'salt.utils.asynchronous.SyncWrapper'). Ensure that
# _stream_return() completes by restarting the IO Loop.
# This will prevent potential errors on shutdown.
try:
@ -972,7 +972,7 @@ class SaltMessageClient(object):
'source_port': self.source_port}
else:
log.warning('If you need a certain source IP/port, consider upgrading Tornado >= 4.5')
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self._stream = yield self._tcp_client.connect(self.host,
self.port,
ssl_options=self.opts.get('ssl'),
@ -1451,9 +1451,9 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel):
pull_uri = int(self.opts.get('tcp_master_publish_pull', 4514))
else:
pull_uri = os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
# TODO: switch to the actual async interface
# TODO: switch to the actual asynchronous interface
#pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop)
pub_sock = salt.utils.async.SyncWrapper(
pub_sock = salt.utils.asynchronous.SyncWrapper(
salt.transport.ipc.IPCMessageClient,
(pull_uri,)
)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
'''
Helpers/utils for working with tornado async stuff
Helpers/utils for working with tornado asynchronous stuff
'''
from __future__ import absolute_import, print_function, unicode_literals
@ -30,9 +30,9 @@ class SyncWrapper(object):
This is uses as a simple wrapper, for example:
async = AsyncClass()
asynchronous = AsyncClass()
# this method would reguarly return a future
future = async.async_method()
future = asynchronous.async_method()
sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'})
# the sync wrapper will automatically wait on the future
@ -46,15 +46,15 @@ class SyncWrapper(object):
kwargs['io_loop'] = self.io_loop
with current_ioloop(self.io_loop):
self.async = method(*args, **kwargs)
self.asynchronous = method(*args, **kwargs)
def __getattribute__(self, key):
try:
return object.__getattribute__(self, key)
except AttributeError as ex:
if key == 'async':
if key == 'asynchronous':
raise ex
attr = getattr(self.async, key)
attr = getattr(self.asynchronous, key)
if hasattr(attr, '__call__'):
def wrap(*args, **kwargs):
# Overload the ioloop for the func call-- since it might call .current()
@ -75,15 +75,15 @@ class SyncWrapper(object):
def __del__(self):
'''
On deletion of the async wrapper, make sure to clean up the async stuff
On deletion of the asynchronous wrapper, make sure to clean up the asynchronous stuff
'''
if hasattr(self, 'async'):
if hasattr(self.async, 'close'):
if hasattr(self, 'asynchronous'):
if hasattr(self.asynchronous, 'close'):
# Certain things such as streams should be closed before
# their associated io_loop is closed to allow for proper
# cleanup.
self.async.close()
del self.async
self.asynchronous.close()
del self.asynchronous
self.io_loop.close()
del self.io_loop
elif hasattr(self, 'io_loop'):

View File

@ -72,7 +72,7 @@ def sleep_exponential_backoff(attempts):
https://docs.aws.amazon.com/general/latest/gr/api-retries.html
Failure to implement this approach results in a failure rate of >30% when using salt-cloud with
"--parallel" when creating 50 or more instances with a fixed delay of 2 seconds.
A failure rate of >10% is observed when using the salt-api with an asyncronous client
A failure rate of >10% is observed when using the salt-api with an asynchronous client
specified (runner_async).
"""
time.sleep(random.uniform(1, 2**attempts))

View File

@ -72,7 +72,7 @@ import tornado.iostream
# Import salt libs
import salt.config
import salt.payload
import salt.utils.async
import salt.utils.asynchronous
import salt.utils.cache
import salt.utils.dicttrim
import salt.utils.files
@ -225,7 +225,7 @@ class SaltEvent(object):
:param Bool keep_loop: Pass a boolean to determine if we want to keep
the io loop or destroy it when the event handle
is destroyed. This is useful when using event
loops from within third party async code
loops from within third party asynchronous code
'''
self.serial = salt.payload.Serial({'serial': 'msgpack'})
self.keep_loop = keep_loop
@ -361,7 +361,7 @@ class SaltEvent(object):
return True
if self._run_io_loop_sync:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
@ -380,7 +380,7 @@ class SaltEvent(object):
io_loop=self.io_loop
)
# For the async case, the connect will be defered to when
# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
self.cpub = True
return self.cpub
@ -406,7 +406,7 @@ class SaltEvent(object):
return True
if self._run_io_loop_sync:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.pusher is None:
self.pusher = salt.transport.ipc.IPCMessageClient(
self.pulluri,
@ -424,7 +424,7 @@ class SaltEvent(object):
self.pulluri,
io_loop=self.io_loop
)
# For the async case, the connect will be deferred to when
# For the asynchronous case, the connect will be deferred to when
# fire_event() is invoked.
self.cpush = True
return self.cpush
@ -629,7 +629,7 @@ class SaltEvent(object):
ret = self._check_pending(tag, match_func)
if ret is None:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if auto_reconnect:
raise_errors = self.raise_errors
self.raise_errors = True
@ -740,7 +740,7 @@ class SaltEvent(object):
serialized_data])
msg = salt.utils.stringutils.to_bytes(event, 'utf-8')
if self._run_io_loop_sync:
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
try:
self.io_loop.run_sync(lambda: self.pusher.send(msg))
except Exception as ex:
@ -1087,7 +1087,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess):
'''
salt.utils.process.appendproctitle(self.__class__.__name__)
self.io_loop = tornado.ioloop.IOLoop()
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.opts['ipc_mode'] == 'tcp':
epub_uri = int(self.opts['tcp_master_pub_port'])
epull_uri = int(self.opts['tcp_master_pull_port'])

View File

@ -482,7 +482,7 @@ class ProcessManager(object):
del self._process_map[pid]
@gen.coroutine
def run(self, async=False):
def run(self, asynchronous=False):
'''
Load and start all available api modules
'''
@ -505,7 +505,7 @@ class ProcessManager(object):
# The event-based subprocesses management code was removed from here
# because os.wait() conflicts with the subprocesses management logic
# implemented in `multiprocessing` package. See #35480 for details.
if async:
if asynchronous:
yield gen.sleep(10)
else:
time.sleep(10)

View File

@ -707,7 +707,7 @@ def gen_min(cachedir, extra_mods='', overwrite=False, so_mods='',
'salt/utils/openstack',
'salt/utils/openstack/__init__.py',
'salt/utils/openstack/swift.py',
'salt/utils/async.py',
'salt/utils/asynchronous.py',
'salt/utils/process.py',
'salt/utils/jinja.py',
'salt/utils/rsax931.py',

View File

@ -57,7 +57,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin,
return self.low(fun, kwargs, print_event=kwargs.get('print_event', True), full_return=kwargs.get('full_return', False))
# TODO: Inconsistent with runner client-- the runner client's master_call gives
# an async return, unlike this
# an asynchronous return, unlike this
def master_call(self, **kwargs):
'''
Execute a wheel function through the master network interface (eauth).
@ -120,7 +120,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin,
{'jid': '20131219224744416681', 'tag': 'salt/wheel/20131219224744416681'}
'''
fun = low.pop('fun')
return self.async(fun, low)
return self.asynchronous(fun, low)
def cmd(self, fun, arg=None, pub_data=None, kwarg=None, print_event=True, full_return=False):
'''

View File

@ -21,7 +21,7 @@ import logging
# Import salt libs
import salt.utils.event
import salt.utils.async
import salt.utils.asynchronous
# Import 3rd-party libs
from tornado import gen
@ -70,7 +70,7 @@ class PyTestEngine(object):
self.sock.bind(('localhost', port))
# become a server socket
self.sock.listen(5)
with salt.utils.async.current_ioloop(self.io_loop):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
netutil.add_accept_handler(
self.sock,
self.handle_connection,

View File

@ -399,7 +399,7 @@ class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase):
def test_post_with_incorrect_client(self):
'''
The /minions endpoint is async only, so if you try something else
The /minions endpoint is asynchronous only, so if you try something else
make sure you get an error
'''
# get a token for this test

View File

@ -13,7 +13,7 @@
# pylint: disable=repr-flag-used-in-string
# Import python libs
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
import os
import re
import sys
@ -149,17 +149,19 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin):
arg_str,
with_retcode=False,
catch_stderr=False,
async=False,
asynchronous=False,
timeout=60,
config_dir=None):
config_dir=None,
**kwargs):
'''
Execute salt-run
'''
asynchronous = kwargs.get('async', asynchronous)
arg_str = '-c {0}{async_flag} -t {timeout} {1}'.format(
config_dir or self.config_dir,
arg_str,
timeout=timeout,
async_flag=' --async' if async else '')
async_flag=' --async' if asynchronous else '')
return self.run_script('salt-run',
arg_str,
with_retcode=with_retcode,

View File

@ -8,7 +8,7 @@ import tornado.testing
import tornado.gen
from tornado.testing import AsyncTestCase
import salt.utils.async as async
import salt.utils.asynchronous as asynchronous
class HelperA(object):
@ -24,7 +24,7 @@ class HelperA(object):
class HelperB(object):
def __init__(self, a=None, io_loop=None):
if a is None:
a = async.SyncWrapper(HelperA)
a = asynchronous.SyncWrapper(HelperA)
self.a = a
@tornado.gen.coroutine
@ -38,7 +38,7 @@ class TestSyncWrapper(AsyncTestCase):
@tornado.testing.gen_test
def test_helpers(self):
'''
Test that the helper classes do what we expect within a regular async env
Test that the helper classes do what we expect within a regular asynchronous env
'''
ha = HelperA()
ret = yield ha.sleep()
@ -50,29 +50,29 @@ class TestSyncWrapper(AsyncTestCase):
def test_basic_wrap(self):
'''
Test that we can wrap an async caller.
Test that we can wrap an asynchronous caller.
'''
sync = async.SyncWrapper(HelperA)
sync = asynchronous.SyncWrapper(HelperA)
ret = sync.sleep()
self.assertTrue(ret)
def test_double(self):
'''
Test when the async wrapper object itself creates a wrap of another thing
Test when the asynchronous wrapper object itself creates a wrap of another thing
This works fine since the second wrap is based on the first's IOLoop so we
don't have to worry about complex start/stop mechanics
'''
sync = async.SyncWrapper(HelperB)
sync = asynchronous.SyncWrapper(HelperB)
ret = sync.sleep()
self.assertFalse(ret)
def test_double_sameloop(self):
'''
Test async wrappers initiated from the same IOLoop, to ensure that
Test asynchronous wrappers initiated from the same IOLoop, to ensure that
we don't wire up both to the same IOLoop (since it causes MANY problems).
'''
a = async.SyncWrapper(HelperA)
sync = async.SyncWrapper(HelperB, (a,))
a = asynchronous.SyncWrapper(HelperA)
sync = asynchronous.SyncWrapper(HelperB, (a,))
ret = sync.sleep()
self.assertFalse(ret)