# -*- coding: utf-8 -*- ''' Unit Tests for the k8s execution module. ''' # Import Python libs from __future__ import absolute_import import json import hashlib import base64 import time from subprocess import Popen, PIPE # Import Salt Testing libs from tests.support.unit import TestCase from tests.support.helpers import skip_if_binaries_missing # Import Salt libs import salt.utils import salt.modules.k8s as k8s # Import 3rd-party libs from salt.ext.six.moves import range # pylint: disable=import-error @skip_if_binaries_missing(['kubectl']) class TestK8SNamespace(TestCase): maxDiff = None def test_get_namespaces(self): res = k8s.get_namespaces(apiserver_url="http://127.0.0.1:8080") a = len(res.get("items")) proc = Popen(["kubectl", "get", "namespaces", "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = len(kubectl_out.get("items")) self.assertEqual(a, b) def test_get_one_namespace(self): res = k8s.get_namespaces("default", apiserver_url="http://127.0.0.1:8080") a = res.get("metadata", {}).get("name", "a") proc = Popen(["kubectl", "get", "namespaces", "default", "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = kubectl_out.get("metadata", {}).get("name", "b") self.assertEqual(a, b) def test_create_namespace(self): hash = hashlib.sha1() hash.update(str(time.time())) nsname = hash.hexdigest()[:16] res = k8s.create_namespace(nsname, apiserver_url="http://127.0.0.1:8080") proc = Popen(["kubectl", "get", "namespaces", nsname, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) # if creation is failed, kubernetes return non json error message self.assertTrue(isinstance(kubectl_out, dict)) @skip_if_binaries_missing(['kubectl']) class TestK8SSecrets(TestCase): maxDiff = None def setUp(self): hash = hashlib.sha1() hash.update(str(time.time())) self.name = hash.hexdigest()[:16] data = {"testsecret": base64.encodestring("teststring")} self.request = { "apiVersion": "v1", "kind": "Secret", "metadata": { "name": self.name, "namespace": "default", }, "data": data, } def test_get_secrets(self): res = k8s.get_secrets("default", apiserver_url="http://127.0.0.1:8080") a = len(res.get("items", [])) proc = Popen(["kubectl", "--namespace=default", "get", "secrets", "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = len(kubectl_out.get("items", [])) self.assertEqual(a, b) def test_get_one_secret(self): name = self.name filename = "/tmp/{0}.json".format(name) with salt.utils.fopen(filename, 'w') as f: json.dump(self.request, f) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.1) res = k8s.get_secrets("default", name, apiserver_url="http://127.0.0.1:8080") a = res.get("metadata", {}).get("name", "a") proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = kubectl_out.get("metadata", {}).get("name", "b") self.assertEqual(a, b) def test_get_decoded_secret(self): name = self.name filename = "/tmp/{0}.json".format(name) with salt.utils.fopen(filename, 'w') as f: json.dump(self.request, f) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give etcd to populate data on all nodes time.sleep(0.1) res = k8s.get_secrets("default", name, apiserver_url="http://127.0.0.1:8080", decode=True) a = res.get("data", {}).get("testsecret", ) self.assertEqual(a, "teststring") def test_create_secret(self): name = self.name names = [] expected_data = {} for i in range(2): names.append("/tmp/{0}-{1}".format(name, i)) with salt.utils.fopen("/tmp/{0}-{1}".format(name, i), 'w') as f: expected_data["{0}-{1}".format(name, i)] = base64.b64encode("{0}{1}".format(name, i)) f.write("{0}{1}".format(name, i)) res = k8s.create_secret("default", name, names, apiserver_url="http://127.0.0.1:8080") proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) # if creation is failed, kubernetes return non json error message b = kubectl_out.get("data", {}) self.assertTrue(isinstance(kubectl_out, dict)) self.assertEqual(expected_data, b) def test_update_secret(self): name = self.name filename = "/tmp/{0}.json".format(name) with salt.utils.fopen(filename, 'w') as f: json.dump(self.request, f) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.1) expected_data = {} names = [] for i in range(3): names.append("/tmp/{0}-{1}-updated".format(name, i)) with salt.utils.fopen("/tmp/{0}-{1}-updated".format(name, i), 'w') as f: expected_data["{0}-{1}-updated".format(name, i)] = base64.b64encode("{0}{1}-updated".format(name, i)) f.write("{0}{1}-updated".format(name, i)) res = k8s.update_secret("default", name, names, apiserver_url="http://127.0.0.1:8080") # if creation is failed, kubernetes return non json error message proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) # if creation is failed, kubernetes return non json error message b = kubectl_out.get("data", {}) self.assertTrue(isinstance(kubectl_out, dict)) self.assertEqual(expected_data, b) def test_delete_secret(self): name = self.name filename = "/tmp/{0}.json".format(name) with salt.utils.fopen(filename, 'w') as f: json.dump(self.request, f) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.1) res = k8s.delete_secret("default", name, apiserver_url="http://127.0.0.1:8080") time.sleep(0.1) proc = Popen(["kubectl", "--namespace=default", "get", "secrets", name, "-o", "json"], stdout=PIPE, stderr=PIPE) kubectl_out, err = proc.communicate() # stdout is empty, stderr is showing something like "not found" self.assertEqual('', kubectl_out) self.assertEqual('Error from server: secrets "{0}" not found\n'.format(name), err) @skip_if_binaries_missing(['kubectl']) class TestK8SResourceQuotas(TestCase): maxDiff = None def setUp(self): hash = hashlib.sha1() hash.update(str(time.time())) self.name = hash.hexdigest()[:16] def test_get_resource_quotas(self): name = self.name namespace = self.name create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE) create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE) request = """ apiVersion: v1 kind: ResourceQuota metadata: name: {0} spec: hard: cpu: "20" memory: 1Gi persistentvolumeclaims: "10" pods: "10" replicationcontrollers: "20" resourcequotas: "1" secrets: "10" services: "5" """.format(name) filename = "/tmp/{0}.yaml".format(name) with salt.utils.fopen(filename, 'w') as f: f.write(request) create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.2) res = k8s.get_resource_quotas(namespace, apiserver_url="http://127.0.0.1:8080") a = len(res.get("items", [])) proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = len(kubectl_out.get("items", [])) self.assertEqual(a, b) def test_get_one_resource_quota(self): name = self.name namespace = self.name create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE) request = """ apiVersion: v1 kind: ResourceQuota metadata: name: {0} spec: hard: cpu: "20" memory: 1Gi persistentvolumeclaims: "10" pods: "10" replicationcontrollers: "20" resourcequotas: "1" secrets: "10" services: "5" """.format(name) filename = "/tmp/{0}.yaml".format(name) with salt.utils.fopen(filename, 'w') as f: f.write(request) create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.2) res = k8s.get_resource_quotas(namespace, name, apiserver_url="http://127.0.0.1:8080") a = res.get("metadata", {}).get("name", "a") proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = kubectl_out.get("metadata", {}).get("name", "b") self.assertEqual(a, b) def test_create_resource_quota(self): name = self.name namespace = self.name create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE) quota = { "cpu": "20", "memory": "1Gi" } res = k8s.create_resource_quota(namespace, quota, name=name, apiserver_url="http://127.0.0.1:8080") proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) self.assertTrue(isinstance(kubectl_out, dict)) def test_update_resource_quota(self): name = self.name namespace = self.name create_namespace = Popen(["kubectl", "create", "namespace", namespace], stdout=PIPE) request = """ apiVersion: v1 kind: ResourceQuota metadata: name: {0} spec: hard: cpu: "20" memory: 1Gi persistentvolumeclaims: "10" pods: "10" replicationcontrollers: "20" resourcequotas: "1" secrets: "10" services: "5" """.format(name) filename = "/tmp/{0}.yaml".format(name) with salt.utils.fopen(filename, 'w') as f: f.write(request) create = Popen(["kubectl", "--namespace={0}".format(namespace), "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.2) quota = { "cpu": "10", "memory": "2Gi" } res = k8s.create_resource_quota(namespace, quota, name=name, apiserver_url="http://127.0.0.1:8080", update=True) proc = Popen(["kubectl", "--namespace={0}".format(namespace), "get", "quota", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) limit = kubectl_out.get("spec").get("hard").get("memory") self.assertEqual("2Gi", limit) @skip_if_binaries_missing(['kubectl']) class TestK8SLimitRange(TestCase): maxDiff = None def setUp(self): hash = hashlib.sha1() hash.update(str(time.time())) self.name = hash.hexdigest()[:16] def test_create_limit_range(self): name = self.name limits = { "Container": { "defaultRequest": { "cpu": "100m" } } } res = k8s.create_limit_range("default", limits, name=name, apiserver_url="http://127.0.0.1:8080") proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) self.assertTrue(isinstance(kubectl_out, dict)) def test_update_limit_range(self): name = self.name request = """ apiVersion: v1 kind: LimitRange metadata: name: {0} spec: limits: - default: cpu: 200m memory: 512Mi defaultRequest: cpu: 100m memory: 256Mi type: Container """.format(name) limits = { "Container": { "defaultRequest": { "cpu": "100m" } } } filename = "/tmp/{0}.yaml".format(name) with salt.utils.fopen(filename, 'w') as f: f.write(request) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.1) res = k8s.create_limit_range("default", limits, name=name, apiserver_url="http://127.0.0.1:8080", update=True) proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) limit = kubectl_out.get("spec").get("limits")[0].get("defaultRequest").get("cpu") self.assertEqual("100m", limit) def test_get_limit_ranges(self): res = k8s.get_limit_ranges("default", apiserver_url="http://127.0.0.1:8080") a = len(res.get("items", [])) proc = Popen(["kubectl", "--namespace=default", "get", "limits", "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = len(kubectl_out.get("items", [])) self.assertEqual(a, b) def test_get_one_limit_range(self): name = self.name request = """ apiVersion: v1 kind: LimitRange metadata: name: {0} spec: limits: - default: cpu: 200m memory: 512Mi defaultRequest: cpu: 100m memory: 256Mi type: Container """.format(name) filename = "/tmp/{0}.yaml".format(name) with salt.utils.fopen(filename, 'w') as f: f.write(request) create = Popen(["kubectl", "--namespace=default", "create", "-f", filename], stdout=PIPE) # wee need to give kubernetes time save data in etcd time.sleep(0.1) res = k8s.get_limit_ranges("default", name, apiserver_url="http://127.0.0.1:8080") a = res.get("metadata", {}).get("name", "a") proc = Popen(["kubectl", "--namespace=default", "get", "limits", name, "-o", "json"], stdout=PIPE) kubectl_out = json.loads(proc.communicate()[0]) b = kubectl_out.get("metadata", {}).get("name", "b") self.assertEqual(a, b)