segement heir + cache inc/dec

This commit is contained in:
Dmitry Kolesnikov 2014-01-21 16:52:52 +02:00
parent b3382c75db
commit 3c13aaaad6
3 changed files with 150 additions and 7 deletions

View File

@ -1,7 +1,7 @@
{application, cache,
[
{description, "in-memory cache"},
{vsn, "0.9.1"},
{vsn, "0.10.0"},
{modules, [
cache,
cache_bucket,

View File

@ -34,6 +34,7 @@
-export([
start_link/2,
drop/1,
purge/1,
i/1,
i/2,
heap/2,
@ -47,6 +48,10 @@
ttl/2,
remove/2,
remove_/2,
inc/3,
inc_/3,
dec/3,
dec_/3,
% memecached like interface
set/3,
set/4,
@ -85,6 +90,7 @@
%% {ttl, integer()} - default time-to-live in seconds
%% {quota, integer()} - frequency of quota check in seconds
%% {stats, function() | {Mod, Fun}} - cache statistic aggregate functor
%% {heir, atom() | pid()} - heir of evicted cache segments
-spec(start_link/2 :: (name(), list()) -> {ok, pid()} | {error, any()}).
start_link(Cache, Opts) ->
@ -105,7 +111,25 @@ drop(Cache)
drop(whereis(Cache));
drop(Cache)
when is_pid(Cache) ->
erlang:exit(Cache, shutdown).
gen_server:call(Cache, drop).
%%
%% purge cache
-spec(purge/1 :: (cache()) -> ok).
purge(undefined) ->
ok;
purge({global, Cache}) ->
purge(global:whereis_name(Cache));
purge({Cache, Node}) ->
purge(rpc:call(Node, cache, purge, [Cache]));
purge(Cache)
when is_atom(Cache) ->
purge(whereis(Cache));
purge(Cache)
when is_pid(Cache) ->
gen_server:call(Cache, purge).
%%
%% return cache meta data
@ -194,6 +218,28 @@ remove(Cache, Key) ->
remove_(Cache, Key) ->
gen_server:cast(Cache, {remove, Key}).
%%
%% synchronous cache inc/dec
-spec(inc/3 :: (cache(), key(), integer() | {integer(), integer()}) -> integer() | undefined).
-spec(dec/3 :: (cache(), key(), integer() | {integer(), integer()}) -> integer() | undefined).
inc(Cache, Key, Val) ->
gen_server:call(Cache, {inc, Key, Val}, ?DEF_CACHE_TIMEOUT).
dec(Cache, Key, Val) ->
gen_server:call(Cache, {dec, Key, Val}, ?DEF_CACHE_TIMEOUT).
%%
%% asynchronous cache inc/dec
-spec(inc_/3 :: (cache(), key(), integer() | {integer(), integer()}) -> ok).
-spec(dec_/3 :: (cache(), key(), integer() | {integer(), integer()}) -> ok).
inc_(Cache, Key, Val) ->
gen_server:cast(Cache, {inc, Key, Val}).
dec_(Cache, Key, Val) ->
gen_server:cast(Cache, {dec, Key, Val}).
%%%----------------------------------------------------------------------------
%%%

View File

@ -44,9 +44,10 @@
memory = undefined :: integer(), %% cache memory limit
quota = ?DEF_CACHE_QUOTA :: integer(), %% quota enforcement timer
evict = undefined, %% evict timer
evict = undefined %% evict timer
stats = undefined :: any() %% stats aggregation functor
,stats = undefined :: any() %% stats aggregation functor
,heir = undefined :: pid() %% the heir of evicted cache segment
}).
%% cache segment
@ -93,6 +94,9 @@ init([{quota, X} | Opts], S) ->
init([{stats, X} | Opts], S) ->
init(Opts, S#cache{stats=X});
init([{heir, X} | Opts], S) ->
init(Opts, S#cache{heir=X});
init([_ | Opts], S) ->
init(Opts, S);
@ -108,8 +112,13 @@ init([], S) ->
%%
%%
terminate(_Reason, _S) ->
ok.
terminate(_Reason, S) ->
lists:foreach(
fun(X) ->
destroy_heap(X#heap.id, S#cache.heir)
end,
S#cache.heap
).
%%%----------------------------------------------------------------------------
%%%
@ -138,6 +147,14 @@ handle_call({ttl, Key}, _, S) ->
handle_call({remove, Key}, _, S) ->
{reply, ok, cache_remove(Key, S)};
handle_call({inc, Key, Val}, _, S) ->
{Reply, NS} = cache_inc(Key, Val, S),
{reply, Reply, NS};
handle_call({dec, Key, Val}, _, S) ->
{Reply, NS} = cache_dec(Key, Val, S),
{reply, Reply, NS};
handle_call({add, Key, Val}, _, S) ->
case cache_has(Key, S) of
true ->
@ -207,6 +224,18 @@ handle_call({heap, N}, _, S) ->
{reply, badarg, S}
end;
handle_call(drop, _, S) ->
{stop, normal, ok, S};
handle_call(purge, _, S) ->
lists:foreach(
fun(X) ->
destroy_heap(X#heap.id, S#cache.heir)
end,
S#cache.heap
),
{reply, ok, init_heap(S#cache{heap = []})};
handle_call(_, _, S) ->
{noreply, S}.
@ -221,6 +250,15 @@ handle_cast({put, Key, Val, TTL}, S) ->
handle_cast({remove, Key}, S) ->
{noreply, cache_remove(Key, S)};
handle_cast({inc, Key, Val}, S) ->
{_, NS} = cache_inc(Key, Val, S),
{noreply, NS};
handle_cast({dec, Key, Val}, S) ->
{_, NS} = cache_dec(Key, Val, S),
{noreply, NS};
handle_cast({add, Key, Val}, S) ->
case cache_has(Key, S) of
true ->
@ -437,6 +475,50 @@ cache_remove(Key, #cache{}=S) ->
?DEBUG("cache ~p: remove ~p~n", [S#cache.name, Key]),
S.
%%
%% @todo: reduce one write
cache_inc(Key, {Pos, Val}, S) ->
case cache_get(Key, S) of
X when is_tuple(X) ->
Old = erlang:element(Pos, X),
{Old, cache_put(Key, erlang:setelement(Pos, X, Old + Val), S)};
_ ->
{undefined, S}
end;
cache_inc(Key, Val, S) ->
case cache_get(Key, S) of
undefined ->
{undefined, cache_put(Key, Val, S)};
X when is_integer(X) ->
{X, cache_put(Key, X + Val, S)};
_ ->
{undefined, S}
end.
%%
%%
cache_dec(Key, {Pos, Val}, S) ->
case cache_get(Key, S) of
X when is_tuple(X) ->
Old = erlang:element(Pos, X),
{Old, cache_put(Key, erlang:setelement(Pos, X, Old - Val), S)};
_ ->
{undefined, S}
end;
cache_dec(Key, Val, S) ->
case cache_get(Key, S) of
undefined ->
{undefined, cache_put(Key, - Val, S)};
X when is_integer(X) ->
{X, cache_put(Key, X - Val, S)};
_ ->
{undefined, S}
end.
%%
%%
heap_lookup(Key, [H | Tail]) ->
@ -480,8 +562,8 @@ init_heap(#cache{}=S) ->
free_heap(#cache{}=S) ->
[H | Tail] = lists:reverse(S#cache.heap),
Size = ets:info(H#heap.id, size),
ets:delete(H#heap.id),
cache_util:stats(S#cache.stats, {cache, S#cache.name, evicted}, Size),
destroy_heap(H#heap.id, S#cache.heir),
?DEBUG("cache ~p: free heap ~p~n", [S#cache.name, H#heap.id]),
init_heap(
S#cache{
@ -489,6 +571,21 @@ free_heap(#cache{}=S) ->
}
).
destroy_heap(Id, undefined) ->
ets:delete(Id);
destroy_heap(Id, Heir)
when is_pid(Heir) ->
ets:give_away(Id, Heir, evicted);
destroy_heap(Id, Heir)
when is_atom(Heir) ->
case erlang:whereis(Heir) of
undefined ->
ets:delete(Id);
Pid ->
ets:give_away(Id, Pid, evicted)
end.
%%
%% heap policy check
is_heap_out_of_quota(#heap{}=H) ->