Merge pull request #9406 from cro/salt_transport

Salt transport
This commit is contained in:
Thomas S Hatch 2013-12-23 09:27:33 -08:00
commit 15764276fc
13 changed files with 237 additions and 94 deletions

View File

@ -39,6 +39,7 @@ import getpass
# Import salt libs
import salt.config
import salt.payload
import salt.transport
import salt.utils
import salt.utils.verify
import salt.utils.event
@ -1320,12 +1321,15 @@ class LocalClient(object):
if self.opts['order_masters']:
payload_kwargs['to'] = timeout
sreq = salt.payload.SREQ(
#'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
'tcp://' + salt.utils.ip_bracket(self.opts['interface']) +
':' + str(self.opts['ret_port']),
)
payload = sreq.send('clear', payload_kwargs)
# sreq = salt.payload.SREQ(
# #'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
# 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) +
# ':' + str(self.opts['ret_port']),
# )
master_uri = 'tcp://' + salt.utils.ip_bracket(self.opts['interface']) + \
':' + str(self.opts['ret_port'])
sreq = salt.transport.Channel.factory(self.opts, crypt='clear', master_uri=master_uri)
payload = sreq.send(payload_kwargs)
if not payload:
# The master key could have changed out from under us! Regen
@ -1335,7 +1339,7 @@ class LocalClient(object):
return payload
self.key = key
payload_kwargs['key'] = self.key
payload = sreq.send('clear', payload_kwargs)
payload = sreq.send(payload_kwargs)
if not payload:
return payload

View File

