new async api, returns reference to operation

This commit is contained in:
Dmitry Kolesnikov 2015-05-10 14:02:58 +03:00
parent 572050affc
commit ff404f45a8
8 changed files with 451 additions and 486 deletions

245
Makefile
View File

@ -1,52 +1,237 @@
.PHONY: deps test
## @author Dmitry Kolesnikov, <dmkolesnikov@gmail.com>
## @copyright (c) 2012 - 2014 Dmitry Kolesnikov. All Rights Reserved
##
## @description
## Makefile to build and release Erlang applications using
## rebar, reltool, etc (see README for details)
##
## application version schema (based on semantic version)
## ${APP}-${VSN}+${GIT}.${ARCH}.${PLAT}
##
## @version 0.8.2
.PHONY: test rel deps all pkg
ifeq ($(id),)
export id=cache
#####################################################################
##
## application config
##
#####################################################################
ROOT = `pwd`
PREFIX ?= /usr/local
APP ?= $(notdir $(CURDIR))
ARCH?= $(shell uname -m)
PLAT?= $(shell uname -s)
HEAD?= $(shell git rev-parse --short HEAD)
TAG = ${HEAD}.${ARCH}.${PLAT}
TEST?= ${APP}
S3 =
GIT ?=
VMI =
NET ?= lo0
USER =
PASS =
## root path to benchmark framework
BB = ../basho_bench
SSHENV = /tmp/ssh-agent.conf
ADDR = $(shell ifconfig ${NET} | sed -En 's/.*inet (addr:)?(([0-9]*\.){3}[0-9]*).*/\2/p')
BRANCH = $(shell git symbolic-ref --short -q HEAD)
## erlang flags (make run only)
EFLAGS = \
-name ${APP}@${ADDR} \
-setcookie nocookie \
-pa ${ROOT}/ebin \
-pa ${ROOT}/deps/*/ebin \
-pa ${ROOT}/apps/*/ebin \
-pa rel/files \
-kernel inet_dist_listen_min 32100 \
-kernel inet_dist_listen_max 32199 \
+P 1000000 \
+K true +A 160 -sbt ts
#####################################################################
##
## internal config
##
#####################################################################
ifeq ($(wildcard rel/reltool.config),)
REL =
VSN =
TAR =
PKG =
else
IID = $(shell cat rel/reltool.config | sed -n 's/{target_dir,.*\"\([^-]*\).*\"}./\1/p')
REL = $(shell cat rel/reltool.config | sed -n 's/{target_dir,.*\"\(.*\)\"}./\1/p')
VSN = $(shell echo ${REL} | sed -n 's/.*-\(.*\)/\1/p')
ifeq (${VSN},)
VSN = $(shell cat rel/reltool.config | sed -n 's/.*{rel,.*\".*\",.*\"\(.*\)\".*/\1/p')
endif
ifeq (${config},)
RFLAGS =
VARIANT =
else
VARIANT = $(addprefix ., $(notdir $(basename ${config})))
RFLAGS = target_dir=${REL}${VARIANT} overlay_vars=${ROOT}/${config}
endif
ifeq (${VSN},)
TAR = ${IID}${VARIANT}+${TAG}.tgz
PKG = ${IID}${VARIANT}+${TAG}.bundle
else
TAR = ${IID}-${VSN}${VARIANT}+${TAG}.tgz
PKG = ${IID}-${VSN}${VARIANT}+${TAG}.bundle
endif
endif
FLAGS=\
-name ${id}@127.0.0.1 \
-setcookie nocookie \
-pa ./deps/*/ebin \
-pa ./examples/*/ebin \
-pa ./ebin \
+K true +A 160 -sbt ts
BB=../basho_bench
## self-extracting bundle wrapper
BUNDLE_INIT = PREFIX=${PREFIX}\nREL=${PREFIX}/${REL}${VARIANT}\nAPP=${APP}\nVSN=${VSN}\nLINE=\`grep -a -n 'BUNDLE:\x24' \x240\`\ntail -n +\x24(( \x24{LINE\x25\x25:*} + 1)) \x240 | gzip -vdc - | tar -C ${PREFIX} -xvf - > /dev/null\n
BUNDLE_FREE = exit\nBUNDLE:\n
BUILDER = cd /tmp && git clone -b ${BRANCH} ${GIT}/${APP} && cd /tmp/${APP} && make && make rel && sleep 300
#####################################################################
##
## build
##
#####################################################################
all: rebar deps compile
compile:
./rebar compile
@./rebar compile
deps:
./rebar get-deps
@./rebar get-deps
clean:
./rebar clean
rm -rf test.*-temp-data
@./rebar clean ; \
rm -rf test.*-temp-data ; \
rm -rf tests ; \
rm -rf log ; \
rm -f *.tgz ; \
rm -f *.bundle
distclean: clean
./rebar delete-deps
test: all
./rebar skip_deps=true eunit
distclean: clean
@./rebar delete-deps
unit: all
@./rebar skip_deps=true eunit
test:
@erl ${EFLAGS} -run deb test test/${TEST}.config
docs:
./rebar skip_deps=true doc
@./rebar skip_deps=true doc
dialyzer: compile
@dialyzer -Wno_return -c ./ebin
#####################################################################
##
## release
##
#####################################################################
ifneq (${REL},)
rel: ${TAR}
## assemble VM release
ifeq (${PLAT},$(shell uname -s))
${TAR}:
@./rebar generate ${RFLAGS}; \
cd rel ; tar -zcf ${TAR} ${REL}${VARIANT}/ ; mv ${TAR} ../${TAR} ; cd - ;\
echo "==> tarball: ${TAR}"
else
ifneq (${VMI},)
${TAR}:
@echo "==> docker run ${VMI}" ;\
K=`test ${PASS} && cat ${PASS}` ;\
A=`test ${USER} && echo "mkdir -p /root/.ssh && echo \"$$K\" > /root/.ssh/id_rsa && chmod 0700 /root/.ssh/id_rsa && echo -e \"Host *\n\tUser ${USER}\n\tStrictHostKeyChecking no\n\" > /root/.ssh/config &&"` ;\
I=`docker run -d -a stdout -a stderr ${VMI} /bin/sh -c "$$A ${BUILDER}"` ;\
(docker attach $$I &) ;\
docker cp $$I:/tmp/${APP}/${TAR} . 1> /dev/null 2>&1 ;\
while [ $$? -ne 0 ] ;\
do \
sleep 10 ;\
docker cp $$I:/tmp/${APP}/${TAR} . 1> /dev/null 2>&1 ;\
done ;\
docker kill $$I ;\
docker rm $$I
endif
endif
## package VM release to executable bundle
pkg: rel/deploy.sh ${TAR}
@printf "${BUNDLE_INIT}" > ${PKG} ; \
cat rel/deploy.sh >> ${PKG} ; \
printf "${BUNDLE_FREE}" >> ${PKG} ; \
cat ${TAR} >> ${PKG} ; \
chmod ugo+x ${PKG} ; \
echo "==> bundle: ${PKG}"
## copy 'package' to s3
## copy 'package' to s3
s3: ${PKG}
aws s3 cp ${PKG} ${S3}/${APP}+${TAG}${VARIANT}.bundle
s3-latest: ${PKG}
aws s3 cp ${PKG} ${S3}/${APP}+latest${VARIANT}.bundle
endif
#####################################################################
##
## deploy
##
#####################################################################
ifneq (${host},)
${SSHENV}:
@echo "==> ssh: config keys" ;\
ssh-agent -s > ${SSHENV}
node: ${SSHENV}
@echo "==> deploy: ${host}" ;\
. ${SSHENV} ;\
k=`basename ${pass}` ;\
l=`ssh-add -l | grep $$k` ;\
if [ -z "$$l" ] ; then \
ssh-add ${pass} ;\
fi ;\
rsync -cav --rsh=ssh --progress ${PKG} ${host}:${PKG} ;\
ssh -t ${host} "sudo sh ./${PKG}"
endif
#####################################################################
##
## debug
##
#####################################################################
run:
erl ${FLAGS}
rebar:
curl -O https://cloud.github.com/downloads/basho/rebar/rebar
chmod ugo+x rebar
@erl ${EFLAGS}
benchmark:
$(BB)/basho_bench -N bb@127.0.0.0 -C nocookie priv/${id}.benchmark
$(BB)/priv/summary.r -i tests/current
@echo "==> benchmark: ${TEST}" ;\
$(BB)/basho_bench -N bb@127.0.0.1 -C nocookie priv/${TEST}.benchmark ;\
$(BB)/priv/summary.r -i tests/current ;\
open tests/current/summary.png
ifneq (${REL},)
start:
@./rel/${REL}${VARIANT}/bin/${APP} start
stop:
@./rel/${REL}${VARIANT}/bin/${APP} stop
console:
@./rel/${REL}${VARIANT}/bin/${APP} console
attach:
@./rel/${REL}${VARIANT}/bin/${APP} attach
endif
#####################################################################
##
## dependencies
##
#####################################################################
rebar:
@curl -L -O https://github.com/rebar/rebar/wiki/rebar ; \
chmod ugo+x rebar

View File

@ -8,7 +8,10 @@ The write operation always uses youngest segment. The read operation lookup key
The downside is inability to assign precise TTL per single cache entry. TTL is always approximated to nearest segment. (e.g. cache with 60 sec TTL and 10 segments has 6 sec accuracy on TTL)
## Change log
* 2.0.0 - various changes on asynchronous api, not compatible with version 1.x
* 1.0.1 - production release
## Usage
@ -16,8 +19,13 @@ The downside is inability to assign precise TTL per single cache entry. TTL is a
application:start(cache).
{ok, _} = cache:start_link(my_cache, [{n, 10}, {ttl, 60}]).
%% synchronous i/o
ok = cache:put(my_cache, <<"my key">>, <<"my value">>).
Val = cache:get(my_cache, <<"my key">>).
%% asynchronous i/o
Ref = cache:get_(my_cache, <<"my key">>).
receive {Ref, Val} -> Val end.
```
### configuration via Erlang `sys.config`

