From ff404f45a8cbebd065f51df09906293c26f9e066 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Sun, 10 May 2015 14:02:58 +0300 Subject: [PATCH] new async api, returns reference to operation --- Makefile | 245 ++++++++++++++++++++++++++++++++----- README.md | 8 ++ priv/cache.benchmark | 1 - src/cache.app.src | 2 +- src/cache.erl | 208 ++++++++++++++++++-------------- src/cache.hrl | 2 +- src/cache_bucket.erl | 281 ++++++++++++++++--------------------------- test/cache_tests.erl | 190 ++--------------------------- 8 files changed, 451 insertions(+), 486 deletions(-) diff --git a/Makefile b/Makefile index 968273e..cef73ee 100644 --- a/Makefile +++ b/Makefile @@ -1,52 +1,237 @@ -.PHONY: deps test +## @author Dmitry Kolesnikov, +## @copyright (c) 2012 - 2014 Dmitry Kolesnikov. All Rights Reserved +## +## @description +## Makefile to build and release Erlang applications using +## rebar, reltool, etc (see README for details) +## +## application version schema (based on semantic version) +## ${APP}-${VSN}+${GIT}.${ARCH}.${PLAT} +## +## @version 0.8.2 +.PHONY: test rel deps all pkg -ifeq ($(id),) -export id=cache +##################################################################### +## +## application config +## +##################################################################### +ROOT = `pwd` +PREFIX ?= /usr/local +APP ?= $(notdir $(CURDIR)) +ARCH?= $(shell uname -m) +PLAT?= $(shell uname -s) +HEAD?= $(shell git rev-parse --short HEAD) +TAG = ${HEAD}.${ARCH}.${PLAT} +TEST?= ${APP} +S3 = +GIT ?= +VMI = +NET ?= lo0 +USER = +PASS = + +## root path to benchmark framework +BB = ../basho_bench +SSHENV = /tmp/ssh-agent.conf +ADDR = $(shell ifconfig ${NET} | sed -En 's/.*inet (addr:)?(([0-9]*\.){3}[0-9]*).*/\2/p') +BRANCH = $(shell git symbolic-ref --short -q HEAD) + +## erlang flags (make run only) +EFLAGS = \ + -name ${APP}@${ADDR} \ + -setcookie nocookie \ + -pa ${ROOT}/ebin \ + -pa ${ROOT}/deps/*/ebin \ + -pa ${ROOT}/apps/*/ebin \ + -pa rel/files \ + -kernel inet_dist_listen_min 32100 \ + -kernel inet_dist_listen_max 32199 \ + +P 1000000 \ + +K true +A 160 -sbt ts + +##################################################################### +## +## internal config +## +##################################################################### +ifeq ($(wildcard rel/reltool.config),) + REL = + VSN = + TAR = + PKG = +else + IID = $(shell cat rel/reltool.config | sed -n 's/{target_dir,.*\"\([^-]*\).*\"}./\1/p') + REL = $(shell cat rel/reltool.config | sed -n 's/{target_dir,.*\"\(.*\)\"}./\1/p') + VSN = $(shell echo ${REL} | sed -n 's/.*-\(.*\)/\1/p') +ifeq (${VSN},) + VSN = $(shell cat rel/reltool.config | sed -n 's/.*{rel,.*\".*\",.*\"\(.*\)\".*/\1/p') +endif +ifeq (${config},) + RFLAGS = + VARIANT = +else + VARIANT = $(addprefix ., $(notdir $(basename ${config}))) + RFLAGS = target_dir=${REL}${VARIANT} overlay_vars=${ROOT}/${config} +endif +ifeq (${VSN},) + TAR = ${IID}${VARIANT}+${TAG}.tgz + PKG = ${IID}${VARIANT}+${TAG}.bundle +else + TAR = ${IID}-${VSN}${VARIANT}+${TAG}.tgz + PKG = ${IID}-${VSN}${VARIANT}+${TAG}.bundle +endif endif -FLAGS=\ - -name ${id}@127.0.0.1 \ - -setcookie nocookie \ - -pa ./deps/*/ebin \ - -pa ./examples/*/ebin \ - -pa ./ebin \ - +K true +A 160 -sbt ts - -BB=../basho_bench +## self-extracting bundle wrapper +BUNDLE_INIT = PREFIX=${PREFIX}\nREL=${PREFIX}/${REL}${VARIANT}\nAPP=${APP}\nVSN=${VSN}\nLINE=\`grep -a -n 'BUNDLE:\x24' \x240\`\ntail -n +\x24(( \x24{LINE\x25\x25:*} + 1)) \x240 | gzip -vdc - | tar -C ${PREFIX} -xvf - > /dev/null\n +BUNDLE_FREE = exit\nBUNDLE:\n +BUILDER = cd /tmp && git clone -b ${BRANCH} ${GIT}/${APP} && cd /tmp/${APP} && make && make rel && sleep 300 +##################################################################### +## +## build +## +##################################################################### all: rebar deps compile compile: - ./rebar compile + @./rebar compile deps: - ./rebar get-deps + @./rebar get-deps clean: - ./rebar clean - rm -rf test.*-temp-data + @./rebar clean ; \ + rm -rf test.*-temp-data ; \ + rm -rf tests ; \ + rm -rf log ; \ + rm -f *.tgz ; \ + rm -f *.bundle -distclean: clean - ./rebar delete-deps -test: all - ./rebar skip_deps=true eunit +distclean: clean + @./rebar delete-deps + +unit: all + @./rebar skip_deps=true eunit + +test: + @erl ${EFLAGS} -run deb test test/${TEST}.config docs: - ./rebar skip_deps=true doc + @./rebar skip_deps=true doc -dialyzer: compile - @dialyzer -Wno_return -c ./ebin +##################################################################### +## +## release +## +##################################################################### +ifneq (${REL},) +rel: ${TAR} + +## assemble VM release +ifeq (${PLAT},$(shell uname -s)) +${TAR}: + @./rebar generate ${RFLAGS}; \ + cd rel ; tar -zcf ${TAR} ${REL}${VARIANT}/ ; mv ${TAR} ../${TAR} ; cd - ;\ + echo "==> tarball: ${TAR}" + +else +ifneq (${VMI},) +${TAR}: + @echo "==> docker run ${VMI}" ;\ + K=`test ${PASS} && cat ${PASS}` ;\ + A=`test ${USER} && echo "mkdir -p /root/.ssh && echo \"$$K\" > /root/.ssh/id_rsa && chmod 0700 /root/.ssh/id_rsa && echo -e \"Host *\n\tUser ${USER}\n\tStrictHostKeyChecking no\n\" > /root/.ssh/config &&"` ;\ + I=`docker run -d -a stdout -a stderr ${VMI} /bin/sh -c "$$A ${BUILDER}"` ;\ + (docker attach $$I &) ;\ + docker cp $$I:/tmp/${APP}/${TAR} . 1> /dev/null 2>&1 ;\ + while [ $$? -ne 0 ] ;\ + do \ + sleep 10 ;\ + docker cp $$I:/tmp/${APP}/${TAR} . 1> /dev/null 2>&1 ;\ + done ;\ + docker kill $$I ;\ + docker rm $$I + +endif +endif + +## package VM release to executable bundle +pkg: rel/deploy.sh ${TAR} + @printf "${BUNDLE_INIT}" > ${PKG} ; \ + cat rel/deploy.sh >> ${PKG} ; \ + printf "${BUNDLE_FREE}" >> ${PKG} ; \ + cat ${TAR} >> ${PKG} ; \ + chmod ugo+x ${PKG} ; \ + echo "==> bundle: ${PKG}" + +## copy 'package' to s3 +## copy 'package' to s3 +s3: ${PKG} + aws s3 cp ${PKG} ${S3}/${APP}+${TAG}${VARIANT}.bundle + +s3-latest: ${PKG} + aws s3 cp ${PKG} ${S3}/${APP}+latest${VARIANT}.bundle +endif + +##################################################################### +## +## deploy +## +##################################################################### +ifneq (${host},) +${SSHENV}: + @echo "==> ssh: config keys" ;\ + ssh-agent -s > ${SSHENV} + +node: ${SSHENV} + @echo "==> deploy: ${host}" ;\ + . ${SSHENV} ;\ + k=`basename ${pass}` ;\ + l=`ssh-add -l | grep $$k` ;\ + if [ -z "$$l" ] ; then \ + ssh-add ${pass} ;\ + fi ;\ + rsync -cav --rsh=ssh --progress ${PKG} ${host}:${PKG} ;\ + ssh -t ${host} "sudo sh ./${PKG}" + +endif + +##################################################################### +## +## debug +## +##################################################################### run: - erl ${FLAGS} - -rebar: - curl -O https://cloud.github.com/downloads/basho/rebar/rebar - chmod ugo+x rebar + @erl ${EFLAGS} benchmark: - $(BB)/basho_bench -N bb@127.0.0.0 -C nocookie priv/${id}.benchmark - $(BB)/priv/summary.r -i tests/current + @echo "==> benchmark: ${TEST}" ;\ + $(BB)/basho_bench -N bb@127.0.0.1 -C nocookie priv/${TEST}.benchmark ;\ + $(BB)/priv/summary.r -i tests/current ;\ open tests/current/summary.png +ifneq (${REL},) +start: + @./rel/${REL}${VARIANT}/bin/${APP} start + +stop: + @./rel/${REL}${VARIANT}/bin/${APP} stop + +console: + @./rel/${REL}${VARIANT}/bin/${APP} console + +attach: + @./rel/${REL}${VARIANT}/bin/${APP} attach +endif + +##################################################################### +## +## dependencies +## +##################################################################### +rebar: + @curl -L -O https://github.com/rebar/rebar/wiki/rebar ; \ + chmod ugo+x rebar diff --git a/README.md b/README.md index 06e8c7b..21b69fb 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,10 @@ The write operation always uses youngest segment. The read operation lookup key The downside is inability to assign precise TTL per single cache entry. TTL is always approximated to nearest segment. (e.g. cache with 60 sec TTL and 10 segments has 6 sec accuracy on TTL) +## Change log + * 2.0.0 - various changes on asynchronous api, not compatible with version 1.x + * 1.0.1 - production release ## Usage @@ -16,8 +19,13 @@ The downside is inability to assign precise TTL per single cache entry. TTL is a application:start(cache). {ok, _} = cache:start_link(my_cache, [{n, 10}, {ttl, 60}]). + %% synchronous i/o ok = cache:put(my_cache, <<"my key">>, <<"my value">>). Val = cache:get(my_cache, <<"my key">>). + + %% asynchronous i/o + Ref = cache:get_(my_cache, <<"my key">>). + receive {Ref, Val} -> Val end. ``` ### configuration via Erlang `sys.config` diff --git a/priv/cache.benchmark b/priv/cache.benchmark index 7745006..627146d 100644 --- a/priv/cache.benchmark +++ b/priv/cache.benchmark @@ -16,7 +16,6 @@ ,{get, 5} ]}. -%{cache, {mycache, 'cache@127.0.0.1'}}. {local, [ {ttl, 20} ,{n, 10} diff --git a/src/cache.app.src b/src/cache.app.src index a082bfc..c4576cb 100644 --- a/src/cache.app.src +++ b/src/cache.app.src @@ -1,7 +1,7 @@ {application, cache, [ {description, "in-memory cache"}, - {vsn, "1.0.1"}, + {vsn, "2.0.0"}, {modules, []}, {registered, []}, {applications,[ diff --git a/src/cache.erl b/src/cache.erl index daa82e6..8626ad0 100644 --- a/src/cache.erl +++ b/src/cache.erl @@ -31,7 +31,7 @@ -include("cache.hrl"). --export([start/0]). +%% cache management interface -export([ start_link/1, start_link/2, @@ -39,20 +39,27 @@ purge/1, i/1, i/2, - heap/2, + heap/2 +]). +%% basic cache i/o interface +-export([ put/3, put/4, put_/3, put_/4, get/2, + get_/2, lookup/2, + lookup_/2, has/2, ttl/2, remove/2, - remove_/2, + remove_/2 +]). +%% extended cache i/o interface +-export([ acc/3, acc_/3, - % memecached like interface set/3, set/4, set_/3, @@ -72,21 +79,26 @@ delete/2, delete_/2 ]). +-export([start/0]). -export_type([cache/0]). --type(cache() :: atom() | {atom(), atom()} | {global, atom()} | pid()). --type(name() :: atom() | {global, atom()}). +-type(cache() :: atom() | pid()). -type(key() :: any()). --type(entity() :: any()). +-type(val() :: any()). -type(ttl() :: integer()). %% %% RnD start application start() -> - application:start(pq), application:start(cache). +%%%---------------------------------------------------------------------------- +%%% +%%% cache management interface +%%% +%%%---------------------------------------------------------------------------- + %% %% start new cache bucket, accepted options: %% {type, set | ordered_set} - cache segment type (default set) @@ -99,7 +111,7 @@ start() -> %% {stats, function() | {Mod, Fun}} - cache statistic aggregate functor %% {heir, atom() | pid()} - heir of evicted cache segments -spec(start_link/1 :: (list()) -> {ok, pid()} | {error, any()}). --spec(start_link/2 :: (name(), list()) -> {ok, pid()} | {error, any()}). +-spec(start_link/2 :: (atom(), list()) -> {ok, pid()} | {error, any()}). start_link(Opts) -> cache_bucket:start_link(Opts). @@ -111,41 +123,21 @@ start_link(Cache, Opts) -> %% drop cache -spec(drop/1 :: (cache()) -> ok). -drop(undefined) -> - ok; -drop({global, Cache}) -> - drop(global:whereis_name(Cache)); -drop({Cache, Node}) -> - drop(rpc:call(Node, cache, drop, [Cache])); -drop(Cache) - when is_atom(Cache) -> - drop(whereis(Cache)); -drop(Cache) - when is_pid(Cache) -> +drop(Cache) -> gen_server:call(Cache, drop). %% %% purge cache -spec(purge/1 :: (cache()) -> ok). -purge(undefined) -> - ok; -purge({global, Cache}) -> - purge(global:whereis_name(Cache)); -purge({Cache, Node}) -> - purge(rpc:call(Node, cache, purge, [Cache])); -purge(Cache) - when is_atom(Cache) -> - purge(whereis(Cache)); -purge(Cache) - when is_pid(Cache) -> +purge(Cache) -> gen_server:call(Cache, purge). %% %% return cache meta data -%% {heap, [integer()]} - cache segments references -%% {expire, [integer()]} - cache segments expire times +%% {heap, [integer()]} - references to cache segments +%% {expire, [integer()]} - cache segments expiration times %% {size, [integer()]} - cardinality of cache segments %% {memory, [integer()]} - memory occupied by each cache segment -spec(i/1 :: (cache()) -> list()). @@ -165,94 +157,114 @@ heap(Cache, N) -> gen_server:call(Cache, {heap, N}). +%%%---------------------------------------------------------------------------- +%%% +%%% basic cache i/o interface +%%% +%%%---------------------------------------------------------------------------- + %% %% synchronous cache put --spec(put/3 :: (cache(), key(), entity()) -> ok). --spec(put/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(put/3 :: (cache(), key(), val()) -> ok). +-spec(put/4 :: (cache(), key(), val(), ttl()) -> ok). put(Cache, Key, Val) -> - gen_server:call(Cache, {put, Key, Val}, ?DEF_CACHE_TIMEOUT). + cache:put(Cache, Key, Val, undefined). put(Cache, Key, Val, TTL) -> - gen_server:call(Cache, {put, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT). + call(Cache, {put, Key, Val, TTL}). %% %% asynchronous cache put --spec(put_/3 :: (cache(), key(), entity()) -> ok). --spec(put_/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(put_/3 :: (cache(), key(), val()) -> reference()). +-spec(put_/4 :: (cache(), key(), val(), ttl()) -> reference()). put_(Cache, Key, Val) -> - gen_server:cast(Cache, {put, Key, Val}). + cache:put_(Cache, Key, Val, undefined). put_(Cache, Key, Val, TTL) -> - gen_server:cast(Cache, {put, Key, Val, TTL}). + cast(Cache, {put, Key, Val, TTL}). %% -%% synchronous get cache entry, the operation prolongs entry ttl --spec(get/2 :: (cache(), key()) -> entity() | undefined). +%% synchronous cache get, the operation prolongs value ttl +-spec(get/2 :: (cache(), key()) -> val() | undefined). get(Cache, Key) -> - gen_server:call(Cache, {get, Key}, ?DEF_CACHE_TIMEOUT). + call(Cache, {get, Key}). %% -%% synchronous lookup cache entry, the operation do not prolong entry ttl --spec(lookup/2 :: (cache(), key()) -> entity() | undefined). +%% asynchronous cache get, the operation prolongs value ttl +-spec(get_/2 :: (cache(), key()) -> reference()). + +get_(Cache, Key) -> + cast(Cache, {get, Key}). + +%% +%% synchronous cache lookup, the operation do not prolong entry ttl +-spec(lookup/2 :: (cache(), key()) -> val() | undefined). lookup(Cache, Key) -> - gen_server:call(Cache, {lookup, Key}, ?DEF_CACHE_TIMEOUT). + call(Cache, {lookup, Key}). + +%% +%% asynchronous cache lookup, the operation do not prolong entry ttl +-spec(lookup_/2 :: (cache(), key()) -> reference()). + +lookup_(Cache, Key) -> + cast(Cache, {lookup, Key}). %% %% check if cache key exists, -spec(has/2 :: (cache(), key()) -> true | false). has(Cache, Key) -> - gen_server:call(Cache, {has, Key}, ?DEF_CACHE_TIMEOUT). + call(Cache, {has, Key}). %% %% check entity at cache and return estimated ttl -spec(ttl/2 :: (cache(), key()) -> ttl() | false). ttl(Cache, Key) -> - gen_server:call(Cache, {ttl, Key}, ?DEF_CACHE_TIMEOUT). + call(Cache, {ttl, Key}). %% %% synchronous remove entry from cache -spec(remove/2 :: (cache(), key()) -> ok). remove(Cache, Key) -> - gen_server:call(Cache, {remove, Key}, ?DEF_CACHE_TIMEOUT). + call(Cache, {remove, Key}). %% %% asynchronous remove entry from cache -spec(remove_/2 :: (cache(), key()) -> ok). remove_(Cache, Key) -> - gen_server:cast(Cache, {remove, Key}). + cast(Cache, {remove, Key}). + +%%%---------------------------------------------------------------------------- +%%% +%%% extended cache i/o interface +%%% +%%%---------------------------------------------------------------------------- %% %% synchronous in-cache accumulator -spec(acc/3 :: (cache(), key(), integer() | [{integer(), integer()}]) -> integer() | undefined). acc(Cache, Key, Val) -> - gen_server:call(Cache, {acc, Key, Val}, ?DEF_CACHE_TIMEOUT). + call(Cache, {acc, Key, Val}). %% %% asynchronous in-cache accumulator -spec(acc_/3 :: (cache(), key(), integer() | {integer(), integer()}) -> ok). acc_(Cache, Key, Val) -> - gen_server:cast(Cache, {acc, Key, Val}). - -%%%---------------------------------------------------------------------------- -%%% -%%% memcached-like interface -%%% -%%%---------------------------------------------------------------------------- + cast(Cache, {acc, Key, Val}). %% %% synchronous store key/val --spec(set/3 :: (cache(), key(), entity()) -> ok). --spec(set/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(set/3 :: (cache(), key(), val()) -> ok). +-spec(set/4 :: (cache(), key(), val(), ttl()) -> ok). set(Cache, Key, Val) -> cache:put(Cache, Key, Val). @@ -262,8 +274,8 @@ set(Cache, Key, Val, TTL) -> %% %% asynchronous store key/val --spec(set_/3 :: (cache(), key(), entity()) -> ok). --spec(set_/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(set_/3 :: (cache(), key(), val()) -> ok). +-spec(set_/4 :: (cache(), key(), val(), ttl()) -> ok). set_(Cache, Key, Val) -> cache:put_(Cache, Key, Val). @@ -271,84 +283,79 @@ set_(Cache, Key, Val) -> set_(Cache, Key, Val, TTL) -> cache:put_(Cache, Key, Val, TTL). - %% %% synchronous store key/val only if cache does not already hold data for this key --spec(add/3 :: (cache(), key(), entity()) -> ok | conflict). --spec(add/4 :: (cache(), key(), entity(), ttl()) -> ok | conflict). +-spec(add/3 :: (cache(), key(), val()) -> ok | {error, conflict}). +-spec(add/4 :: (cache(), key(), val(), ttl()) -> ok | {error, conflict}). add(Cache, Key, Val) -> - gen_server:call(Cache, {add, Key, Val}, ?DEF_CACHE_TIMEOUT). + cache:add(Cache, Key, Val, undefined). add(Cache, Key, Val, TTL) -> - gen_server:call(Cache, {add, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT). + call(Cache, {add, Key, Val, TTL}). %% %% asynchronous store key/val only if cache does not already hold data for this key --spec(add_/3 :: (cache(), key(), entity()) -> ok). --spec(add_/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(add_/3 :: (cache(), key(), val()) -> reference()). +-spec(add_/4 :: (cache(), key(), val(), ttl()) -> reference()). add_(Cache, Key, Val) -> - gen_server:cast(Cache, {add, Key, Val}). + cache:add_(Cache, Key, Val, undefined). add_(Cache, Key, Val, TTL) -> - gen_server:cast(Cache, {add, Key, Val, TTL}). - + cast(Cache, {add, Key, Val, TTL}). %% %% synchronous store key/val only if cache does hold data for this key --spec(replace/3 :: (cache(), key(), entity()) -> ok | not_found). --spec(replace/4 :: (cache(), key(), entity(), ttl()) -> ok | not_found). +-spec(replace/3 :: (cache(), key(), val()) -> ok | {error, not_found}). +-spec(replace/4 :: (cache(), key(), val(), ttl()) -> ok | {error, not_found}). replace(Cache, Key, Val) -> - gen_server:call(Cache, {replace, Key, Val}, ?DEF_CACHE_TIMEOUT). + cache:replace(Cache, Key, Val, undefined). replace(Cache, Key, Val, TTL) -> - gen_server:call(Cache, {replace, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT). - + call(Cache, {replace, Key, Val, TTL}). %% %% asynchronous store key/val only if cache does hold data for this key --spec(replace_/3 :: (cache(), key(), entity()) -> ok). --spec(replace_/4 :: (cache(), key(), entity(), ttl()) -> ok). +-spec(replace_/3 :: (cache(), key(), val()) -> reference()). +-spec(replace_/4 :: (cache(), key(), val(), ttl()) -> reference()). replace_(Cache, Key, Val) -> - gen_server:cast(Cache, {replace, Key, Val}). + cache:replace_(Cache, Key, Val). replace_(Cache, Key, Val, TTL) -> - gen_server:cast(Cache, {replace, Key, Val, TTL}). - + cast(Cache, {replace, Key, Val, TTL}). %% %% synchronously add data to existing key after existing data, the operation do not prolong entry ttl --spec(append/3 :: (cache(), key(), entity()) -> ok | not_found). +-spec(append/3 :: (cache(), key(), val()) -> ok | {error, not_found}). append(Cache, Key, Val) -> - gen_server:call(Cache, {append, Key, Val}, ?DEF_CACHE_TIMEOUT). + call(Cache, {append, Key, Val}). %% %% asynchronously add data to existing key after existing data, the operation do not prolong entry ttl --spec(append_/3 :: (cache(), key(), entity()) -> ok). +-spec(append_/3 :: (cache(), key(), val()) -> reference()). append_(Cache, Key, Val) -> - gen_server:cast(Cache, {append, Key, Val}). + cast(Cache, {append, Key, Val}). %% %% synchronously add data to existing key before existing data --spec(prepend/3 :: (cache(), key(), entity()) -> ok | not_found). +-spec(prepend/3 :: (cache(), key(), val()) -> ok | {error, not_found}). prepend(Cache, Key, Val) -> - gen_server:call(Cache, {prepend, Key, Val}, ?DEF_CACHE_TIMEOUT). + call(Cache, {prepend, Key, Val}). %% %% asynchronously add data to existing key before existing data --spec(prepend_/3 :: (cache(), key(), entity()) -> ok). +-spec(prepend_/3 :: (cache(), key(), val()) -> reference()). prepend_(Cache, Key, Val) -> gen_server:cast(Cache, {prepend, Key, Val}). - %% %% synchronous remove entry from cache -spec(delete/2 :: (cache(), key()) -> ok). @@ -363,3 +370,22 @@ delete(Cache, Key) -> delete_(Cache, Key) -> cache:remove_(Cache, Key). + +%%%---------------------------------------------------------------------------- +%%% +%%% private +%%% +%%%---------------------------------------------------------------------------- + +%% +%% synchronous call to server +call(Pid, Req) -> + gen_server:call(Pid, Req, ?CONFIG_TIMEOUT). + +%% +%% asynchronous call +cast(Pid, Req) -> + Ref = erlang:make_ref(), + erlang:send(Pid, {'$gen_call', {self(), Ref}, Req}, [noconnect]), + Ref. + diff --git a/src/cache.hrl b/src/cache.hrl index 5fb6fd1..efaa529 100644 --- a/src/cache.hrl +++ b/src/cache.hrl @@ -33,4 +33,4 @@ -define(DEF_CACHE_CHECK, 20000). %% default cache i/o timeout --define(DEF_CACHE_TIMEOUT, 60000). +-define(CONFIG_TIMEOUT, 30000). diff --git a/src/cache_bucket.erl b/src/cache_bucket.erl index a13b60a..dcf3872 100644 --- a/src/cache_bucket.erl +++ b/src/cache_bucket.erl @@ -103,83 +103,64 @@ terminate(_Reason, State) -> %%% %%%---------------------------------------------------------------------------- -handle_call({put, Key, Val}, _, S) -> - {reply, ok, cache_put(Key, Val, S)}; +handle_call({put, Key, Val, TTL}, _, State) -> + {reply, ok, cache_put(Key, Val, TTL, State)}; -handle_call({put, Key, Val, TTL}, _, S) -> - {reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)}; +handle_call({get, Key}, _, State) -> + {reply, cache_get(Key, State), State}; -handle_call({get, Key}, _, S) -> - {reply, cache_get(Key, S), S}; +handle_call({lookup, Key}, _, State) -> + {reply, cache_lookup(Key, State), State}; -handle_call({lookup, Key}, _, S) -> - {reply, cache_lookup(Key, S), S}; +handle_call({has, Key}, _, State) -> + {reply, cache_has(Key, State), State}; -handle_call({has, Key}, _, S) -> - {reply, cache_has(Key, S), S}; +handle_call({ttl, Key}, _, State) -> + {reply, cache_ttl(Key, State), State}; -handle_call({ttl, Key}, _, S) -> - {reply, cache_ttl(Key, S), S}; +handle_call({remove, Key}, _, State) -> + {reply, ok, cache_remove(Key, State)}; -handle_call({remove, Key}, _, S) -> - {reply, ok, cache_remove(Key, S)}; +handle_call({acc, Key, Val}, _, State0) -> + {Result, State1} = cache_acc(Key, Val, State0), + {reply, Result, State1}; -handle_call({acc, Key, Val}, _, S) -> - {Reply, NS} = cache_acc(Key, Val, S), - {reply, Reply, NS}; - -handle_call({add, Key, Val}, _, S) -> - case cache_has(Key, S) of +handle_call({add, Key, Val, TTL}, _, State) -> + case cache_has(Key, State) of true -> - {reply, conflict, S}; + {reply, {error, conflict}, State}; false -> - {reply, ok, cache_put(Key, Val, S)} + {reply, ok, cache_put(Key, Val, TTL, State)} end; -handle_call({add, Key, Val, TTL}, _, S) -> - case cache_has(Key, S) of +handle_call({replace, Key, Val, TTL}, _, State) -> + case cache_has(Key, State) of true -> - {reply, conflict, S}; + {reply, ok, cache_put(Key, Val, TTL, State)}; false -> - {reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)} + {reply, {error, not_found}, State} end; -handle_call({replace, Key, Val}, _, S) -> - case cache_has(Key, S) of - true -> - {reply, ok, cache_put(Key, Val, S)}; - false -> - {reply, not_found, S} - end; - -handle_call({replace, Key, Val, TTL}, _, S) -> - case cache_has(Key, S) of - true -> - {reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)}; - false -> - {reply, not_found, S} - end; - -handle_call({prepend, Key, Val}, _, S) -> +handle_call({prepend, Key, Val}, _, State) -> % @todo: reduce one write - case cache_get(Key, S) of + case cache_get(Key, State) of undefined -> - {reply, ok, cache_put(Key, [Val], S)}; + {reply, ok, cache_put(Key, [Val], undefined, State)}; X when is_list(X) -> - {reply, ok, cache_put(Key, [Val|X], S)}; + {reply, ok, cache_put(Key, [Val|X], undefined, State)}; X -> - {reply, ok, cache_put(Key, [Val,X], S)} + {reply, ok, cache_put(Key, [Val,X], undefined, State)} end; -handle_call({append, Key, Val}, _, S) -> +handle_call({append, Key, Val}, _, State) -> % @todo: reduce one write - case cache_get(Key, S) of + case cache_get(Key, State) of undefined -> - {reply, ok, cache_put(Key, [Val], S)}; + {reply, ok, cache_put(Key, [Val], undefined, State)}; X when is_list(X) -> - {reply, ok, cache_put(Key, X++[Val], S)}; + {reply, ok, cache_put(Key, X++[Val], undefined, State)}; X -> - {reply, ok, cache_put(Key, [X, Val], S)} + {reply, ok, cache_put(Key, [X, Val], undefined, State)} end; handle_call(i, _, State) -> @@ -209,73 +190,6 @@ handle_call(_, _, S) -> %% %% -handle_cast({put, Key, Val}, S) -> - {noreply, cache_put(Key, Val, S)}; - -handle_cast({put, Key, Val, TTL}, S) -> - {noreply, cache_put(Key, Val, cache_util:now() + TTL, S)}; - -handle_cast({remove, Key}, S) -> - {noreply, cache_remove(Key, S)}; - -handle_cast({acc, Key, Val}, S) -> - {_, NS} = cache_acc(Key, Val, S), - {noreply, NS}; - -handle_cast({add, Key, Val}, S) -> - case cache_has(Key, S) of - true -> - {noreply, S}; - false -> - {noreply, cache_put(Key, Val, S)} - end; - -handle_cast({add, Key, Val, TTL}, S) -> - case cache_has(Key, S) of - true -> - {noreply, S}; - false -> - {noreply, cache_put(Key, Val, cache_util:now() + TTL, S)} - end; - -handle_cast({replace, Key, Val}, S) -> - case cache_has(Key, S) of - true -> - {noreply, cache_put(Key, Val, S)}; - false -> - {noreply, S} - end; - -handle_cast({replace, Key, Val, TTL}, S) -> - case cache_has(Key, S) of - true -> - {noreply, cache_put(Key, Val, cache_util:now() + TTL, S)}; - false -> - {noreply, S} - end; - -handle_cast({prepend, Key, Val}, S) -> - % @todo: reduce one write - case cache_get(Key, S) of - undefined -> - {noreply, cache_put(Key, [Val], S)}; - X when is_list(X) -> - {noreply, cache_put(Key, [Val|X], S)}; - X -> - {noreply, cache_put(Key, [Val,X], S)} - end; - -handle_cast({append, Key, Val}, S) -> - % @todo: reduce one write - case cache_get(Key, S) of - undefined -> - {noreply, cache_put(Key, [Val], S)}; - X when is_list(X) -> - {noreply, cache_put(Key, X++[Val], S)}; - X -> - {noreply, cache_put(Key, [X, Val], S)} - end; - handle_cast(_, S) -> {noreply, S}. @@ -309,7 +223,6 @@ handle_info(_, S) -> code_change(_Vsn, S, _Extra) -> {ok, S}. - %%%---------------------------------------------------------------------------- %%% %%% private @@ -318,88 +231,83 @@ code_change(_Vsn, S, _Extra) -> %% %% insert value to cache -cache_put(Key, Val, #cache{heap=Heap}=State) -> +cache_put(Key, Val, undefined, #cache{name=_Name, heap=Heap}=State) -> {_, Head} = cache_heap:head(Heap), true = ets:insert(Head, {Key, Val}), - lists:foreach( - fun({_, X}) -> ets:delete(X, Key) end, - cache_heap:tail(Heap) - ), - cache_util:stats(State#cache.stats, {cache, State#cache.name, put}), - ?DEBUG("cache ~p: put ~p to heap ~p~n", [State#cache.name, Key, Head]), - State. + ok = heap_remove(Key, cache_heap:tail(Heap)), + _ = stats(put, State), + ?DEBUG("cache ~p: put ~p to heap ~p~n", [_Name, Key, Head]), + State; -cache_put(Key, Val, Expire, #cache{}=State) -> - Refs = cache_heap:refs(State#cache.heap), +cache_put(Key, Val, TTL, #cache{name=_Name, heap=Heap}=State) -> + Expire = cache_util:now() + TTL, + Refs = cache_heap:refs(Heap), case lists:splitwith(fun({X, _}) -> X > Expire end, Refs) of {[], _Tail} -> - cache_put(Key, Val, State); + cache_put(Key, Val, undefined, State); {Head, Tail} -> - [{_, Heap} | Rest] = lists:reverse(Head), - true = ets:insert(Heap, {Key, Val}), - lists:foreach( - fun({_, X}) -> ets:delete(X, Key) end, - Rest ++ Tail - ), - cache_util:stats(State#cache.stats, {cache, State#cache.name, put}), - ?DEBUG("cache ~p: put ~p to heap ~p~n", [State#cache.name, Key, Heap]), + [{_, Inst} | Rest] = lists:reverse(Head), + true = ets:insert(Inst, {Key, Val}), + ok = heap_remove(Key, Rest ++ Tail), + _ = stats(put, State), + ?DEBUG("cache ~p: put ~p to heap ~p~n", [_Name, Key, Inst]), State end. %% %% get cache value -cache_get(Key, #cache{policy=mru}=S) -> +cache_get(Key, #cache{policy=mru}=State) -> % cache MRU should not move key anywhere because % cache always evicts last generation % fall-back to cache lookup - cache_lookup(Key, S); + cache_lookup(Key, State); -cache_get(Key, #cache{}=S) -> - {_, Head} = cache_heap:head(S#cache.heap), - case heap_lookup(Key, cache_heap:refs(S#cache.heap)) of +cache_get(Key, #cache{name=_Name, heap=Heap}=State) -> + {_, Head} = cache_heap:head(Heap), + case heap_lookup(Key, cache_heap:refs(Heap)) of undefined -> - cache_util:stats(S#cache.stats, {cache, S#cache.name, miss}), + stats(miss, State), undefined; {Head, Val} -> - ?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, Head]), - cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}), + ?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, Head]), + stats(hit, State), Val; - {Heap, Val} -> + {Cell, Val} -> true = ets:insert(Head, {Key, Val}), - _ = ets:delete(Heap, Key), - ?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, Heap]), - cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}), + _ = ets:delete(Cell, Key), + ?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, Cell]), + stats(hit, State), Val end. %% %% lookup cache value -cache_lookup(Key, #cache{}=S) -> - case heap_lookup(Key, cache_heap:refs(S#cache.heap)) of +cache_lookup(Key, #cache{name=_Name, heap=Heap}=State) -> + case heap_lookup(Key, cache_heap:refs(Heap)) of undefined -> - cache_util:stats(S#cache.stats, {cache, S#cache.name, miss}), + stats(miss, State), undefined; - {_Heap, Val} -> - ?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, _Heap]), - cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}), + {_Cell, Val} -> + ?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, _Cell]), + stats(hit, State), Val end. %% %% check if key exists -cache_has(Key, #cache{}=S) -> - case heap_has(Key, cache_heap:refs(S#cache.heap)) of +cache_has(Key, #cache{name=_Name, heap=Heap}) -> + case heap_has(Key, cache_heap:refs(Heap)) of false -> false; _Heap -> - ?DEBUG("cache ~p: has ~p at cell ~p~n", [S#cache.name, Key, _Heap]), + ?DEBUG("cache ~p: has ~p at cell ~p~n", [_Name, Key, _Heap]), true end. %% %% check key ttl -cache_ttl(Key, #cache{}=S) -> - case heap_has(Key, cache_heap:refs(S#cache.heap)) of +cache_ttl(Key,#cache{heap=Heap}) -> + case heap_has(Key, cache_heap:refs(Heap)) of false -> undefined; {Expire, _} -> @@ -408,35 +316,32 @@ cache_ttl(Key, #cache{}=S) -> %% %% -cache_remove(Key, #cache{}=S) -> - lists:foreach( - fun({_, X}) -> ets:delete(X, Key) end, - cache_heap:refs(S#cache.heap) - ), - cache_util:stats(S#cache.stats, {cache, S#cache.name, remove}), - ?DEBUG("cache ~p: remove ~p~n", [S#cache.name, Key]), - S. +cache_remove(Key, #cache{name=_Name, heap=Heap}=State) -> + ok = heap_remove(Key, cache_heap:refs(Heap)), + _ = stats(remove, State), + ?DEBUG("cache ~p: remove ~p~n", [_Name, Key]), + State. %% %% @todo: reduce one write -cache_acc(Key, Val, S) +cache_acc(Key, Val, State) when is_integer(Val) -> - case cache_get(Key, S) of + case cache_get(Key, State) of undefined -> - {undefined, cache_put(Key, Val, S)}; + {undefined, cache_put(Key, Val, undefined, State)}; X when is_integer(X) -> - {X, cache_put(Key, X + Val, S)}; + {X, cache_put(Key, X + Val, undefined, State)}; X when is_tuple(X) -> - {erlang:element(1, X), cache_put(Key, tuple_acc({1, Val}, X), S)}; + {erlang:element(1, X), cache_put(Key, tuple_acc({1, Val}, X), undefined, State)}; _ -> - {badarg, S} + {badarg, State} end; -cache_acc(Key, Val, S) -> - case cache_get(Key, S) of +cache_acc(Key, Val, State) -> + case cache_get(Key, State) of X when is_tuple(X) -> - {X, cache_put(Key, tuple_acc(Val, X), S)}; + {X, cache_put(Key, tuple_acc(Val, X), undefined, State)}; _ -> - {badarg, S} + {badarg, State} end. tuple_acc({Pos, Val}, X) -> @@ -450,6 +355,15 @@ tuple_acc(List, X) -> List ). + +%% +%% remove key from heap segments +heap_remove(Key, Heap) -> + lists:foreach( + fun({_, Id}) -> ets:delete(Id, Key) end, + Heap + ). + %% %% heap_lookup(Key, [{_, Heap} | Tail]) -> @@ -472,3 +386,10 @@ heap_has(Key, [{_, Heap}=X | Tail]) -> heap_has(_Key, []) -> false. +%% +%% update statistic +stats(_, #cache{stats = undefined}) -> + ok; +stats(X, #cache{stats = Stats, name = Name}) -> + cache_util:stats(Stats, {cache, Name, X}). + diff --git a/test/cache_tests.erl b/test/cache_tests.erl index 27ae618..6ff92f7 100644 --- a/test/cache_tests.erl +++ b/test/cache_tests.erl @@ -44,22 +44,6 @@ cache_interface_test_() -> ] }. - -% lru_test_() -> -% { -% setup, -% fun cache_init/0, -% fun cache_free/1, -% [ -% {"put", fun cache_put/0} -% ,{"has", fun cache_has/0} -% ,{"get", fun cache_get/0} -% ,{"del", fun cache_del/0} -% ,{"acc", fun cache_acc/0} -% ,{"lifecycle 1", {timeout, 10000, fun cache_lc1/0}} -% ] -% }. - %%%---------------------------------------------------------------------------- %%% %%% factory @@ -103,8 +87,8 @@ put(Pid) -> [ ?_assertMatch(ok, cache:put(Pid, <<"key-1">>, <<"val-1">>)) ,?_assertMatch(ok, cache:put(Pid, <<"key-2">>, <<"val-2">>, 5)) - ,?_assertMatch(ok, cache:put_(Pid, <<"key-3">>, <<"val-3">>)) - ,?_assertMatch(ok, cache:put_(Pid, <<"key-4">>, <<"val-4">>, 5)) + ,?_assertMatch(ok, async(cache:put_(Pid, <<"key-3">>, <<"val-3">>))) + ,?_assertMatch(ok, async(cache:put_(Pid, <<"key-4">>, <<"val-4">>, 5))) ]. get(Pid) -> @@ -114,7 +98,7 @@ get(Pid) -> ,?_assertMatch(<<"val-1">>, cache:lookup(Pid, <<"key-1">>)) ,?_assertMatch(true, cache:has(Pid, <<"key-1">>)) - ,?_assertMatch(ok, cache:put_(Pid, <<"key-3">>, <<"val-3">>)) + ,?_assertMatch(ok, async(cache:put_(Pid, <<"key-3">>, <<"val-3">>))) ,?_assertMatch(<<"val-3">>, cache:get(Pid, <<"key-3">>)) ,?_assertMatch(<<"val-3">>, cache:lookup(Pid, <<"key-3">>)) ,?_assertMatch(true, cache:has(Pid, <<"key-3">>)) @@ -124,166 +108,8 @@ get(Pid) -> ,?_assertMatch(false, cache:has(Pid, <<"key-5">>)) ]. - -%% @todo - fix ttl and segment expire time - - - - - -cache_init() -> - cache:start_link(test, ?CACHE). - -cache_free({ok, Pid}) -> - erlang:unlink(Pid), - cache:drop(test). - -cache_put() -> - ok = cache:put(test, <<"key">>, <<"val">>). - -cache_has() -> - true = cache:has(test, <<"key">>), - false = cache:has(test, <<"yek">>). - -cache_get() -> - <<"val">> = cache:get(test, <<"key">>), - undefined = cache:get(test, <<"yek">>). - -cache_del() -> - ok = cache:remove(test, <<"key">>), - ok = cache:remove(test, <<"yek">>). - -cache_acc() -> - undefined = cache:acc(test, <<"acc">>, 10), - 10 = cache:acc(test, <<"acc">>, 10), - 20 = cache:get(test, <<"acc">>), - badarg = cache:acc(test, <<"acc">>, [{1, 10}]), - badarg = cache:acc(test, <<"acc1">>,[{1, 10}]), - ok = cache:put(test, <<"acc1">>, {10,20,30,40}), - {10, 20, 30, 40} = cache:acc(test, <<"acc1">>,[{1, 10}, {2, 10}]), - {20, 30, 30, 40} = cache:get(test, <<"acc1">>). - -cache_lc1() -> - ok = cache:put(test, key, val), - timer:sleep(1200), - val = cache:get(test, key), - timer:sleep(1200), - val = cache:get(test, key), - timer:sleep(1200), - val = cache:get(test, key), - timer:sleep(3200), - undefined = cache:get(test, key). - - -% lifecyle_2_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {ttl, 10}, -% {evict, 100} -% ]), -% ok = cache:put(test, key, val), -% timer:sleep(6), -% {ok, val} = cache:get(test, key), -% timer:sleep(6), -% {ok, val} = cache:get(test, key), -% timer:sleep(20), -% none = cache:get(test, key), -% cache:stop(test). - -% lifecyle_3_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {ttl, 10}, -% {evict, 5} -% ]), -% ok = cache:put(test, key1, val1), -% timer:sleep(5), -% ok = cache:put(test, key2, val2), -% timer:sleep(5), -% ok = cache:put(test, key3, val3), -% timer:sleep(5), -% ok = cache:put(test, key4, val4), - -% none = cache:get(test, key1), -% none = cache:get(test, key2), -% {ok, val3} = cache:get(test, key3), -% {ok, val4} = cache:get(test, key4), -% cache:stop(test). - -% evict_lru_1_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {policy, lru}, -% {ttl, 100}, -% {evict, 5}, -% {size, 10}, -% {chunk, 2} -% ]), -% lists:foreach( -% fun(X) -> cache:put(test, X, X) end, -% lists:seq(1, 10) -% ), -% timer:sleep(10), -% {ok, 1} = cache:get(test, 1), -% cache:put(test, key, val), -% timer:sleep(10), -% none = cache:get(test, 2), -% cache:stop(test). - -% evict_lru_2_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {policy, lru}, -% {ttl, 100}, -% {evict, 100}, -% {size, 10}, -% {chunk, 2} -% ]), -% lists:foreach( -% fun(X) -> cache:put(test, X, X) end, -% lists:seq(1, 10) -% ), -% {ok, 1} = cache:get(test, 1), -% cache:put(test, key, val), -% cache:evict(test), -% none = cache:get(test, 2), -% cache:stop(test). - -% evict_mru_1_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {policy, mru}, -% {ttl, 100}, -% {evict, 5}, -% {size, 10}, -% {chunk, 2} -% ]), -% lists:foreach( -% fun(X) -> cache:put(test, X, X) end, -% lists:seq(1, 10) -% ), -% timer:sleep(10), -% {ok, 1} = cache:get(test, 1), -% cache:put(test, key, val), -% timer:sleep(10), -% none = cache:get(test, key), -% cache:stop(test). - -% evict_mru_2_test() -> -% cache:start(), -% {ok, _} = cache:start_link(test, [ -% {policy, mru}, -% {ttl, 100}, -% {evict, 100}, -% {size, 10}, -% {chunk, 2} -% ]), -% lists:foreach( -% fun(X) -> cache:put(test, X, X) end, -% lists:seq(1, 10) -% ), -% {ok, 1} = cache:get(test, 1), -% cache:put(test, key, val), -% cache:evict(test), -% none = cache:get(test, key), -% cache:stop(test). +async(Ref) -> + receive + {Ref, X} -> + X + end.