@ -177,7 +177,8 @@ VALID_OPTS = {
'jinja_trim_blocks': bool,
'minion_id_caching': bool,
'sign_pub_messages': bool,
'keysize': int
'keysize': int,
'salt_transport': str,
}
# default configurations
@ -269,7 +270,8 @@ DEFAULT_MINION_OPTS = {
'modules_max_memory': -1,
'grains_refresh_every': 0,
'minion_id_caching': True,
'keysize': 4096
'keysize': 4096,
'salt_transport': 'zeromq',
}
DEFAULT_MASTER_OPTS = {
@ -377,7 +379,8 @@ DEFAULT_MASTER_OPTS = {
'jinja_lstrip_blocks': False,
'jinja_trim_blocks': False,
'sign_pub_messages': False,
'keysize': 4096
'keysize': 4096,
'salt_transport': 'zeromq',
}
# ----- Salt Cloud Configuration Defaults ----------------------------------->

View File

@ -20,6 +20,7 @@ import salt.client
import salt.crypt
import salt.loader
import salt.payload
import salt.transport
import salt.utils
import salt.utils.templates
import salt.utils.gzip_util
@ -852,28 +853,29 @@ class RemoteClient(Client):
'''
def __init__(self, opts):
Client.__init__(self, opts)
self.auth = salt.crypt.SAuth(opts)
self.sreq = salt.payload.SREQ(self.opts['master_uri'])
def _crypted_transfer(self, load, tries=3, timeout=60, payload='aes'):
'''
In case of authentication errors, try to renegotiate authentication
and retry the method.
Indeed, we can fail too early in case of a master restart during a
minion state execution call
'''
def _do_transfer():
return self.auth.crypticle.loads(
self.sreq.send(payload,
self.auth.crypticle.dumps(load),
tries,
timeout)
)
try:
return _do_transfer()
except salt.crypt.AuthenticationError:
self.auth = salt.crypt.SAuth(self.opts)
return _do_transfer()
self.channel = salt.transport.Channel.factory(opts)
# self.auth = salt.crypt.SAuth(opts)
# self.sreq = salt.payload.SREQ(self.opts['master_uri'])
#
# def _crypted_transfer(self, load, tries=3, timeout=60, payload='aes'):
# '''
# In case of authentication errors, try to renegotiate authentication
# and retry the method.
# Indeed, we can fail too early in case of a master restart during a
# minion state execution call
# '''
# def _do_transfer():
# return self.auth.crypticle.loads(
# self.sreq.send(payload,
# self.auth.crypticle.dumps(load),
# tries,
# timeout)
# )
# try:
# return _do_transfer()
# except salt.crypt.AuthenticationError:
# self.auth = salt.crypt.SAuth(self.opts)
# return _do_transfer()
def get_file(self,
path,
@ -947,7 +949,7 @@ class RemoteClient(Client):
else:
load['loc'] = fn_.tell()
try:
data = self._crypted_transfer(load)
data = self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1012,7 +1014,7 @@ class RemoteClient(Client):
'prefix': prefix,
'cmd': '_file_list'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1034,7 +1036,7 @@ class RemoteClient(Client):
'prefix': prefix,
'cmd': '_file_list_emptydirs'}
try:
self._crypted_transfer(load)
self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1056,7 +1058,7 @@ class RemoteClient(Client):
'prefix': prefix,
'cmd': '_dir_list'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1068,7 +1070,7 @@ class RemoteClient(Client):
'prefix': prefix,
'cmd': '_symlink_list'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1105,7 +1107,7 @@ class RemoteClient(Client):
'saltenv': saltenv,
'cmd': '_file_hash'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1126,7 +1128,7 @@ class RemoteClient(Client):
load = {'saltenv': saltenv,
'cmd': '_file_list'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1136,7 +1138,7 @@ class RemoteClient(Client):
'''
load = {'cmd': '_master_opts'}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''
@ -1148,8 +1150,8 @@ class RemoteClient(Client):
load = {'cmd': '_ext_nodes',
'id': self.opts['id'],
'opts': self.opts,
'tok': self.auth.gen_token('salt')}
'tok': self.channel.auth.gen_token('salt')}
try:
return self._crypted_transfer(load)
return self.channel.send(load)
except SaltReqTimeoutError:
return ''

View File

@ -12,6 +12,7 @@ import salt.minion
import salt.fileclient
import salt.utils
import salt.crypt
import salt.transport
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
@ -607,13 +608,16 @@ def push(path):
'id': __opts__['id'],
'path': path.lstrip(os.sep),
'tok': auth.gen_token('salt')}
sreq = salt.payload.SREQ(__opts__['master_uri'])
sreq = salt.transport.Channel.factory(__opts__)
# sreq = salt.payload.SREQ(__opts__['master_uri'])
with salt.utils.fopen(path, 'rb') as fp_:
while True:
load['loc'] = fp_.tell()
load['data'] = fp_.read(__opts__['file_buffer_size'])
if not load['data']:
return True
ret = sreq.send('aes', auth.crypticle.dumps(load))
# ret = sreq.send('aes', auth.crypticle.dumps(load))
ret = sreq.send(load)
if not ret:
return ret

View File

@ -8,6 +8,7 @@ master to the minion and vice-versa.
import salt.crypt
import salt.utils.event
import salt.payload
import salt.transport
def fire_master(data, tag, preload=None):
@ -31,9 +32,10 @@ def fire_master(data, tag, preload=None):
'tok': auth.gen_token('salt'),
'cmd': '_minion_event'})
sreq = salt.payload.SREQ(__opts__['master_uri'])
# sreq = salt.payload.SREQ(__opts__['master_uri'])
sreq = salt.transport.Channel.factory(__opts__)
try:
sreq.send('aes', auth.crypticle.dumps(load))
sreq.send(load)
except Exception:
pass
return True

View File

@ -79,9 +79,13 @@ def update(clear=False):
'clear': clear,
'tok': auth.gen_token('salt'),
}
sreq = salt.payload.SREQ(__opts__['master_uri'])
ret = sreq.send('aes', auth.crypticle.dumps(load))
return auth.crypticle.loads(ret)
# Changed for transport plugin
# sreq = salt.payload.SREQ(__opts__['master_uri'])
# ret = sreq.send('aes', auth.crypticle.dumps(load))
# return auth.crypticle.loads(ret)
sreq = salt.transport.Channel.factory(__opts__)
ret = sreq.send(load)
return ret
def send(func, *args, **kwargs):
@ -128,9 +132,13 @@ def send(func, *args, **kwargs):
'id': __opts__['id'],
'tok': auth.gen_token('salt'),
}
sreq = salt.payload.SREQ(__opts__['master_uri'])
ret = sreq.send('aes', auth.crypticle.dumps(load))
return auth.crypticle.loads(ret)
# Changed for transport plugin
# sreq = salt.payload.SREQ(__opts__['master_uri'])
# ret = sreq.send('aes', auth.crypticle.dumps(load))
# return auth.crypticle.loads(ret)
sreq = salt.transport.Channel.factory(__opts__)
ret = sreq.send(load)
return ret
def get(tgt, fun, expr_form='glob'):
@ -179,9 +187,13 @@ def get(tgt, fun, expr_form='glob'):
'expr_form': expr_form,
'tok': auth.gen_token('salt'),
}
sreq = salt.payload.SREQ(__opts__['master_uri'])
ret = sreq.send('aes', auth.crypticle.dumps(load))
return auth.crypticle.loads(ret)
# Changed for transport plugin
# sreq = salt.payload.SREQ(__opts__['master_uri'])
# ret = sreq.send('aes', auth.crypticle.dumps(load))
# return auth.crypticle.loads(ret)
sreq = salt.transport.Channel.factory(__opts__)
ret = sreq.send(load)
return ret
def delete(fun):
@ -206,9 +218,13 @@ def delete(fun):
'fun': fun,
'tok': auth.gen_token('salt'),
}
sreq = salt.payload.SREQ(__opts__['master_uri'])
ret = sreq.send('aes', auth.crypticle.dumps(load))
return auth.crypticle.loads(ret)
# Changed for transport plugin
# sreq = salt.payload.SREQ(__opts__['master_uri'])
# ret = sreq.send('aes', auth.crypticle.dumps(load))
# return auth.crypticle.loads(ret)
sreq = salt.transport.Channel.factory(__opts__)
ret = sreq.send(load)
return ret
def flush():
@ -229,6 +245,10 @@ def flush():
'id': __opts__['id'],
'tok': auth.gen_token('salt'),
}
sreq = salt.payload.SREQ(__opts__['master_uri'])
ret = sreq.send('aes', auth.crypticle.dumps(load))
return auth.crypticle.loads(ret)
# Changed for transport plugin
# sreq = salt.payload.SREQ(__opts__['master_uri'])
# ret = sreq.send('aes', auth.crypticle.dumps(load))
# return auth.crypticle.loads(ret)
sreq = salt.transport.Channel.factory(__opts__)
ret = sreq.send(load)
return ret

View File

@ -11,6 +11,7 @@ import logging
# Import salt libs
import salt.crypt
import salt.payload
import salt.transport
from salt.exceptions import SaltReqTimeoutError
from salt._compat import string_types, integer_types
@ -52,7 +53,7 @@ def _publish(
arg = _normalize_arg(arg)
log.info('Publishing {0!r} to {master_uri}'.format(fun, **__opts__))
sreq = salt.payload.SREQ(__opts__['master_uri'])
# sreq = salt.payload.SREQ(__opts__['master_uri'])
auth = salt.crypt.SAuth(__opts__)
tok = auth.gen_token('salt')
load = {'cmd': 'minion_pub',
@ -66,9 +67,11 @@ def _publish(
'form': form,
'id': __opts__['id']}
sreq = salt.transport.Channel.factory(__opts__)
try:
peer_data = auth.crypticle.loads(
sreq.send('aes', auth.crypticle.dumps(load), 1))
peer_data = sreq.send(load)
# peer_data = auth.crypticle.loads(
# sreq.send('aes', auth.crypticle.dumps(load), 1))
except SaltReqTimeoutError:
return '{0!r} publish timed out'.format(fun)
if not peer_data:
@ -79,8 +82,9 @@ def _publish(
'id': __opts__['id'],
'tok': tok,
'jid': peer_data['jid']}
ret = auth.crypticle.loads(
sreq.send('aes', auth.crypticle.dumps(load), 5))
ret = sreq.send(load)
# auth.crypticle.loads(
# sreq.send('aes', auth.crypticle.dumps(load), 5))
if form == 'clean':
cret = {}
for host in ret:
@ -212,7 +216,7 @@ def runner(fun, arg=None):
arg = _normalize_arg(arg)
log.info('Publishing runner {0!r} to {master_uri}'.format(fun, **__opts__))
sreq = salt.payload.SREQ(__opts__['master_uri'])
# sreq = salt.payload.SREQ(__opts__['master_uri'])
auth = salt.crypt.SAuth(__opts__)
tok = auth.gen_token('salt')
load = {'cmd': 'minion_runner',
@ -220,8 +224,11 @@ def runner(fun, arg=None):
'arg': arg,
'tok': tok,
'id': __opts__['id']}
sreq = salt.transport.Channel.factory(__opts__)
try:
return auth.crypticle.loads(
sreq.send('aes', auth.crypticle.dumps(load), 1))
return sreq.send(load)
# return auth.crypticle.loads(
# sreq.send('aes', auth.crypticle.dumps(load), 1))
except SaltReqTimeoutError:
return '{0!r} runner publish timed out'.format(fun)

View File

@ -20,6 +20,7 @@ import salt.payload
import salt.state
import salt.client
import salt.utils
import salt.transport
from salt.exceptions import SaltReqTimeoutError
from salt._compat import string_types
@ -515,15 +516,18 @@ def revoke_auth():
salt '*' saltutil.revoke_auth
'''
sreq = salt.payload.SREQ(__opts__['master_uri'])
# sreq = salt.payload.SREQ(__opts__['master_uri'])
auth = salt.crypt.SAuth(__opts__)
tok = auth.gen_token('salt')
load = {'cmd': 'revoke_auth',
'id': __opts__['id'],
'tok': tok}
sreq = salt.transport.Channel.factory(__opts__)
try:
return auth.crypticle.loads(
sreq.send('aes', auth.crypticle.dumps(load), 1))
sreq.send(load)
# return auth.crypticle.loads(
# sreq.send('aes', auth.crypticle.dumps(load), 1))
except SaltReqTimeoutError:
return False
return False

View File

@ -13,12 +13,14 @@ import salt.loader
import salt.fileclient
import salt.minion
import salt.crypt
import salt.transport
from salt._compat import string_types
from salt.template import compile_template
from salt.utils.dictupdate import update
from salt.utils.odict import OrderedDict
from salt.version import __version__
log = logging.getLogger(__name__)
@ -52,8 +54,8 @@ class RemotePillar(object):
self.grains = grains
self.id_ = id_
self.serial = salt.payload.Serial(self.opts)
self.sreq = salt.payload.SREQ(self.opts['master_uri'])
self.auth = salt.crypt.SAuth(opts)
self.sreq = salt.transport.Channel.factory(opts)
# self.auth = salt.crypt.SAuth(opts)
def compile_pillar(self):
'''
@ -66,11 +68,14 @@ class RemotePillar(object):
'cmd': '_pillar'}
if self.ext:
load['ext'] = self.ext
ret = self.sreq.send('aes', self.auth.crypticle.dumps(load), 3, 7200)
key = self.auth.get_keys()
aes = key.private_decrypt(ret['key'], 4)
pcrypt = salt.crypt.Crypticle(self.opts, aes)
ret_pillar = pcrypt.loads(ret['pillar'])
# ret = self.sreq.send(load, tries=3, timeout=7200)
ret_pillar = self.sreq.crypted_transfer_decode_dictentry(load, dictkey='pillar', tries=3, timeout=7200)
# key = self.auth.get_keys()
# aes = key.private_decrypt(ret['key'], 4)
# pcrypt = salt.crypt.Crypticle(self.opts, aes)
# ret_pillar = pcrypt.loads(ret['pillar'])
if not isinstance(ret_pillar, dict):
log.error(
'Got a bad pillar from master, type {0}, expecting dict: '

View File

@ -172,10 +172,11 @@ class RunnerClient(object):
'''
load = kwargs
load['cmd'] = 'runner'
sreq = salt.payload.SREQ(
'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
)
ret = sreq.send('clear', load)
# sreq = salt.payload.SREQ(
# 'tcp://{0[interface]}:{0[ret_port]}'.format(self.opts),
# )
sreq = salt.transport.Channel.factory(opts, crypt='clear')
ret = sreq.send(load)
if isinstance(ret, collections.Mapping):
if 'error' in ret:
raise_error(**ret['error'])

View File

@ -2611,8 +2611,8 @@ class RemoteHighState(object):
self.opts = opts
self.grains = grains
self.serial = salt.payload.Serial(self.opts)
self.auth = salt.crypt.SAuth(opts)
self.sreq = salt.payload.SREQ(self.opts['master_uri'])
# self.auth = salt.crypt.SAuth(opts)
self.sreq = salt.transport.Channel.factory(self.opts['master_uri'])
def compile_master(self):
'''
@ -2622,10 +2622,11 @@ class RemoteHighState(object):
'opts': self.opts,
'cmd': '_master_state'}
try:
return self.auth.crypticle.loads(self.sreq.send(
'aes',
self.auth.crypticle.dumps(load),
3,
72000))
return self.sreq.send(load, tries=3, timeout=72000)
# return self.auth.crypticle.loads(self.sreq.send(
# 'aes',
# self.auth.crypticle.dumps(load),
# 3,
# 72000))
except SaltReqTimeoutError:
return {}

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
'''
Encapsulate the different transports available to Salt. Currently this is only ZeroMQ.
'''
import salt.payload
import salt.auth
class Channel(object):
@staticmethod
def factory(opts, **kwargs):
# Default to ZeroMQ for now
ttype = 'zeromq'
if 'transport_type' in opts: ttype = opts['transport_type']
elif 'transport_type' in opts['pillar']['master']:
ttype = opts['pillar']['master']['transport_type']
if ttype == 'zeromq':
return ZeroMQChannel(opts, **kwargs)
else:
raise Exception("Channels are only defined for ZeroMQ")
# return NewKindOfChannel(opts, **kwargs)
class ZeroMQChannel(Channel):
'''
Encapsulate sending routines to ZeroMQ.
ZMQ Channels default to 'crypt=aes'
'''
def __init__(self, opts, **kwargs):
self.opts = opts
# crypt defaults to 'aes'
self.crypt = kwargs['crypt'] if 'crypt' in kwargs else 'aes'
self.serial = salt.payload.Serial(opts)
if self.crypt != 'clear':
self.auth = salt.crypt.SAuth(opts)
if 'master_uri' in kwargs:
master_uri = kwargs['master_uri']
else:
master_uri = opts['master_uri']
self.sreq = salt.payload.SREQ(master_uri)
def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60):
ret = self.sreq.send('aes', self.auth.crypticle.dumps(load), tries, timeout)
key = self.auth.get_keys()
aes = key.private_decrypt(ret['key'], 4)
pcrypt = salt.crypt.Crypticle(self.opts, aes)
return pcrypt.loads(ret[dictkey])
def _crypted_transfer(self, load, tries=3, timeout=60):
'''
In case of authentication errors, try to renegotiate authentication
and retry the method.
Indeed, we can fail too early in case of a master restart during a
minion state execution call
'''
def _do_transfer():
return self.auth.crypticle.loads(
self.sreq.send(self.crypt,
self.auth.crypticle.dumps(load),
tries,
timeout)
)
try:
return _do_transfer()
except salt.crypt.AuthenticationError:
self.auth = salt.crypt.SAuth(self.opts)
return _do_transfer()
def _uncrypted_transfer(self, load, tries=3, timeout=60):
return self.sreq.send(self.crypt, load, tries, timeout)
def send(self, load, tries=3, timeout=60):
if self.crypt != 'clear':
return self._crypted_transfer(load, tries, timeout)
else:
return self._uncrypted_transfer(load, tries, timeout)
# Do we ever do non-crypted transfers?

View File

@ -639,9 +639,9 @@ class StateFire(object):
'cmd': '_minion_event',
'tok': self.auth.gen_token('salt')})
sreq = salt.payload.SREQ(self.opts['master_uri'])
sreq = salt.transport.Channel.factory(self.opts)
try:
sreq.send('aes', self.auth.crypticle.dumps(load))
sreq.send(load)
except Exception:
pass
return True
@ -669,9 +669,9 @@ class StateFire(object):
{'tag': tag,
'data': running[stag]}
)
sreq = salt.payload.SREQ(self.opts['master_uri'])
sreq = salt.transport.Channel.factory(self.opts)
try:
sreq.send('aes', self.auth.crypticle.dumps(load))
sreq.send(load)
except Exception:
pass
return True