Updated consistent semantics for SaltEvent get_event factor

node is application kind which is provided by opts['__role']
the values of application kind are 'master, 'minion', 'syndic', 'call'
This allows RAETEvent to set up its lanename correctly
on a master the manor lanename is always 'master'
on a minion the manor lanename is derived from the opts['id']
In general opts should be passed to get_event when possible salt_cloud is the exception for now
This means that can never have two masters running on the same host unless all the directories are
made unique such as cachedir and sockdir
but multiple minions can run on the same host
This commit is contained in:
Samuel M Smith 2014-09-29 20:47:16 -06:00
parent f5b93eed2f
commit 30b32ba5d7
20 changed files with 104 additions and 26 deletions

View File

@ -119,6 +119,7 @@ class LocalClient(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=not self.opts.get('__worker', False))
self.returners = salt.loader.returners(self.opts, {})
@ -896,6 +897,7 @@ class LocalClient(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=not self.opts.get('__worker', False))
jinfo = self.gather_job_info(jid, tgt, tgt_type)
# if we weren't assigned any jid that means the master thinks

View File

@ -61,7 +61,8 @@ class APIClient(object):
self.event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'])
self.opts['transport'],
opts=self.opts)
def run(self, cmd):
'''

View File

@ -77,6 +77,7 @@ class AsyncClientMixin(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
event.fire_event(data, tagify('new', base=tag))

View File

@ -170,6 +170,7 @@ class SSH(object):
'master',
opts['sock_dir'],
opts['transport'],
opts=opts,
listen=False)
else:
self.event = None

View File

@ -587,10 +587,13 @@ class SaltManorLaneSetup(ioflo.base.deeding.Deed):
'''
Run once at enter
'''
#name = "{0}{1}".format(self.opts.value.get('id', self.local.data.name), 'lane')
name = 'manor'
lanename = self.opts.value.get('id', self.local.data.lanename)
#yid = self.local.data.yid
kind = self.opts.value['__role']
if kind == 'master':
lanename = 'master'
else:
lanename = self.opts.value.get('id', self.local.data.lanename)
self.stack.value = LaneStack(
name=name,
lanename=lanename,
@ -906,8 +909,6 @@ class Eventer(ioflo.base.deeding.Deed):
Register event requests
Iterate over the registered event yards and fire!
'''
import wingdbstub
while self.event_req.value: # event subscription requests are msg with routes
self._register_event_yard(
self.event_req.value.popleft()

View File

@ -356,6 +356,7 @@ class RemoteFuncs(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
self.serial = salt.payload.Serial(opts)
self.ckminions = salt.utils.minions.CkMinions(opts)
@ -942,6 +943,7 @@ class LocalFuncs(object):
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
# Make a client
self.local = salt.client.get_local_client(mopts=self.opts)

View File

@ -984,6 +984,7 @@ def update():
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
event.fire_event(data, tagify(['gitfs', 'update'], prefix='fileserver'))
try:

View File

@ -353,6 +353,7 @@ def update():
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
event.fire_event(data, tagify(['hgfs', 'update'], prefix='fileserver'))
try:

View File

@ -150,6 +150,7 @@ def update():
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
event.fire_event(data, tagify(['roots', 'update'], prefix='fileserver'))

View File

@ -284,6 +284,7 @@ def update():
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
event.fire_event(data, tagify(['svnfs', 'update'], prefix='fileserver'))
try:

View File

@ -13,6 +13,7 @@ import fnmatch
import hashlib
import json
import msgpack
import logging
# Import salt libs
import salt.crypt
@ -21,6 +22,7 @@ import salt.utils.event
import salt.daemons.masterapi
from salt.utils.event import tagify
log = logging.getLogger(__name__)
class KeyCLI(object):
'''
@ -416,10 +418,16 @@ class Key(object):
'''
def __init__(self, opts):
self.opts = opts
kind = self.opts.get('__role', '')
if not kind:
emsg = "Missing application kind via opts['__role']"
log.error(emsg + '\n')
raise ValueError(emsg)
self.event = salt.utils.event.get_event(
'master',
kind,
opts['sock_dir'],
opts['transport'],
opts=opts,
listen=False)
def _check_minions_directories(self):

View File

@ -83,12 +83,12 @@ def fire(data, tag):
salt '*' event.fire '{"data":"my event data"}' 'tag'
'''
try:
event = salt.utils.event.get_event(
__opts__['id'],
sock_dir=__opts__['sock_dir'],
opts=__opts__,
transport=__opts__['transport'],
listen=False)
event = salt.utils.event.get_event('minion', # was __opts__['id']
sock_dir=__opts__['sock_dir'],
transport=__opts__['transport'],
opts=__opts__,
listen=False)
return event.fire_event(data, tag)
except Exception:
return False

View File

@ -148,7 +148,8 @@ class EventListener(object):
self.event = salt.utils.event.get_event(
'master',
opts['sock_dir'],
opts['transport'])
opts['transport'],
opts=opts)
# tag -> list of futures
self.tag_map = defaultdict(list)
@ -756,7 +757,8 @@ class WebhookSaltAPIHandler(SaltAPIHandler):
self.event = salt.utils.event.get_event(
'master',
self.application.opts['sock_dir'],
self.application.opts['transport'])
self.application.opts['transport'],
opts=self.application.opts)
ret = self.event.fire_event({
'post': self.raw_data,

View File

@ -170,8 +170,10 @@ class RunnerClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
'eauth': 'pam',
})
'''
sevent = salt.utils.event.get_event('master', self.opts['sock_dir'],
self.opts['transport'])
sevent = salt.utils.event.get_event('master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts)
job = self.master_call(**low)
ret_tag = tagify('ret', base=job['tag'])

View File

@ -180,6 +180,7 @@ def process_queue(queue, quantity=1, backend='sqlite'):
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
try:
items = pop(queue=queue, quantity=quantity, backend=backend)

View File

@ -241,7 +241,8 @@ def event(tagmatch='*', count=-1, quiet=False, sock_dir=None, pretty=False):
sevent = salt.utils.event.get_event(
'master',
sock_dir or __opts__['sock_dir'],
__opts__['transport'])
__opts__['transport'],
opts=__opts__)
while True:
ret = sevent.get_event(full=True)

View File

@ -460,7 +460,8 @@ def wait_for_event(
sevent = salt.utils.event.get_event(
'master',
__opts__['sock_dir'],
__opts__['transport'])
__opts__['transport'],
opts=__opts__)
del_counter = 0
starttime = time.time()

View File

@ -88,17 +88,38 @@ class RAETChannel(Channel):
not already setup such as in salt-call to communicate to-from the minion
'''
kind = self.opts.get('__role', '') # application kind 'master', 'minion', etc
if not kind:
emsg = ("Missing opts['__role']. required to setup RAETChannel.")
log.error(emsg + "\n")
raise ValueError(emsg)
if kind == 'master':
lanename = 'master'
elif kind == 'minion':
role = self.opts.get('id', '')
if not role:
emsg = ("Missing opts['id']. required to setup RAETChannel.")
log.error(emsg + "\n")
raise ValueError(emsg)
lanename = role # add kind later
else:
emsg = ("Unsupported application kind '{0}' for RAETChannel "
"Raet.".format(self.node))
log.error(emsg + '\n')
raise ValueError(emsg)
mid = self.opts.get('id', 'master')
uid = nacling.uuid(size=18)
name = 'channel' + uid
stack = LaneStack(name=name,
lanename=mid,
lanename=lanename,
sockdirpath=self.opts['sock_dir'])
stack.Pk = raeting.packKinds.pack
stack.addRemote(RemoteYard(stack=stack,
name='manor',
lanename=mid,
lanename=lanename,
dirpath=self.opts['sock_dir']))
log.debug("Created Channel Jobber Stack {0}\n".format(stack.name))
return stack

View File

@ -32,7 +32,7 @@ class SaltEvent(object):
'''
Set up the stack and remote yard
'''
self.node = node # this is usually 'master', 'minion' or opts['id']
self.node = node # application kind 'master', 'minion', 'syndic', 'call' etc
self.sock_dir = sock_dir
self.listen = listen
if opts is None:
@ -41,6 +41,34 @@ class SaltEvent(object):
self.__prep_stack()
def __prep_stack(self):
if self.node == 'master':
lanename = 'master'
if self.opts:
kind = self.opts.get('__role', '') # opts optional for master
if kind and kind != self.node:
emsg = ("Mismatch between node '{}' and kind '{}' in setup "
"of SaltEvent on Raet.".format(self.node, kind))
log.error(emsg + '\n')
raise ValueError(emsg)
elif self.node == 'minion':
role = self.opts.get('id', '') #opts required for minion
if not role:
emsg = ("Missing opts['id'] required by SaltEvent on Raet with "
"node kind {0}.".format(self.node))
log.error(emsg + '\n')
raise ValueError(emsg)
kind = self.opts.get('__role', '')
if kind != self.node:
emsg = ("Mismatch between node '{}' and kind '{}' in setup "
"of SaltEvent on Raet.".format(self.node, kind))
log.error(emsg + '\n')
raise ValueError(emsg)
lanename = role # add '_minion'
else:
emsg = ("Unsupported application node kind '{0}' for SaltEvent "
"Raet.".format(self.node))
log.error(emsg + '\n')
raise ValueError(emsg)
self.yid = nacling.uuid(size=18)
name = 'event' + self.yid
cachedir = self.opts.get('cachedir', os.path.join(syspaths.CACHE_DIR, self.node))
@ -48,12 +76,12 @@ class SaltEvent(object):
self.stack = LaneStack(
name=name,
uid=self.yid,
lanename=self.node,
lanename=lanename,
sockdirpath=self.sock_dir)
self.stack.Pk = raeting.packKinds.pack
self.router_yard = RemoteYard(
stack=self.stack,
lanename=self.node,
lanename=lanename,
uid=0,
name='manor',
dirpath=self.sock_dir)

View File

@ -98,8 +98,10 @@ class WheelClient(mixins.SyncClientMixin, mixins.AsyncClientMixin, object):
})
{'minions': {'jerry': '5d:f6:79:43:5e:d4:42:3f:57:b8:45:a8:7e:a4:6e:ca'}}
'''
sevent = salt.utils.event.get_event('master', self.opts['sock_dir'],
self.opts['transport'])
sevent = salt.utils.event.get_event('master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts)
job = self.master_call(**low)
ret_tag = tagify('ret', base=job['tag'])