salt/tests/unit/utils/cache_test.py

103 lines
2.8 KiB
Python
Raw Normal View History

2014-03-06 02:12:20 +00:00
# -*- coding: utf-8 -*-
'''
tests.unit.utils.cache_test
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Test the salt cache objects
'''
# Import python libs
from __future__ import absolute_import
2016-11-08 05:49:10 +00:00
import os
import time
2016-11-08 05:49:10 +00:00
import tempfile
import shutil
2014-03-06 02:12:20 +00:00
# Import Salt Testing libs
from salttesting import TestCase
from salttesting.helpers import ensure_in_syspath
ensure_in_syspath('../../')
# Import salt libs
2016-11-08 05:49:10 +00:00
import salt.config
import salt.loader
2014-03-06 02:12:20 +00:00
from salt.utils import cache
class CacheDictTestCase(TestCase):
def test_sanity(self):
'''
Make sure you can instantiate etc.
'''
cd = cache.CacheDict(5)
self.assertIsInstance(cd, cache.CacheDict)
2014-03-06 02:12:20 +00:00
# do some tests to make sure it looks like a dict
self.assertNotIn('foo', cd)
2014-03-06 02:12:20 +00:00
cd['foo'] = 'bar'
self.assertEqual(cd['foo'], 'bar')
2014-03-06 02:12:20 +00:00
del cd['foo']
self.assertNotIn('foo', cd)
2014-03-06 02:12:20 +00:00
def test_ttl(self):
cd = cache.CacheDict(0.1)
cd['foo'] = 'bar'
self.assertIn('foo', cd)
self.assertEqual(cd['foo'], 'bar')
time.sleep(0.2)
self.assertNotIn('foo', cd)
2014-03-06 02:12:20 +00:00
# make sure that a get would get a regular old key error
self.assertRaises(KeyError, cd.__getitem__, 'foo')
2016-11-08 06:40:48 +00:00
2016-11-08 05:49:10 +00:00
class CacheContextTestCase(TestCase):
def setUp(self):
2016-11-08 06:40:48 +00:00
context_dir = os.path.join(tempfile.gettempdir(), 'context')
2016-11-08 05:49:10 +00:00
if os.path.exists(context_dir):
shutil.rmtree(os.path.join(tempfile.gettempdir(), 'context'))
def test_smoke_context(self):
'''
Smoke test the context cache
'''
if os.path.exists(os.path.join(tempfile.gettempdir(), 'context')):
2016-11-08 06:40:48 +00:00
self.skipTest('Context dir already exists')
2016-11-08 05:49:10 +00:00
else:
opts = salt.config.DEFAULT_MINION_OPTS
opts['cachedir'] = tempfile.gettempdir()
context_cache = cache.ContextCache(opts, 'cache_test')
context_cache.cache_context({'a': 'b'})
ret = context_cache.get_cache_context()
self.assertDictEqual({'a': 'b'}, ret)
def test_context_wrapper(self):
'''
Test to ensure that a module which decorates itself
with a context cache can store and retreive its contextual
data
'''
opts = salt.config.DEFAULT_MINION_OPTS
opts['cachedir'] = tempfile.gettempdir()
ll_ = salt.loader.LazyLoader(
[os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cache_mods')],
tag='rawmodule',
virtual_enable=False,
opts=opts)
cache_test_func = ll_['cache_mod.test_context_module']
self.assertEqual(cache_test_func()['called'], 0)
self.assertEqual(cache_test_func()['called'], 1)
2014-03-06 02:12:20 +00:00
if __name__ == '__main__':
from integration import run_tests
run_tests(CacheDictTestCase, needs_daemon=False)