mirror of
https://github.com/valitydev/salt.git
synced 2024-11-07 17:09:03 +00:00
Add packet conflict resolver on multi-master environment
This commit is contained in:
parent
565f81f6e2
commit
bb3f1322b4
@ -23,6 +23,7 @@ JSON-based service discovery protocol, used by minions to find running Master.
|
||||
import datetime
|
||||
import time
|
||||
import logging
|
||||
import random
|
||||
import socket
|
||||
from collections import OrderedDict
|
||||
|
||||
@ -134,6 +135,32 @@ class SSDPFactory(SSDPBase):
|
||||
'''
|
||||
self.transport = transport
|
||||
|
||||
def _sendto(self, data, addr=None, attempts=10):
|
||||
'''
|
||||
On multi-master environments, running on the same machine,
|
||||
transport sending to the destination can be allowed only at once.
|
||||
Since every machine will immediately respond, high chance to
|
||||
get sending fired at the same time, which will result to a PermissionError
|
||||
at socket level. We are attempting to send it in a different time.
|
||||
|
||||
:param data:
|
||||
:param addr:
|
||||
:return:
|
||||
'''
|
||||
tries = 0
|
||||
slp_time = lambda: 0.5 / random.randint(10, 30)
|
||||
slp = slp_time()
|
||||
while tries < 3:
|
||||
try:
|
||||
self.transport.sendto(data, addr=addr)
|
||||
self.log.debug('Sent successfully')
|
||||
return
|
||||
except AttributeError as ex:
|
||||
self.log.debug('Permission error: {0}'.format(ex))
|
||||
time.sleep(slp)
|
||||
tries += 1
|
||||
slp += slp_time()
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
'''
|
||||
On datagram receive.
|
||||
@ -149,21 +176,21 @@ class SSDPFactory(SSDPBase):
|
||||
except TypeError:
|
||||
self.log.debug('Received invalid timestamp in package from %s' % ("%s:%s" % addr))
|
||||
if self.disable_hidden:
|
||||
self.transport.sendto('{0}#ERROR#{1}'.format(self.signature, 'Invalid timestamp'), addr)
|
||||
self._sendto('{0}#ERROR#{1}'.format(self.signature, 'Invalid timestamp'), addr)
|
||||
return
|
||||
|
||||
if datetime.datetime.fromtimestamp(timestamp) < (datetime.datetime.now() - datetime.timedelta(seconds=20)):
|
||||
if self.disable_hidden:
|
||||
self.transport.sendto('{0}#ERROR#{1}'.format(self.signature, 'Timestamp is too old'), addr)
|
||||
self._sendto('{0}#ERROR#{1}'.format(self.signature, 'Timestamp is too old'), addr)
|
||||
self.log.debug('Received outdated package from %s' % ("%s:%s" % addr))
|
||||
return
|
||||
|
||||
self.log.debug('Received %r from %s' % (message, "%s:%s" % addr))
|
||||
self.transport.sendto('{0}#OK#{1}'.format(self.signature,
|
||||
self._sendto('{0}#OK#{1}'.format(self.signature,
|
||||
json.dumps(self.answer)), addr)
|
||||
else:
|
||||
if self.disable_hidden:
|
||||
self.transport.sendto('{0}#ERROR#{1}'.format(self.signature,
|
||||
self._sendto('{0}#ERROR#{1}'.format(self.signature,
|
||||
'Invalid packet signature').encode(), addr)
|
||||
self.log.debug('Received bad signature from %s:%s' % addr)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user