View File

@ -16,7 +16,6 @@
,{get, 5}
]}.
%{cache, {mycache, 'cache@127.0.0.1'}}.
{local, [
{ttl, 20}
,{n, 10}

View File

@ -1,7 +1,7 @@
{application, cache,
[
{description, "in-memory cache"},
{vsn, "1.0.1"},
{vsn, "2.0.0"},
{modules, []},
{registered, []},
{applications,[

View File

@ -31,7 +31,7 @@
-include("cache.hrl").
-export([start/0]).
%% cache management interface
-export([
start_link/1,
start_link/2,
@ -39,20 +39,27 @@
purge/1,
i/1,
i/2,
heap/2,
heap/2
]).
%% basic cache i/o interface
-export([
put/3,
put/4,
put_/3,
put_/4,
get/2,
get_/2,
lookup/2,
lookup_/2,
has/2,
ttl/2,
remove/2,
remove_/2,
remove_/2
]).
%% extended cache i/o interface
-export([
acc/3,
acc_/3,
% memecached like interface
set/3,
set/4,
set_/3,
@ -72,21 +79,26 @@
delete/2,
delete_/2
]).
-export([start/0]).
-export_type([cache/0]).
-type(cache() :: atom() | {atom(), atom()} | {global, atom()} | pid()).
-type(name() :: atom() | {global, atom()}).
-type(cache() :: atom() | pid()).
-type(key() :: any()).
-type(entity() :: any()).
-type(val() :: any()).
-type(ttl() :: integer()).
%%
%% RnD start application
start() ->
application:start(pq),
application:start(cache).
%%%----------------------------------------------------------------------------
%%%
%%% cache management interface
%%%
%%%----------------------------------------------------------------------------
%%
%% start new cache bucket, accepted options:
%% {type, set | ordered_set} - cache segment type (default set)
@ -99,7 +111,7 @@ start() ->
%% {stats, function() | {Mod, Fun}} - cache statistic aggregate functor
%% {heir, atom() | pid()} - heir of evicted cache segments
-spec(start_link/1 :: (list()) -> {ok, pid()} | {error, any()}).
-spec(start_link/2 :: (name(), list()) -> {ok, pid()} | {error, any()}).
-spec(start_link/2 :: (atom(), list()) -> {ok, pid()} | {error, any()}).
start_link(Opts) ->
cache_bucket:start_link(Opts).
@ -111,41 +123,21 @@ start_link(Cache, Opts) ->
%% drop cache
-spec(drop/1 :: (cache()) -> ok).
drop(undefined) ->
ok;
drop({global, Cache}) ->
drop(global:whereis_name(Cache));
drop({Cache, Node}) ->
drop(rpc:call(Node, cache, drop, [Cache]));
drop(Cache)
when is_atom(Cache) ->
drop(whereis(Cache));
drop(Cache)
when is_pid(Cache) ->
drop(Cache) ->
gen_server:call(Cache, drop).
%%
%% purge cache
-spec(purge/1 :: (cache()) -> ok).
purge(undefined) ->
ok;
purge({global, Cache}) ->
purge(global:whereis_name(Cache));
purge({Cache, Node}) ->
purge(rpc:call(Node, cache, purge, [Cache]));
purge(Cache)
when is_atom(Cache) ->
purge(whereis(Cache));
purge(Cache)
when is_pid(Cache) ->
purge(Cache) ->
gen_server:call(Cache, purge).
%%
%% return cache meta data
%% {heap, [integer()]} - cache segments references
%% {expire, [integer()]} - cache segments expire times
%% {heap, [integer()]} - references to cache segments
%% {expire, [integer()]} - cache segments expiration times
%% {size, [integer()]} - cardinality of cache segments
%% {memory, [integer()]} - memory occupied by each cache segment
-spec(i/1 :: (cache()) -> list()).
@ -165,94 +157,114 @@ heap(Cache, N) ->
gen_server:call(Cache, {heap, N}).
%%%----------------------------------------------------------------------------
%%%
%%% basic cache i/o interface
%%%
%%%----------------------------------------------------------------------------
%%
%% synchronous cache put
-spec(put/3 :: (cache(), key(), entity()) -> ok).
-spec(put/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(put/3 :: (cache(), key(), val()) -> ok).
-spec(put/4 :: (cache(), key(), val(), ttl()) -> ok).
put(Cache, Key, Val) ->
gen_server:call(Cache, {put, Key, Val}, ?DEF_CACHE_TIMEOUT).
cache:put(Cache, Key, Val, undefined).
put(Cache, Key, Val, TTL) ->
gen_server:call(Cache, {put, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT).
call(Cache, {put, Key, Val, TTL}).
%%
%% asynchronous cache put
-spec(put_/3 :: (cache(), key(), entity()) -> ok).
-spec(put_/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(put_/3 :: (cache(), key(), val()) -> reference()).
-spec(put_/4 :: (cache(), key(), val(), ttl()) -> reference()).
put_(Cache, Key, Val) ->
gen_server:cast(Cache, {put, Key, Val}).
cache:put_(Cache, Key, Val, undefined).
put_(Cache, Key, Val, TTL) ->
gen_server:cast(Cache, {put, Key, Val, TTL}).
cast(Cache, {put, Key, Val, TTL}).
%%
%% synchronous get cache entry, the operation prolongs entry ttl
-spec(get/2 :: (cache(), key()) -> entity() | undefined).
%% synchronous cache get, the operation prolongs value ttl
-spec(get/2 :: (cache(), key()) -> val() | undefined).
get(Cache, Key) ->
gen_server:call(Cache, {get, Key}, ?DEF_CACHE_TIMEOUT).
call(Cache, {get, Key}).
%%
%% synchronous lookup cache entry, the operation do not prolong entry ttl
-spec(lookup/2 :: (cache(), key()) -> entity() | undefined).
%% asynchronous cache get, the operation prolongs value ttl
-spec(get_/2 :: (cache(), key()) -> reference()).
get_(Cache, Key) ->
cast(Cache, {get, Key}).
%%
%% synchronous cache lookup, the operation do not prolong entry ttl
-spec(lookup/2 :: (cache(), key()) -> val() | undefined).
lookup(Cache, Key) ->
gen_server:call(Cache, {lookup, Key}, ?DEF_CACHE_TIMEOUT).
call(Cache, {lookup, Key}).
%%
%% asynchronous cache lookup, the operation do not prolong entry ttl
-spec(lookup_/2 :: (cache(), key()) -> reference()).
lookup_(Cache, Key) ->
cast(Cache, {lookup, Key}).
%%
%% check if cache key exists,
-spec(has/2 :: (cache(), key()) -> true | false).
has(Cache, Key) ->
gen_server:call(Cache, {has, Key}, ?DEF_CACHE_TIMEOUT).
call(Cache, {has, Key}).
%%
%% check entity at cache and return estimated ttl
-spec(ttl/2 :: (cache(), key()) -> ttl() | false).
ttl(Cache, Key) ->
gen_server:call(Cache, {ttl, Key}, ?DEF_CACHE_TIMEOUT).
call(Cache, {ttl, Key}).
%%
%% synchronous remove entry from cache
-spec(remove/2 :: (cache(), key()) -> ok).
remove(Cache, Key) ->
gen_server:call(Cache, {remove, Key}, ?DEF_CACHE_TIMEOUT).
call(Cache, {remove, Key}).
%%
%% asynchronous remove entry from cache
-spec(remove_/2 :: (cache(), key()) -> ok).
remove_(Cache, Key) ->
gen_server:cast(Cache, {remove, Key}).
cast(Cache, {remove, Key}).
%%%----------------------------------------------------------------------------
%%%
%%% extended cache i/o interface
%%%
%%%----------------------------------------------------------------------------
%%
%% synchronous in-cache accumulator
-spec(acc/3 :: (cache(), key(), integer() | [{integer(), integer()}]) -> integer() | undefined).
acc(Cache, Key, Val) ->
gen_server:call(Cache, {acc, Key, Val}, ?DEF_CACHE_TIMEOUT).
call(Cache, {acc, Key, Val}).
%%
%% asynchronous in-cache accumulator
-spec(acc_/3 :: (cache(), key(), integer() | {integer(), integer()}) -> ok).
acc_(Cache, Key, Val) ->
gen_server:cast(Cache, {acc, Key, Val}).
%%%----------------------------------------------------------------------------
%%%
%%% memcached-like interface
%%%
%%%----------------------------------------------------------------------------
cast(Cache, {acc, Key, Val}).
%%
%% synchronous store key/val
-spec(set/3 :: (cache(), key(), entity()) -> ok).
-spec(set/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(set/3 :: (cache(), key(), val()) -> ok).
-spec(set/4 :: (cache(), key(), val(), ttl()) -> ok).
set(Cache, Key, Val) ->
cache:put(Cache, Key, Val).
@ -262,8 +274,8 @@ set(Cache, Key, Val, TTL) ->
%%
%% asynchronous store key/val
-spec(set_/3 :: (cache(), key(), entity()) -> ok).
-spec(set_/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(set_/3 :: (cache(), key(), val()) -> ok).
-spec(set_/4 :: (cache(), key(), val(), ttl()) -> ok).
set_(Cache, Key, Val) ->
cache:put_(Cache, Key, Val).
@ -271,84 +283,79 @@ set_(Cache, Key, Val) ->
set_(Cache, Key, Val, TTL) ->
cache:put_(Cache, Key, Val, TTL).
%%
%% synchronous store key/val only if cache does not already hold data for this key
-spec(add/3 :: (cache(), key(), entity()) -> ok | conflict).
-spec(add/4 :: (cache(), key(), entity(), ttl()) -> ok | conflict).
-spec(add/3 :: (cache(), key(), val()) -> ok | {error, conflict}).
-spec(add/4 :: (cache(), key(), val(), ttl()) -> ok | {error, conflict}).
add(Cache, Key, Val) ->
gen_server:call(Cache, {add, Key, Val}, ?DEF_CACHE_TIMEOUT).
cache:add(Cache, Key, Val, undefined).
add(Cache, Key, Val, TTL) ->
gen_server:call(Cache, {add, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT).
call(Cache, {add, Key, Val, TTL}).
%%
%% asynchronous store key/val only if cache does not already hold data for this key
-spec(add_/3 :: (cache(), key(), entity()) -> ok).
-spec(add_/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(add_/3 :: (cache(), key(), val()) -> reference()).
-spec(add_/4 :: (cache(), key(), val(), ttl()) -> reference()).
add_(Cache, Key, Val) ->
gen_server:cast(Cache, {add, Key, Val}).
cache:add_(Cache, Key, Val, undefined).
add_(Cache, Key, Val, TTL) ->
gen_server:cast(Cache, {add, Key, Val, TTL}).
cast(Cache, {add, Key, Val, TTL}).
%%
%% synchronous store key/val only if cache does hold data for this key
-spec(replace/3 :: (cache(), key(), entity()) -> ok | not_found).
-spec(replace/4 :: (cache(), key(), entity(), ttl()) -> ok | not_found).
-spec(replace/3 :: (cache(), key(), val()) -> ok | {error, not_found}).
-spec(replace/4 :: (cache(), key(), val(), ttl()) -> ok | {error, not_found}).
replace(Cache, Key, Val) ->
gen_server:call(Cache, {replace, Key, Val}, ?DEF_CACHE_TIMEOUT).
cache:replace(Cache, Key, Val, undefined).
replace(Cache, Key, Val, TTL) ->
gen_server:call(Cache, {replace, Key, Val, TTL}, ?DEF_CACHE_TIMEOUT).
call(Cache, {replace, Key, Val, TTL}).
%%
%% asynchronous store key/val only if cache does hold data for this key
-spec(replace_/3 :: (cache(), key(), entity()) -> ok).
-spec(replace_/4 :: (cache(), key(), entity(), ttl()) -> ok).
-spec(replace_/3 :: (cache(), key(), val()) -> reference()).
-spec(replace_/4 :: (cache(), key(), val(), ttl()) -> reference()).
replace_(Cache, Key, Val) ->
gen_server:cast(Cache, {replace, Key, Val}).
cache:replace_(Cache, Key, Val).
replace_(Cache, Key, Val, TTL) ->
gen_server:cast(Cache, {replace, Key, Val, TTL}).
cast(Cache, {replace, Key, Val, TTL}).
%%
%% synchronously add data to existing key after existing data, the operation do not prolong entry ttl
-spec(append/3 :: (cache(), key(), entity()) -> ok | not_found).
-spec(append/3 :: (cache(), key(), val()) -> ok | {error, not_found}).
append(Cache, Key, Val) ->
gen_server:call(Cache, {append, Key, Val}, ?DEF_CACHE_TIMEOUT).
call(Cache, {append, Key, Val}).
%%
%% asynchronously add data to existing key after existing data, the operation do not prolong entry ttl
-spec(append_/3 :: (cache(), key(), entity()) -> ok).
-spec(append_/3 :: (cache(), key(), val()) -> reference()).
append_(Cache, Key, Val) ->
gen_server:cast(Cache, {append, Key, Val}).
cast(Cache, {append, Key, Val}).
%%
%% synchronously add data to existing key before existing data
-spec(prepend/3 :: (cache(), key(), entity()) -> ok | not_found).
-spec(prepend/3 :: (cache(), key(), val()) -> ok | {error, not_found}).
prepend(Cache, Key, Val) ->
gen_server:call(Cache, {prepend, Key, Val}, ?DEF_CACHE_TIMEOUT).
call(Cache, {prepend, Key, Val}).
%%
%% asynchronously add data to existing key before existing data
-spec(prepend_/3 :: (cache(), key(), entity()) -> ok).
-spec(prepend_/3 :: (cache(), key(), val()) -> reference()).
prepend_(Cache, Key, Val) ->
gen_server:cast(Cache, {prepend, Key, Val}).
%%
%% synchronous remove entry from cache
-spec(delete/2 :: (cache(), key()) -> ok).
@ -363,3 +370,22 @@ delete(Cache, Key) ->
delete_(Cache, Key) ->
cache:remove_(Cache, Key).
%%%----------------------------------------------------------------------------
%%%
%%% private
%%%
%%%----------------------------------------------------------------------------
%%
%% synchronous call to server
call(Pid, Req) ->
gen_server:call(Pid, Req, ?CONFIG_TIMEOUT).
%%
%% asynchronous call
cast(Pid, Req) ->
Ref = erlang:make_ref(),
erlang:send(Pid, {'$gen_call', {self(), Ref}, Req}, [noconnect]),
Ref.

View File

@ -33,4 +33,4 @@
-define(DEF_CACHE_CHECK, 20000).
%% default cache i/o timeout
-define(DEF_CACHE_TIMEOUT, 60000).
-define(CONFIG_TIMEOUT, 30000).

View File

@ -103,83 +103,64 @@ terminate(_Reason, State) ->
%%%
%%%----------------------------------------------------------------------------
handle_call({put, Key, Val}, _, S) ->
{reply, ok, cache_put(Key, Val, S)};
handle_call({put, Key, Val, TTL}, _, State) ->
{reply, ok, cache_put(Key, Val, TTL, State)};
handle_call({put, Key, Val, TTL}, _, S) ->
{reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)};
handle_call({get, Key}, _, State) ->
{reply, cache_get(Key, State), State};
handle_call({get, Key}, _, S) ->
{reply, cache_get(Key, S), S};
handle_call({lookup, Key}, _, State) ->
{reply, cache_lookup(Key, State), State};
handle_call({lookup, Key}, _, S) ->
{reply, cache_lookup(Key, S), S};
handle_call({has, Key}, _, State) ->
{reply, cache_has(Key, State), State};
handle_call({has, Key}, _, S) ->
{reply, cache_has(Key, S), S};
handle_call({ttl, Key}, _, State) ->
{reply, cache_ttl(Key, State), State};
handle_call({ttl, Key}, _, S) ->
{reply, cache_ttl(Key, S), S};
handle_call({remove, Key}, _, State) ->
{reply, ok, cache_remove(Key, State)};
handle_call({remove, Key}, _, S) ->
{reply, ok, cache_remove(Key, S)};
handle_call({acc, Key, Val}, _, State0) ->
{Result, State1} = cache_acc(Key, Val, State0),
{reply, Result, State1};
handle_call({acc, Key, Val}, _, S) ->
{Reply, NS} = cache_acc(Key, Val, S),
{reply, Reply, NS};
handle_call({add, Key, Val}, _, S) ->
case cache_has(Key, S) of
handle_call({add, Key, Val, TTL}, _, State) ->
case cache_has(Key, State) of
true ->
{reply, conflict, S};
{reply, {error, conflict}, State};
false ->
{reply, ok, cache_put(Key, Val, S)}
{reply, ok, cache_put(Key, Val, TTL, State)}
end;
handle_call({add, Key, Val, TTL}, _, S) ->
case cache_has(Key, S) of
handle_call({replace, Key, Val, TTL}, _, State) ->
case cache_has(Key, State) of
true ->
{reply, conflict, S};
{reply, ok, cache_put(Key, Val, TTL, State)};
false ->
{reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)}
{reply, {error, not_found}, State}
end;
handle_call({replace, Key, Val}, _, S) ->
case cache_has(Key, S) of
true ->
{reply, ok, cache_put(Key, Val, S)};
false ->
{reply, not_found, S}
end;
handle_call({replace, Key, Val, TTL}, _, S) ->
case cache_has(Key, S) of
true ->
{reply, ok, cache_put(Key, Val, cache_util:now() + TTL, S)};
false ->
{reply, not_found, S}
end;
handle_call({prepend, Key, Val}, _, S) ->
handle_call({prepend, Key, Val}, _, State) ->
% @todo: reduce one write
case cache_get(Key, S) of
case cache_get(Key, State) of
undefined ->
{reply, ok, cache_put(Key, [Val], S)};
{reply, ok, cache_put(Key, [Val], undefined, State)};
X when is_list(X) ->
{reply, ok, cache_put(Key, [Val|X], S)};
{reply, ok, cache_put(Key, [Val|X], undefined, State)};
X ->
{reply, ok, cache_put(Key, [Val,X], S)}
{reply, ok, cache_put(Key, [Val,X], undefined, State)}
end;
handle_call({append, Key, Val}, _, S) ->
handle_call({append, Key, Val}, _, State) ->
% @todo: reduce one write
case cache_get(Key, S) of
case cache_get(Key, State) of
undefined ->
{reply, ok, cache_put(Key, [Val], S)};
{reply, ok, cache_put(Key, [Val], undefined, State)};
X when is_list(X) ->
{reply, ok, cache_put(Key, X++[Val], S)};
{reply, ok, cache_put(Key, X++[Val], undefined, State)};
X ->
{reply, ok, cache_put(Key, [X, Val], S)}
{reply, ok, cache_put(Key, [X, Val], undefined, State)}
end;
handle_call(i, _, State) ->
@ -209,73 +190,6 @@ handle_call(_, _, S) ->
%%
%%
handle_cast({put, Key, Val}, S) ->
{noreply, cache_put(Key, Val, S)};
handle_cast({put, Key, Val, TTL}, S) ->
{noreply, cache_put(Key, Val, cache_util:now() + TTL, S)};
handle_cast({remove, Key}, S) ->
{noreply, cache_remove(Key, S)};
handle_cast({acc, Key, Val}, S) ->
{_, NS} = cache_acc(Key, Val, S),
{noreply, NS};
handle_cast({add, Key, Val}, S) ->
case cache_has(Key, S) of
true ->
{noreply, S};
false ->
{noreply, cache_put(Key, Val, S)}
end;
handle_cast({add, Key, Val, TTL}, S) ->
case cache_has(Key, S) of
true ->
{noreply, S};
false ->
{noreply, cache_put(Key, Val, cache_util:now() + TTL, S)}
end;
handle_cast({replace, Key, Val}, S) ->
case cache_has(Key, S) of
true ->
{noreply, cache_put(Key, Val, S)};
false ->
{noreply, S}
end;
handle_cast({replace, Key, Val, TTL}, S) ->
case cache_has(Key, S) of
true ->
{noreply, cache_put(Key, Val, cache_util:now() + TTL, S)};
false ->
{noreply, S}
end;
handle_cast({prepend, Key, Val}, S) ->
% @todo: reduce one write
case cache_get(Key, S) of
undefined ->
{noreply, cache_put(Key, [Val], S)};
X when is_list(X) ->
{noreply, cache_put(Key, [Val|X], S)};
X ->
{noreply, cache_put(Key, [Val,X], S)}
end;
handle_cast({append, Key, Val}, S) ->
% @todo: reduce one write
case cache_get(Key, S) of
undefined ->
{noreply, cache_put(Key, [Val], S)};
X when is_list(X) ->
{noreply, cache_put(Key, X++[Val], S)};
X ->
{noreply, cache_put(Key, [X, Val], S)}
end;
handle_cast(_, S) ->
{noreply, S}.
@ -309,7 +223,6 @@ handle_info(_, S) ->
code_change(_Vsn, S, _Extra) ->
{ok, S}.
%%%----------------------------------------------------------------------------
%%%
%%% private
@ -318,88 +231,83 @@ code_change(_Vsn, S, _Extra) ->
%%
%% insert value to cache
cache_put(Key, Val, #cache{heap=Heap}=State) ->
cache_put(Key, Val, undefined, #cache{name=_Name, heap=Heap}=State) ->
{_, Head} = cache_heap:head(Heap),
true = ets:insert(Head, {Key, Val}),
lists:foreach(
fun({_, X}) -> ets:delete(X, Key) end,
cache_heap:tail(Heap)
),
cache_util:stats(State#cache.stats, {cache, State#cache.name, put}),
?DEBUG("cache ~p: put ~p to heap ~p~n", [State#cache.name, Key, Head]),
State.
ok = heap_remove(Key, cache_heap:tail(Heap)),
_ = stats(put, State),
?DEBUG("cache ~p: put ~p to heap ~p~n", [_Name, Key, Head]),
State;
cache_put(Key, Val, Expire, #cache{}=State) ->
Refs = cache_heap:refs(State#cache.heap),
cache_put(Key, Val, TTL, #cache{name=_Name, heap=Heap}=State) ->
Expire = cache_util:now() + TTL,
Refs = cache_heap:refs(Heap),
case lists:splitwith(fun({X, _}) -> X > Expire end, Refs) of
{[], _Tail} ->
cache_put(Key, Val, State);
cache_put(Key, Val, undefined, State);
{Head, Tail} ->
[{_, Heap} | Rest] = lists:reverse(Head),
true = ets:insert(Heap, {Key, Val}),
lists:foreach(
fun({_, X}) -> ets:delete(X, Key) end,
Rest ++ Tail
),
cache_util:stats(State#cache.stats, {cache, State#cache.name, put}),
?DEBUG("cache ~p: put ~p to heap ~p~n", [State#cache.name, Key, Heap]),
[{_, Inst} | Rest] = lists:reverse(Head),
true = ets:insert(Inst, {Key, Val}),
ok = heap_remove(Key, Rest ++ Tail),
_ = stats(put, State),
?DEBUG("cache ~p: put ~p to heap ~p~n", [_Name, Key, Inst]),
State
end.
%%
%% get cache value
cache_get(Key, #cache{policy=mru}=S) ->
cache_get(Key, #cache{policy=mru}=State) ->
% cache MRU should not move key anywhere because
% cache always evicts last generation
% fall-back to cache lookup
cache_lookup(Key, S);
cache_lookup(Key, State);
cache_get(Key, #cache{}=S) ->
{_, Head} = cache_heap:head(S#cache.heap),
case heap_lookup(Key, cache_heap:refs(S#cache.heap)) of
cache_get(Key, #cache{name=_Name, heap=Heap}=State) ->
{_, Head} = cache_heap:head(Heap),
case heap_lookup(Key, cache_heap:refs(Heap)) of
undefined ->
cache_util:stats(S#cache.stats, {cache, S#cache.name, miss}),
stats(miss, State),
undefined;
{Head, Val} ->
?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, Head]),
cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}),
?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, Head]),
stats(hit, State),
Val;
{Heap, Val} ->
{Cell, Val} ->
true = ets:insert(Head, {Key, Val}),
_ = ets:delete(Heap, Key),
?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, Heap]),
cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}),
_ = ets:delete(Cell, Key),
?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, Cell]),
stats(hit, State),
Val
end.
%%
%% lookup cache value
cache_lookup(Key, #cache{}=S) ->
case heap_lookup(Key, cache_heap:refs(S#cache.heap)) of
cache_lookup(Key, #cache{name=_Name, heap=Heap}=State) ->
case heap_lookup(Key, cache_heap:refs(Heap)) of
undefined ->
cache_util:stats(S#cache.stats, {cache, S#cache.name, miss}),
stats(miss, State),
undefined;
{_Heap, Val} ->
?DEBUG("cache ~p: get ~p at cell ~p~n", [S#cache.name, Key, _Heap]),
cache_util:stats(S#cache.stats, {cache, S#cache.name, hit}),
{_Cell, Val} ->
?DEBUG("cache ~p: get ~p at cell ~p~n", [_Name, Key, _Cell]),
stats(hit, State),
Val
end.
%%
%% check if key exists
cache_has(Key, #cache{}=S) ->
case heap_has(Key, cache_heap:refs(S#cache.heap)) of
cache_has(Key, #cache{name=_Name, heap=Heap}) ->
case heap_has(Key, cache_heap:refs(Heap)) of
false ->
false;
_Heap ->
?DEBUG("cache ~p: has ~p at cell ~p~n", [S#cache.name, Key, _Heap]),
?DEBUG("cache ~p: has ~p at cell ~p~n", [_Name, Key, _Heap]),
true
end.
%%
%% check key ttl
cache_ttl(Key, #cache{}=S) ->
case heap_has(Key, cache_heap:refs(S#cache.heap)) of
cache_ttl(Key,#cache{heap=Heap}) ->
case heap_has(Key, cache_heap:refs(Heap)) of
false ->
undefined;
{Expire, _} ->
@ -408,35 +316,32 @@ cache_ttl(Key, #cache{}=S) ->
%%
%%
cache_remove(Key, #cache{}=S) ->
lists:foreach(
fun({_, X}) -> ets:delete(X, Key) end,
cache_heap:refs(S#cache.heap)
),
cache_util:stats(S#cache.stats, {cache, S#cache.name, remove}),
?DEBUG("cache ~p: remove ~p~n", [S#cache.name, Key]),
S.
cache_remove(Key, #cache{name=_Name, heap=Heap}=State) ->
ok = heap_remove(Key, cache_heap:refs(Heap)),
_ = stats(remove, State),
?DEBUG("cache ~p: remove ~p~n", [_Name, Key]),
State.
%%
%% @todo: reduce one write
cache_acc(Key, Val, S)
cache_acc(Key, Val, State)
when is_integer(Val) ->
case cache_get(Key, S) of
case cache_get(Key, State) of
undefined ->
{undefined, cache_put(Key, Val, S)};
{undefined, cache_put(Key, Val, undefined, State)};
X when is_integer(X) ->
{X, cache_put(Key, X + Val, S)};
{X, cache_put(Key, X + Val, undefined, State)};
X when is_tuple(X) ->
{erlang:element(1, X), cache_put(Key, tuple_acc({1, Val}, X), S)};
{erlang:element(1, X), cache_put(Key, tuple_acc({1, Val}, X), undefined, State)};
_ ->
{badarg, S}
{badarg, State}
end;
cache_acc(Key, Val, S) ->
case cache_get(Key, S) of
cache_acc(Key, Val, State) ->
case cache_get(Key, State) of
X when is_tuple(X) ->
{X, cache_put(Key, tuple_acc(Val, X), S)};
{X, cache_put(Key, tuple_acc(Val, X), undefined, State)};
_ ->
{badarg, S}
{badarg, State}
end.
tuple_acc({Pos, Val}, X) ->
@ -450,6 +355,15 @@ tuple_acc(List, X) ->
List
).
%%
%% remove key from heap segments
heap_remove(Key, Heap) ->
lists:foreach(
fun({_, Id}) -> ets:delete(Id, Key) end,
Heap
).
%%
%%
heap_lookup(Key, [{_, Heap} | Tail]) ->
@ -472,3 +386,10 @@ heap_has(Key, [{_, Heap}=X | Tail]) ->
heap_has(_Key, []) ->
false.
%%
%% update statistic
stats(_, #cache{stats = undefined}) ->
ok;
stats(X, #cache{stats = Stats, name = Name}) ->
cache_util:stats(Stats, {cache, Name, X}).

View File

@ -44,22 +44,6 @@ cache_interface_test_() ->
]
}.
% lru_test_() ->
% {
% setup,
% fun cache_init/0,
% fun cache_free/1,
% [
% {"put", fun cache_put/0}
% ,{"has", fun cache_has/0}
% ,{"get", fun cache_get/0}
% ,{"del", fun cache_del/0}
% ,{"acc", fun cache_acc/0}
% ,{"lifecycle 1", {timeout, 10000, fun cache_lc1/0}}
% ]
% }.
%%%----------------------------------------------------------------------------
%%%
%%% factory
@ -103,8 +87,8 @@ put(Pid) ->
[
?_assertMatch(ok, cache:put(Pid, <<"key-1">>, <<"val-1">>))
,?_assertMatch(ok, cache:put(Pid, <<"key-2">>, <<"val-2">>, 5))
,?_assertMatch(ok, cache:put_(Pid, <<"key-3">>, <<"val-3">>))
,?_assertMatch(ok, cache:put_(Pid, <<"key-4">>, <<"val-4">>, 5))
,?_assertMatch(ok, async(cache:put_(Pid, <<"key-3">>, <<"val-3">>)))
,?_assertMatch(ok, async(cache:put_(Pid, <<"key-4">>, <<"val-4">>, 5)))
].
get(Pid) ->
@ -114,7 +98,7 @@ get(Pid) ->
,?_assertMatch(<<"val-1">>, cache:lookup(Pid, <<"key-1">>))
,?_assertMatch(true, cache:has(Pid, <<"key-1">>))
,?_assertMatch(ok, cache:put_(Pid, <<"key-3">>, <<"val-3">>))
,?_assertMatch(ok, async(cache:put_(Pid, <<"key-3">>, <<"val-3">>)))
,?_assertMatch(<<"val-3">>, cache:get(Pid, <<"key-3">>))
,?_assertMatch(<<"val-3">>, cache:lookup(Pid, <<"key-3">>))
,?_assertMatch(true, cache:has(Pid, <<"key-3">>))
@ -124,166 +108,8 @@ get(Pid) ->
,?_assertMatch(false, cache:has(Pid, <<"key-5">>))
].
%% @todo - fix ttl and segment expire time
cache_init() ->
cache:start_link(test, ?CACHE).
cache_free({ok, Pid}) ->
erlang:unlink(Pid),
cache:drop(test).
cache_put() ->
ok = cache:put(test, <<"key">>, <<"val">>).
cache_has() ->
true = cache:has(test, <<"key">>),
false = cache:has(test, <<"yek">>).
cache_get() ->
<<"val">> = cache:get(test, <<"key">>),
undefined = cache:get(test, <<"yek">>).
cache_del() ->
ok = cache:remove(test, <<"key">>),
ok = cache:remove(test, <<"yek">>).
cache_acc() ->
undefined = cache:acc(test, <<"acc">>, 10),
10 = cache:acc(test, <<"acc">>, 10),
20 = cache:get(test, <<"acc">>),
badarg = cache:acc(test, <<"acc">>, [{1, 10}]),
badarg = cache:acc(test, <<"acc1">>,[{1, 10}]),
ok = cache:put(test, <<"acc1">>, {10,20,30,40}),
{10, 20, 30, 40} = cache:acc(test, <<"acc1">>,[{1, 10}, {2, 10}]),
{20, 30, 30, 40} = cache:get(test, <<"acc1">>).
cache_lc1() ->
ok = cache:put(test, key, val),
timer:sleep(1200),
val = cache:get(test, key),
timer:sleep(1200),
val = cache:get(test, key),
timer:sleep(1200),
val = cache:get(test, key),
timer:sleep(3200),
undefined = cache:get(test, key).
% lifecyle_2_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {ttl, 10},
% {evict, 100}
% ]),
% ok = cache:put(test, key, val),
% timer:sleep(6),
% {ok, val} = cache:get(test, key),
% timer:sleep(6),
% {ok, val} = cache:get(test, key),
% timer:sleep(20),
% none = cache:get(test, key),
% cache:stop(test).
% lifecyle_3_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {ttl, 10},
% {evict, 5}
% ]),
% ok = cache:put(test, key1, val1),
% timer:sleep(5),
% ok = cache:put(test, key2, val2),
% timer:sleep(5),
% ok = cache:put(test, key3, val3),
% timer:sleep(5),
% ok = cache:put(test, key4, val4),
% none = cache:get(test, key1),
% none = cache:get(test, key2),
% {ok, val3} = cache:get(test, key3),
% {ok, val4} = cache:get(test, key4),
% cache:stop(test).
% evict_lru_1_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {policy, lru},
% {ttl, 100},
% {evict, 5},
% {size, 10},
% {chunk, 2}
% ]),
% lists:foreach(
% fun(X) -> cache:put(test, X, X) end,
% lists:seq(1, 10)
% ),
% timer:sleep(10),
% {ok, 1} = cache:get(test, 1),
% cache:put(test, key, val),
% timer:sleep(10),
% none = cache:get(test, 2),
% cache:stop(test).
% evict_lru_2_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {policy, lru},
% {ttl, 100},
% {evict, 100},
% {size, 10},
% {chunk, 2}
% ]),
% lists:foreach(
% fun(X) -> cache:put(test, X, X) end,
% lists:seq(1, 10)
% ),
% {ok, 1} = cache:get(test, 1),
% cache:put(test, key, val),
% cache:evict(test),
% none = cache:get(test, 2),
% cache:stop(test).
% evict_mru_1_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {policy, mru},
% {ttl, 100},
% {evict, 5},
% {size, 10},
% {chunk, 2}
% ]),
% lists:foreach(
% fun(X) -> cache:put(test, X, X) end,
% lists:seq(1, 10)
% ),
% timer:sleep(10),
% {ok, 1} = cache:get(test, 1),
% cache:put(test, key, val),
% timer:sleep(10),
% none = cache:get(test, key),
% cache:stop(test).
% evict_mru_2_test() ->
% cache:start(),
% {ok, _} = cache:start_link(test, [
% {policy, mru},
% {ttl, 100},
% {evict, 100},
% {size, 10},
% {chunk, 2}
% ]),
% lists:foreach(
% fun(X) -> cache:put(test, X, X) end,
% lists:seq(1, 10)
% ),
% {ok, 1} = cache:get(test, 1),
% cache:put(test, key, val),
% cache:evict(test),
% none = cache:get(test, key),
% cache:stop(test).
async(Ref) ->
receive
{Ref, X} ->
X
end.