diff --git a/Makefile b/Makefile
index 2fa756d..ecf4e09 100644
--- a/Makefile
+++ b/Makefile
@@ -14,3 +14,35 @@ distclean: clean
doc:
@./rebar doc skip_deps=true
+
+COMBO_PLT = $(HOME)/.rhc_dialyzer_plt
+APPS = kernel stdlib sasl erts eunit
+INCLUDES = -I include -I deps
+
+check_plt: all
+ dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin
+
+build_plt: all
+ dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin
+
+dialyzer: all
+ @echo
+ @echo Use "'make check_plt'" to check PLT prior to using this target.
+ @echo Use "'make build_plt'" to build PLT prior to using this target.
+ @echo
+ @sleep 1
+ dialyzer --verbose -Wno_return --plt $(COMBO_PLT) $(INCLUDES) ebin
+
+typer:
+ typer --plt $(COMBO_PLT) $(INCLUDES) -r src
+
+plt_info:
+ dialyzer --plt $(COMBO_PLT) --plt_info
+
+clean_plt:
+ @echo
+ @echo "Are you sure? It takes time to re-build."
+ @echo Deleting $(COMBO_PLT) in 5 seconds.
+ @echo
+ @sleep 5
+ rm $(COMBO_PLT)
\ No newline at end of file
diff --git a/include/rhc.hrl b/include/rhc.hrl
index 36ee005..6b80ef6 100644
--- a/include/rhc.hrl
+++ b/include/rhc.hrl
@@ -20,9 +20,10 @@
%%
%% -------------------------------------------------------------------
--define(DEFAULT_TIMEOUT, 60000).
+-define(DEFAULT_HTTP_TIMEOUT, 60000).
-record(rhc, {ip,
port,
prefix,
options}).
+
diff --git a/rebar.config b/rebar.config
index 0eef421..c362be9 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,5 +1,5 @@
-{erl_opts, []}.
-
+{erl_opts, [debug_info,
+ warnings_as_errors]}.
{deps,
[
%% ibrowse for doing HTTP requests
diff --git a/src/rhc.erl b/src/rhc.erl
index 9c6a46d..021f6f3 100644
--- a/src/rhc.erl
+++ b/src/rhc.erl
@@ -67,12 +67,14 @@
-include("raw_http.hrl").
-include("rhc.hrl").
+-include_lib("riakc/include/riakc.hrl").
-export_type([rhc/0]).
-opaque rhc() :: #rhc{}.
%% @doc Create a client for connecting to the default port on localhost.
%% @equiv create("127.0.0.1", 8098, "riak", [])
+-spec create() -> rhc().
create() ->
create("127.0.0.1", 8098, "riak", []).
@@ -84,7 +86,7 @@ create() ->
%% Defaults for r, w, dw, rw, and return_body may be passed in
%% the Options list. The client id can also be specified by
%% adding `{client_id, ID}' to the Options list.
-%% @spec create(string(), integer(), string(), Options::list()) -> rhc()
+-spec create(string(), integer(), string(), list()) -> rhc().
create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port),
is_list(Prefix), is_list(Opts0) ->
Opts = case proplists:lookup(client_id, Opts0) of
@@ -98,19 +100,21 @@ create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port),
#rhc{ip=IP, port=Port, prefix=Prefix, options=Opts}.
%% @doc Get the IP this client will connect to.
-%% @spec ip(rhc()) -> string()
+-spec ip(rhc()) -> string().
+
ip(#rhc{ip=IP}) -> IP.
%% @doc Get the Port this client will connect to.
-%% @spec port(rhc()) -> integer()
+-spec port(rhc()) -> integer().
port(#rhc{port=Port}) -> Port.
%% @doc Get the prefix this client will use for object URLs
-%% @spec prefix(rhc()) -> string()
+-spec prefix(rhc()) -> string().
+
prefix(#rhc{prefix=Prefix}) -> Prefix.
%% @doc Ping the server by requesting the "/ping" resource.
-%% @spec ping(rhc()) -> ok|{error, term()}
+-spec ping(rhc()) -> ok | {error, term()}.
ping(Rhc) ->
Url = ping_url(Rhc),
case request(get, Url, ["200","204"], [], [], Rhc) of
@@ -121,13 +125,14 @@ ping(Rhc) ->
end.
%% @doc Get the client ID that this client will use when storing objects.
-%% @spec get_client_id(rhc()) -> {ok, string()}
+
+-spec get_client_id(rhc()) -> {ok, string()}.
get_client_id(Rhc) ->
{ok, client_id(Rhc, [])}.
%% @doc Get some basic information about the server. The proplist returned
%% should include `node' and `server_version' entries.
-%% @spec get_server_info(rhc()) -> {ok, proplist()}|{error, term()}
+-spec get_server_info(rhc()) -> {ok, list()}|{error, term()}.
get_server_info(Rhc) ->
Url = stats_url(Rhc),
case request(get, Url, ["200"], [], [], Rhc) of
@@ -139,7 +144,7 @@ get_server_info(Rhc) ->
end.
%% @doc Get the list of full stats from a /stats call to the server.
-%% @spec get_server_stats(rhc()) -> {ok, proplist()}|{error, term()}
+-spec get_server_stats(rhc()) -> {ok, list()}|{error, term()}.
get_server_stats(Rhc) ->
Url = stats_url(Rhc),
case request(get, Url, ["200"], [], [], Rhc) of
@@ -151,7 +156,9 @@ get_server_stats(Rhc) ->
{error, Error}
end.
-%% @equiv get(Rhc, Bucket, Key, [])
+
+-spec get(rhc(), term(), term()) -> {ok, term()}|{error, term()}.
+
get(Rhc, Bucket, Key) ->
get(Rhc, Bucket, Key, []).
@@ -167,8 +174,7 @@ get(Rhc, Bucket, Key) ->
%%
%% The term in the second position of the error tuple will be
%% `notfound' if the key was not found.
-%% @spec get(rhc(), bucket(), key(), proplist())
-%% -> {ok, riakc_obj()}|{error, term()}
+-spec get(rhc(), {binary(), bucket()}, key(), list()) -> {ok, riakc_obj()}|{error, term()}.
get(Rhc, Bucket, Key, Options) ->
Qs = get_q_params(Rhc, Options),
Url = make_url(Rhc, Bucket, Key, Qs),
@@ -191,7 +197,7 @@ get(Rhc, Bucket, Key, Options) ->
{error, Error}
end.
-%% @equiv put(Rhc, Object, [])
+-spec put(rhc(),riakc_obj()) -> ok | {error, term()} | {ok, riakc_obj()}.
put(Rhc, Object) ->
put(Rhc, Object, []).
@@ -210,8 +216,8 @@ put(Rhc, Object) ->
%% response. `ok' is returned if return_body is false.
%% `{ok, Object}' is returned if return_body is true.
%%
-%% @spec put(rhc(), riakc_obj(), proplist())
-%% -> ok|{ok, riakc_obj()}|{error, term()}
+
+-spec put(rhc(),riakc_obj(),list()) -> ok | {error, term()} | {ok, riakc_obj()}.
put(Rhc, Object, Options) ->
Qs = put_q_params(Rhc, Options),
Bucket = riakc_obj:bucket(Object),
@@ -237,7 +243,6 @@ put(Rhc, Object, Options) ->
%% @doc Increment the counter stored under `bucket', `key'
%% by the given `amount'.
-%% @equiv counter_incr(Rhc, Bucket, Key, Amt, [])
-spec counter_incr(rhc(), binary(), binary(), integer()) -> ok | {ok, integer()}
| {error, term()}.
counter_incr(Rhc, Bucket, Key, Amt) ->
@@ -313,7 +318,7 @@ counter_val(Rhc, Bucket, Key, Options) ->
{error, Error}
end.
-%% @equiv delete(Rhc, Bucket, Key, [])
+-spec delete(rhc(),{binary(), bucket()}, binary()) -> ok | {error, term()}.
delete(Rhc, Bucket, Key) ->
delete(Rhc, Bucket, Key, []).
@@ -326,7 +331,7 @@ delete(Rhc, Bucket, Key) ->
%%
`timeout'
%% The server-side timeout for the write in ms
%%
-%% @spec delete(rhc(), bucket(), key(), proplist()) -> ok|{error, term()}
+-spec delete(rhc(), {binary(), bucket()}, key(), list()) -> ok | {error, term()}.
delete(Rhc, Bucket, Key, Options) ->
Qs = delete_q_params(Rhc, Options),
Url = make_url(Rhc, Bucket, Key, Qs),
@@ -342,39 +347,45 @@ delete(Rhc, Bucket, Key, Options) ->
end.
-%% @equiv delete_obj(Rhc, Obj, [])
+-spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}.
delete_obj(Rhc, Obj) ->
delete_obj(Rhc, Obj, []).
%% @doc Delete the key of the given object, using the contained vector
%% clock if present.
-%% @equiv delete(Rhc, riakc_obj:bucket(Obj), riakc_obj:key(Obj), [{vclock, riakc_obj:vclock(Obj)}|Options])
+-spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}.
delete_obj(Rhc, Obj, Options) ->
Bucket = riakc_obj:bucket(Obj),
Key = riakc_obj:key(Obj),
VClock = riakc_obj:vclock(Obj),
delete(Rhc, Bucket, Key, [{vclock, VClock}|Options]).
+-spec list_buckets(_) -> none().
list_buckets(Rhc) ->
list_buckets(Rhc, undefined).
+-spec list_buckets(rhc(),integer()) -> {error, term()} | {ok,[binary()]}.
list_buckets(Rhc, BucketType) when is_binary(BucketType) ->
list_buckets(Rhc, BucketType, undefined);
list_buckets(Rhc, Timeout) ->
list_buckets(Rhc, undefined, Timeout).
+-spec list_buckets(rhc(), undefined | binary(),integer()) -> {error, term()} | {ok,[binary()]}.
list_buckets(Rhc, BucketType, Timeout) ->
{ok, ReqId} = stream_list_buckets(Rhc, BucketType, Timeout),
rhc_listkeys:wait_for_list(ReqId, Timeout).
+-spec stream_list_buckets(rhc()) -> {error, term()} | {ok,reference()}.
stream_list_buckets(Rhc) ->
stream_list_buckets(Rhc, undefined).
+-spec stream_list_buckets(rhc(),_) -> {error, term()} | {ok,reference()}.
stream_list_buckets(Rhc, BucketType) when is_binary(BucketType) ->
stream_list_buckets(Rhc, BucketType, undefined);
stream_list_buckets(Rhc, Timeout) ->
stream_list_buckets(Rhc, undefined, Timeout).
+-spec stream_list_buckets(rhc(), term(), integer()) -> {error, term()} | {ok, reference()}.
stream_list_buckets(Rhc, BucketType, Timeout) ->
ParamList0 = [{?Q_BUCKETS, ?Q_STREAM},
{?Q_PROPS, ?Q_FALSE}],
@@ -393,16 +404,17 @@ stream_list_buckets(Rhc, BucketType, Timeout) ->
{error, Error} -> {error, Error}
end.
+-spec list_keys(rhc(),_) -> {error,_} | {ok,[binary()]}.
list_keys(Rhc, Bucket) ->
list_keys(Rhc, Bucket, undefined).
%% @doc List the keys in the given bucket.
-%% @spec list_keys(rhc(), bucket(), integer()) -> {ok, [key()]}|{error, term()}
-
+-spec list_keys(rhc(),_,_) -> {error,_} | {ok,[binary()]}.
list_keys(Rhc, Bucket, Timeout) ->
{ok, ReqId} = stream_list_keys(Rhc, Bucket, Timeout),
- rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_TIMEOUT).
+ rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_HTTP_TIMEOUT).
+-spec stream_list_keys(rhc(),_) -> {error,_} | {ok,reference()}.
stream_list_keys(Rhc, Bucket) ->
stream_list_keys(Rhc, Bucket, undefined).
@@ -417,8 +429,7 @@ stream_list_keys(Rhc, Bucket) ->
%% `{error, term()}'
%% an error occurred
%%
-%% @spec stream_list_keys(rhc(), bucket(), integer()) ->
-%% {ok, reference()}|{error, term()}
+-spec stream_list_keys(rhc(),_,_) -> {error, term()} | {ok,reference()}.
stream_list_keys(Rhc, Bucket, Timeout) ->
ParamList0 = [{?Q_KEYS, ?Q_STREAM},
{?Q_PROPS, ?Q_FALSE}],
@@ -438,27 +449,23 @@ stream_list_keys(Rhc, Bucket, Timeout) ->
end.
%% @doc Query a secondary index.
-%% @spec get_index(rhc(), bucket(), index(), index_query()) ->
-%% {ok, index_results()} | {error, term()}
+-spec get_index(rhc(), bucket(), term(), term()) -> {error, term()} | {ok, index_results()}.
get_index(Rhc, Bucket, Index, Query) ->
get_index(Rhc, Bucket, Index, Query, []).
%% @doc Query a secondary index.
-%% @spec get_index(rhc(), bucket(), index(), index_query(), index_options()) ->
-%% {ok, index_results()} | {error, term()}
+-spec get_index(rhc(), bucket(), term(), term(), list()) -> {ok,index_results()} | {error, term()}.
get_index(Rhc, Bucket, Index, Query, Options) ->
{ok, ReqId} = stream_index(Rhc, Bucket, Index, Query, Options),
rhc_index:wait_for_index(ReqId).
%% @doc Query a secondary index, streaming the results back.
-%% @spec stream_index(rhc(), bucket(), index(), index_query()) ->
-%% {ok, reference()} | {error, term()}
+-spec stream_index(rhc(), bucket(), term(), term()) -> {ok,reference()} | {error,term()}.
stream_index(Rhc, Bucket, Index, Query) ->
stream_index(Rhc, Bucket, Index, Query, []).
%% @doc Query a secondary index, streaming the results back.
-%% @spec stream_index(rhc(), bucket(), index(), index_query(), index_options()) ->
-%% {ok, reference()} | {error, term()}
+-spec stream_index(rhc(),_,_,_,[{atom(),_}]) -> {error,_} | {ok,reference()}.
stream_index(Rhc, Bucket, Index, Query, Options) ->
ParamList = rhc_index:query_options([{stream, true}|Options]),
Url = index_url(Rhc, Bucket, Index, Query, ParamList),
@@ -473,7 +480,7 @@ stream_index(Rhc, Bucket, Index, Query, Options) ->
end.
%% @doc Get the properties of the given bucket.
-%% @spec get_bucket(rhc(), bucket()) -> {ok, proplist()}|{error, term()}
+-spec get_bucket(rhc(), {binary(), bucket()}) -> {ok, [proplists:property()]} | {error, term()}.
get_bucket(Rhc, Bucket) ->
Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE},
{?Q_KEYS, ?Q_FALSE}]),
@@ -496,7 +503,7 @@ get_bucket(Rhc, Bucket) ->
%% Whether or not this bucket should allow siblings to
%% be created for its keys
%%
-%% @spec set_bucket(rhc(), bucket(), proplist()) -> ok|{error, term()}
+-spec set_bucket(rhc(),_,list()) -> ok | {error,_}.
set_bucket(Rhc, Bucket, Props0) ->
Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]),
Headers = [{"Content-Type", "application/json"}],
@@ -507,6 +514,7 @@ set_bucket(Rhc, Bucket, Props0) ->
{error, Error} -> {error, Error}
end.
+-spec reset_bucket(rhc(),_) -> ok | {error,_}.
reset_bucket(Rhc, Bucket) ->
Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]),
case request(delete, Url, ["204"], [], [], Rhc) of
@@ -516,7 +524,7 @@ reset_bucket(Rhc, Bucket) ->
%% @doc Get the properties of the given bucket.
-%% @spec get_bucket_type (rhc(), bucket()) -> {ok, proplist()}|{error, term()}
+-spec get_bucket_type(rhc(),_) -> {error,_} | {ok,[{atom(),_}]}.
get_bucket_type(Rhc, Type) ->
Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE},
{?Q_KEYS, ?Q_FALSE}]),
@@ -530,8 +538,7 @@ get_bucket_type(Rhc, Type) ->
end.
%% @doc Set the properties of the given bucket type.
-%%
-%% @spec set_bucket_type(rhc(), bucket(), proplist()) -> ok|{error, term()}
+-spec set_bucket_type(rhc(),_,list()) -> ok | {error,_}.
set_bucket_type(Rhc, Type, Props0) ->
Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]),
Headers = [{"Content-Type", "application/json"}],
@@ -542,6 +549,7 @@ set_bucket_type(Rhc, Type, Props0) ->
{error, Error} -> {error, Error}
end.
+-spec reset_bucket_type(rhc(),_) -> ok | {error,_}.
reset_bucket_type(Rhc, Type) ->
Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]),
case request(delete, Url, ["204"], [], [], Rhc) of
@@ -549,23 +557,23 @@ reset_bucket_type(Rhc, Type) ->
{error, Error} -> {error, Error}
end.
-%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_TIMEOUT)
+%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_HTTP_TIMEOUT)
+-spec mapred(rhc(), binary(), term()) -> {ok, [{term(), term()}]} | {error, term()}.
mapred(Rhc, Inputs, Query) ->
- mapred(Rhc, Inputs, Query, ?DEFAULT_TIMEOUT).
+ mapred(Rhc, Inputs, Query, ?DEFAULT_HTTP_TIMEOUT).
%% @doc Execute a map/reduce query. See {@link
%% rhc_mapred:encode_mapred/2} for details of the allowed formats
%% for `Inputs' and `Query'.
-%% @spec mapred(rhc(), rhc_mapred:map_input(),
-%% [rhc_mapred:query_part()], integer())
-%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()}
+-spec mapred(rhc(), rhc_mapred:map_input(), [rhc_mapred:query_part()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}.
mapred(Rhc, Inputs, Query, Timeout) ->
{ok, ReqId} = mapred_stream(Rhc, Inputs, Query, self(), Timeout),
rhc_mapred:wait_for_mapred(ReqId, Timeout).
-%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_TIMEOUT)
+%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_HTTP_TIMEOUT)
+-spec mapred_stream(rhc(), binary(), term(), term()) -> {ok,reference()} | {error, term()}.
mapred_stream(Rhc, Inputs, Query, ClientPid) ->
- mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_TIMEOUT).
+ mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_HTTP_TIMEOUT).
%% @doc Stream map/reduce results to a Pid. Messages sent to the Pid
%% will be of the form `{reference(), message()}',
@@ -580,9 +588,9 @@ mapred_stream(Rhc, Inputs, Query, ClientPid) ->
%% `{error, term()}'
%% an error occurred
%%
-%% @spec mapred_stream(rhc(), rhc_mapred:mapred_input(),
-%% [rhc_mapred:query_phase()], pid(), integer())
-%% -> {ok, reference()}|{error, term()}
+-spec mapred_stream(rhc(), rhc_mapred:mapred_input(),
+ [rhc_mapred:query_phase()], pid(), integer()) ->
+ {ok, reference()}|{error, term()}.
mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) ->
Url = mapred_url(Rhc),
StartRef = make_ref(),
@@ -598,13 +606,12 @@ mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) ->
%% @doc Execute a search query. This command will return an error
%% unless executed against a Riak Search cluster.
-%% @spec search(rhc(), bucket(), string()) ->
-%% {ok, [rhc_mapred:phase_result()]}|{error, term()}
+-spec search(rhc(), bucket(), string()) -> {ok,[rhc_mapred:phase_result()]} | {error,term}.
search(Rhc, Bucket, SearchQuery) ->
%% Run a Map/Reduce operation using reduce_identity to get a list
%% of BKeys.
IdentityQuery = [{reduce, {modfun, riak_kv_mapreduce, reduce_identity}, none, true}],
- case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_TIMEOUT) of
+ case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_HTTP_TIMEOUT) of
{ok, [{_, Results}]} ->
%% Unwrap the results.
{ok, Results};
@@ -615,42 +622,40 @@ search(Rhc, Bucket, SearchQuery) ->
%% query. See {@link rhc_mapred:encode_mapred/2} for details of
%% the allowed formats for `MRQuery'. This command will return an error
%% unless executed against a Riak Search cluster.
-%% @spec search(rhc(), bucket(), string(),
-%% [rhc_mapred:query_part()], integer()) ->
-%% {ok, [rhc_mapred:phase_result()]}|{error, term()}
+-spec search(rhc(), bucket(), string(),[rhc_mapred:query_part()], integer()) ->
+ {ok, [rhc_mapred:phase_result()]} | {error,term()}.
search(Rhc, Bucket, SearchQuery, MRQuery, Timeout) ->
Inputs = {modfun, riak_search, mapred_search, [Bucket, SearchQuery]},
mapred(Rhc, Inputs, MRQuery, Timeout).
-%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_TIMEOUT)
+%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_HTTP_TIMEOUT)
+-spec mapred_bucket(rhc(), bucket(), term()) -> {ok, [rhc_mapred:phase_result()]} | {error,term()}.
mapred_bucket(Rhc, Bucket, Query) ->
- mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_TIMEOUT).
+ mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_HTTP_TIMEOUT).
%% @doc Execute a map/reduce query over all keys in the given bucket.
-%% @spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()],
-%% integer())
-%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()}
+-spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}.
mapred_bucket(Rhc, Bucket, Query, Timeout) ->
{ok, ReqId} = mapred_bucket_stream(Rhc, Bucket, Query, self(), Timeout),
rhc_mapred:wait_for_mapred(ReqId, Timeout).
%% @doc Stream map/reduce results over all keys in a bucket to a Pid.
%% Similar to {@link mapred_stream/5}
-%% @spec mapred_bucket_stream(rhc(), bucket(),
-%% [rhc_mapred:query_phase()], pid(), integer())
-%% -> {ok, reference()}|{error, term()}
+-spec mapred_bucket_stream(rhc(), bucket(),
+ [rhc_mapred:query_phase()], pid(), integer())
+ -> {ok, reference()}|{error, term()}.
mapred_bucket_stream(Rhc, Bucket, Query, ClientPid, Timeout) ->
mapred_stream(Rhc, Bucket, Query, ClientPid, Timeout).
%% @doc Fetches the representation of a convergent datatype from Riak.
--spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary()) ->
+-spec fetch_type(rhc(), {binary(), bucket()}, key()) ->
{ok, riakc_datatype:datatype()} | {error, term()}.
fetch_type(Rhc, BucketAndType, Key) ->
fetch_type(Rhc, BucketAndType, Key, []).
%% @doc Fetches the representation of a convergent datatype from Riak,
%% using the given request options.
--spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) ->
+-spec fetch_type(rhc(), {binary(), bucket()}, key(), [proplists:property()]) ->
{ok, riakc_datatype:datatype()} | {error, term()}.
fetch_type(Rhc, BucketAndType, Key, Options) ->
Query = fetch_type_q_params(Rhc, Options),
@@ -664,17 +669,18 @@ fetch_type(Rhc, BucketAndType, Key, Options) ->
%% @doc Updates the convergent datatype in Riak with local
%% modifications stored in the container type.
--spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(),
+-spec update_type(rhc(), {binary(), bucket()}, key(),
Update::riakc_datatype:update(term())) ->
- ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} |
- {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}.
+ ok | {ok, key()} | {ok, riakc_datatype:datatype()} |
+ {ok, key(), riakc_datatype:datatype()} | {error, term()}.
+
update_type(Rhc, BucketAndType, Key, Update) ->
update_type(Rhc, BucketAndType, Key, Update, []).
--spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(),
+-spec update_type(rhc(), {binary(), bucket()}, key(),
Update::riakc_datatype:update(term()), [proplists:property()]) ->
- ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} |
- {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}.
+ ok | {ok, key()} | {ok, riakc_datatype:datatype()} |
+ {ok, key(), riakc_datatype:datatype()} | {error, term()}.
update_type(_Rhc, _BucketAndType, _Key, undefined, _Options) ->
{error, unmodified};
update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) ->
@@ -707,7 +713,7 @@ update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) ->
%% updates the datatype in Riak. If an existing value is not found,
%% but you want the updates to apply anyway, use the 'create' option.
-spec modify_type(rhc(), fun((riakc_datatype:datatype()) -> riakc_datatype:datatype()),
- {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) ->
+ {BucketType::binary(), bucket()}, key(), [proplists:property()]) ->
ok | {ok, riakc_datatype:datatype()} | {error, term()}.
modify_type(Rhc, Fun, BucketAndType, Key, Options) ->
Create = proplists:get_value(create, Options, true),
@@ -730,7 +736,7 @@ modify_type(Rhc, Fun, BucketAndType, Key, Options) ->
%% @doc Get the client ID to use, given the passed options and client.
%% Choose the client ID in Options before the one in the client.
-%% @spec client_id(rhc(), proplist()) -> client_id()
+-spec client_id(rhc(), list()) -> any().
client_id(#rhc{options=RhcOptions}, Options) ->
case proplists:get_value(client_id, Options) of
undefined ->
@@ -740,7 +746,7 @@ client_id(#rhc{options=RhcOptions}, Options) ->
end.
%% @doc Generate a random client ID.
-%% @spec random_client_id() -> client_id()
+-spec random_client_id() -> string().
random_client_id() ->
{{Y,Mo,D},{H,Mi,S}} = erlang:universaltime(),
{_,_,NowPart} = now(),
@@ -748,7 +754,7 @@ random_client_id() ->
base64:encode_to_string(<>).
%% @doc Assemble the root URL for the given client
-%% @spec root_url(rhc()) -> iolist()
+-spec root_url(rhc()) -> iolist().
root_url(#rhc{ip=Ip, port=Port, options=Opts}) ->
Proto = case proplists:get_value(is_ssl, Opts) of
true ->
@@ -759,21 +765,23 @@ root_url(#rhc{ip=Ip, port=Port, options=Opts}) ->
[Proto, "://",Ip,":",integer_to_list(Port),"/"].
%% @doc Assemble the URL for the map/reduce resource
-%% @spec mapred_url(rhc()) -> iolist()
+-spec mapred_url(rhc()) -> iolist().
mapred_url(Rhc) ->
binary_to_list(iolist_to_binary([root_url(Rhc), "mapred/?chunked=true"])).
%% @doc Assemble the URL for the ping resource
-%% @spec ping_url(rhc()) -> iolist()
+-spec ping_url(rhc()) -> iolist().
ping_url(Rhc) ->
binary_to_list(iolist_to_binary([root_url(Rhc), "ping/"])).
%% @doc Assemble the URL for the stats resource
-%% @spec stats_url(rhc()) -> iolist()
+-spec stats_url(rhc()) -> iolist().
stats_url(Rhc) ->
binary_to_list(iolist_to_binary([root_url(Rhc), "stats/"])).
%% @doc Assemble the URL for the 2I resource
+-spec index_url(rhc(),{binary(), bucket()}, term(), term(), list()) ->
+ list() | {error, term()} | {incomplete, list(), binary()}.
index_url(Rhc, BucketAndType, Index, Query, Params) ->
{Type, Bucket} = extract_bucket_type(BucketAndType),
QuerySegments = index_query_segments(Query),
@@ -786,6 +794,7 @@ index_url(Rhc, BucketAndType, Index, Query, Params) ->
[ ["?", mochiweb_util:urlencode(Params)] || Params =/= []]]).
+-spec index_query_segments(_) -> [binary() | string()].
index_query_segments(B) when is_binary(B) ->
[ B ];
index_query_segments(I) when is_integer(I) ->
@@ -798,6 +807,9 @@ index_query_segments({I1, I2}) when is_integer(I1),
[ integer_to_list(I1), integer_to_list(I2) ];
index_query_segments(_) -> [].
+-spec index_name({binary_index, term()} |
+ {integer_index, term()} |
+ term()) -> term().
index_name({binary_index, B}) ->
[B, "_bin"];
index_name({integer_index, I}) ->
@@ -805,9 +817,8 @@ index_name({integer_index, I}) ->
index_name(Idx) -> Idx.
-
%% @doc Assemble the URL for the given bucket and key
-%% @spec make_url(rhc(), bucket(), key(), proplist()) -> iolist()
+-spec make_url(rhc(), {binary(),bucket() | undefined}, key() | undefined, list()) -> iolist().
make_url(Rhc=#rhc{}, BucketAndType, Key, Query) ->
{Type, Bucket} = extract_bucket_type(BucketAndType),
{IsKeys, IsProps, IsBuckets} = detect_bucket_flags(Query),
@@ -832,6 +843,7 @@ make_counter_url(Rhc, Bucket, Key, Query) ->
<<"buckets">>, "/", Bucket, "/", <<"counters">>, "/", Key, "?",
[ [mochiweb_util:urlencode(Query)] || Query =/= []]])).
+-spec make_datatype_url(rhc(),_, key(),list()) -> string() | {error, list(), iolist()} | {incomplete, list(), binary()}.
make_datatype_url(Rhc, BucketAndType, Key, Query) ->
case extract_bucket_type(BucketAndType) of
{undefined, _B} ->
@@ -846,6 +858,7 @@ make_datatype_url(Rhc, BucketAndType, Key, Query) ->
end.
%% @doc send an ibrowse request
+-spec request(atom(), term(), list(), list(), term(), rhc()) -> any().
request(Method, Url, Expect, Headers, Body, Rhc) ->
AuthHeader = get_auth_header(Rhc#rhc.options),
SSLOptions = get_ssl_options(Rhc#rhc.options),
@@ -862,6 +875,7 @@ request(Method, Url, Expect, Headers, Body, Rhc) ->
end.
%% @doc stream an ibrowse request
+-spec request_stream(pid(), atom(), string(), list(), term(), rhc()) -> any().
request_stream(Pid, Method, Url, Headers, Body, Rhc) ->
AuthHeader = get_auth_header(Rhc#rhc.options),
SSLOptions = get_ssl_options(Rhc#rhc.options),
@@ -875,17 +889,20 @@ request_stream(Pid, Method, Url, Headers, Body, Rhc) ->
end.
%% @doc Get the default options for the given client
-%% @spec options(rhc()) -> proplist()
+
+-spec options(rhc()) -> any().
options(#rhc{options=Options}) ->
Options.
%% @doc Extract the list of query parameters to use for a GET
-%% @spec get_q_params(rhc(), proplist()) -> proplist()
+
+-spec get_q_params(rhc(),list()) -> list().
get_q_params(Rhc, Options) ->
options_list([r,pr,timeout], Options ++ options(Rhc)).
%% @doc Extract the list of query parameters to use for a PUT
-%% @spec put_q_params(rhc(), proplist()) -> proplist()
+
+-spec put_q_params(rhc(),list()) -> list().
put_q_params(Rhc, Options) ->
options_list([r,w,dw,pr,pw,timeout,asis,{return_body,"returnbody"}],
Options ++ options(Rhc)).
@@ -897,24 +914,28 @@ counter_q_params(Rhc, Options) ->
options_list([r, pr, w, pw, dw, returnvalue, basic_quorum, notfound_ok], Options ++ options(Rhc)).
%% @doc Extract the list of query parameters to use for a DELETE
-%% @spec delete_q_params(rhc(), proplist()) -> proplist()
+
+-spec delete_q_params(rhc(),list()) -> list().
delete_q_params(Rhc, Options) ->
options_list([r,w,dw,pr,pw,rw,timeout], Options ++ options(Rhc)).
+-spec fetch_type_q_params(rhc(),list()) -> list().
fetch_type_q_params(Rhc, Options) ->
options_list([r,pr,basic_quorum,notfound_ok,timeout,include_context], Options ++ options(Rhc)).
+-spec update_type_q_params(rhc(),list()) -> list().
update_type_q_params(Rhc, Options) ->
options_list([r,w,dw,pr,pw,basic_quorum,notfound_ok,timeout,include_context,{return_body, "returnbody"}],
Options ++ options(Rhc)).
%% @doc Extract the options for the given `Keys' from the possible
%% list of `Options'.
-%% @spec options_list([Key::atom()|{Key::atom(),Alias::string()}],
-%% proplist()) -> proplist()
+
+-spec options_list(list(),_) -> list().
options_list(Keys, Options) ->
options_list(Keys, Options, []).
+-spec options_list(list(), [proplist:property()], list()) -> list().
options_list([K|Rest], Options, Acc) ->
{Key,Alias} = case K of
{_, _} -> K;
@@ -930,12 +951,15 @@ options_list([], _, Acc) ->
%% @doc Convert a stats-resource response to an erlang-term server
%% information proplist.
+-spec erlify_server_info(list()) -> [{node,_} | {server_version,_}].
erlify_server_info(Props) ->
lists:flatten([ erlify_server_info(K, V) || {K, V} <- Props ]).
+-spec erlify_server_info(_,_) -> [] | {node, term()} | {server_version, term()}.
erlify_server_info(<<"nodename">>, Name) -> {node, Name};
erlify_server_info(<<"riak_kv_version">>, Vsn) -> {server_version, Vsn};
erlify_server_info(_Ignore, _) -> [].
+-spec get_auth_header(list()) -> [{string(), string()}].
get_auth_header(Options) ->
case lists:keyfind(credentials, 1, Options) of
{credentials, User, Password} ->
@@ -946,6 +970,7 @@ get_auth_header(Options) ->
[]
end.
+-spec get_ssl_options([proplists:property()]) -> [{is_ssl,true} | {ssl_options,list()}].
get_ssl_options(Options) ->
case proplists:get_value(is_ssl, Options) of
true ->
@@ -959,6 +984,8 @@ get_ssl_options(Options) ->
[]
end.
+-spec extract_bucket_type({binary(), bucket()} | bucket()) ->
+ {undefined | binary() , bucket()}.
extract_bucket_type({<<"default">>, B}) ->
{undefined, B};
extract_bucket_type({T,B}) ->
@@ -966,6 +993,7 @@ extract_bucket_type({T,B}) ->
extract_bucket_type(B) ->
{undefined, B}.
+-spec detect_bucket_flags(list()) -> {false,boolean(),boolean()} | {true,boolean(),boolean()}.
detect_bucket_flags(Query) ->
{proplists:get_value(?Q_KEYS, Query, ?Q_FALSE) =/= ?Q_FALSE,
proplists:get_value(?Q_PROPS, Query, ?Q_FALSE) =/= ?Q_FALSE,
diff --git a/src/rhc_bucket.erl b/src/rhc_bucket.erl
index 1b074e6..98ef0ec 100644
--- a/src/rhc_bucket.erl
+++ b/src/rhc_bucket.erl
@@ -29,8 +29,10 @@
-include("raw_http.hrl").
+-spec erlify_props([any()]) -> [{atom(),_}].
erlify_props(Props) ->
lists:flatten([ erlify_prop(K, V) || {K, V} <- Props ]).
+-spec erlify_prop(_,_) -> [] | {atom(),_}.
erlify_prop(?JSON_ALLOW_MULT, AM) -> {allow_mult, AM};
erlify_prop(?JSON_BACKEND, B) -> {backend, B};
erlify_prop(?JSON_BASIC_Q, B) -> {basic_quorum, B};
@@ -55,12 +57,14 @@ erlify_prop(?JSON_W, W) -> {w, erlify_quorum(W)};
erlify_prop(?JSON_YOUNG_VC, I) -> {young_vclock, I};
erlify_prop(_Ignore, _) -> [].
+-spec erlify_quorum(_) -> 'all' | 'one' | 'quorum' | 'undefined' | integer().
erlify_quorum(?JSON_ALL) -> all;
erlify_quorum(?JSON_QUORUM) -> quorum;
erlify_quorum(?JSON_ONE) -> one;
erlify_quorum(I) when is_integer(I) -> I;
erlify_quorum(_) -> undefined.
+-spec erlify_repl(_) -> 'false' | 'fullsync' | 'realtime' | 'true' | 'undefined'.
erlify_repl(?JSON_REALTIME) -> realtime;
erlify_repl(?JSON_FULLSYNC) -> fullsync;
erlify_repl(?JSON_BOTH) -> true; %% both is equivalent to true, but only works in 1.2+
@@ -68,6 +72,7 @@ erlify_repl(true) -> true;
erlify_repl(false) -> false;
erlify_repl(_) -> undefined.
+-spec erlify_chash({'struct',[{<<_:24>>,binary()},...]}) -> {atom(),atom()}.
erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) ->
try
{binary_to_existing_atom(Mod, utf8), binary_to_existing_atom(Fun, utf8)}
@@ -77,12 +82,15 @@ erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) ->
{binary_to_atom(Mod, utf8), binary_to_atom(Fun, utf8)}
end.
+-spec erlify_linkfun({'struct',[{<<_:24>>,binary()},...]}) -> {'modfun',atom(),atom()}.
erlify_linkfun(Struct) ->
{Mod, Fun} = erlify_chash(Struct),
{modfun, Mod, Fun}.
+-spec httpify_props(list()) -> [{<<_:8,_:_*8>>,_}].
httpify_props(Props) ->
lists:flatten([ httpify_prop(K, V) || {K, V} <- Props ]).
+-spec httpify_prop(_,_) -> [] | {<<_:8,_:_*8>>,_}.
httpify_prop(allow_mult, AM) -> {?JSON_ALLOW_MULT, AM};
httpify_prop(backend, B) -> {?JSON_BACKEND, B};
httpify_prop(basic_quorum, B) -> {?JSON_BASIC_Q, B};
@@ -107,6 +115,7 @@ httpify_prop(w, Q) -> {?JSON_W, Q};
httpify_prop(young_vclock, VC) -> {?JSON_YOUNG_VC, VC};
httpify_prop(_Ignore, _) -> [].
+-spec httpify_modfun({atom(),atom()} | {'modfun',atom(),atom()}) -> {'struct',[{_,_},...]}.
httpify_modfun({modfun, M, F}) ->
httpify_modfun({M, F});
httpify_modfun({M, F}) ->
diff --git a/src/rhc_dt.erl b/src/rhc_dt.erl
index b72bf74..b5b145d 100644
--- a/src/rhc_dt.erl
+++ b/src/rhc_dt.erl
@@ -31,6 +31,7 @@
-define(FIELD_PATTERN, "^(.*)_(counter|set|register|flag|map)$").
+-spec datatype_from_json({'struct',[any()]}) -> any().
datatype_from_json({struct, Props}) ->
Value = proplists:get_value(<<"value">>, Props),
Type = binary_to_existing_atom(proplists:get_value(<<"type">>, Props), utf8),
@@ -38,6 +39,7 @@ datatype_from_json({struct, Props}) ->
Mod = riakc_datatype:module(Type),
Mod:new(decode_value(Type, Value), Context).
+-spec decode_value('counter' | 'flag' | 'map' | 'register' | 'set',_) -> any().
decode_value(counter, Value) -> Value;
decode_value(set, Value) -> Value;
decode_value(flag, Value) -> Value;
@@ -48,14 +50,17 @@ decode_value(map, {struct, Fields}) ->
{{Name,Type}, decode_value(Type, Value)}
end || {Field, Value} <- Fields ].
+-spec field_from_json(binary()) -> {binary() | [binary() | string() | char() | {integer(),integer()} | {'error',[any()],binary()} | {'incomplete',[any()],binary()}] | {integer(),integer()} | {'error',string(),binary()} | {'incomplete',string(),binary()},atom()}.
field_from_json(Bin) when is_binary(Bin) ->
{match, [Name, BinType]} = re:run(Bin, ?FIELD_PATTERN, [anchored, {capture, all_but_first, binary}]),
{Name, binary_to_existing_atom(BinType, utf8)}.
+-spec field_to_json({binary(),atom()}) -> <<_:8,_:_*8>>.
field_to_json({Name, Type}) when is_binary(Name), is_atom(Type) ->
BinType = atom_to_binary(Type, utf8),
<>.
+-spec decode_error(_,{'ok',_,_,_}) -> any().
decode_error(fetch, {ok, "404", Headers, Body}) ->
case proplists:get_value("Content-Type", Headers) of
"application/json" ->
@@ -75,6 +80,7 @@ decode_error(_, {ok, "403", _, Body}) ->
decode_error(_, {ok, _, _, Body}) ->
Body.
+-spec encode_update_request('counter' | 'flag' | 'map' | 'register' | 'set', term(), term()) -> binary() | {'struct',list()}.
encode_update_request(register, {assign, Bin}, _Context) ->
{struct, [{<<"assign">>, Bin}]};
encode_update_request(flag, Atom, _Context) ->
@@ -89,6 +95,7 @@ encode_update_request(map, {update, Ops}, Context) ->
{struct, orddict:to_list(lists:foldl(fun encode_map_op/2, orddict:new(), Ops)) ++
include_context(Context)}.
+-spec encode_map_op({'add',{binary(),atom()}} | {'remove',{binary(),atom()}} | {'update',{binary(),'counter' | 'flag' | 'map' | 'register' | 'set'},_},[{_,_}]) -> [{_,_},...].
encode_map_op({add, Entry}, Ops) ->
orddict:append(add, field_to_json(Entry), Ops);
encode_map_op({remove, Entry}, Ops) ->
@@ -103,6 +110,7 @@ encode_map_op({update, {_Key,Type}=Field, Op}, Ops) ->
orddict:store(update, {struct, [Update]}, Ops)
end.
+-spec include_context(undefined | binary() | term()) -> [{<<_:56>>,_}].
include_context(undefined) -> [];
include_context(<<>>) -> [];
include_context(Bin) -> [{<<"context">>, Bin}].
diff --git a/src/rhc_index.erl b/src/rhc_index.erl
index 06db9ca..e4497b4 100644
--- a/src/rhc_index.erl
+++ b/src/rhc_index.erl
@@ -38,6 +38,7 @@
query_options(Options) ->
lists:flatmap(fun query_option/1, Options).
+-spec query_option(_) -> [{[1..255,...], list()}].
query_option({timeout, N}) when is_integer(N) ->
[{?Q_TIMEOUT, integer_to_list(N)}];
query_option({stream, B}) when is_boolean(B) ->
@@ -54,10 +55,11 @@ query_option(_) ->
[].
%% @doc Collects 2i query results on behalf of the caller.
--spec wait_for_index(reference()) -> {ok, ?INDEX_RESULTS{}} | {error, term()}.
+-spec wait_for_index(reference()) -> {ok, index_results()} | {error, term()}.
wait_for_index(ReqId) ->
wait_for_index(ReqId, []).
+-spec wait_for_index(_,[index_stream_result()]) -> {'error',_} | {'ok', index_results()}.
wait_for_index(ReqId, Acc) ->
receive
{ReqId, {done, Continuation}} ->
@@ -70,12 +72,14 @@ wait_for_index(ReqId, Acc) ->
wait_for_index(ReqId, [Res|Acc])
end.
+-spec collect_results([index_stream_result()],'undefined' | binary()) -> index_results().
collect_results(Acc, Continuation) ->
lists:foldl(fun merge_index_results/2,
?INDEX_RESULTS{keys=[],
terms=[],
continuation=Continuation}, Acc).
+-spec merge_index_results(index_stream_result(),index_results()) -> index_results().
merge_index_results(?INDEX_STREAM_RESULT{keys=KL},
?INDEX_RESULTS{keys=K0}=Acc) when is_list(KL) ->
Acc?INDEX_RESULTS{keys=KL++K0};
@@ -92,6 +96,7 @@ index_acceptor(Pid, PidRef) ->
index_acceptor(Pid, PidRef, IbrowseRef)
end.
+-spec index_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error','timeout' | {_,_}}}.
index_acceptor(Pid, PidRef, IBRef) ->
receive
{ibrowse_async_headers, IBRef, Status, Headers} ->
@@ -113,6 +118,7 @@ index_acceptor(Pid, PidRef, IBRef) ->
%% @doc Receives multipart chunks from webmachine_multipart and parses
%% them into results that can be sent to Pid.
%% @private
+-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()}, term(), 'done_parts' | term()) -> {term(),'done'}.
stream_parts_acceptor(Pid, PidRef, done_parts) ->
Pid ! {PidRef, done};
stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) ->
@@ -127,6 +133,7 @@ stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) ->
%% @doc Sends keys or terms to the Pid if they are present in the
%% result, otherwise sends nothing.
%% @private
+-spec maybe_send_results(term(), term(), term(), term()) -> 'ok' | {_,index_stream_result()}.
maybe_send_results(_Pid, _PidRef, undefined, undefined) -> ok;
maybe_send_results(Pid, PidRef, Keys, Results) ->
Pid ! {PidRef, ?INDEX_STREAM_RESULT{keys=Keys,
@@ -135,6 +142,7 @@ maybe_send_results(Pid, PidRef, Keys, Results) ->
%% @doc Sends the continuation to Pid if it is present in the result,
%% otherwise sends nothing.
%% @private
+-spec maybe_send_continuation(_,_,_) -> 'ok' | {_,{'done',_}}.
maybe_send_continuation(_Pid, _PidRef, undefined) -> ok;
maybe_send_continuation(Pid, PidRef, Continuation) ->
Pid ! {PidRef, {done, Continuation}}.
@@ -142,6 +150,7 @@ maybe_send_continuation(Pid, PidRef, Continuation) ->
%% @doc "next" fun for the webmachine_multipart streamer - waits for
%% an ibrowse message, and then returns it to the streamer for processing
%% @private
+-spec stream_parts_helper(_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}).
stream_parts_helper(Pid, PidRef, IbrowseRef, First) ->
fun() ->
receive
diff --git a/src/rhc_listkeys.erl b/src/rhc_listkeys.erl
index f64aeef..7f5418a 100644
--- a/src/rhc_listkeys.erl
+++ b/src/rhc_listkeys.erl
@@ -30,6 +30,7 @@
-include("raw_http.hrl").
-include("rhc.hrl").
+-include_lib("riakc/include/riakc.hrl").
-record(parse_state, {buffer=[], %% unused characters in reverse order
brace=0, %% depth of braces in current partial
@@ -40,10 +41,13 @@
%% @doc Collect all keylist results, and provide them as one list
%% instead of streaming to a Pid.
%% @spec wait_for_list(term(), integer()) ->
-%% {ok, [key()]}|{error, term()}
+%% {ok, [key()]}|{error, term()}
+-spec wait_for_list(term(), integer()) -> {ok, [key()]}|{error, term()}.
+
wait_for_list(ReqId, Timeout) ->
wait_for_list(ReqId,Timeout,[]).
%% @private
+-spec wait_for_list(_,_,[any()]) -> {'error',_} | {'ok',[any()]}.
wait_for_list(ReqId, _Timeout0, Acc) ->
receive
{ReqId, done} ->
@@ -56,6 +60,7 @@ wait_for_list(ReqId, _Timeout0, Acc) ->
%% @doc first stage of ibrowse response handling - just waits to be
%% told what ibrowse request ID to expect
+-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error',_}}.
list_acceptor(Pid, PidRef, Type) ->
receive
{ibrowse_req_id, PidRef, IbrowseRef} ->
@@ -64,6 +69,7 @@ list_acceptor(Pid, PidRef, Type) ->
%% @doc main loop for ibrowse response handling - parses response and
%% sends messaged to client Pid
+-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_,#parse_state{},_) -> {_,'done' | {'error',_}}.
list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) ->
receive
{ibrowse_async_response_end, IbrowseRef} ->
@@ -101,11 +107,13 @@ list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) ->
end
end.
+-spec is_empty(#parse_state{}) -> boolean().
is_empty(#parse_state{buffer=[],brace=0,quote=false,escape=false}) ->
true;
is_empty(#parse_state{}) ->
false.
+-spec try_parse(binary(),#parse_state{}) -> {[any()],#parse_state{}}.
try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) ->
Parse = try_parse(binary_to_list(Data), B, D, Q, E),
{KeyLists, NewParseState} =
@@ -135,6 +143,7 @@ try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) ->
Parse),
{lists:flatten(KeyLists), NewParseState}.
+-spec try_parse([byte()],_,_,_,_) -> [[any(),...] | #parse_state{},...].
try_parse([], B, D, Q, E) ->
[#parse_state{buffer=B, brace=D, quote=Q, escape=E}];
try_parse([_|Rest],B,D,Q,true) ->
diff --git a/src/rhc_mapred.erl b/src/rhc_mapred.erl
index df54218..de66e02 100644
--- a/src/rhc_mapred.erl
+++ b/src/rhc_mapred.erl
@@ -48,10 +48,12 @@
%% {jsanon, {bucket(), key()}}|
%% {jsanon, binary()}
%% @type linkspec() = binary()|'_'
+-spec encode_mapred(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_},[{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> any().
encode_mapred(Inputs, Query) ->
mochijson2:encode(
{struct, [{<<"inputs">>, encode_mapred_inputs(Inputs)},
{<<"query">>, encode_mapred_query(Query)}]}).
+-spec encode_mapred_inputs(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_}) -> binary() | [binary() | [any(),...]] | {'struct',[{_,_},...]}.
encode_mapred_inputs({BucketType, Bucket}) when is_binary(BucketType),
is_binary(Bucket) ->
[BucketType, Bucket];
@@ -77,6 +79,7 @@ encode_mapred_inputs({modfun, Module, Function, Options}) ->
%% [Bucket, Key]
%% or
%% [Bucket, Key, KeyData]
+-spec normalize_mapred_input([any(),...] | {binary() | {binary() | {binary(),binary()},binary()},_}) -> [any(),...].
normalize_mapred_input({Bucket, Key})
when is_binary(Bucket), is_binary(Key) ->
[Bucket, Key];
@@ -96,9 +99,11 @@ normalize_mapred_input([Bucket, Key, KeyData])
when is_binary(Bucket), is_binary(Key) ->
[Bucket, Key, KeyData].
+-spec encode_mapred_query([{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> [{'struct',[any(),...]}].
encode_mapred_query(Query) when is_list(Query) ->
[ encode_mapred_phase(P) || P <- Query ].
+-spec encode_mapred_phase({'link',_,_,_} | {'map',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_} | {'reduce',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_}) -> {'struct',[{<<_:24,_:_*8>>,{_,_}},...]}.
encode_mapred_phase({MR, Fundef, Arg, Keep}) when MR =:= map;
MR =:= reduce ->
Type = if MR =:= map -> <<"map">>;
@@ -151,11 +156,13 @@ encode_mapred_phase({link, Bucket, Tag, Keep}) ->
%% @spec wait_for_mapred(term(), integer()) ->
%% {ok, [phase_result()]}|{error, term()}
%% @type phase_result() = {integer(), [term()]}
+-spec wait_for_mapred(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}.
wait_for_mapred(ReqId, Timeout) ->
wait_for_mapred_first(ReqId, Timeout).
%% Wait for the first mapred result, so we know at least one phase
%% that will be delivering results.
+-spec wait_for_mapred_first(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}.
wait_for_mapred_first(ReqId, Timeout) ->
case receive_mapred(ReqId, Timeout) of
done ->
@@ -173,6 +180,7 @@ wait_for_mapred_first(ReqId, Timeout) ->
%% of accumulating a single phases's outputs will be more efficient
%% than the repeated orddict:append_list/3 used when accumulating
%% outputs from multiple phases.
+-spec wait_for_mapred_one(reference(),'infinity' | non_neg_integer(),integer(),_) -> {'error',_} | {'ok',[{_,_}]}.
wait_for_mapred_one(ReqId, Timeout, Phase, Acc) ->
case receive_mapred(ReqId, Timeout) of
done ->
@@ -192,15 +200,18 @@ wait_for_mapred_one(ReqId, Timeout, Phase, Acc) ->
end.
%% Single-phase outputs are kept as a reverse list of results.
+-spec acc_mapred_one([any()],_) -> any().
acc_mapred_one([R|Rest], Acc) ->
acc_mapred_one(Rest, [R|Acc]);
acc_mapred_one([], Acc) ->
Acc.
+-spec finish_mapred_one(integer(),[any()]) -> [{integer(),[any()]},...].
finish_mapred_one(Phase, Acc) ->
[{Phase, lists:reverse(Acc)}].
%% Tracking outputs from multiple phases.
+-spec wait_for_mapred_many(reference(),'infinity' | non_neg_integer(),[tuple(),...]) -> {'error',_} | {'ok',[{_,_}]}.
wait_for_mapred_many(ReqId, Timeout, Acc) ->
case receive_mapred(ReqId, Timeout) of
done ->
@@ -216,6 +227,7 @@ wait_for_mapred_many(ReqId, Timeout, Acc) ->
%% Many-phase outputs are kepts as a proplist of reversed lists of
%% results.
+-spec acc_mapred_many(integer(),[any()],[tuple(),...]) -> [tuple(),...].
acc_mapred_many(Phase, Res, Acc) ->
case lists:keytake(Phase, 1, Acc) of
{value, {Phase, PAcc}, RAcc} ->
@@ -224,12 +236,14 @@ acc_mapred_many(Phase, Res, Acc) ->
[{Phase,acc_mapred_one(Res,[])}|Acc]
end.
+-spec finish_mapred_many([tuple(),...]) -> [{_,[any()]}].
finish_mapred_many(Acc) ->
[ {P, lists:reverse(A)} || {P, A} <- lists:keysort(1, Acc) ].
%% Receive one mapred message.
-spec receive_mapred(reference(), timeout()) ->
done | {mapred, integer(), [term()]} | {error, term()} | timeout.
+
receive_mapred(ReqId, Timeout) ->
receive {ReqId, Msg} ->
%% Msg should be `done', `{mapred, Phase, Results}', or
@@ -241,6 +255,7 @@ receive_mapred(ReqId, Timeout) ->
%% @doc first stage of ibrowse response handling - just waits to be
%% told what ibrowse request ID to expect
+-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer()) -> {_,'done' | {'error','timeout' | {_,_}}}.
mapred_acceptor(Pid, PidRef, Timeout) ->
receive
{ibrowse_req_id, PidRef, IbrowseRef} ->
@@ -251,6 +266,7 @@ mapred_acceptor(Pid, PidRef, Timeout) ->
%% @doc second stage of ibrowse response handling - waits for headers
%% and extracts the boundary of the multipart/mixed message
+-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer(),_) -> {_,'done' | {'error','timeout' | {_,_}}}.
mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) ->
receive
{ibrowse_async_headers, IbrowseRef, Status, Headers} ->
@@ -275,6 +291,7 @@ mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) ->
%% @doc driver of the webmachine_multipart streamer - handles results
%% of the parsing process (sends them to the client) and polls for
%% the next part
+-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()},_,'done_parts' | {{_,_,binary() | maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | byte(),binary() | [])},fun(() -> any())}) -> {_,'done'}.
stream_parts_acceptor(Pid,PidRef,done_parts) ->
Pid ! {PidRef, done};
stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) ->
@@ -286,6 +303,7 @@ stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) ->
%% @doc "next" fun for the webmachine_multipart streamer - waits for
%% an ibrowse message, and then returns it to the streamer for processing
+-spec stream_parts_helper(_,_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}).
stream_parts_helper(Pid, PidRef, Timeout, IbrowseRef, First) ->
fun() ->
receive
diff --git a/src/rhc_obj.erl b/src/rhc_obj.erl
index 977680c..1335164 100644
--- a/src/rhc_obj.erl
+++ b/src/rhc_obj.erl
@@ -30,9 +30,10 @@
-include("raw_http.hrl").
-include("rhc.hrl").
-
+-include_lib("riakc/include/riakc.hrl").
%% HTTP -> riakc_obj
+-spec make_riakc_obj(binary() | {binary(),binary()},'undefined' | binary(),[any()],binary()) -> riakc_obj().
make_riakc_obj(Bucket, Key, Headers, Body) ->
Vclock = base64:decode(proplists:get_value(?HEAD_VCLOCK, Headers, "")),
case ctype_from_headers(Headers) of
@@ -47,10 +48,12 @@ make_riakc_obj(Bucket, Key, Headers, Body) ->
[{headers_to_metadata(Headers), Body}])
end.
+-spec ctype_from_headers([any()]) -> {[byte()],_}.
ctype_from_headers(Headers) ->
mochiweb_util:parse_header(
proplists:get_value(?HEAD_CTYPE, Headers)).
+-spec vtag_from_headers([any()]) -> any().
vtag_from_headers(Headers) ->
%% non-sibling uses ETag, sibling uses Etag
%% (note different capitalization on 't')
@@ -60,6 +63,7 @@ vtag_from_headers(Headers) ->
end.
+-spec lastmod_from_headers([any()]) -> 'undefined' | {integer(),integer(),0}.
lastmod_from_headers(Headers) ->
case proplists:get_value("Last-Modified", Headers) of
undefined ->
@@ -73,6 +77,7 @@ lastmod_from_headers(Headers) ->
0} % Microseconds
end.
+-spec decode_siblings(maybe_improper_list(),binary()) -> [{dict(),binary()}].
decode_siblings(Boundary, <<"\r\n",SibBody/binary>>) ->
decode_siblings(Boundary, SibBody);
decode_siblings(Boundary, SibBody) ->
@@ -83,6 +88,7 @@ decode_siblings(Boundary, SibBody) ->
element(1, split_binary(Body, size(Body)-2))} %% remove trailing \r\n
|| {_, {_, Headers}, Body} <- Parts ].
+-spec headers_to_metadata([any()]) -> dict().
headers_to_metadata(Headers) ->
UserMeta = extract_user_metadata(Headers),
@@ -108,13 +114,16 @@ headers_to_metadata(Headers) ->
Entries -> dict:store(?MD_INDEX, Entries, LinkMeta)
end.
+-spec extract_user_metadata([any()]) -> any().
extract_user_metadata(Headers) ->
lists:foldl(fun extract_user_metadata/2, dict:new(), Headers).
+-spec extract_user_metadata(_,_) -> any().
extract_user_metadata({?HEAD_USERMETA_PREFIX++K, V}, Dict) ->
riakc_obj:set_user_metadata_entry(Dict, {K, V});
extract_user_metadata(_, D) -> D.
+-spec extract_links([any()]) -> any().
extract_links(Headers) ->
{ok, Re} = re:compile("[^/]+/([^/]+)/([^/]+)>; *riaktag=\"(.*)\""),
Extractor = fun(L, Acc) ->
@@ -128,9 +137,11 @@ extract_links(Headers) ->
LinkHeader = proplists:get_value(?HEAD_LINK, Headers, []),
lists:foldl(Extractor, [], string:tokens(LinkHeader, ",")).
+-spec extract_indexes([any()]) -> [{binary(),binary() | integer()}].
extract_indexes(Headers) ->
[ {list_to_binary(K), decode_index_value(K,V)} || {?HEAD_INDEX_PREFIX++K, V} <- Headers].
+-spec decode_index_value([byte()],maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | char(),binary() | [])) -> binary() | integer().
decode_index_value(K, V) ->
case lists:last(string:tokens(K, "_")) of
"bin" ->
@@ -141,9 +152,11 @@ decode_index_value(K, V) ->
%% riakc_obj -> HTTP
+-spec serialize_riakc_obj(_, riakc_obj()) -> {[{list(), list()}], binary()}.
serialize_riakc_obj(Rhc, Object) ->
{make_headers(Rhc, Object), make_body(Object)}.
+-spec make_headers(rhc:rhc(), riakc_obj()) -> [{list(),list()}].
make_headers(Rhc, Object) ->
MD = riakc_obj:get_update_metadata(Object),
CType = case dict:find(?MD_CTYPE, MD) of
@@ -164,6 +177,7 @@ make_headers(Rhc, Object) ->
encode_indexes(MD)
| encode_user_metadata(MD) ]).
+-spec encode_links(rhc:rhc(), list()) -> list().
encode_links(_, []) -> [];
encode_links(#rhc{prefix=Prefix}, Links) ->
{{FirstBucket, FirstKey}, FirstTag} = hd(Links),
@@ -174,10 +188,12 @@ encode_links(#rhc{prefix=Prefix}, Links) ->
format_link(Prefix, FirstBucket, FirstKey, FirstTag),
tl(Links)).
+-spec encode_user_metadata(dict()) -> [].
encode_user_metadata(_Metadata) ->
%% TODO
[].
+-spec encode_indexes(dict()) -> [{nonempty_maybe_improper_list(any(),[] | {_,_,_}),maybe_improper_list()}].
encode_indexes(MD) ->
case dict:find(?MD_INDEX, MD) of
{ok, Entries} ->
@@ -186,6 +202,7 @@ encode_indexes(MD) ->
[]
end.
+-spec encode_index({term(), binary() | list() | integer()}) -> {list(), list()}.
encode_index({Name, IntValue}) when is_integer(IntValue) ->
encode_index({Name, integer_to_list(IntValue)});
encode_index({Name, BinValue}) when is_binary(BinValue) ->
@@ -194,10 +211,12 @@ encode_index({Name, String}) when is_list(String) ->
{?HEAD_INDEX_PREFIX ++ unicode:characters_to_list(Name, latin1),
String}.
+-spec format_link(_, _, riakc:key(), _) -> string().
format_link(Prefix, Bucket, Key, Tag) ->
io_lib:format("~s/~s/~s>; riaktag=\"~s\"",
[Prefix, Bucket, Key, Tag]).
+-spec make_body(riakc_obj()) -> binary().
make_body(Object) ->
case riakc_obj:get_update_value(Object) of
Val when is_binary(Val) -> Val;
@@ -210,6 +229,7 @@ make_body(Object) ->
term_to_binary(Val)
end.
+-spec is_iolist(term()) -> boolean().
is_iolist(Binary) when is_binary(Binary) -> true;
is_iolist(List) when is_list(List) ->
lists:all(fun is_iolist/1, List);