Merge pull request #32155 from cachedout/lint_32008

Lint 32008
This commit is contained in:
Mike Place 2016-03-25 12:40:21 -06:00
commit 0bde9f0aef
2 changed files with 999 additions and 58 deletions

View File

@ -6,7 +6,6 @@ Salt module to manage Kubernetes cluster
Roadmap:
* Remove python-requests dependency
* Add creation of K8S objects (pod, rc, service, ...)
* Add replace of K8S objects (pod, rc, service, ...)
* Add deletion of K8S objects (pod, rc, service, ...)
@ -19,25 +18,25 @@ from __future__ import absolute_import
import os
import re
import json
import logging as logger
import base64
import salt.ext.six as six
from salt.ext.six.moves.urllib.parse import urlparse as _urlparse # pylint: disable=no-name-in-module
# TODO Remove requests dependency
# Import third party libs
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
import salt.utils
import salt.utils.http as http
__virtualname__ = 'k8s'
# Setup the logger
log = logger.getLogger(__name__)
def __virtual__():
'''Load load if python-requests is installed.'''
if not HAS_REQUESTS:
return False
return __virtualname__
@ -45,21 +44,126 @@ def _guess_apiserver(apiserver_url=None):
'''Try to guees the kubemaster url from environ,
then from `/etc/kubernetes/config` file
'''
default_config = "/etc/kubernetes/config"
if apiserver_url is not None:
return apiserver_url
if "KUBERNETES_MASTER" in os.environ:
apiserver_url = os.environ.get("KUBERNETES_MASTER")
else:
elif __salt__['config.get']('k8s:master'):
apiserver_url = __salt__['config.get']('k8s:master')
elif os.path.exists(default_config) or __salt__['config.get']('k8s:config', ""):
config = __salt__['config.get']('k8s:config', default_config)
kubeapi_regex = re.compile("""KUBE_MASTER=['"]--master=(.*)['"]""",
re.MULTILINE)
with salt.utils.fopen("/etc/kubernetes/config") as fh_k8s:
with salt.utils.fopen(config) as fh_k8s:
for line in fh_k8s.readlines():
match_line = kubeapi_regex.match(line)
if match_line:
apiserver_url = match_line.group(1)
else:
# we failed to discover, lets use k8s default address
apiserver_url = "http://127.0.0.1:8080"
log.debug("Discoverd k8s API server address: {0}".format(apiserver_url))
return apiserver_url
def _kpost(url, data):
''' create any object in kubernetes based on URL '''
# Prepare headers
headers = {"Content-Type": "application/json"}
# Make request
log.trace("url is: {0}, data is: {1}".format(url, data))
ret = http.query(url, method='POST', header_dict=headers, data=json.dumps(data))
# Check requests status
if ret.get('error'):
return ret
else:
return json.loads(ret.get('body'))
def _kput(url, data):
''' put any object in kubernetes based on URL '''
# Prepare headers
headers = {"Content-Type": "application/json"}
# Make request
ret = http.query(url, method='PUT', header_dict=headers, data=json.dumps(data))
# Check requests status
if ret.get('error'):
return ret
else:
return json.loads(ret.get('body'))
def _kpatch(url, data):
''' patch any object in kubernetes based on URL '''
# Prepare headers
headers = {"Content-Type": "application/json-patch+json"}
# Make request
ret = http.query(url, method='PATCH', header_dict=headers,
data=json.dumps(data))
# Check requests status
if ret.get('error'):
log.error("Got an error: {0}".format(ret.get("error")))
return ret
else:
return json.loads(ret.get('body'))
def _kname(obj):
'''Get name or names out of json result from API server'''
if isinstance(obj, dict):
return [obj.get("metadata", {}).get("name", "")]
elif isinstance(obj, (list, tuple)):
names = []
for i in obj:
names.append(i.get("metadata", {}).get("name", ""))
return names
else:
return "Unknown type"
def _is_dns_subdomain(name):
''' Check that name is DNS subdomain: One or more lowercase rfc1035/rfc1123
labels separated by '.' with a maximum length of 253 characters '''
dns_subdomain = re.compile(r"""^[a-z0-9\.-]{1,253}$""")
if dns_subdomain.match(name):
log.debug("Name: {0} is valid DNS subdomain".format(name))
return True
else:
log.debug("Name: {0} is not valid DNS subdomain".format(name))
return False
def _is_port_name(name):
''' Check that name is IANA service: An alphanumeric (a-z, and 0-9) string,
with a maximum length of 15 characters, with the '-' character allowed
anywhere except the first or the last character or adjacent to another '-'
character, it must contain at least a (a-z) character '''
port_name = re.compile("""^[a-z0-9]{1,15}$""")
if port_name.match(name):
return True
else:
return False
def _is_dns_label(name):
''' Check that name is DNS label: An alphanumeric (a-z, and 0-9) string,
with a maximum length of 63 characters, with the '-' character allowed
anywhere except the first or last character, suitable for use as a hostname
or segment in a domain name '''
dns_label = re.compile(r"""^[a-z0-9][a-z0-9\.-]{1,62}$""")
if dns_label.match(name):
return True
else:
return False
def _guess_node_id(node):
'''Try to guess kube node ID using salt minion ID'''
if node is None:
@ -70,44 +174,36 @@ def _guess_node_id(node):
def _get_labels(node, apiserver_url):
'''Get all labels from a kube node.'''
# Prepare URL
url = apiserver_url + "/api/v1/nodes/" + node
url = "{0}/api/v1/nodes/{1}".format(apiserver_url, node)
# Make request
ret = requests.get(url)
ret = http.query(url)
# Check requests status
try:
ret.raise_for_status()
except requests.HTTPError as exp:
if ret.status_code == 404:
return "Node {0} doesn't exist".format(node)
else:
return exp
if 'body' in ret:
ret = json.loads(ret.get('body'))
elif ret.get('status', 0) == 404:
return "Node {0} doesn't exist".format(node)
else:
return ret
# Get and return labels
return ret.json().get('metadata', {}).get('labels', {})
return ret.get('metadata', {}).get('labels', {})
def _set_labels(node, apiserver_url, labels):
'''Replace labels dict by a new one'''
# Prepare URL
url = apiserver_url + "/api/v1/nodes/" + node
url = "{0}/api/v1/nodes/{1}".format(apiserver_url, node)
# Prepare data
data = [{"op": "replace", "path": "/metadata/labels", "value": labels}]
# Prepare headers
headers = {"Content-Type": "application/json-patch+json"}
# Make request
ret = requests.patch(url, headers=headers, json=data)
# Check requests status
try:
ret.raise_for_status()
except requests.HTTPError as exp:
if ret.status_code == 404:
return "Node {0} doesn't exist".format(node)
else:
return exp
ret = _kpatch(url, data)
if ret.get("status") == 404:
return "Node {0} doesn't exist".format(node)
return ret
def get_labels(node=None, apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Get labels from the current node
CLI Example:
@ -127,15 +223,14 @@ def get_labels(node=None, apiserver_url=None):
# Get data
ret = _get_labels(node, apiserver_url)
return {"labels": ret}
def label_present(
name,
value,
node=None,
apiserver_url=None):
def label_present(name, value, node=None, apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Set label to the current node
CLI Example:
@ -163,14 +258,26 @@ def label_present(
# This is a new label
ret['changes'] = {name: value}
labels[name] = str(value)
_set_labels(node, apiserver_url, labels)
ret['comment'] = "Label {0} created".format(name)
res = _set_labels(node, apiserver_url, labels)
if res.get('status') == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret['changes'] = {}
ret['comment'] = "Could not create label {0}, please retry".format(name)
else:
ret['comment'] = "Label {0} created".format(name)
elif labels.get(name) != str(value):
# This is a old label and we are going to edit it
ret['changes'] = {name: str(value)}
labels[name] = value
_set_labels(node, apiserver_url, labels)
ret['comment'] = "Label {0} updated".format(name)
labels[name] = str(value)
res = _set_labels(node, apiserver_url, labels)
if res.get('status') == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret['changes'] = {}
ret['comment'] = "Could not update label {0}, please retry".format(name)
else:
ret['comment'] = "Label {0} updated".format(name)
else:
# This is a old label and it has already the wanted value
ret['comment'] = "Label {0} already set".format(name)
@ -178,11 +285,10 @@ def label_present(
return ret
def label_absent(
name,
node=None,
apiserver_url=None):
def label_absent(name, node=None, apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Delete label to the current node
CLI Example:
@ -213,18 +319,23 @@ def label_absent(
ret['comment'] = "Label {0} already absent".format(name)
else:
# Label needs to be delete
_set_labels(node, apiserver_url, labels)
ret['changes'] = {"deleted": name}
ret['comment'] = "Label {0} absent".format(name)
res = _set_labels(node, apiserver_url, labels)
if res.get('status') == 409:
# there is an update during operation, need to retry
log.debug("Got 409, will try later")
ret['changes'] = {}
ret['comment'] = "Could not delete label {0}, please retry".format(name)
else:
ret['changes'] = {"deleted": name}
ret['comment'] = "Label {0} absent".format(name)
return ret
def label_folder_absent(
name,
node=None,
apiserver_url=None):
def label_folder_absent(name, node=None, apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Delete label folder to the current node
CLI Example:
@ -256,8 +367,438 @@ def label_folder_absent(
ret['comment'] = "Label folder {0} already absent".format(folder)
else:
# Label needs to be delete
_set_labels(node, apiserver_url, labels)
ret['changes'] = {"deleted": folder}
ret['comment'] = "Label folder {0} absent".format(folder)
res = _set_labels(node, apiserver_url, labels)
if res.get('status') == 409:
log.debug("Got 409, will try later")
ret['changes'] = {}
ret['comment'] = "Could not delete label folder {0}, please retry".format(folder)
else:
ret['changes'] = {"deleted": folder}
ret['comment'] = "Label folder {0} absent".format(folder)
return ret
# Namespaces
def _get_namespaces(apiserver_url, name=""):
'''Get namespace is namespace is defined otherwise return all namespaces'''
# Prepare URL
url = "{0}/api/v1/namespaces/{1}".format(apiserver_url, name)
# Make request
ret = http.query(url)
if ret.get("body"):
return json.loads(ret.get("body"))
else:
return None
def _create_namespace(namespace, apiserver_url):
''' create namespace on the defined k8s cluster '''
# Prepare URL
url = "{0}/api/v1/namespaces".format(apiserver_url)
# Prepare data
data = {
"kind": "Namespace",
"apiVersion": "v1",
"metadata": {
"name": namespace,
}
}
log.trace("namespace creation requests: {0}".format(data))
# Make request
ret = _kpost(url, data)
log.trace("result is: {0}".format(ret))
# Check requests status
return ret
def create_namespace(name, apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Create kubernetes namespace from the name, similar to the functionality added to kubectl since v.1.2.0:
.. code-block:: bash
kubectl create namespaces namespace-name
CLI Example:
.. code-block:: bash
salt '*' k8s.create_namespace namespace_name
salt '*' k8s.create_namespace namespace_name http://kube-master.cluster.local
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
if not _get_namespaces(apiserver_url, name):
# This is a new namespace
_create_namespace(name, apiserver_url)
ret['changes'] = name
ret['comment'] = "Namespace {0} created".format(name)
else:
ret['comment'] = "Namespace {0} already present".format(name)
return ret
def get_namespaces(namespace="", apiserver_url=None):
'''
.. versionadded:: 2016.3.0
Get one or all kubernetes namespaces.
If namespace parameter is omitted, all namespaces will be returned back to user, similar to following kubectl example:
.. code-block:: bash
kubectl get namespaces -o json
In case namespace is set by user, the output will be similar to the one from kubectl:
.. code-block:: bash
kubectl get namespaces namespace_name -o json
CLI Example:
.. code-block:: bash
salt '*' k8s.get_namespaces
salt '*' k8s.get_namespaces namespace_name http://kube-master.cluster.local
'''
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get data
ret = _get_namespaces(apiserver_url, namespace)
return ret
# Secrets
def _get_secrets(namespace, name, apiserver_url):
'''Get secrets of the namespace.'''
# Prepare URL
url = "{0}/api/v1/namespaces/{1}/secrets/{2}".format(apiserver_url,
namespace, name)
# Make request
ret = http.query(url)
if ret.get("body"):
return json.loads(ret.get("body"))
else:
return None
def _update_secret(namespace, name, data, apiserver_url):
'''Replace secrets data by a new one'''
# Prepare URL
url = "{0}/api/v1/namespaces/{1}/secrets/{2}".format(apiserver_url,
namespace, name)
# Prepare data
data = [{"op": "replace", "path": "/data", "value": data}]
# Make request
ret = _kpatch(url, data)
if ret.get("status") == 404:
return "Node {0} doesn't exist".format(url)
return ret
def _create_secret(namespace, name, data, apiserver_url):
''' create namespace on the defined k8s cluster '''
# Prepare URL
url = "{0}/api/v1/namespaces/{1}/secrets".format(apiserver_url, namespace)
# Prepare data
request = {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": name,
"namespace": namespace,
},
"data": data
}
# Make request
ret = _kpost(url, request)
return ret
def _is_valid_secret_file(filename):
if os.path.exists(filename) and os.path.isfile(filename):
log.debug("File: {0} is valid secret file".format(filename))
return True
log.warn("File: {0} does not exists or not file".format(filename))
return False
def _file_encode(filename):
log.trace("Encoding secret file: {0}".format(filename))
with open(filename, "rb") as f:
data = f.read()
return base64.b64encode(data)
def _decode_secrets(secrets):
items = secrets.get("items", [])
if items:
for i, secret in enumerate(items):
log.trace(i, secret)
for k, v in secret.get("data", {}).iteritems():
items[i]['data'][k] = base64.b64decode(v)
secrets["items"] = items
return secrets
else:
for k, v in secrets.get("data", {}).iteritems():
secrets['data'][k] = base64.b64decode(v)
return secrets
def get_secrets(namespace, name="", apiserver_url=None, decode=False, brief=False):
'''
Get k8s namespaces
CLI Example:
.. code-block:: bash
salt '*' k8s.get_secrets namespace_name
salt '*' k8s.get_secrets namespace_name secret_name http://kube-master.cluster.local
'''
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# Get data
if not decode:
ret = _get_secrets(namespace, name, apiserver_url)
else:
ret = _decode_secrets(_get_secrets(namespace, name, apiserver_url))
return ret
def _source_encode(source, saltenv):
try:
source_url = _urlparse(source)
except TypeError:
return '', {}, ('Invalid format for source parameter')
protos = ('salt', 'http', 'https', 'ftp', 'swift', 's3', 'file')
log.trace("parsed source looks like: {0}".format(source_url))
if not source_url.scheme or source_url.scheme == 'file':
# just a regular file
filename = os.path.abspath(source_url.path)
sname = os.path.basename(filename)
log.debug("Source is a regular local file: {0}".format(source_url.path))
if _is_dns_subdomain(sname) and _is_valid_secret_file(filename):
return sname, _file_encode(filename)
else:
if source_url.scheme in protos:
# The source is a file on a server
filename = __salt__['cp.cache_file'](source, saltenv)
if not filename:
log.warn("Source file: {0} can not be retrieved".format(source))
return "", ""
return os.path.basename(filename), _file_encode(filename)
return "", ""
def update_secret(namespace, name, sources, apiserver_url=None, force=True, saltenv='base'):
'''
.. versionadded:: 2016.3.0
alias to k8s.create_secret with update=true
CLI Example:
.. code-block:: bash
salt '*' k8s.update_secret namespace_name secret_name sources [apiserver_url] [force=true] [update=false] [saltenv='base']
sources are either dictionary of {name: path, name1: path} pairs or array of strings defining paths.
Example of paths array:
.. code-block:: bash
['/full/path/filename', "file:///full/path/filename", "salt://secret/storage/file.txt", "http://user:password@securesite.com/secret-file.json"]
Example of dictionaries:
.. code-block:: bash
{"nameit": '/full/path/fiename', name2: "salt://secret/storage/file.txt"}
optional parameters accepted:
force=[true] default value is true
if the to False, secret will not be created in case one of the files is not
valid kubernetes secret. e.g. capital letters in secret name or _
in case force is set to True, wrong files will be skipped but secret will be created any way.
saltenv=['base'] default value is base
in case 'salt://' path is used, this parameter can change the visibility of files
'''
apiserver_url = _guess_apiserver(apiserver_url)
ret = create_secret(namespace, name, sources, apiserver_url=apiserver_url,
force=force, update=True, saltenv=saltenv)
return ret
def create_secret(namespace, name, sources, apiserver_url=None, force=False, update=False, saltenv='base'):
'''
.. versionadded:: 2016.3.0
Create k8s secrets in the defined namespace from the list of files
CLI Example:
.. code-block:: bash
salt '*' k8s.create_secret namespace_name secret_name sources
salt '*' k8s.create_secret namespace_name secret_name sources
http://kube-master.cluster.local
sources are either dictionary of {name: path, name1: path} pairs or array of strings defining paths.
Example of paths array:
.. code-block:: bash
['/full/path/filename', "file:///full/path/filename", "salt://secret/storage/file.txt", "http://user:password@securesite.com/secret-file.json"]
Example of dictionaries:
.. code-block:: bash
{"nameit": '/full/path/fiename', name2: "salt://secret/storage/file.txt"}
optional parameters accepted:
update=[false] default value is false
if set to false, and secret is already present on the cluster - warning will be returned and no changes to the secret will be done.
In case it is set to "true" and secret is present but data is differ - secret will be updated.
force=[true] default value is true
if the to False, secret will not be created in case one of the files is not
valid kubernetes secret. e.g. capital letters in secret name or _
in case force is set to True, wrong files will be skipped but secret will be created any way.
saltenv=['base'] default value is base
in case 'salt://' path is used, this parameter can change the visibility of files
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
if not sources:
return {'name': name, 'result': False, 'comment': 'No source available', 'changes': {}}
apiserver_url = _guess_apiserver(apiserver_url)
# we need namespace to create secret in it
if not _get_namespaces(apiserver_url, namespace):
if force:
_create_namespace(namespace, apiserver_url)
else:
return {'name': name, 'result': False, 'comment': "Namespace doesn't exists", 'changes': {}}
secret = _get_secrets(namespace, name, apiserver_url)
if secret and not update:
log.info("Secret {0} is already present on {1}".format(name, namespace))
return {'name': name, 'result': False,
'comment': 'Secret {0} is already present'.format(name),
'changes': {}}
data = {}
for source in sources:
log.debug("source is: {0}".format(source))
if isinstance(source, dict):
# format is array of dictionaries:
# [{public_auth: salt://public_key}, {test: "/tmp/test"}]
log.trace("source is dictionary: {0}".format(source))
for k, v in source.iteritems():
sname, encoded = _source_encode(v, saltenv)
if sname == encoded == "":
ret['comment'] += "Source file {0} is missing or name is incorrect\n".format(v)
if force:
continue
else:
return ret
data[k] = encoded
elif isinstance(source, six.string_types):
# expected format is array of filenames
sname, encoded = _source_encode(source, saltenv)
if sname == encoded == "":
if force:
ret['comment'] += "Source file {0} is missing or name is incorrect\n".format(source)
continue
else:
return ret
data[sname] = encoded
log.trace("secret data is: {0}".format(data))
if secret and update:
if not data:
ret["comment"] += "Could not find source files or your sources are empty"
ret["result"] = False
elif secret.get("data") and data != secret.get("data"):
res = _update_secret(namespace, name, data, apiserver_url)
ret['comment'] = 'Updated secret'
ret['changes'] = 'Updated secret'
else:
log.debug("Secret has not been changed on cluster, skipping it")
ret['comment'] = 'Has not been changed on cluster, skipping it'
else:
res = _create_secret(namespace, name, data, apiserver_url)
return ret
def delete_secret(namespace, name, apiserver_url=None, force=True):
'''
.. versionadded:: 2016.3.0
Delete kubernetes secret in the defined namespace. Namespace is the mandatory parameter as well as name.
CLI Example:
.. code-block:: bash
salt '*' k8s.delete_secret namespace_name secret_name
salt '*' k8s.delete_secret namespace_name secret_name http://kube-master.cluster.local
'''
ret = {'name': name, 'result': True, 'comment': '', 'changes': {}}
# Try to get kubernetes master
apiserver_url = _guess_apiserver(apiserver_url)
if apiserver_url is None:
return False
# we need namespace to delete secret in it
if not _get_namespaces(apiserver_url, namespace):
return {'name': name, 'result': False,
'comment': "Namespace doesn't exists, can't delete anything there",
'changes': {}}
url = "{0}/api/v1/namespaces/{1}/secrets/{2}".format(apiserver_url,
namespace, name)
res = http.query(url, method='DELETE')
if res.get('body'):
ret['comment'] = "Removed secret {0} in {1} namespace".format(name,
namespace)
return ret

400
tests/unit/modules/k8s_test.py Executable file
View File

@ -0,0 +1,400 @@
# Import pytohn libs
from __future__ import absolute_import
# Import Salt Testing libs
from salttesting import skipIf, TestCase
from salttesting.helpers import ensure_in_syspath, skip_if_binaries_missing
ensure_in_syspath('../../')
# from unittest import TestCase
# Import Salt execution module to test
# from salt.modules import k8s
# FIXME
import salt.modules.k8s as k8s
import json
import hashlib
import base64
import time
from subprocess import Popen, PIPE
TestCase.maxDiff = None
@skip_if_binaries_missing(['kubectl'])
class TestK8SNamespace(TestCase):
def test_get_namespaces(self):
res = k8s.get_namespaces(apiserver_url="http://127.0.0.1:8080")
a = len(res.get("items"))
proc = Popen(["kubectl", "get", "namespaces", "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = len(kubectl_out.get("items"))
self.assertEqual(a, b)
def test_get_one_namespace(self):
res = k8s.get_namespaces("default", apiserver_url="http://127.0.0.1:8080")
a = res.get("metadata", {}).get("name", "a")
proc = Popen(["kubectl", "get", "namespaces", "default", "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = kubectl_out.get("metadata", {}).get("name", "b")
self.assertEqual(a, b)
def test_create_namespace(self):
hash = hashlib.sha1()
hash.update(str(time.time()))
nsname = hash.hexdigest()[:16]
res = k8s.create_namespace(nsname, apiserver_url="http://127.0.0.1:8080")
proc = Popen(["kubectl", "get", "namespaces", nsname, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
# if creation is failed, kubernetes return non json error message
self.assertTrue(isinstance(kubectl_out, dict))
@skip_if_binaries_missing(['kubectl'])
class TestK8SSecrets(TestCase):
def setUp(self):
hash = hashlib.sha1()
hash.update(str(time.time()))
self.name = hash.hexdigest()[:16]
data = {"testsecret": base64.encodestring("teststring")}
self.request = {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": self.name,
"namespace": "default",
},
"data": data,
}
def test_get_secrets(self):
res = k8s.get_secrets("default", apiserver_url="http://127.0.0.1:8080")
a = len(res.get("items", []))
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = len(kubectl_out.get("items", []))
self.assertEqual(a, b)
def test_get_one_secret(self):
name = self.name
filename = "/tmp/{0}.json".format(name)
with open(filename, 'w') as f:
json.dump(self.request, f)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
res = k8s.get_secrets("default", name, apiserver_url="http://127.0.0.1:8080")
a = res.get("metadata", {}).get("name", "a")
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = kubectl_out.get("metadata", {}).get("name", "b")
self.assertEqual(a, b)
def test_get_decoded_secret(self):
name = self.name
filename = "/tmp/{0}.json".format(name)
with open(filename, 'w') as f:
json.dump(self.request, f)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give etcd to populate data on all nodes
time.sleep(0.1)
res = k8s.get_secrets("default", name, apiserver_url="http://127.0.0.1:8080", decode=True)
a = res.get("data", {}).get("testsecret", )
self.assertEqual(a, "teststring")
def test_create_secret(self):
name = self.name
names = []
expected_data = {}
for i in range(2):
names.append("/tmp/{0}-{1}".format(name, i))
with open("/tmp/{0}-{1}".format(name, i), 'w') as f:
expected_data["{0}-{1}".format(name, i)] = base64.b64encode("{0}{1}".format(name, i))
f.write("{0}{1}".format(name, i))
res = k8s.create_secret("default", name, names, apiserver_url="http://127.0.0.1:8080")
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
# if creation is failed, kubernetes return non json error message
b = kubectl_out.get("data", {})
self.assertTrue(isinstance(kubectl_out, dict))
self.assertEqual(expected_data, b)
def test_update_secret(self):
name = self.name
filename = "/tmp/{0}.json".format(name)
with open(filename, 'w') as f:
json.dump(self.request, f)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
expected_data = {}
names = []
for i in range(3):
names.append("/tmp/{0}-{1}-updated".format(name, i))
with open("/tmp/{0}-{1}-updated".format(name, i), 'w') as f:
expected_data["{0}-{1}-updated".format(name, i)] = base64.b64encode("{0}{1}-updated".format(name, i))
f.write("{0}{1}-updated".format(name, i))
res = k8s.update_secret("default", name, names, apiserver_url="http://127.0.0.1:8080")
# if creation is failed, kubernetes return non json error message
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
# if creation is failed, kubernetes return non json error message
b = kubectl_out.get("data", {})
self.assertTrue(isinstance(kubectl_out, dict))
self.assertEqual(expected_data, b)
def test_delete_secret(self):
name = self.name
filename = "/tmp/{0}.json".format(name)
with open(filename, 'w') as f:
json.dump(self.request, f)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
res = k8s.delete_secret("default", name, apiserver_url="http://127.0.0.1:8080")
time.sleep(0.1)
proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE, stderr=PIPE)
kubectl_out, err = proc.communicate()
# stdout is empty, stderr is showing something like "not found"
self.assertEqual('', kubectl_out)
self.assertEqual('Error from server: secrets "{0}" not found\n'.format(name), err)
@skip_if_binaries_missing(['kubectl'])
class TestK8SResourceQuotas(TestCase):
def setUp(self):
hash = hashlib.sha1()
hash.update(str(time.time()))
self.name = hash.hexdigest()[:16]
def test_get_resource_quotas(self):
name = self.name
namespace = self.name
create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE)
create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE)
request = """
apiVersion: v1
kind: ResourceQuota
metadata:
name: {0}
spec:
hard:
cpu: "20"
memory: 1Gi
persistentvolumeclaims: "10"
pods: "10"
replicationcontrollers: "20"
resourcequotas: "1"
secrets: "10"
services: "5"
""".format(name)
filename = "/tmp/{0}.yaml".format(name)
with open(filename, 'w') as f:
f.write(request)
create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.2)
res = k8s.get_resource_quotas(namespace, apiserver_url="http://127.0.0.1:8080")
a = len(res.get("items", []))
proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = len(kubectl_out.get("items", []))
self.assertEqual(a, b)
def test_get_one_resource_quota(self):
name = self.name
namespace = self.name
create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE)
request = """
apiVersion: v1
kind: ResourceQuota
metadata:
name: {0}
spec:
hard:
cpu: "20"
memory: 1Gi
persistentvolumeclaims: "10"
pods: "10"
replicationcontrollers: "20"
resourcequotas: "1"
secrets: "10"
services: "5"
""".format(name)
filename = "/tmp/{0}.yaml".format(name)
with open(filename, 'w') as f:
f.write(request)
create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.2)
res = k8s.get_resource_quotas(namespace, name, apiserver_url="http://127.0.0.1:8080")
a = res.get("metadata", {}).get("name", "a")
proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = kubectl_out.get("metadata", {}).get("name", "b")
self.assertEqual(a, b)
def test_create_resource_quota(self):
name = self.name
namespace = self.name
create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE)
quota = {
"cpu": "20",
"memory": "1Gi"
}
res = k8s.create_resource_quota(namespace, quota, name=name, apiserver_url="http://127.0.0.1:8080")
proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
self.assertTrue(isinstance(kubectl_out, dict))
def test_update_resource_quota(self):
name = self.name
namespace = self.name
create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE)
request = """
apiVersion: v1
kind: ResourceQuota
metadata:
name: {0}
spec:
hard:
cpu: "20"
memory: 1Gi
persistentvolumeclaims: "10"
pods: "10"
replicationcontrollers: "20"
resourcequotas: "1"
secrets: "10"
services: "5"
""".format(name)
filename = "/tmp/{0}.yaml".format(name)
with open(filename, 'w') as f:
f.write(request)
create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.2)
quota = {
"cpu": "10",
"memory": "2Gi"
}
res = k8s.create_resource_quota(namespace, quota, name=name, apiserver_url="http://127.0.0.1:8080", update=True)
proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
limit = kubectl_out.get("spec").get("hard").get("memory")
self.assertEqual("2Gi", limit)
@skip_if_binaries_missing(['kubectl'])
class TestK8SLimitrange(TestCase):
def setUp(self):
hash = hashlib.sha1()
hash.update(str(time.time()))
self.name = hash.hexdigest()[:16]
def test_create_limit_range(self):
name = self.name
limits = {
"Container": {
"defaultRequest": {
"cpu": "100m"
}
}
}
res = k8s.create_limit_range("default", limits, name=name, apiserver_url="http://127.0.0.1:8080")
proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
self.assertTrue(isinstance(kubectl_out, dict))
def test_update_limit_range(self):
name = self.name
request = """
apiVersion: v1
kind: LimitRange
metadata:
name: {0}
spec:
limits:
- default:
cpu: 200m
memory: 512Mi
defaultRequest:
cpu: 100m
memory: 256Mi
type: Container
""".format(name)
limits = {
"Container": {
"defaultRequest": {
"cpu": "100m"
}
}
}
filename = "/tmp/{0}.yaml".format(name)
with open(filename, 'w') as f:
f.write(request)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
res = k8s.create_limit_range("default", limits, name=name, apiserver_url="http://127.0.0.1:8080", update=True)
proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
limit = kubectl_out.get("spec").get("limits")[0].get("defaultRequest").get("cpu")
self.assertEqual("100m", limit)
def test_get_limit_ranges(self):
res = k8s.get_limit_ranges("default", apiserver_url="http://127.0.0.1:8080")
a = len(res.get("items", []))
proc = Popen(["kubectl", "--namespace=default", "get", "limits", "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = len(kubectl_out.get("items", []))
self.assertEqual(a, b)
def test_get_one_limit_range(self):
name = self.name
request = """
apiVersion: v1
kind: LimitRange
metadata:
name: {0}
spec:
limits:
- default:
cpu: 200m
memory: 512Mi
defaultRequest:
cpu: 100m
memory: 256Mi
type: Container
""".format(name)
filename = "/tmp/{0}.yaml".format(name)
with open(filename, 'w') as f:
f.write(request)
create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE)
# wee need to give kubernetes time save data in etcd
time.sleep(0.1)
res = k8s.get_limit_ranges("default", name, apiserver_url="http://127.0.0.1:8080")
a = res.get("metadata", {}).get("name", "a")
proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE)
kubectl_out = json.loads(proc.communicate()[0])
b = kubectl_out.get("metadata", {}).get("name", "b")
self.assertEqual(a, b)
if __name__ == '__main__':
from integration import run_tests
run_tests(TestK8SNamespace,
TestK8SSecrets,
TestK8SResourceQuotas,
TestK8SLimitRange,
needs_daemon=False)