mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 08:58:59 +00:00
Merge pull request #19979 from SmithSamuelM/sam_20150122
Fixed conflict More preparatory work for master cluster
This commit is contained in:
commit
98ab9af354
@ -38,7 +38,25 @@ framer inbound be inactive first start
|
||||
do salt raet road stack service rx
|
||||
do salt raet lane stack service rx
|
||||
|
||||
framer bootstrap be inactive first join
|
||||
|
||||
framer bootstrap be inactive first setup
|
||||
|
||||
frame setup
|
||||
enter
|
||||
do salt raet road clustered per inode ".salt.road.manor."
|
||||
do salt raet road usher minion setup per inode ".salt.road.manor."
|
||||
go clustermaster
|
||||
go multimaster
|
||||
|
||||
frame clustermaster
|
||||
let if salt.road.manor.cluster.clustered
|
||||
print Setting Up Master Cluster ....
|
||||
go join
|
||||
|
||||
frame multimaster
|
||||
print Setting Up Master or MultiMaster
|
||||
go join
|
||||
|
||||
frame join
|
||||
print Joining...
|
||||
enter
|
||||
@ -49,7 +67,6 @@ framer bootstrap be inactive first join
|
||||
|
||||
go next if joined in .salt.road.manor.status
|
||||
go abort if rejected in .salt.road.manor.status
|
||||
#go abort if elapsed >= 10
|
||||
|
||||
frame joined
|
||||
print Joined
|
||||
@ -69,6 +86,11 @@ framer bootstrap be inactive first join
|
||||
print Allowed
|
||||
go next if elapsed >= 0.5
|
||||
|
||||
frame clustering
|
||||
print Cluster Setup ...
|
||||
do salt raet road cluster load setup
|
||||
go next
|
||||
|
||||
frame manager
|
||||
# start the manager framer
|
||||
bid start manager #start alive presence from minion side
|
||||
@ -81,8 +103,7 @@ framer bootstrap be inactive first join
|
||||
go router
|
||||
|
||||
frame router
|
||||
do salt raet router
|
||||
|
||||
do salt raet router minion
|
||||
|
||||
frame abort
|
||||
bid stop all
|
||||
|
@ -124,32 +124,65 @@ class SaltRaetRoadClustered(ioflo.base.deeding.Deed):
|
||||
self.clustered.update(value=self.opts.value.get('cluster_mode', False))
|
||||
|
||||
|
||||
class SaltRaetRoadClusterMinionSetup(ioflo.base.deeding.Deed):
|
||||
class SaltRaetRoadUsherMinionSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Setups shares for master cluster
|
||||
Set up .ushers which is initial list of masters to bootstrap
|
||||
into road
|
||||
|
||||
FloScript:
|
||||
|
||||
do salt raet road cluster minion setup at enter
|
||||
do salt raet road usher minion setup at enter
|
||||
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode='.salt.road.manor.',
|
||||
masters='masters',
|
||||
loads=odict(ipath='cluster.loads', ival=odict()),
|
||||
opts='.salt.opts',)
|
||||
inode=".salt.road.manor.",
|
||||
ushers='ushers',
|
||||
opts='.salt.opts')
|
||||
|
||||
def action(self, **kwa):
|
||||
def action(self):
|
||||
'''
|
||||
Generate list of masters
|
||||
Assign .ushers by parsing opts
|
||||
'''
|
||||
self.masters.value = daemons.extract_masters(self.opts.value,
|
||||
'cluster_masters')
|
||||
masters = 'master'
|
||||
port = None
|
||||
if self.opts.value.get('cluster_mode', False):
|
||||
masters = 'cluster_masters'
|
||||
|
||||
self.ushers.value = daemons.extract_masters(self.opts.value,
|
||||
masters=masters,
|
||||
port=port)
|
||||
|
||||
|
||||
class SaltRaetRoadUsherMasterSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Set up .ushers which is initial list of masters to bootstrap
|
||||
into road
|
||||
|
||||
FloScript:
|
||||
|
||||
do salt raet road usher master setup at enter
|
||||
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode=".salt.road.manor.",
|
||||
ushers='ushers',
|
||||
opts='.salt.opts')
|
||||
|
||||
def action(self):
|
||||
'''
|
||||
Assign .ushers by parsing opts
|
||||
'''
|
||||
masters = 'cluster_masters'
|
||||
port = 'raet_port'
|
||||
|
||||
self.ushers.value = daemons.extract_masters(self.opts.value,
|
||||
masters=masters,
|
||||
port=port)
|
||||
|
||||
|
||||
class SaltRaetRoadClusterLoadSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Sets up cluster.loads for load balancing
|
||||
Sets up cluster.masters for load balancing
|
||||
|
||||
FloScript:
|
||||
|
||||
@ -158,7 +191,7 @@ class SaltRaetRoadClusterLoadSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode='.salt.road.manor.',
|
||||
loads='cluster.loads',
|
||||
masters={'ipath': 'cluster.masters', 'ival': odict()},
|
||||
stack='stack',
|
||||
opts='.salt.opts',)
|
||||
|
||||
@ -168,51 +201,8 @@ class SaltRaetRoadClusterLoadSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
if self.opts.value.get('cluster_mode'):
|
||||
for remote in self.stack.value.remotes.values():
|
||||
self.loads.value[remote.name] = odict(load=0.0, expire=self.store.stamp)
|
||||
|
||||
|
||||
class SaltRaetRoadClusterMasterSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Setups shares for master cluster
|
||||
|
||||
FloScript:
|
||||
|
||||
do salt raet road cluster master setup at enter
|
||||
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode='.salt.road.manor.',
|
||||
masters='masters',
|
||||
opts='.salt.opts',)
|
||||
|
||||
def action(self, **kwa):
|
||||
'''
|
||||
Generate list of masters
|
||||
'''
|
||||
self.masters.value = daemons.extract_masters(self.opts.value,
|
||||
'cluster_masters',
|
||||
'raet_port')
|
||||
|
||||
|
||||
class SaltRaetRoadMasterSetup(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Sets up list of masters for minoin
|
||||
|
||||
FloScript:
|
||||
|
||||
do salt raet road master setup at enter
|
||||
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode=".salt.road.manor.",
|
||||
masters='masters',
|
||||
opts='.salt.opts')
|
||||
|
||||
def action(self):
|
||||
'''
|
||||
Assign .masters by parsing opts
|
||||
'''
|
||||
self.masters.value = daemons.extract_masters(self.opts.value)
|
||||
if remote.kind == kinds.applKinds.master:
|
||||
self.masters.value[remote.name] = odict(load=0.0, expire=self.store.stamp)
|
||||
|
||||
|
||||
class SaltRaetRoadStackSetup(ioflo.base.deeding.Deed):
|
||||
@ -342,13 +332,13 @@ class SaltRaetRoadStackJoiner(ioflo.base.deeding.Deed):
|
||||
|
||||
assumes that prior the following has been run to setup .masters
|
||||
|
||||
do salt raet road master setup
|
||||
do salt raet road usher minion setup
|
||||
|
||||
'''
|
||||
Ioinits = odict(
|
||||
inode=".salt.road.manor.",
|
||||
stack='stack',
|
||||
masters='masters',
|
||||
ushers='ushers',
|
||||
opts='.salt.opts')
|
||||
|
||||
def action(self, **kwa):
|
||||
@ -357,7 +347,6 @@ class SaltRaetRoadStackJoiner(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
stack = self.stack.value
|
||||
if stack and isinstance(stack, RoadStack):
|
||||
# minion should default
|
||||
refresh = (self.opts.value.get('raet_clear_remotes', True) or
|
||||
not stack.remotes)
|
||||
|
||||
@ -367,14 +356,15 @@ class SaltRaetRoadStackJoiner(ioflo.base.deeding.Deed):
|
||||
|
||||
stack.puid = stack.Uid # reset puid so reuse same uid each time
|
||||
|
||||
for master in self.masters.value:
|
||||
for master in self.ushers.value:
|
||||
mha = master['external']
|
||||
stack.addRemote(RemoteEstate(stack=stack,
|
||||
fuid=0, # vacuous join
|
||||
sid=0, # always 0 for join
|
||||
ha=mha))
|
||||
for remote in stack.remotes.values():
|
||||
stack.join(uid=remote.uid, timeout=0.0)
|
||||
if remote.kind == kinds.applKinds.master:
|
||||
stack.join(uid=remote.uid, timeout=0.0)
|
||||
|
||||
|
||||
class SaltRaetRoadStackJoined(ioflo.base.deeding.Deed):
|
||||
@ -403,8 +393,8 @@ class SaltRaetRoadStackJoined(ioflo.base.deeding.Deed):
|
||||
joined = False
|
||||
if stack and isinstance(stack, RoadStack):
|
||||
if stack.remotes:
|
||||
for remote in stack.remotes.values():
|
||||
joined = any([remote.joined for remote in stack.remotes.values()])
|
||||
joined = any([remote.joined for remote in stack.remotes.values()
|
||||
if remote.kind == kinds.applKinds.master])
|
||||
self.status.update(joined=joined)
|
||||
|
||||
|
||||
@ -434,9 +424,9 @@ class SaltRaetRoadStackRejected(ioflo.base.deeding.Deed):
|
||||
rejected = False
|
||||
if stack and isinstance(stack, RoadStack):
|
||||
if stack.remotes:
|
||||
for remote in stack.remotes.values():
|
||||
rejected = all([remote.acceptance == raeting.acceptances.rejected
|
||||
for remote in stack.remotes.values()])
|
||||
for remote in stack.remotes.values()
|
||||
if remote.kind == kinds.applKinds.master])
|
||||
else: # no remotes so assume rejected
|
||||
rejected = True
|
||||
self.status.update(rejected=rejected)
|
||||
@ -461,8 +451,9 @@ class SaltRaetRoadStackAllower(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
stack = self.stack.value
|
||||
if stack and isinstance(stack, RoadStack):
|
||||
stack.allow(timeout=0.0)
|
||||
return None
|
||||
for remote in stack.remotes.values():
|
||||
if remote.kind == kinds.applKinds.master:
|
||||
stack.allow(uid=remote.uid, timeout=0.0)
|
||||
|
||||
|
||||
class SaltRaetRoadStackAllowed(ioflo.base.deeding.Deed):
|
||||
@ -491,8 +482,8 @@ class SaltRaetRoadStackAllowed(ioflo.base.deeding.Deed):
|
||||
allowed = False
|
||||
if stack and isinstance(stack, RoadStack):
|
||||
if stack.remotes:
|
||||
for remote in stack.remotes.values():
|
||||
allowed = any([remote.allowed for remote in stack.remotes.values()])
|
||||
allowed = any([remote.allowed for remote in stack.remotes.values()
|
||||
if remote.kind == kinds.applKinds.master])
|
||||
self.status.update(allowed=allowed)
|
||||
|
||||
|
||||
@ -947,11 +938,7 @@ class SaltRaetRouter(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Routes the communication in and out of Road and Lane connections
|
||||
|
||||
This is the initial static salt router, we want to create a dynamic
|
||||
router that takes a map that defines where packets are send
|
||||
FloScript:
|
||||
|
||||
do salt raet router
|
||||
This is a base class
|
||||
|
||||
'''
|
||||
Ioinits = {'opts': '.salt.opts',
|
||||
@ -971,61 +958,15 @@ class SaltRaetRouter(ioflo.base.deeding.Deed):
|
||||
'laters': {'ipath': '.salt.lane.manor.laters', # requeuing when not yet routable
|
||||
'ival': deque()}}
|
||||
|
||||
def _process_udp_rxmsg(self, msg, sender):
|
||||
def _process_road_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send to the right queue
|
||||
msg is the message body dict
|
||||
sender is the unique name of the remote estate that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
pass
|
||||
|
||||
if s_estate is None: # drop
|
||||
return
|
||||
|
||||
log.debug("**** Road Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg= {3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is not None and d_estate != self.road_stack.value.local.name:
|
||||
log.error(
|
||||
'Road Router Received message for wrong estate: {0}'.format(d_estate))
|
||||
return
|
||||
|
||||
if d_yard is not None:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Received message without share: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'local_cmd':
|
||||
# Refuse local commands over the wire
|
||||
log.error('Received local command remotely! Ignoring: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'remote_cmd':
|
||||
# Send it to a remote worker
|
||||
if 'load' in msg:
|
||||
role = self.road_stack.value.nameRemotes[sender].role
|
||||
msg['load']['id'] = role # sender # should this be role XXXX
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.fetchUidByName(next(self.workers.value)))
|
||||
elif d_share == 'fun':
|
||||
if self.road_stack.value.kind == kinds.applKinds.minion:
|
||||
self.fun.value.append(msg)
|
||||
|
||||
def _process_uxd_rxmsg(self, msg, sender):
|
||||
def _process_lane_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send uxd messages tot he right queue or forward them to the correct
|
||||
yard etc.
|
||||
@ -1033,84 +974,7 @@ class SaltRaetRouter(ioflo.base.deeding.Deed):
|
||||
msg is message body dict
|
||||
sender is unique name of remote that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Lane Router Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_yard is None:
|
||||
return # drop message
|
||||
|
||||
if s_estate is None: # substitute local estate
|
||||
s_estate = self.road_stack.value.local.name
|
||||
msg['route']['src'] = (s_estate, s_yard, s_share)
|
||||
|
||||
log.debug("**** Lane Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg={3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is None:
|
||||
pass
|
||||
elif d_estate != self.road_stack.value.local.name:
|
||||
# Forward to the correct estate
|
||||
if d_estate in self.road_stack.value.nameRemotes:
|
||||
self.road_stack.value.message(msg,
|
||||
self.road_stack.value.nameRemotes[d_estate].uid)
|
||||
return
|
||||
|
||||
if d_share == 'pub_ret':
|
||||
# only publish to available minions
|
||||
msg['return']['ret']['minions'] = self._availablize(msg['return']['ret']['minions'])
|
||||
if msg.get('__worker_verify') == self.worker_verify.value:
|
||||
self.publish.value.append(msg)
|
||||
|
||||
if d_yard is None:
|
||||
pass
|
||||
elif d_yard != self.lane_stack.value.local.name:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Lane Router Received message without share: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'local_cmd':
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.fetchUidByName(next(self.workers.value)))
|
||||
elif d_share == 'event_req':
|
||||
self.event_req.value.append(msg)
|
||||
#log.debug("\n**** Event Subscribe \n {0}\n".format(msg))
|
||||
elif d_share == 'event_fire':
|
||||
self.event.value.append(msg)
|
||||
#log.debug("\n**** Event Fire \n {0}\n".format(msg))
|
||||
elif d_share == 'presence_req':
|
||||
self.presence_req.value.append(msg)
|
||||
#log.debug("\n**** Presence Request \n {0}\n".format(msg))
|
||||
elif d_share == 'remote_cmd': # assume minion to master or salt-call
|
||||
if not self.road_stack.value.remotes:
|
||||
log.error("**** Lane Router: Missing joined master. Unable to route "
|
||||
"remote_cmd. Requeuing".format())
|
||||
self.laters.value.append((msg, sender))
|
||||
return
|
||||
d_estate = self._get_master_estate_name(clustered=self.opts.get('cluster_mode', False))
|
||||
if not d_estate:
|
||||
log.error("**** Lane Router: No available destination estate for 'remote_cmd'."
|
||||
"Unable to route. Requeuing".format())
|
||||
self.laters.value.append((msg, sender))
|
||||
return
|
||||
msg['route']['dst'] = (d_estate, d_yard, d_share)
|
||||
log.debug("**** Lane Router: Missing destination estate for 'remote_cmd'. "
|
||||
"Using default route={0}.".format(msg['route']['dst']))
|
||||
self.road_stack.value.message(msg,
|
||||
self.road_stack.value.nameRemotes[d_estate].uid)
|
||||
pass
|
||||
|
||||
def _get_master_estate_name(self, clustered=False):
|
||||
'''
|
||||
@ -1154,13 +1018,276 @@ class SaltRaetRouter(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
while self.road_stack.value.rxMsgs:
|
||||
msg, sender = self.road_stack.value.rxMsgs.popleft()
|
||||
self._process_udp_rxmsg(msg=msg, sender=sender)
|
||||
self._process_road_rxmsg(msg=msg, sender=sender)
|
||||
while self.laters.value: # process requeued LaneMsgs
|
||||
msg, sender = self.laters.value.popleft()
|
||||
self.lane_stack.value.rxMsgs.append((msg, sender))
|
||||
while self.lane_stack.value.rxMsgs:
|
||||
msg, sender = self.lane_stack.value.rxMsgs.popleft()
|
||||
self._process_uxd_rxmsg(msg=msg, sender=sender)
|
||||
self._process_lane_rxmsg(msg=msg, sender=sender)
|
||||
|
||||
|
||||
class SaltRaetRouterMaster(SaltRaetRouter):
|
||||
'''
|
||||
Routes the communication in and out of Road and Lane connections
|
||||
Specific to Master
|
||||
|
||||
do salt raet router master
|
||||
|
||||
'''
|
||||
def _process_road_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send to the right queue
|
||||
msg is the message body dict
|
||||
sender is the unique name of the remote estate that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_estate is None: # drop
|
||||
return
|
||||
|
||||
log.debug("**** Road Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg= {3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is not None and d_estate != self.road_stack.value.local.name:
|
||||
log.error(
|
||||
'Road Router Received message for wrong estate: {0}'.format(d_estate))
|
||||
return
|
||||
|
||||
if d_yard is not None:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Received message without share: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'event_fire': # rebroadcast events from other masters
|
||||
self.event.value.append(msg)
|
||||
#log.debug("\n**** Event Fire \n {0}\n".format(msg))
|
||||
return
|
||||
elif d_share == 'local_cmd':
|
||||
# Refuse local commands over the wire
|
||||
log.error('Received local command remotely! Ignoring: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'remote_cmd':
|
||||
# Send it to a remote worker
|
||||
if 'load' in msg:
|
||||
role = self.road_stack.value.nameRemotes[sender].role
|
||||
msg['load']['id'] = role # sender # should this be role XXXX
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.fetchUidByName(next(self.workers.value)))
|
||||
|
||||
|
||||
def _process_lane_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send uxd messages tot he right queue or forward them to the correct
|
||||
yard etc.
|
||||
|
||||
msg is message body dict
|
||||
sender is unique name of remote that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Lane Router Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_yard is None:
|
||||
return # drop message
|
||||
|
||||
if s_estate is None: # substitute local estate
|
||||
s_estate = self.road_stack.value.local.name
|
||||
msg['route']['src'] = (s_estate, s_yard, s_share)
|
||||
|
||||
log.debug("**** Lane Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg={3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is None:
|
||||
pass
|
||||
elif d_estate != self.road_stack.value.local.name:
|
||||
# Forward to the correct estate
|
||||
if d_estate in self.road_stack.value.nameRemotes:
|
||||
self.road_stack.value.message(msg,
|
||||
self.road_stack.value.nameRemotes[d_estate].uid)
|
||||
return
|
||||
|
||||
if d_share == 'pub_ret':
|
||||
# only publish to available minions
|
||||
msg['return']['ret']['minions'] = self._availablize(msg['return']['ret']['minions'])
|
||||
if msg.get('__worker_verify') == self.worker_verify.value:
|
||||
self.publish.value.append(msg)
|
||||
|
||||
if d_yard is None:
|
||||
pass
|
||||
elif d_yard != self.lane_stack.value.local.name:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Lane Router Received message without share: {0}'.format(msg))
|
||||
return
|
||||
elif d_share == 'local_cmd':
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.fetchUidByName(next(self.workers.value)))
|
||||
elif d_share == 'event_req':
|
||||
self.event_req.value.append(msg)
|
||||
#log.debug("\n**** Event Subscribe \n {0}\n".format(msg))
|
||||
elif d_share == 'event_fire':
|
||||
self.event.value.append(msg)
|
||||
#log.debug("\n**** Event Fire \n {0}\n".format(msg))
|
||||
elif d_share == 'presence_req':
|
||||
self.presence_req.value.append(msg)
|
||||
#log.debug("\n**** Presence Request \n {0}\n".format(msg))
|
||||
|
||||
|
||||
class SaltRaetRouterMinion(SaltRaetRouter):
|
||||
'''
|
||||
Routes the communication in and out of Road and Lane connections
|
||||
Specific to Minions
|
||||
|
||||
do salt raet router minion
|
||||
|
||||
'''
|
||||
def _process_road_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send to the right queue
|
||||
msg is the message body dict
|
||||
sender is the unique name of the remote estate that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_estate is None: # drop
|
||||
return
|
||||
|
||||
log.debug("**** Road Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg= {3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is not None and d_estate != self.road_stack.value.local.name:
|
||||
log.error(
|
||||
'Road Router Received message for wrong estate: {0}'.format(d_estate))
|
||||
return
|
||||
|
||||
if d_yard is not None:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Received message without share: {0}'.format(msg))
|
||||
return
|
||||
|
||||
elif d_share == 'fun':
|
||||
if self.road_stack.value.kind == kinds.applKinds.minion:
|
||||
self.fun.value.append(msg)
|
||||
|
||||
def _process_lane_rxmsg(self, msg, sender):
|
||||
'''
|
||||
Send uxd messages tot he right queue or forward them to the correct
|
||||
yard etc.
|
||||
|
||||
msg is message body dict
|
||||
sender is unique name of remote that sent the message
|
||||
'''
|
||||
try:
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
d_estate, d_yard, d_share = msg['route']['dst']
|
||||
except (ValueError, IndexError):
|
||||
log.error('Lane Router Received invalid message: {0}'.format(msg))
|
||||
return
|
||||
|
||||
if s_yard is None:
|
||||
return # drop message
|
||||
|
||||
if s_estate is None: # substitute local estate
|
||||
s_estate = self.road_stack.value.local.name
|
||||
msg['route']['src'] = (s_estate, s_yard, s_share)
|
||||
|
||||
log.debug("**** Lane Router rxMsg **** id={0} estate={1} yard={2}\n"
|
||||
" msg={3}\n".format(
|
||||
self.opts.value['id'],
|
||||
self.road_stack.value.local.name,
|
||||
self.lane_stack.value.local.name,
|
||||
msg))
|
||||
|
||||
if d_estate is None:
|
||||
pass
|
||||
elif d_estate != self.road_stack.value.local.name:
|
||||
# Forward to the correct estate
|
||||
if d_estate in self.road_stack.value.nameRemotes:
|
||||
self.road_stack.value.message(msg,
|
||||
self.road_stack.value.nameRemotes[d_estate].uid)
|
||||
return
|
||||
|
||||
if d_yard is None:
|
||||
pass
|
||||
elif d_yard != self.lane_stack.value.local.name:
|
||||
# Meant for another yard, send it off!
|
||||
if d_yard in self.lane_stack.value.nameRemotes:
|
||||
self.lane_stack.value.transmit(msg,
|
||||
self.lane_stack.value.nameRemotes[d_yard].uid)
|
||||
return
|
||||
return
|
||||
if d_share is None:
|
||||
# No queue destination!
|
||||
log.error('Lane Router Received message without share: {0}'.format(msg))
|
||||
return
|
||||
|
||||
elif d_share == 'event_req':
|
||||
self.event_req.value.append(msg)
|
||||
#log.debug("\n**** Event Subscribe \n {0}\n".format(msg))
|
||||
elif d_share == 'event_fire':
|
||||
self.event.value.append(msg)
|
||||
#log.debug("\n**** Event Fire \n {0}\n".format(msg))
|
||||
|
||||
elif d_share == 'remote_cmd': # assume minion to master or salt-call
|
||||
if not self.road_stack.value.remotes:
|
||||
log.error("**** Lane Router: Missing joined master. Unable to route "
|
||||
"remote_cmd. Requeuing".format())
|
||||
self.laters.value.append((msg, sender))
|
||||
return
|
||||
d_estate = self._get_master_estate_name(clustered=self.opts.get('cluster_mode', False))
|
||||
if not d_estate:
|
||||
log.error("**** Lane Router: No available destination estate for 'remote_cmd'."
|
||||
"Unable to route. Requeuing".format())
|
||||
self.laters.value.append((msg, sender))
|
||||
return
|
||||
msg['route']['dst'] = (d_estate, d_yard, d_share)
|
||||
log.debug("**** Lane Router: Missing destination estate for 'remote_cmd'. "
|
||||
"Using default route={0}.".format(msg['route']['dst']))
|
||||
self.road_stack.value.message(msg,
|
||||
self.road_stack.value.nameRemotes[d_estate].uid)
|
||||
|
||||
|
||||
class SaltRaetEventer(ioflo.base.deeding.Deed):
|
||||
@ -1177,7 +1304,9 @@ class SaltRaetEventer(ioflo.base.deeding.Deed):
|
||||
'event_req': '.salt.event.event_req',
|
||||
'module_refresh': '.salt.var.module_refresh',
|
||||
'pillar_refresh': '.salt.var.pillar_refresh',
|
||||
'lane_stack': '.salt.lane.manor.stack'}
|
||||
'lane_stack': '.salt.lane.manor.stack',
|
||||
'road_stack': '.salt.road.manor.stack',
|
||||
'availables': '.salt.var.presence.availables',}
|
||||
|
||||
def _register_event_yard(self, msg):
|
||||
'''
|
||||
@ -1190,6 +1319,8 @@ class SaltRaetEventer(ioflo.base.deeding.Deed):
|
||||
Forward an event message to all subscribed yards
|
||||
Event message has a route
|
||||
'''
|
||||
import wingdbstub
|
||||
|
||||
rm_ = []
|
||||
if msg.get('tag') == 'pillar_refresh':
|
||||
self.pillar_refresh.value = True
|
||||
@ -1221,6 +1352,34 @@ class SaltRaetEventer(ioflo.base.deeding.Deed):
|
||||
)
|
||||
|
||||
|
||||
class SaltRaetEventerMaster(SaltRaetEventer):
|
||||
'''
|
||||
Fire events!
|
||||
FloScript:
|
||||
|
||||
do salt raet eventer master
|
||||
|
||||
'''
|
||||
def _forward_event(self, msg):
|
||||
'''
|
||||
Forward an event message to all subscribed yards
|
||||
Event message has a route
|
||||
Also rebroadcast to all masters in cluster
|
||||
'''
|
||||
super(SaltRaetEventerMaster, self)._forward_event(msg)
|
||||
if self.opts.value.get('cluster_mode'):
|
||||
if msg.get('origin') is None:
|
||||
masters = (self.availables.value &
|
||||
set((remote.name for remote in self.road_stack.value.remotes.values()
|
||||
if remote.kind == kinds.applKinds.master)))
|
||||
for remote in masters:
|
||||
msg['origin'] = self.road_stack.value.name
|
||||
s_estate, s_yard, s_share = msg['route']['src']
|
||||
msg['route']['src'] = (self.road_stack.value.name, s_yard, s_share)
|
||||
msg['route']['dst'] = (remote.name, None, 'event_fire')
|
||||
self.road_stack.value.message(msg, remote.uid)
|
||||
|
||||
|
||||
class SaltRaetPresenter(ioflo.base.deeding.Deed):
|
||||
'''
|
||||
Fire presence events!
|
||||
|
@ -34,6 +34,8 @@ framer masterudpstack be active first setup
|
||||
frame start
|
||||
# Start the message receive framer
|
||||
bid start inbound
|
||||
# Start the cluster bootstrap framer
|
||||
bid start bootstrap
|
||||
# Start the message receive framer
|
||||
bid start uxdrouter
|
||||
# Start the event framer
|
||||
@ -60,15 +62,69 @@ framer inbound be inactive first start
|
||||
do salt raet road stack service rx
|
||||
do salt raet lane stack service rx
|
||||
|
||||
# Bootstrap framer
|
||||
framer bootstrap be inactive first setup
|
||||
frame setup
|
||||
enter
|
||||
do salt raet road clustered per inode ".salt.road.manor."
|
||||
|
||||
go clustermaster
|
||||
go quit
|
||||
|
||||
frame clustermaster
|
||||
let if salt.road.manor.cluster.clustered
|
||||
print Setting Up Master Cluster ....
|
||||
do salt raet road usher master setup per inode ".salt.road.manor."
|
||||
go join
|
||||
|
||||
frame join
|
||||
print Joining...
|
||||
enter
|
||||
do salt raet road stack joiner per inode ".salt.road.manor."
|
||||
recur
|
||||
do salt raet road stack joined per inode ".salt.road.manor."
|
||||
do salt raet road stack rejected per inode ".salt.road.manor."
|
||||
|
||||
go next if joined in .salt.road.manor.status
|
||||
#go abort if rejected in .salt.road.manor.status
|
||||
|
||||
frame joined
|
||||
print Joined
|
||||
go next if elapsed >= 0.5
|
||||
|
||||
frame allow
|
||||
print Allowing...
|
||||
enter
|
||||
do salt raet road stack allower per inode ".salt.road.manor."
|
||||
recur
|
||||
do salt raet road stack allowed per inode ".salt.road.manor."
|
||||
|
||||
go next if allowed in .salt.road.manor.status
|
||||
|
||||
frame allowed
|
||||
print Allowed
|
||||
go next
|
||||
|
||||
frame clustering
|
||||
print Cluster Setup ...
|
||||
do salt raet road cluster load setup
|
||||
go next
|
||||
|
||||
frame quit
|
||||
bid stop me
|
||||
|
||||
frame abort
|
||||
bid stop all
|
||||
|
||||
# Router framer
|
||||
framer uxdrouter be inactive first start
|
||||
frame start
|
||||
do salt raet router
|
||||
do salt raet router master
|
||||
|
||||
# Event bus framer
|
||||
framer events be inactive first start
|
||||
frame start
|
||||
do salt raet eventer
|
||||
do salt raet eventer master
|
||||
|
||||
# Presence framer
|
||||
framer presence be inactive first start
|
||||
|
@ -46,20 +46,17 @@ framer bootstrap be inactive first setup
|
||||
frame setup
|
||||
enter
|
||||
do salt raet road clustered per inode ".salt.road.manor."
|
||||
do salt raet road usher minion setup per inode ".salt.road.manor."
|
||||
go clustermaster
|
||||
go multimaster
|
||||
|
||||
frame clustermaster
|
||||
let if salt.road.manor.cluster.clustered
|
||||
print Setting Up Master Cluster ....
|
||||
enter
|
||||
do salt raet road cluster minion setup per inode ".salt.road.manor."
|
||||
go join
|
||||
|
||||
frame multimaster
|
||||
print Setting Up Master or MultiMaster
|
||||
enter
|
||||
do salt raet road master setup per inode ".salt.road.manor."
|
||||
go join
|
||||
|
||||
frame join
|
||||
@ -72,7 +69,6 @@ framer bootstrap be inactive first setup
|
||||
|
||||
go next if joined in .salt.road.manor.status
|
||||
go abort if rejected in .salt.road.manor.status
|
||||
#go abort if elapsed >= 10
|
||||
|
||||
frame joined
|
||||
print Joined
|
||||
@ -86,14 +82,13 @@ framer bootstrap be inactive first setup
|
||||
do salt raet road stack allowed per inode ".salt.road.manor."
|
||||
|
||||
go next if allowed in .salt.road.manor.status
|
||||
#go abort if elapsed >= 5
|
||||
|
||||
frame allowed
|
||||
print Allowed
|
||||
go next if elapsed >= 0.5
|
||||
|
||||
frame balancing
|
||||
print Balancing Setup
|
||||
frame clustering
|
||||
print Cluster Setup ...
|
||||
do salt raet road cluster load setup
|
||||
go next
|
||||
|
||||
@ -112,7 +107,7 @@ framer bootstrap be inactive first setup
|
||||
frame router
|
||||
# start the manager framer
|
||||
bid start manager #start alive presence from minion side
|
||||
do salt raet router
|
||||
do salt raet router minion
|
||||
go pillar if .salt.var.pillar_refresh
|
||||
go loading if .salt.var.module_refresh
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user