diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 89de3a67db..122f89ac70 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -119,6 +119,7 @@ class LocalClient(object): 'master', self.opts['sock_dir'], self.opts['transport'], + opts=self.opts, listen=not self.opts.get('__worker', False)) self.returners = salt.loader.returners(self.opts, {}) @@ -896,6 +897,7 @@ class LocalClient(object): 'master', self.opts['sock_dir'], self.opts['transport'], + opts=self.opts, listen=not self.opts.get('__worker', False)) jinfo = self.gather_job_info(jid, tgt, tgt_type) # if we weren't assigned any jid that means the master thinks diff --git a/salt/client/api.py b/salt/client/api.py index 77020a3bcc..fd977ea07c 100644 --- a/salt/client/api.py +++ b/salt/client/api.py @@ -61,7 +61,8 @@ class APIClient(object): self.event = salt.utils.event.get_event( 'master', self.opts['sock_dir'], - self.opts['transport']) + self.opts['transport'], + opts=self.opts) def run(self, cmd): ''' diff --git a/salt/client/mixins.py b/salt/client/mixins.py index 87f6cff391..f225362bdc 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -77,6 +77,7 @@ class AsyncClientMixin(object): 'master', self.opts['sock_dir'], self.opts['transport'], + opts=self.opts, listen=False) event.fire_event(data, tagify('new', base=tag)) diff --git a/salt/client/ssh/__init__.py b/salt/client/ssh/__init__.py index 1bfae5a697..4c340aab62 100644 --- a/salt/client/ssh/__init__.py +++ b/salt/client/ssh/__init__.py @@ -170,6 +170,7 @@ class SSH(object): 'master', opts['sock_dir'], opts['transport'], + opts=opts, listen=False) else: self.event = None diff --git a/salt/daemons/flo/core.py b/salt/daemons/flo/core.py index 6e4cba45b6..6541ea8855 100644 --- a/salt/daemons/flo/core.py +++ b/salt/daemons/flo/core.py @@ -587,10 +587,13 @@ class SaltManorLaneSetup(ioflo.base.deeding.Deed): ''' Run once at enter ''' - #name = "{0}{1}".format(self.opts.value.get('id', self.local.data.name), 'lane') name = 'manor' - lanename = self.opts.value.get('id', self.local.data.lanename) - #yid = self.local.data.yid + kind = self.opts.value['__role'] + if kind == 'master': + lanename = 'master' + else: + lanename = self.opts.value.get('id', self.local.data.lanename) + self.stack.value = LaneStack( name=name, lanename=lanename, @@ -906,8 +909,6 @@ class Eventer(ioflo.base.deeding.Deed): Register event requests Iterate over the registered event yards and fire! ''' - import wingdbstub - while self.event_req.value: # event subscription requests are msg with routes self._register_event_yard( self.event_req.value.popleft() diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 605b44865f..7bde88c735 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -356,6 +356,7 @@ class RemoteFuncs(object): 'master', self.opts['sock_dir'], self.opts['transport'], + opts=self.opts, listen=False) self.serial = salt.payload.Serial(opts) self.ckminions = salt.utils.minions.CkMinions(opts) @@ -942,6 +943,7 @@ class LocalFuncs(object): 'master', self.opts['sock_dir'], self.opts['transport'], + opts=self.opts, listen=False) # Make a client self.local = salt.client.get_local_client(mopts=self.opts) diff --git a/salt/fileserver/gitfs.py b/salt/fileserver/gitfs.py index d5ee48e859..e178c5663a 100644 --- a/salt/fileserver/gitfs.py +++ b/salt/fileserver/gitfs.py @@ -984,6 +984,7 @@ def update(): 'master', __opts__['sock_dir'], __opts__['transport'], + opts=__opts__, listen=False) event.fire_event(data, tagify(['gitfs', 'update'], prefix='fileserver')) try: diff --git a/salt/fileserver/hgfs.py b/salt/fileserver/hgfs.py index 955cd8df95..8be1868212 100644 --- a/salt/fileserver/hgfs.py +++ b/salt/fileserver/hgfs.py @@ -353,6 +353,7 @@ def update(): 'master', __opts__['sock_dir'], __opts__['transport'], + opts=__opts__, listen=False) event.fire_event(data, tagify(['hgfs', 'update'], prefix='fileserver')) try: diff --git a/salt/fileserver/roots.py b/salt/fileserver/roots.py index 297a2ea3eb..29e220d0c5 100644 --- a/salt/fileserver/roots.py +++ b/salt/fileserver/roots.py @@ -150,6 +150,7 @@ def update(): 'master', __opts__['sock_dir'], __opts__['transport'], + opts=__opts__, listen=False) event.fire_event(data, tagify(['roots', 'update'], prefix='fileserver')) diff --git a/salt/fileserver/svnfs.py b/salt/fileserver/svnfs.py index 11fed18bed..20d47c56c1 100644 --- a/salt/fileserver/svnfs.py +++ b/salt/fileserver/svnfs.py @@ -284,6 +284,7 @@ def update(): 'master', __opts__['sock_dir'], __opts__['transport'], + opts=__opts__, listen=False) event.fire_event(data, tagify(['svnfs', 'update'], prefix='fileserver')) try: diff --git a/salt/key.py b/salt/key.py index 3adaec5587..a319371365 100644 --- a/salt/key.py +++ b/salt/key.py @@ -13,6 +13,7 @@ import fnmatch import hashlib import json import msgpack +import logging # Import salt libs import salt.crypt @@ -21,6 +22,7 @@ import salt.utils.event import salt.daemons.masterapi from salt.utils.event import tagify +log = logging.getLogger(__name__) class KeyCLI(object): ''' @@ -416,10 +418,16 @@ class Key(object): ''' def __init__(self, opts): self.opts = opts + kind = self.opts.get('__role', '') + if not kind: + emsg = "Missing application kind via opts['__role']" + log.error(emsg + '\n') + raise ValueError(emsg) self.event = salt.utils.event.get_event( - 'master', + kind, opts['sock_dir'], opts['transport'], + opts=opts, listen=False) def _check_minions_directories(self): diff --git a/salt/modules/event.py b/salt/modules/event.py index 60618c3e06..ca84341c1f 100644 --- a/salt/modules/event.py +++ b/salt/modules/event.py @@ -83,12 +83,12 @@ def fire(data, tag): salt '*' event.fire '{"data":"my event data"}' 'tag' ''' try: - event = salt.utils.event.get_event( - __opts__['id'], - sock_dir=__opts__['sock_dir'], - opts=__opts__, - transport=__opts__['transport'], - listen=False) + event = salt.utils.event.get_event('minion', # was __opts__['id'] + sock_dir=__opts__['sock_dir'], + transport=__opts__['transport'], + opts=__opts__, + listen=False) + return event.fire_event(data, tag) except Exception: return False diff --git a/salt/netapi/rest_tornado/saltnado.py b/salt/netapi/rest_tornado/saltnado.py index e334a81a5a..c4422a46f4 100644 --- a/salt/netapi/rest_tornado/saltnado.py +++ b/salt/netapi/rest_tornado/saltnado.py @@ -148,7 +148,8 @@ class EventListener(object): self.event = salt.utils.event.get_event( 'master', opts['sock_dir'], - opts['transport']) + opts['transport'], + opts=opts) # tag -> list of futures self.tag_map = defaultdict(list) @@ -756,7 +757,8 @@ class WebhookSaltAPIHandler(SaltAPIHandler): self.event = salt.utils.event.get_event( 'master', self.application.opts['sock_dir'], - self.application.opts['transport']) + self.application.opts['transport'], + opts=self.application.opts) ret = self.event.fire_event({ 'post': self.raw_data, diff --git a/salt/runner.py b/salt/runner.py index f6e1421e0e..4eea171d75 100644 --- a/salt/runner.py +++ b/salt/runner.py @@ -170,8 +170,10 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object): 'eauth': 'pam', }) ''' - sevent = salt.utils.event.get_event('master', self.opts['sock_dir'], - self.opts['transport']) + sevent = salt.utils.event.get_event('master', + self.opts['sock_dir'], + self.opts['transport'], + opts=self.opts) job = self.master_call(**low) ret_tag = tagify('ret', base=job['tag']) diff --git a/salt/runners/queue.py b/salt/runners/queue.py index ddf3fb0824..16269d2e78 100644 --- a/salt/runners/queue.py +++ b/salt/runners/queue.py @@ -180,6 +180,7 @@ def process_queue(queue, quantity=1, backend='sqlite'): 'master', __opts__['sock_dir'], __opts__['transport'], + opts=__opts__, listen=False) try: items = pop(queue=queue, quantity=quantity, backend=backend) diff --git a/salt/runners/state.py b/salt/runners/state.py index c2fbe10a02..7729af1d8b 100644 --- a/salt/runners/state.py +++ b/salt/runners/state.py @@ -241,7 +241,8 @@ def event(tagmatch='*', count=-1, quiet=False, sock_dir=None, pretty=False): sevent = salt.utils.event.get_event( 'master', sock_dir or __opts__['sock_dir'], - __opts__['transport']) + __opts__['transport'], + opts=__opts__) while True: ret = sevent.get_event(full=True) diff --git a/salt/states/saltmod.py b/salt/states/saltmod.py index 8b908bb551..a384d3047a 100644 --- a/salt/states/saltmod.py +++ b/salt/states/saltmod.py @@ -460,7 +460,8 @@ def wait_for_event( sevent = salt.utils.event.get_event( 'master', __opts__['sock_dir'], - __opts__['transport']) + __opts__['transport'], + opts=__opts__) del_counter = 0 starttime = time.time() diff --git a/salt/transport/__init__.py b/salt/transport/__init__.py index 4f742d2e38..dae6c5d5a6 100644 --- a/salt/transport/__init__.py +++ b/salt/transport/__init__.py @@ -88,17 +88,38 @@ class RAETChannel(Channel): not already setup such as in salt-call to communicate to-from the minion ''' + kind = self.opts.get('__role', '') # application kind 'master', 'minion', etc + if not kind: + emsg = ("Missing opts['__role']. required to setup RAETChannel.") + log.error(emsg + "\n") + raise ValueError(emsg) + if kind == 'master': + lanename = 'master' + elif kind == 'minion': + role = self.opts.get('id', '') + if not role: + emsg = ("Missing opts['id']. required to setup RAETChannel.") + log.error(emsg + "\n") + raise ValueError(emsg) + lanename = role # add kind later + else: + emsg = ("Unsupported application kind '{0}' for RAETChannel " + "Raet.".format(self.node)) + log.error(emsg + '\n') + raise ValueError(emsg) + + mid = self.opts.get('id', 'master') uid = nacling.uuid(size=18) name = 'channel' + uid stack = LaneStack(name=name, - lanename=mid, + lanename=lanename, sockdirpath=self.opts['sock_dir']) stack.Pk = raeting.packKinds.pack stack.addRemote(RemoteYard(stack=stack, name='manor', - lanename=mid, + lanename=lanename, dirpath=self.opts['sock_dir'])) log.debug("Created Channel Jobber Stack {0}\n".format(stack.name)) return stack diff --git a/salt/utils/raetevent.py b/salt/utils/raetevent.py index aeea1d2e77..7390ae46d3 100644 --- a/salt/utils/raetevent.py +++ b/salt/utils/raetevent.py @@ -32,7 +32,7 @@ class SaltEvent(object): ''' Set up the stack and remote yard ''' - self.node = node # this is usually 'master', 'minion' or opts['id'] + self.node = node # application kind 'master', 'minion', 'syndic', 'call' etc self.sock_dir = sock_dir self.listen = listen if opts is None: @@ -41,6 +41,34 @@ class SaltEvent(object): self.__prep_stack() def __prep_stack(self): + if self.node == 'master': + lanename = 'master' + if self.opts: + kind = self.opts.get('__role', '') # opts optional for master + if kind and kind != self.node: + emsg = ("Mismatch between node '{}' and kind '{}' in setup " + "of SaltEvent on Raet.".format(self.node, kind)) + log.error(emsg + '\n') + raise ValueError(emsg) + elif self.node == 'minion': + role = self.opts.get('id', '') #opts required for minion + if not role: + emsg = ("Missing opts['id'] required by SaltEvent on Raet with " + "node kind {0}.".format(self.node)) + log.error(emsg + '\n') + raise ValueError(emsg) + kind = self.opts.get('__role', '') + if kind != self.node: + emsg = ("Mismatch between node '{}' and kind '{}' in setup " + "of SaltEvent on Raet.".format(self.node, kind)) + log.error(emsg + '\n') + raise ValueError(emsg) + lanename = role # add '_minion' + else: + emsg = ("Unsupported application node kind '{0}' for SaltEvent " + "Raet.".format(self.node)) + log.error(emsg + '\n') + raise ValueError(emsg) self.yid = nacling.uuid(size=18) name = 'event' + self.yid cachedir = self.opts.get('cachedir', os.path.join(syspaths.CACHE_DIR, self.node)) @@ -48,12 +76,12 @@ class SaltEvent(object): self.stack = LaneStack( name=name, uid=self.yid, - lanename=self.node, + lanename=lanename, sockdirpath=self.sock_dir) self.stack.Pk = raeting.packKinds.pack self.router_yard = RemoteYard( stack=self.stack, - lanename=self.node, + lanename=lanename, uid=0, name='manor', dirpath=self.sock_dir) diff --git a/salt/wheel/__init__.py b/salt/wheel/__init__.py index aff9b346df..9766808931 100644 --- a/salt/wheel/__init__.py +++ b/salt/wheel/__init__.py @@ -98,8 +98,10 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object): }) {'minions': {'jerry': '5d:f6:79:43:5e:d4:42:3f:57:b8:45:a8:7e:a4:6e:ca'}} ''' - sevent = salt.utils.event.get_event('master', self.opts['sock_dir'], - self.opts['transport']) + sevent = salt.utils.event.get_event('master', + self.opts['sock_dir'], + self.opts['transport'], + opts=self.opts) job = self.master_call(**low) ret_tag = tagify('ret', base=job['tag'])