Transform serialization systems to use the serial class

This commit is contained in:
Thomas S Hatch 2011-12-13 21:15:56 -07:00
parent fd4d47ed6b
commit b79e0017c4
5 changed files with 53 additions and 43 deletions

View File

@ -71,6 +71,7 @@ class LocalClient(object):
'''
def __init__(self, c_path='/etc/salt/master'):
self.opts = salt.config.master_config(c_path)
self.serial = salt.payload.Serial(self.opts)
self.key = self.__read_master_key()
def __read_master_key(self):
@ -199,7 +200,7 @@ class LocalClient(object):
continue
while fn_ not in ret:
try:
ret[fn_] = salt.payload.load(open(retp, 'r'))
ret[fn_] = self.serial.load(open(retp, 'r'))
except:
pass
if ret and start == 999999999999:
@ -238,10 +239,10 @@ class LocalClient(object):
continue
while fn_ not in ret:
try:
ret_data = salt.payload.load(open(retp, 'r'))
ret_data = self.serial.load(open(retp, 'r'))
ret[fn_] = {'ret': ret_data}
if os.path.isfile(outp):
ret[fn_]['out'] = salt.payload.load(open(outp, 'r'))
ret[fn_]['out'] = self.serial.load(open(outp, 'r'))
except:
pass
if ret and start == 999999999999:
@ -268,7 +269,7 @@ class LocalClient(object):
loadp = os.path.join(jid_dir, '.load.p')
if os.path.isfile(loadp):
try:
load = salt.payload.load(open(loadp, 'r'))
load = self.serial.load(open(loadp, 'r'))
if load['fun'] == cmd:
# We found a match! Add the return values
ret[jid] = {}
@ -277,7 +278,7 @@ class LocalClient(object):
retp = os.path.join(host_dir, 'return.p')
if not os.path.isfile(retp):
continue
ret[jid][host] = salt.payload.load(open(retp))
ret[jid][host] = self.serial.load(open(retp))
except:
continue
else:
@ -334,7 +335,7 @@ class LocalClient(object):
return {'jid': '',
'minions': minions}
if self.opts['order_masters']:
package = salt.payload.format_payload(
package = self.serial.format_payload(
'clear',
cmd='publish',
tgt=tgt,
@ -346,7 +347,7 @@ class LocalClient(object):
jid=jid,
to=timeout)
else:
package = salt.payload.format_payload(
package = self.serial.format_payload(
'clear',
cmd='publish',
tgt=tgt,
@ -369,7 +370,7 @@ class LocalClient(object):
payload = None
for ind in range(100):
try:
payload = salt.payload.unpackage(
payload = self.serial.unpackage(
socket.recv(
zmq.NOBLOCK
)

View File

@ -97,6 +97,7 @@ class Auth(object):
'''
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(self.opts)
self.rsa_path = os.path.join(self.opts['pki_dir'], 'minion.pem')
if 'syndic_master' in self.opts:
self.mpub = 'syndic_master.pub'
@ -187,9 +188,9 @@ class Auth(object):
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(self.opts['master_uri'])
payload = salt.payload.package(self.minion_sign_in_payload())
payload = self.serial.package(self.minion_sign_in_payload())
socket.send(payload)
payload = salt.payload.unpackage(socket.recv())
payload = self.serial.unpackage(socket.recv())
if 'load' in payload:
if 'ret' in payload['load']:
if not payload['load']['ret']:
@ -243,9 +244,10 @@ class Crypticle(object):
AES_BLOCK_SIZE = 16
SIG_SIZE = hashlib.sha256().digest_size
def __init__(self, key_string, key_size=192):
def __init__(self, key_string, key_size=192, opts):
self.keys = self.extract_keys(key_string, key_size)
self.key_size = key_size
self.serial = salt.payload.Serial(opts)
@classmethod
def generate_key_string(cls, key_size=192):
@ -291,7 +293,7 @@ class Crypticle(object):
'''
Serialize and encrypt a python object
'''
return self.encrypt(self.PICKLE_PAD + salt.payload.dumps(obj))
return self.encrypt(self.PICKLE_PAD + self.serial.dumps(obj))
def loads(self, data):
'''
@ -301,7 +303,7 @@ class Crypticle(object):
# simple integrity check to verify that we got meaningful data
if not data.startswith(self.PICKLE_PAD):
return {}
return salt.payload.loads(data[len(self.PICKLE_PAD):])
return self.serial.loads(data[len(self.PICKLE_PAD):])
class SAuth(Auth):
@ -325,7 +327,7 @@ class SAuth(Auth):
print 'Failed to authenticate with the master, verify that this'\
+ ' minion\'s public key has been accepted on the salt master'
sys.exit(2)
return Crypticle(creds['aes'])
return Crypticle(creds['aes'], self.opts)
def gen_token(self, clear_tok):
'''

