mirror of
https://github.com/valitydev/salt.git
synced 2024-11-06 16:45:27 +00:00
Merge pull request #50996 from s0undt3ch/hotfix/deprecate-transport-channel
Stop using the deprecated `salt.transport.Channel.factory`
This commit is contained in:
commit
90e26db585
@ -20,6 +20,7 @@ import salt.minion
|
||||
import salt.output
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.utils.args
|
||||
import salt.utils.files
|
||||
import salt.utils.jid
|
||||
@ -328,7 +329,7 @@ class ZeroMQCaller(BaseCaller):
|
||||
'''
|
||||
Return the data up to the master
|
||||
'''
|
||||
channel = salt.transport.Channel.factory(self.opts, usage='salt_call')
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts, usage='salt_call')
|
||||
load = {'cmd': '_return', 'id': self.opts['id']}
|
||||
for key, value in six.iteritems(ret):
|
||||
load[key] = value
|
||||
|
@ -31,7 +31,7 @@ import salt.config
|
||||
import salt.cache
|
||||
import salt.defaults.exitcodes
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.loader
|
||||
import salt.utils.args
|
||||
import salt.utils.event
|
||||
@ -1722,9 +1722,9 @@ class LocalClient(object):
|
||||
|
||||
master_uri = 'tcp://' + salt.utils.zeromq.ip_bracket(self.opts['interface']) + \
|
||||
':' + six.text_type(self.opts['ret_port'])
|
||||
channel = salt.transport.Channel.factory(self.opts,
|
||||
crypt='clear',
|
||||
master_uri=master_uri)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts,
|
||||
crypt='clear',
|
||||
master_uri=master_uri)
|
||||
|
||||
try:
|
||||
# Ensure that the event subscriber is connected.
|
||||
|
@ -28,7 +28,7 @@ import salt.utils.process
|
||||
import salt.utils.state
|
||||
import salt.utils.user
|
||||
import salt.utils.versions
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.log.setup
|
||||
from salt.ext import six
|
||||
|
||||
@ -136,9 +136,9 @@ class SyncClientMixin(object):
|
||||
'''
|
||||
load = kwargs
|
||||
load['cmd'] = self.client
|
||||
channel = salt.transport.Channel.factory(self.opts,
|
||||
crypt='clear',
|
||||
usage='master_call')
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts,
|
||||
crypt='clear',
|
||||
usage='master_call')
|
||||
ret = channel.send(load)
|
||||
if isinstance(ret, collections.Mapping):
|
||||
if 'error' in ret:
|
||||
|
@ -23,7 +23,7 @@ import salt.client
|
||||
import salt.crypt
|
||||
import salt.loader
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.fileserver
|
||||
import salt.utils.data
|
||||
import salt.utils.files
|
||||
@ -1008,7 +1008,7 @@ class RemoteClient(Client):
|
||||
'''
|
||||
def __init__(self, opts):
|
||||
Client.__init__(self, opts)
|
||||
self.channel = salt.transport.Channel.factory(self.opts)
|
||||
self.channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
if hasattr(self.channel, 'auth'):
|
||||
self.auth = self.channel.auth
|
||||
else:
|
||||
@ -1018,7 +1018,7 @@ class RemoteClient(Client):
|
||||
'''
|
||||
Reset the channel, in the event of an interruption
|
||||
'''
|
||||
self.channel = salt.transport.Channel.factory(self.opts)
|
||||
self.channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
return self.channel
|
||||
|
||||
def get_file(self,
|
||||
|
@ -27,6 +27,7 @@ from binascii import crc32
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import range
|
||||
from salt.utils.zeromq import zmq, ZMQDefaultLoop, install_zmq, ZMQ_VERSION_INFO
|
||||
import salt.transport.client
|
||||
import salt.defaults.exitcodes
|
||||
|
||||
from salt.utils.ctx import RequestContext
|
||||
@ -1388,7 +1389,7 @@ class Minion(MinionBase):
|
||||
sig = salt.crypt.sign_message(minion_privkey_path, salt.serializers.msgpack.serialize(load))
|
||||
load['sig'] = sig
|
||||
|
||||
channel = salt.transport.Channel.factory(self.opts)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
return channel.send(load, timeout=timeout)
|
||||
|
||||
@tornado.gen.coroutine
|
||||
@ -2321,7 +2322,7 @@ class Minion(MinionBase):
|
||||
'''
|
||||
Send mine data to the master
|
||||
'''
|
||||
channel = salt.transport.Channel.factory(self.opts)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
data['tok'] = self.tok
|
||||
try:
|
||||
ret = channel.send(data)
|
||||
|
@ -21,7 +21,7 @@ import salt.utils.path
|
||||
import salt.utils.templates
|
||||
import salt.utils.url
|
||||
import salt.crypt
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
from salt.exceptions import CommandExecutionError
|
||||
from salt.ext.six.moves.urllib.parse import urlparse as _urlparse # pylint: disable=import-error,no-name-in-module
|
||||
|
||||
@ -829,7 +829,7 @@ def push(path, keep_symlinks=False, upload_path=None, remove_source=False):
|
||||
'path': load_path_list,
|
||||
'size': os.path.getsize(path),
|
||||
'tok': auth.gen_token(b'salt')}
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
with salt.utils.files.fopen(path, 'rb') as fp_:
|
||||
init_send = False
|
||||
while True:
|
||||
|
@ -17,7 +17,7 @@ import salt.crypt
|
||||
import salt.utils.event
|
||||
import salt.utils.zeromq
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
from salt.ext import six
|
||||
|
||||
__proxyenabled__ = ['*']
|
||||
@ -46,7 +46,7 @@ def fire_master(data, tag, preload=None):
|
||||
log.warning('Local mode detected. Event with tag %s will NOT be sent.', tag)
|
||||
return False
|
||||
if __opts__['transport'] == 'raet':
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
load = {'id': __opts__['id'],
|
||||
'tag': tag,
|
||||
'data': data,
|
||||
@ -83,7 +83,7 @@ def fire_master(data, tag, preload=None):
|
||||
load.update(preload)
|
||||
|
||||
for master in masters:
|
||||
channel = salt.transport.Channel.factory(__opts__, master_uri=master)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__, master_uri=master)
|
||||
try:
|
||||
channel.send(load)
|
||||
# channel.send was successful.
|
||||
|
@ -74,7 +74,7 @@ def _mine_get(load, opts):
|
||||
'Mine could not be retrieved.'
|
||||
)
|
||||
return False
|
||||
channel = salt.transport.Channel.factory(opts)
|
||||
channel = salt.transport.client.ReqChannel.factory(opts)
|
||||
ret = channel.send(load)
|
||||
return ret
|
||||
|
||||
|
@ -11,8 +11,8 @@ import logging
|
||||
# Import salt libs
|
||||
import salt.crypt
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.utils.args
|
||||
import salt.transport.client
|
||||
from salt.exceptions import SaltReqTimeoutError, SaltInvocationError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -120,7 +120,7 @@ def _publish(
|
||||
'id': __opts__['id'],
|
||||
'no_parse': __opts__.get('no_parse', [])}
|
||||
|
||||
channel = salt.transport.Channel.factory(__opts__, master_uri=master_uri)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__, master_uri=master_uri)
|
||||
try:
|
||||
peer_data = channel.send(load)
|
||||
except SaltReqTimeoutError:
|
||||
@ -323,7 +323,7 @@ def runner(fun, arg=None, timeout=5):
|
||||
'id': __opts__['id'],
|
||||
'no_parse': __opts__.get('no_parse', [])}
|
||||
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
try:
|
||||
return channel.send(load)
|
||||
except SaltReqTimeoutError:
|
||||
|
@ -10,8 +10,8 @@ import logging
|
||||
|
||||
# Import salt libs
|
||||
import salt.payload
|
||||
import salt.transport
|
||||
import salt.utils.args
|
||||
import salt.transport.client
|
||||
from salt.exceptions import SaltReqTimeoutError
|
||||
|
||||
# Import 3rd party libs
|
||||
@ -83,7 +83,7 @@ def _publish(
|
||||
'form': form,
|
||||
'id': __opts__['id']}
|
||||
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
try:
|
||||
peer_data = channel.send(load)
|
||||
except SaltReqTimeoutError:
|
||||
@ -229,7 +229,7 @@ def runner(fun, arg=None, timeout=5):
|
||||
'tmo': timeout,
|
||||
'id': __opts__['id']}
|
||||
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
try:
|
||||
return channel.send(load)
|
||||
except SaltReqTimeoutError:
|
||||
|
@ -45,7 +45,6 @@ import salt.client.ssh.client
|
||||
import salt.payload
|
||||
import salt.runner
|
||||
import salt.state
|
||||
import salt.transport
|
||||
import salt.utils.args
|
||||
import salt.utils.event
|
||||
import salt.utils.extmods
|
||||
@ -56,6 +55,7 @@ import salt.utils.path
|
||||
import salt.utils.process
|
||||
import salt.utils.url
|
||||
import salt.wheel
|
||||
import salt.transport.client
|
||||
|
||||
HAS_PSUTIL = True
|
||||
try:
|
||||
@ -697,6 +697,7 @@ def sync_output(saltenv=None, refresh=True, extmod_whitelist=None, extmod_blackl
|
||||
refresh_modules()
|
||||
return ret
|
||||
|
||||
|
||||
sync_outputters = salt.utils.functools.alias_function(sync_output, 'sync_outputters')
|
||||
|
||||
|
||||
@ -1375,7 +1376,7 @@ def regen_keys():
|
||||
pass
|
||||
# TODO: move this into a channel function? Or auth?
|
||||
# create a channel again, this will force the key regen
|
||||
channel = salt.transport.Channel.factory(__opts__)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__)
|
||||
|
||||
|
||||
def revoke_auth(preserve_minion_cache=False):
|
||||
@ -1402,7 +1403,7 @@ def revoke_auth(preserve_minion_cache=False):
|
||||
masters.append(__opts__['master_uri'])
|
||||
|
||||
for master in masters:
|
||||
channel = salt.transport.Channel.factory(__opts__, master_uri=master)
|
||||
channel = salt.transport.client.ReqChannel.factory(__opts__, master_uri=master)
|
||||
tok = channel.auth.gen_token(b'salt')
|
||||
load = {'cmd': 'revoke_auth',
|
||||
'id': __opts__['id'],
|
||||
|
@ -20,7 +20,7 @@ import salt.loader
|
||||
import salt.fileclient
|
||||
import salt.minion
|
||||
import salt.crypt
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.utils.args
|
||||
import salt.utils.cache
|
||||
import salt.utils.crypt
|
||||
@ -202,7 +202,7 @@ class RemotePillar(RemotePillarMixin):
|
||||
self.ext = ext
|
||||
self.grains = grains
|
||||
self.minion_id = minion_id
|
||||
self.channel = salt.transport.Channel.factory(opts)
|
||||
self.channel = salt.transport.client.ReqChannel.factory(opts)
|
||||
if pillarenv is not None:
|
||||
self.opts['pillarenv'] = pillarenv
|
||||
self.pillar_override = pillar_override or {}
|
||||
|
@ -44,6 +44,7 @@ import salt.utils.platform
|
||||
import salt.utils.process
|
||||
import salt.utils.url
|
||||
import salt.syspaths as syspaths
|
||||
import salt.transport.client
|
||||
from salt.serializers.msgpack import serialize as msgpack_serialize, deserialize as msgpack_deserialize
|
||||
from salt.template import compile_template, compile_template_str
|
||||
from salt.exceptions import (
|
||||
@ -4164,7 +4165,7 @@ class RemoteHighState(object):
|
||||
self.grains = grains
|
||||
self.serial = salt.payload.Serial(self.opts)
|
||||
# self.auth = salt.crypt.SAuth(opts)
|
||||
self.channel = salt.transport.Channel.factory(self.opts['master_uri'])
|
||||
self.channel = salt.transport.client.ReqChannel.factory(self.opts['master_uri'])
|
||||
|
||||
def compile_master(self):
|
||||
'''
|
||||
|
@ -2,9 +2,13 @@
|
||||
'''
|
||||
Encapsulate the different transports available to Salt.
|
||||
'''
|
||||
# Import Python libs
|
||||
from __future__ import absolute_import, print_function, unicode_literals
|
||||
import logging
|
||||
|
||||
# Import Salt libs
|
||||
import salt.utils.versions
|
||||
|
||||
# Import third party libs
|
||||
from salt.ext import six
|
||||
from salt.ext.six.moves import range
|
||||
@ -33,22 +37,11 @@ def iter_transport_opts(opts):
|
||||
class Channel(object):
|
||||
@staticmethod
|
||||
def factory(opts, **kwargs):
|
||||
# Default to ZeroMQ for now
|
||||
ttype = 'zeromq'
|
||||
|
||||
# determine the ttype
|
||||
if 'transport' in opts:
|
||||
ttype = opts['transport']
|
||||
elif 'transport' in opts.get('pillar', {}).get('master', {}):
|
||||
ttype = opts['pillar']['master']['transport']
|
||||
|
||||
# the raet ioflo implementation still uses this channel, we need
|
||||
# this as compatibility
|
||||
if ttype == 'raet':
|
||||
import salt.transport.raet
|
||||
return salt.transport.raet.RAETReqChannel(opts, **kwargs)
|
||||
# TODO: deprecation warning, should use
|
||||
# salt.transport.channel.Channel.factory()
|
||||
salt.utils.versions.warn_until(
|
||||
'Sodium',
|
||||
'Stop using salt.transport.Channel and instead use salt.transport.client.ReqChannel',
|
||||
stacklevel=3
|
||||
)
|
||||
from salt.transport.client import ReqChannel
|
||||
return ReqChannel.factory(opts, **kwargs)
|
||||
|
||||
|
@ -88,6 +88,7 @@ import salt.utils.zeromq
|
||||
import salt.log.setup
|
||||
import salt.defaults.exitcodes
|
||||
import salt.transport.ipc
|
||||
import salt.transport.client
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@ -1334,7 +1335,7 @@ class StateFire(object):
|
||||
'tok': self.auth.gen_token(b'salt'),
|
||||
})
|
||||
|
||||
channel = salt.transport.Channel.factory(self.opts)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
try:
|
||||
channel.send(load)
|
||||
except Exception:
|
||||
@ -1364,7 +1365,7 @@ class StateFire(object):
|
||||
'tag': tag,
|
||||
'data': running[stag],
|
||||
})
|
||||
channel = salt.transport.Channel.factory(self.opts)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts)
|
||||
try:
|
||||
channel.send(load)
|
||||
except Exception:
|
||||
|
@ -11,9 +11,9 @@ import collections
|
||||
import salt.client.mixins
|
||||
import salt.config
|
||||
import salt.loader
|
||||
import salt.transport
|
||||
import salt.utils.error
|
||||
import salt.utils.zeromq
|
||||
import salt.transport.client
|
||||
|
||||
# Import 3rd-party libs
|
||||
from salt.ext import six
|
||||
@ -67,12 +67,14 @@ class WheelClient(salt.client.mixins.SyncClientMixin,
|
||||
interface = self.opts['interface']
|
||||
if interface == '0.0.0.0':
|
||||
interface = '127.0.0.1'
|
||||
master_uri = 'tcp://' + salt.utils.zeromq.ip_bracket(interface) + \
|
||||
':' + six.text_type(self.opts['ret_port'])
|
||||
channel = salt.transport.Channel.factory(self.opts,
|
||||
crypt='clear',
|
||||
master_uri=master_uri,
|
||||
usage='master_call')
|
||||
master_uri = 'tcp://{}:{}'.format(
|
||||
salt.utils.zeromq.ip_bracket(interface),
|
||||
six.text_type(self.opts['ret_port'])
|
||||
)
|
||||
channel = salt.transport.client.ReqChannel.factory(self.opts,
|
||||
crypt='clear',
|
||||
master_uri=master_uri,
|
||||
usage='master_call')
|
||||
ret = channel.send(load)
|
||||
if isinstance(ret, collections.Mapping):
|
||||
if 'error' in ret:
|
||||
|
@ -22,7 +22,7 @@ from tests.support.mock import (
|
||||
import salt.utils.files
|
||||
import salt.utils.templates as templates
|
||||
import salt.utils.platform
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
import salt.modules.cp as cp
|
||||
from salt.exceptions import CommandExecutionError
|
||||
|
||||
@ -143,14 +143,14 @@ class CpTestCase(TestCase, LoaderModuleMockMixin):
|
||||
_auth=MagicMock(**{'return_value.gen_token.return_value': 'token'}),
|
||||
__opts__={'id': 'abc', 'file_buffer_size': 10}), \
|
||||
patch('salt.utils.files.fopen', mock_open(read_data=b'content')) as m_open, \
|
||||
patch('salt.transport.Channel.factory', MagicMock()):
|
||||
patch('salt.transport.client.ReqChannel.factory', MagicMock()):
|
||||
response = cp.push(filename)
|
||||
assert response, response
|
||||
num_opens = len(m_open.filehandles[filename])
|
||||
assert num_opens == 1, num_opens
|
||||
fh_ = m_open.filehandles[filename][0]
|
||||
assert fh_.read.call_count == 2, fh_.read.call_count
|
||||
salt.transport.Channel.factory({}).send.assert_called_once_with(
|
||||
salt.transport.client.ReqChannel.factory({}).send.assert_called_once_with(
|
||||
dict(
|
||||
loc=fh_.tell(), # pylint: disable=resource-leakage
|
||||
cmd='_file_recv',
|
||||
|
@ -42,7 +42,7 @@ class EventTestCase(TestCase, LoaderModuleMockMixin):
|
||||
Test for Fire an event off up to the master server
|
||||
'''
|
||||
with patch('salt.crypt.SAuth') as salt_crypt_sauth, \
|
||||
patch('salt.transport.Channel.factory') as salt_transport_channel_factory:
|
||||
patch('salt.transport.client.ReqChannel.factory') as salt_transport_channel_factory:
|
||||
|
||||
preload = {'id': 'id', 'tag': 'tag', 'data': 'data',
|
||||
'tok': 'salt', 'cmd': '_minion_event'}
|
||||
|
@ -73,7 +73,7 @@ class PublishTestCase(TestCase, LoaderModuleMockMixin):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.channel_patcher = patch('salt.transport.Channel', Channel())
|
||||
cls.channel_patcher = patch('salt.transport.client.ReqChannel', Channel())
|
||||
cls.channel_patcher.start()
|
||||
|
||||
@classmethod
|
||||
|
@ -17,7 +17,7 @@ from tests.support.mock import (
|
||||
|
||||
# Import Salt Libs
|
||||
import salt.modules.raet_publish as raet_publish
|
||||
import salt.transport
|
||||
import salt.transport.client
|
||||
from salt.exceptions import SaltReqTimeoutError
|
||||
|
||||
|
||||
@ -50,7 +50,7 @@ class RaetPublishTestCase(TestCase, LoaderModuleMockMixin):
|
||||
the data from the runner function
|
||||
'''
|
||||
with patch.dict(raet_publish.__opts__, {'id': 'id'}):
|
||||
with patch.object(salt.transport.Channel, 'factory', MagicMock()):
|
||||
with patch.object(salt.transport.client.ReqChannel, 'factory', MagicMock()):
|
||||
self.assertTrue(raet_publish.runner('fun'))
|
||||
|
||||
class MockFactory(object):
|
||||
@ -67,6 +67,6 @@ class RaetPublishTestCase(TestCase, LoaderModuleMockMixin):
|
||||
raise SaltReqTimeoutError(load)
|
||||
|
||||
with patch.dict(raet_publish.__opts__, {'id': 'id'}):
|
||||
with patch.object(salt.transport.Channel, 'factory',
|
||||
with patch.object(salt.transport.client.ReqChannel, 'factory',
|
||||
MagicMock(return_value=MockFactory())):
|
||||
self.assertEqual(raet_publish.runner(1), "'1' runner publish timed out")
|
||||
|
@ -727,7 +727,7 @@ foo_wildcard:
|
||||
|
||||
|
||||
@skipIf(NO_MOCK, NO_MOCK_REASON)
|
||||
@patch('salt.transport.Channel.factory', MagicMock())
|
||||
@patch('salt.transport.client.ReqChannel.factory', MagicMock())
|
||||
class RemotePillarTestCase(TestCase):
|
||||
'''
|
||||
Tests for instantiating a RemotePillar in salt.pillar
|
||||
@ -826,7 +826,7 @@ class RemotePillarTestCase(TestCase):
|
||||
'pass_to_ext_pillars': ['path_to_add']}
|
||||
mock_channel = MagicMock(
|
||||
crypted_transfer_decode_dictentry=MagicMock(return_value={}))
|
||||
with patch('salt.transport.Channel.factory',
|
||||
with patch('salt.transport.client.ReqChannel.factory',
|
||||
MagicMock(return_value=mock_channel)):
|
||||
pillar = salt.pillar.RemotePillar(opts, self.grains,
|
||||
'mocked_minion', 'fake_env')
|
||||
|
Loading…
Reference in New Issue
Block a user