View File

@ -28,20 +28,21 @@ import salt.utils
log = logging.getLogger(__name__)
def prep_jid(cachedir, load):
def prep_jid(opts, load):
'''
Parses the job return directory, generates a job id and sets up the
job id directory.
'''
serial = salt.payload.Serial(opts)
jid_root = os.path.join(cachedir, 'jobs')
jid = "{0:%Y%m%d%H%M%S%f}".format(datetime.datetime.now())
jid_dir = os.path.join(jid_root, jid)
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
salt.payload.dump(load, open(os.path.join(jid_dir, '.load.p'), 'w+'))
serial.dump(load, open(os.path.join(jid_dir, '.load.p'), 'w+'))
else:
return prep_jid(load)
return prep_jid(cachedir, load)
return jid
@ -62,7 +63,7 @@ class SMaster(object):
'''
Return the crypticle used for AES
'''
return salt.crypt.Crypticle(self.opts['aes'])
return salt.crypt.Crypticle(self.opts['aes'], self.opts)
def __prep_key(self):
'''
@ -229,6 +230,7 @@ class MWorker(multiprocessing.Process):
clear_funcs):
multiprocessing.Process.__init__(self)
self.opts = opts
self.serial = salt.payload.Serial(opts)
self.crypticle = crypticle
self.aes_funcs = aes_funcs
self.clear_funcs = clear_funcs
@ -247,8 +249,8 @@ class MWorker(multiprocessing.Process):
while True:
package = socket.recv()
payload = salt.payload.unpackage(package)
ret = salt.payload.package(self._handle_payload(payload))
payload = self.serial.unpackage(package)
ret = self.serial.package(self._handle_payload(payload))
socket.send(ret)
def _handle_payload(self, payload):
@ -302,6 +304,7 @@ class AESFuncs(object):
#
def __init__(self, opts, crypticle):
self.opts = opts
self.serial = salt.payload.Serial(opts)
self.crypticle = crypticle
# Make a client
self.local = salt.client.LocalClient(self.opts['conf_file'])
@ -424,10 +427,10 @@ class AESFuncs(object):
hn_dir = os.path.join(jid_dir, load['id'])
if not os.path.isdir(hn_dir):
os.makedirs(hn_dir)
salt.payload.dump(load['return'],
self.serial.dump(load['return'],
open(os.path.join(hn_dir, 'return.p'), 'w+'))
if 'out' in load:
salt.payload.dump(load['out'],
self.serial.dump(load['out'],
open(os.path.join(hn_dir, 'out.p'), 'w+'))
def _syndic_return(self, load):
@ -497,7 +500,7 @@ class AESFuncs(object):
if not good:
return {}
# Set up the publication payload
jid = prep_jid(self.opts['cachedir'], clear_load)
jid = prep_jid(self.opts, clear_load)
payload = {'enc': 'aes'}
load = {
'fun': clear_load['fun'],
@ -522,7 +525,7 @@ class AESFuncs(object):
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
pub_sock.send(salt.payload.package(payload))
pub_sock.send(self.serial.dumps(payload))
# Run the client get_returns method
return self.local.get_returns(
jid,
@ -561,6 +564,7 @@ class ClearFuncs(object):
# _auth
def __init__(self, opts, key, master_key, crypticle):
self.opts = opts
self.serial = salt.payload.Serial(opts)
self.key = key
self.master_key = master_key
self.crypticle = crypticle
@ -695,7 +699,7 @@ class ClearFuncs(object):
if not os.path.isdir(jid_dir):
os.makedirs(jid_dir)
# Save the invocation information
salt.payload.dump(clear_load, open(os.path.join(jid_dir, '.load.p'), 'w+'))
self.serial.dump(clear_load, open(os.path.join(jid_dir, '.load.p'), 'w+'))
# Set up the payload
payload = {'enc': 'aes'}
load = {
@ -717,6 +721,6 @@ class ClearFuncs(object):
os.path.join(self.opts['sock_dir'], 'publish_pull.ipc')
)
pub_sock.connect(pull_uri)
pub_sock.send(salt.payload.package(payload))
pub_sock.send(self.serial.dumps(payload))
return {'enc': 'clear',
'load': {'jid': clear_load['jid']}}

View File

@ -78,6 +78,7 @@ class Minion(object):
Pass in the options dict
'''
self.opts = opts
self.serial = salt.payload.Serial(self.opts)
self.mod_opts = self.__prep_mod_opts()
self.functions, self.returners = self.__load_modules()
self.matcher = Matcher(self.opts, self.functions)
@ -291,7 +292,7 @@ class Minion(object):
except KeyError:
pass
payload['load'] = self.crypticle.dumps(load)
socket.send(salt.payload.dumps(payload))
socket.send(self.serial.dumps(payload))
return socket.recv()
def authenticate(self):
@ -312,7 +313,7 @@ class Minion(object):
time.sleep(10)
self.aes = creds['aes']
self.publish_port = creds['publish_port']
self.crypticle = salt.crypt.Crypticle(self.aes)
self.crypticle = salt.crypt.Crypticle(self.aes, self.opts)
def passive_refresh(self):
'''
@ -350,7 +351,7 @@ class Minion(object):
while True:
payload = None
try:
payload = salt.payload.loads(socket.recv(1))
payload = self.serial.loads(socket.recv(1))
self._handle_payload(payload)
last = time.time()
except:
@ -370,7 +371,7 @@ class Minion(object):
while True:
payload = None
try:
payload = salt.payload(socket.recv(1))
payload = self.serial(socket.recv(1))
self._handle_payload(payload)
except:
pass
@ -580,6 +581,7 @@ class FileClient(object):
'''
def __init__(self, opts):
self.opts = opts
self.serial = salt.payload.Serial(self.opts)
self.auth = salt.crypt.SAuth(opts)
self.socket = self.__get_socket()
@ -624,8 +626,8 @@ class FileClient(object):
else:
load['loc'] = fn_.tell()
payload['load'] = self.auth.crypticle.dumps(load)
self.socket.send(salt.payload.dumps(payload))
data = self.auth.crypticle.loads(salt.payload.loads(self.socket.recv()))
self.socket.send(self.serial.dumps(payload))
data = self.auth.crypticle.loads(self.serial.loads(self.socket.recv()))
if not data['data']:
break
if not fn_:
@ -687,8 +689,8 @@ class FileClient(object):
load = {'env': env,
'cmd': '_file_list'}
payload['load'] = self.auth.crypticle.dumps(load)
self.socket.send(salt.payload.dumps(payload))
return self.auth.crypticle.loads(salt.payload.loads(self.socket.recv()))
self.socket.send(self.serial.dumps(payload))
return self.auth.crypticle.loads(self.serial.loads(self.socket.recv()))
def hash_file(self, path, env='base'):
'''
@ -702,8 +704,8 @@ class FileClient(object):
'env': env,
'cmd': '_file_hash'}
payload['load'] = self.auth.crypticle.dumps(load)
self.socket.send(salt.payload.dumps(payload))
return self.auth.crypticle.loads(salt.payload.loads(self.socket.recv()))
self.socket.send(self.serial.dumps(payload))
return self.auth.crypticle.loads(self.serial.loads(self.socket.recv()))
def list_env(self, path, env='base'):
'''
@ -713,8 +715,8 @@ class FileClient(object):
load = {'env': env,
'cmd': '_file_list'}
payload['load'] = self.auth.crypticle.dumps(load)
self.socket.send(salt.payload.dumps(payload))
return self.auth.crypticle.loads(salt.payload.loads(self.socket.recv()))
self.socket.send(self.serial.dumps(payload))
return self.auth.crypticle.loads(self.serial.loads(self.socket.recv()))
def get_state(self, sls, env):
'''
@ -737,5 +739,5 @@ class FileClient(object):
payload = {'enc': 'aes'}
load = {'cmd': '_master_opts'}
payload['load'] = self.auth.crypticle.dumps(load)
self.socket.send(salt.payload.dumps(payload))
return self.auth.crypticle.loads(salt.payload.loads(self.socket.recv()))
self.socket.send(self.serial.dumps(payload))
return self.auth.crypticle.loads(self.serial.loads(self.socket.recv()))

View File

@ -5,7 +5,7 @@ Publish a command from a minion to a target
import zmq
import salt.crypt
import salt.salt.payload
import salt.payload
def _get_socket():
@ -36,6 +36,7 @@ def publish(tgt, fun, arg=None, expr_form='glob', returner=''):
salt system.example.com publish.publish '*' cmd.run 'ls -la /tmp'
'''
serial = salt.payload.Serial(__opts__)
if fun == 'publish.publish':
# Need to log something here
return {}
@ -56,5 +57,5 @@ def publish(tgt, fun, arg=None, expr_form='glob', returner=''):
'id': __opts__['id']}
payload['load'] = auth.crypticle.dumps(load)
socket = _get_socket()
socket.send(salt.payload.dumps(payload))
return auth.crypticle.loads(salt.payload.loads(socket.recv()))
socket.send(serial.dumps(payload))
return auth.crypticle.loads(serial.loads(socket.recv()))