diff --git a/Makefile b/Makefile index 2fa756d..ecf4e09 100644 --- a/Makefile +++ b/Makefile @@ -14,3 +14,35 @@ distclean: clean doc: @./rebar doc skip_deps=true + +COMBO_PLT = $(HOME)/.rhc_dialyzer_plt +APPS = kernel stdlib sasl erts eunit +INCLUDES = -I include -I deps + +check_plt: all + dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin + +build_plt: all + dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) deps/*/ebin + +dialyzer: all + @echo + @echo Use "'make check_plt'" to check PLT prior to using this target. + @echo Use "'make build_plt'" to build PLT prior to using this target. + @echo + @sleep 1 + dialyzer --verbose -Wno_return --plt $(COMBO_PLT) $(INCLUDES) ebin + +typer: + typer --plt $(COMBO_PLT) $(INCLUDES) -r src + +plt_info: + dialyzer --plt $(COMBO_PLT) --plt_info + +clean_plt: + @echo + @echo "Are you sure? It takes time to re-build." + @echo Deleting $(COMBO_PLT) in 5 seconds. + @echo + @sleep 5 + rm $(COMBO_PLT) \ No newline at end of file diff --git a/include/rhc.hrl b/include/rhc.hrl index 36ee005..6b80ef6 100644 --- a/include/rhc.hrl +++ b/include/rhc.hrl @@ -20,9 +20,10 @@ %% %% ------------------------------------------------------------------- --define(DEFAULT_TIMEOUT, 60000). +-define(DEFAULT_HTTP_TIMEOUT, 60000). -record(rhc, {ip, port, prefix, options}). + diff --git a/rebar.config b/rebar.config index 0eef421..c362be9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ -{erl_opts, []}. - +{erl_opts, [debug_info, + warnings_as_errors]}. {deps, [ %% ibrowse for doing HTTP requests diff --git a/src/rhc.erl b/src/rhc.erl index 9c6a46d..021f6f3 100644 --- a/src/rhc.erl +++ b/src/rhc.erl @@ -67,12 +67,14 @@ -include("raw_http.hrl"). -include("rhc.hrl"). +-include_lib("riakc/include/riakc.hrl"). -export_type([rhc/0]). -opaque rhc() :: #rhc{}. %% @doc Create a client for connecting to the default port on localhost. %% @equiv create("127.0.0.1", 8098, "riak", []) +-spec create() -> rhc(). create() -> create("127.0.0.1", 8098, "riak", []). @@ -84,7 +86,7 @@ create() -> %% Defaults for r, w, dw, rw, and return_body may be passed in %% the Options list. The client id can also be specified by %% adding `{client_id, ID}' to the Options list. -%% @spec create(string(), integer(), string(), Options::list()) -> rhc() +-spec create(string(), integer(), string(), list()) -> rhc(). create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), is_list(Prefix), is_list(Opts0) -> Opts = case proplists:lookup(client_id, Opts0) of @@ -98,19 +100,21 @@ create(IP, Port, Prefix, Opts0) when is_list(IP), is_integer(Port), #rhc{ip=IP, port=Port, prefix=Prefix, options=Opts}. %% @doc Get the IP this client will connect to. -%% @spec ip(rhc()) -> string() +-spec ip(rhc()) -> string(). + ip(#rhc{ip=IP}) -> IP. %% @doc Get the Port this client will connect to. -%% @spec port(rhc()) -> integer() +-spec port(rhc()) -> integer(). port(#rhc{port=Port}) -> Port. %% @doc Get the prefix this client will use for object URLs -%% @spec prefix(rhc()) -> string() +-spec prefix(rhc()) -> string(). + prefix(#rhc{prefix=Prefix}) -> Prefix. %% @doc Ping the server by requesting the "/ping" resource. -%% @spec ping(rhc()) -> ok|{error, term()} +-spec ping(rhc()) -> ok | {error, term()}. ping(Rhc) -> Url = ping_url(Rhc), case request(get, Url, ["200","204"], [], [], Rhc) of @@ -121,13 +125,14 @@ ping(Rhc) -> end. %% @doc Get the client ID that this client will use when storing objects. -%% @spec get_client_id(rhc()) -> {ok, string()} + +-spec get_client_id(rhc()) -> {ok, string()}. get_client_id(Rhc) -> {ok, client_id(Rhc, [])}. %% @doc Get some basic information about the server. The proplist returned %% should include `node' and `server_version' entries. -%% @spec get_server_info(rhc()) -> {ok, proplist()}|{error, term()} +-spec get_server_info(rhc()) -> {ok, list()}|{error, term()}. get_server_info(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -139,7 +144,7 @@ get_server_info(Rhc) -> end. %% @doc Get the list of full stats from a /stats call to the server. -%% @spec get_server_stats(rhc()) -> {ok, proplist()}|{error, term()} +-spec get_server_stats(rhc()) -> {ok, list()}|{error, term()}. get_server_stats(Rhc) -> Url = stats_url(Rhc), case request(get, Url, ["200"], [], [], Rhc) of @@ -151,7 +156,9 @@ get_server_stats(Rhc) -> {error, Error} end. -%% @equiv get(Rhc, Bucket, Key, []) + +-spec get(rhc(), term(), term()) -> {ok, term()}|{error, term()}. + get(Rhc, Bucket, Key) -> get(Rhc, Bucket, Key, []). @@ -167,8 +174,7 @@ get(Rhc, Bucket, Key) -> %% %% The term in the second position of the error tuple will be %% `notfound' if the key was not found. -%% @spec get(rhc(), bucket(), key(), proplist()) -%% -> {ok, riakc_obj()}|{error, term()} +-spec get(rhc(), {binary(), bucket()}, key(), list()) -> {ok, riakc_obj()}|{error, term()}. get(Rhc, Bucket, Key, Options) -> Qs = get_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -191,7 +197,7 @@ get(Rhc, Bucket, Key, Options) -> {error, Error} end. -%% @equiv put(Rhc, Object, []) +-spec put(rhc(),riakc_obj()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object) -> put(Rhc, Object, []). @@ -210,8 +216,8 @@ put(Rhc, Object) -> %% response. `ok' is returned if return_body is false. %% `{ok, Object}' is returned if return_body is true. %% -%% @spec put(rhc(), riakc_obj(), proplist()) -%% -> ok|{ok, riakc_obj()}|{error, term()} + +-spec put(rhc(),riakc_obj(),list()) -> ok | {error, term()} | {ok, riakc_obj()}. put(Rhc, Object, Options) -> Qs = put_q_params(Rhc, Options), Bucket = riakc_obj:bucket(Object), @@ -237,7 +243,6 @@ put(Rhc, Object, Options) -> %% @doc Increment the counter stored under `bucket', `key' %% by the given `amount'. -%% @equiv counter_incr(Rhc, Bucket, Key, Amt, []) -spec counter_incr(rhc(), binary(), binary(), integer()) -> ok | {ok, integer()} | {error, term()}. counter_incr(Rhc, Bucket, Key, Amt) -> @@ -313,7 +318,7 @@ counter_val(Rhc, Bucket, Key, Options) -> {error, Error} end. -%% @equiv delete(Rhc, Bucket, Key, []) +-spec delete(rhc(),{binary(), bucket()}, binary()) -> ok | {error, term()}. delete(Rhc, Bucket, Key) -> delete(Rhc, Bucket, Key, []). @@ -326,7 +331,7 @@ delete(Rhc, Bucket, Key) -> %%
`timeout'
%%
The server-side timeout for the write in ms
%% -%% @spec delete(rhc(), bucket(), key(), proplist()) -> ok|{error, term()} +-spec delete(rhc(), {binary(), bucket()}, key(), list()) -> ok | {error, term()}. delete(Rhc, Bucket, Key, Options) -> Qs = delete_q_params(Rhc, Options), Url = make_url(Rhc, Bucket, Key, Qs), @@ -342,39 +347,45 @@ delete(Rhc, Bucket, Key, Options) -> end. -%% @equiv delete_obj(Rhc, Obj, []) +-spec delete_obj(rhc(), riakc_obj()) -> ok | {error, term()}. delete_obj(Rhc, Obj) -> delete_obj(Rhc, Obj, []). %% @doc Delete the key of the given object, using the contained vector %% clock if present. -%% @equiv delete(Rhc, riakc_obj:bucket(Obj), riakc_obj:key(Obj), [{vclock, riakc_obj:vclock(Obj)}|Options]) +-spec delete_obj(rhc(), riakc_obj(), list()) -> ok | {error, term()}. delete_obj(Rhc, Obj, Options) -> Bucket = riakc_obj:bucket(Obj), Key = riakc_obj:key(Obj), VClock = riakc_obj:vclock(Obj), delete(Rhc, Bucket, Key, [{vclock, VClock}|Options]). +-spec list_buckets(_) -> none(). list_buckets(Rhc) -> list_buckets(Rhc, undefined). +-spec list_buckets(rhc(),integer()) -> {error, term()} | {ok,[binary()]}. list_buckets(Rhc, BucketType) when is_binary(BucketType) -> list_buckets(Rhc, BucketType, undefined); list_buckets(Rhc, Timeout) -> list_buckets(Rhc, undefined, Timeout). +-spec list_buckets(rhc(), undefined | binary(),integer()) -> {error, term()} | {ok,[binary()]}. list_buckets(Rhc, BucketType, Timeout) -> {ok, ReqId} = stream_list_buckets(Rhc, BucketType, Timeout), rhc_listkeys:wait_for_list(ReqId, Timeout). +-spec stream_list_buckets(rhc()) -> {error, term()} | {ok,reference()}. stream_list_buckets(Rhc) -> stream_list_buckets(Rhc, undefined). +-spec stream_list_buckets(rhc(),_) -> {error, term()} | {ok,reference()}. stream_list_buckets(Rhc, BucketType) when is_binary(BucketType) -> stream_list_buckets(Rhc, BucketType, undefined); stream_list_buckets(Rhc, Timeout) -> stream_list_buckets(Rhc, undefined, Timeout). +-spec stream_list_buckets(rhc(), term(), integer()) -> {error, term()} | {ok, reference()}. stream_list_buckets(Rhc, BucketType, Timeout) -> ParamList0 = [{?Q_BUCKETS, ?Q_STREAM}, {?Q_PROPS, ?Q_FALSE}], @@ -393,16 +404,17 @@ stream_list_buckets(Rhc, BucketType, Timeout) -> {error, Error} -> {error, Error} end. +-spec list_keys(rhc(),_) -> {error,_} | {ok,[binary()]}. list_keys(Rhc, Bucket) -> list_keys(Rhc, Bucket, undefined). %% @doc List the keys in the given bucket. -%% @spec list_keys(rhc(), bucket(), integer()) -> {ok, [key()]}|{error, term()} - +-spec list_keys(rhc(),_,_) -> {error,_} | {ok,[binary()]}. list_keys(Rhc, Bucket, Timeout) -> {ok, ReqId} = stream_list_keys(Rhc, Bucket, Timeout), - rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_TIMEOUT). + rhc_listkeys:wait_for_list(ReqId, ?DEFAULT_HTTP_TIMEOUT). +-spec stream_list_keys(rhc(),_) -> {error,_} | {ok,reference()}. stream_list_keys(Rhc, Bucket) -> stream_list_keys(Rhc, Bucket, undefined). @@ -417,8 +429,7 @@ stream_list_keys(Rhc, Bucket) -> %%
`{error, term()}'
%%
an error occurred
%% -%% @spec stream_list_keys(rhc(), bucket(), integer()) -> -%% {ok, reference()}|{error, term()} +-spec stream_list_keys(rhc(),_,_) -> {error, term()} | {ok,reference()}. stream_list_keys(Rhc, Bucket, Timeout) -> ParamList0 = [{?Q_KEYS, ?Q_STREAM}, {?Q_PROPS, ?Q_FALSE}], @@ -438,27 +449,23 @@ stream_list_keys(Rhc, Bucket, Timeout) -> end. %% @doc Query a secondary index. -%% @spec get_index(rhc(), bucket(), index(), index_query()) -> -%% {ok, index_results()} | {error, term()} +-spec get_index(rhc(), bucket(), term(), term()) -> {error, term()} | {ok, index_results()}. get_index(Rhc, Bucket, Index, Query) -> get_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index. -%% @spec get_index(rhc(), bucket(), index(), index_query(), index_options()) -> -%% {ok, index_results()} | {error, term()} +-spec get_index(rhc(), bucket(), term(), term(), list()) -> {ok,index_results()} | {error, term()}. get_index(Rhc, Bucket, Index, Query, Options) -> {ok, ReqId} = stream_index(Rhc, Bucket, Index, Query, Options), rhc_index:wait_for_index(ReqId). %% @doc Query a secondary index, streaming the results back. -%% @spec stream_index(rhc(), bucket(), index(), index_query()) -> -%% {ok, reference()} | {error, term()} +-spec stream_index(rhc(), bucket(), term(), term()) -> {ok,reference()} | {error,term()}. stream_index(Rhc, Bucket, Index, Query) -> stream_index(Rhc, Bucket, Index, Query, []). %% @doc Query a secondary index, streaming the results back. -%% @spec stream_index(rhc(), bucket(), index(), index_query(), index_options()) -> -%% {ok, reference()} | {error, term()} +-spec stream_index(rhc(),_,_,_,[{atom(),_}]) -> {error,_} | {ok,reference()}. stream_index(Rhc, Bucket, Index, Query, Options) -> ParamList = rhc_index:query_options([{stream, true}|Options]), Url = index_url(Rhc, Bucket, Index, Query, ParamList), @@ -473,7 +480,7 @@ stream_index(Rhc, Bucket, Index, Query, Options) -> end. %% @doc Get the properties of the given bucket. -%% @spec get_bucket(rhc(), bucket()) -> {ok, proplist()}|{error, term()} +-spec get_bucket(rhc(), {binary(), bucket()}) -> {ok, [proplists:property()]} | {error, term()}. get_bucket(Rhc, Bucket) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}, {?Q_KEYS, ?Q_FALSE}]), @@ -496,7 +503,7 @@ get_bucket(Rhc, Bucket) -> %%
Whether or not this bucket should allow siblings to %% be created for its keys
%% -%% @spec set_bucket(rhc(), bucket(), proplist()) -> ok|{error, term()} +-spec set_bucket(rhc(),_,list()) -> ok | {error,_}. set_bucket(Rhc, Bucket, Props0) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]), Headers = [{"Content-Type", "application/json"}], @@ -507,6 +514,7 @@ set_bucket(Rhc, Bucket, Props0) -> {error, Error} -> {error, Error} end. +-spec reset_bucket(rhc(),_) -> ok | {error,_}. reset_bucket(Rhc, Bucket) -> Url = make_url(Rhc, Bucket, undefined, [{?Q_PROPS, ?Q_TRUE}]), case request(delete, Url, ["204"], [], [], Rhc) of @@ -516,7 +524,7 @@ reset_bucket(Rhc, Bucket) -> %% @doc Get the properties of the given bucket. -%% @spec get_bucket_type (rhc(), bucket()) -> {ok, proplist()}|{error, term()} +-spec get_bucket_type(rhc(),_) -> {error,_} | {ok,[{atom(),_}]}. get_bucket_type(Rhc, Type) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}, {?Q_KEYS, ?Q_FALSE}]), @@ -530,8 +538,7 @@ get_bucket_type(Rhc, Type) -> end. %% @doc Set the properties of the given bucket type. -%% -%% @spec set_bucket_type(rhc(), bucket(), proplist()) -> ok|{error, term()} +-spec set_bucket_type(rhc(),_,list()) -> ok | {error,_}. set_bucket_type(Rhc, Type, Props0) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]), Headers = [{"Content-Type", "application/json"}], @@ -542,6 +549,7 @@ set_bucket_type(Rhc, Type, Props0) -> {error, Error} -> {error, Error} end. +-spec reset_bucket_type(rhc(),_) -> ok | {error,_}. reset_bucket_type(Rhc, Type) -> Url = make_url(Rhc, {Type, undefined}, undefined, [{?Q_PROPS, ?Q_TRUE}]), case request(delete, Url, ["204"], [], [], Rhc) of @@ -549,23 +557,23 @@ reset_bucket_type(Rhc, Type) -> {error, Error} -> {error, Error} end. -%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_TIMEOUT) +%% @equiv mapred(Rhc, Inputs, Query, DEFAULT_HTTP_TIMEOUT) +-spec mapred(rhc(), binary(), term()) -> {ok, [{term(), term()}]} | {error, term()}. mapred(Rhc, Inputs, Query) -> - mapred(Rhc, Inputs, Query, ?DEFAULT_TIMEOUT). + mapred(Rhc, Inputs, Query, ?DEFAULT_HTTP_TIMEOUT). %% @doc Execute a map/reduce query. See {@link %% rhc_mapred:encode_mapred/2} for details of the allowed formats %% for `Inputs' and `Query'. -%% @spec mapred(rhc(), rhc_mapred:map_input(), -%% [rhc_mapred:query_part()], integer()) -%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec mapred(rhc(), rhc_mapred:map_input(), [rhc_mapred:query_part()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred(Rhc, Inputs, Query, Timeout) -> {ok, ReqId} = mapred_stream(Rhc, Inputs, Query, self(), Timeout), rhc_mapred:wait_for_mapred(ReqId, Timeout). -%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_TIMEOUT) +%% @equiv mapred_stream(Rhc, Inputs, Query, ClientPid, DEFAULT_HTTP_TIMEOUT) +-spec mapred_stream(rhc(), binary(), term(), term()) -> {ok,reference()} | {error, term()}. mapred_stream(Rhc, Inputs, Query, ClientPid) -> - mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_TIMEOUT). + mapred_stream(Rhc, Inputs, Query, ClientPid, ?DEFAULT_HTTP_TIMEOUT). %% @doc Stream map/reduce results to a Pid. Messages sent to the Pid %% will be of the form `{reference(), message()}', @@ -580,9 +588,9 @@ mapred_stream(Rhc, Inputs, Query, ClientPid) -> %%
`{error, term()}'
%%
an error occurred
%% -%% @spec mapred_stream(rhc(), rhc_mapred:mapred_input(), -%% [rhc_mapred:query_phase()], pid(), integer()) -%% -> {ok, reference()}|{error, term()} +-spec mapred_stream(rhc(), rhc_mapred:mapred_input(), + [rhc_mapred:query_phase()], pid(), integer()) -> + {ok, reference()}|{error, term()}. mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) -> Url = mapred_url(Rhc), StartRef = make_ref(), @@ -598,13 +606,12 @@ mapred_stream(Rhc, Inputs, Query, ClientPid, Timeout) -> %% @doc Execute a search query. This command will return an error %% unless executed against a Riak Search cluster. -%% @spec search(rhc(), bucket(), string()) -> -%% {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec search(rhc(), bucket(), string()) -> {ok,[rhc_mapred:phase_result()]} | {error,term}. search(Rhc, Bucket, SearchQuery) -> %% Run a Map/Reduce operation using reduce_identity to get a list %% of BKeys. IdentityQuery = [{reduce, {modfun, riak_kv_mapreduce, reduce_identity}, none, true}], - case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_TIMEOUT) of + case search(Rhc, Bucket, SearchQuery, IdentityQuery, ?DEFAULT_HTTP_TIMEOUT) of {ok, [{_, Results}]} -> %% Unwrap the results. {ok, Results}; @@ -615,42 +622,40 @@ search(Rhc, Bucket, SearchQuery) -> %% query. See {@link rhc_mapred:encode_mapred/2} for details of %% the allowed formats for `MRQuery'. This command will return an error %% unless executed against a Riak Search cluster. -%% @spec search(rhc(), bucket(), string(), -%% [rhc_mapred:query_part()], integer()) -> -%% {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec search(rhc(), bucket(), string(),[rhc_mapred:query_part()], integer()) -> + {ok, [rhc_mapred:phase_result()]} | {error,term()}. search(Rhc, Bucket, SearchQuery, MRQuery, Timeout) -> Inputs = {modfun, riak_search, mapred_search, [Bucket, SearchQuery]}, mapred(Rhc, Inputs, MRQuery, Timeout). -%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_TIMEOUT) +%% @equiv mapred_bucket(Rhc, Bucket, Query, DEFAULT_HTTP_TIMEOUT) +-spec mapred_bucket(rhc(), bucket(), term()) -> {ok, [rhc_mapred:phase_result()]} | {error,term()}. mapred_bucket(Rhc, Bucket, Query) -> - mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_TIMEOUT). + mapred_bucket(Rhc, Bucket, Query, ?DEFAULT_HTTP_TIMEOUT). %% @doc Execute a map/reduce query over all keys in the given bucket. -%% @spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], -%% integer()) -%% -> {ok, [rhc_mapred:phase_result()]}|{error, term()} +-spec mapred_bucket(rhc(), bucket(), [rhc_mapred:query_phase()], integer()) -> {ok, [rhc_mapred:phase_result()]}|{error, term()}. mapred_bucket(Rhc, Bucket, Query, Timeout) -> {ok, ReqId} = mapred_bucket_stream(Rhc, Bucket, Query, self(), Timeout), rhc_mapred:wait_for_mapred(ReqId, Timeout). %% @doc Stream map/reduce results over all keys in a bucket to a Pid. %% Similar to {@link mapred_stream/5} -%% @spec mapred_bucket_stream(rhc(), bucket(), -%% [rhc_mapred:query_phase()], pid(), integer()) -%% -> {ok, reference()}|{error, term()} +-spec mapred_bucket_stream(rhc(), bucket(), + [rhc_mapred:query_phase()], pid(), integer()) + -> {ok, reference()}|{error, term()}. mapred_bucket_stream(Rhc, Bucket, Query, ClientPid, Timeout) -> mapred_stream(Rhc, Bucket, Query, ClientPid, Timeout). %% @doc Fetches the representation of a convergent datatype from Riak. --spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary()) -> +-spec fetch_type(rhc(), {binary(), bucket()}, key()) -> {ok, riakc_datatype:datatype()} | {error, term()}. fetch_type(Rhc, BucketAndType, Key) -> fetch_type(Rhc, BucketAndType, Key, []). %% @doc Fetches the representation of a convergent datatype from Riak, %% using the given request options. --spec fetch_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) -> +-spec fetch_type(rhc(), {binary(), bucket()}, key(), [proplists:property()]) -> {ok, riakc_datatype:datatype()} | {error, term()}. fetch_type(Rhc, BucketAndType, Key, Options) -> Query = fetch_type_q_params(Rhc, Options), @@ -664,17 +669,18 @@ fetch_type(Rhc, BucketAndType, Key, Options) -> %% @doc Updates the convergent datatype in Riak with local %% modifications stored in the container type. --spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), +-spec update_type(rhc(), {binary(), bucket()}, key(), Update::riakc_datatype:update(term())) -> - ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} | - {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}. + ok | {ok, key()} | {ok, riakc_datatype:datatype()} | + {ok, key(), riakc_datatype:datatype()} | {error, term()}. + update_type(Rhc, BucketAndType, Key, Update) -> update_type(Rhc, BucketAndType, Key, Update, []). --spec update_type(rhc(), {BucketType::binary(), Bucket::binary()}, Key::binary(), +-spec update_type(rhc(), {binary(), bucket()}, key(), Update::riakc_datatype:update(term()), [proplists:property()]) -> - ok | {ok, Key::binary()} | {ok, riakc_datatype:datatype()} | - {ok, Key::binary(), riakc_datatype:datatype()} | {error, term()}. + ok | {ok, key()} | {ok, riakc_datatype:datatype()} | + {ok, key(), riakc_datatype:datatype()} | {error, term()}. update_type(_Rhc, _BucketAndType, _Key, undefined, _Options) -> {error, unmodified}; update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) -> @@ -707,7 +713,7 @@ update_type(Rhc, BucketAndType, Key, {Type, Op, Context}, Options) -> %% updates the datatype in Riak. If an existing value is not found, %% but you want the updates to apply anyway, use the 'create' option. -spec modify_type(rhc(), fun((riakc_datatype:datatype()) -> riakc_datatype:datatype()), - {BucketType::binary(), Bucket::binary()}, Key::binary(), [proplists:property()]) -> + {BucketType::binary(), bucket()}, key(), [proplists:property()]) -> ok | {ok, riakc_datatype:datatype()} | {error, term()}. modify_type(Rhc, Fun, BucketAndType, Key, Options) -> Create = proplists:get_value(create, Options, true), @@ -730,7 +736,7 @@ modify_type(Rhc, Fun, BucketAndType, Key, Options) -> %% @doc Get the client ID to use, given the passed options and client. %% Choose the client ID in Options before the one in the client. -%% @spec client_id(rhc(), proplist()) -> client_id() +-spec client_id(rhc(), list()) -> any(). client_id(#rhc{options=RhcOptions}, Options) -> case proplists:get_value(client_id, Options) of undefined -> @@ -740,7 +746,7 @@ client_id(#rhc{options=RhcOptions}, Options) -> end. %% @doc Generate a random client ID. -%% @spec random_client_id() -> client_id() +-spec random_client_id() -> string(). random_client_id() -> {{Y,Mo,D},{H,Mi,S}} = erlang:universaltime(), {_,_,NowPart} = now(), @@ -748,7 +754,7 @@ random_client_id() -> base64:encode_to_string(<>). %% @doc Assemble the root URL for the given client -%% @spec root_url(rhc()) -> iolist() +-spec root_url(rhc()) -> iolist(). root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> Proto = case proplists:get_value(is_ssl, Opts) of true -> @@ -759,21 +765,23 @@ root_url(#rhc{ip=Ip, port=Port, options=Opts}) -> [Proto, "://",Ip,":",integer_to_list(Port),"/"]. %% @doc Assemble the URL for the map/reduce resource -%% @spec mapred_url(rhc()) -> iolist() +-spec mapred_url(rhc()) -> iolist(). mapred_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "mapred/?chunked=true"])). %% @doc Assemble the URL for the ping resource -%% @spec ping_url(rhc()) -> iolist() +-spec ping_url(rhc()) -> iolist(). ping_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "ping/"])). %% @doc Assemble the URL for the stats resource -%% @spec stats_url(rhc()) -> iolist() +-spec stats_url(rhc()) -> iolist(). stats_url(Rhc) -> binary_to_list(iolist_to_binary([root_url(Rhc), "stats/"])). %% @doc Assemble the URL for the 2I resource +-spec index_url(rhc(),{binary(), bucket()}, term(), term(), list()) -> + list() | {error, term()} | {incomplete, list(), binary()}. index_url(Rhc, BucketAndType, Index, Query, Params) -> {Type, Bucket} = extract_bucket_type(BucketAndType), QuerySegments = index_query_segments(Query), @@ -786,6 +794,7 @@ index_url(Rhc, BucketAndType, Index, Query, Params) -> [ ["?", mochiweb_util:urlencode(Params)] || Params =/= []]]). +-spec index_query_segments(_) -> [binary() | string()]. index_query_segments(B) when is_binary(B) -> [ B ]; index_query_segments(I) when is_integer(I) -> @@ -798,6 +807,9 @@ index_query_segments({I1, I2}) when is_integer(I1), [ integer_to_list(I1), integer_to_list(I2) ]; index_query_segments(_) -> []. +-spec index_name({binary_index, term()} | + {integer_index, term()} | + term()) -> term(). index_name({binary_index, B}) -> [B, "_bin"]; index_name({integer_index, I}) -> @@ -805,9 +817,8 @@ index_name({integer_index, I}) -> index_name(Idx) -> Idx. - %% @doc Assemble the URL for the given bucket and key -%% @spec make_url(rhc(), bucket(), key(), proplist()) -> iolist() +-spec make_url(rhc(), {binary(),bucket() | undefined}, key() | undefined, list()) -> iolist(). make_url(Rhc=#rhc{}, BucketAndType, Key, Query) -> {Type, Bucket} = extract_bucket_type(BucketAndType), {IsKeys, IsProps, IsBuckets} = detect_bucket_flags(Query), @@ -832,6 +843,7 @@ make_counter_url(Rhc, Bucket, Key, Query) -> <<"buckets">>, "/", Bucket, "/", <<"counters">>, "/", Key, "?", [ [mochiweb_util:urlencode(Query)] || Query =/= []]])). +-spec make_datatype_url(rhc(),_, key(),list()) -> string() | {error, list(), iolist()} | {incomplete, list(), binary()}. make_datatype_url(Rhc, BucketAndType, Key, Query) -> case extract_bucket_type(BucketAndType) of {undefined, _B} -> @@ -846,6 +858,7 @@ make_datatype_url(Rhc, BucketAndType, Key, Query) -> end. %% @doc send an ibrowse request +-spec request(atom(), term(), list(), list(), term(), rhc()) -> any(). request(Method, Url, Expect, Headers, Body, Rhc) -> AuthHeader = get_auth_header(Rhc#rhc.options), SSLOptions = get_ssl_options(Rhc#rhc.options), @@ -862,6 +875,7 @@ request(Method, Url, Expect, Headers, Body, Rhc) -> end. %% @doc stream an ibrowse request +-spec request_stream(pid(), atom(), string(), list(), term(), rhc()) -> any(). request_stream(Pid, Method, Url, Headers, Body, Rhc) -> AuthHeader = get_auth_header(Rhc#rhc.options), SSLOptions = get_ssl_options(Rhc#rhc.options), @@ -875,17 +889,20 @@ request_stream(Pid, Method, Url, Headers, Body, Rhc) -> end. %% @doc Get the default options for the given client -%% @spec options(rhc()) -> proplist() + +-spec options(rhc()) -> any(). options(#rhc{options=Options}) -> Options. %% @doc Extract the list of query parameters to use for a GET -%% @spec get_q_params(rhc(), proplist()) -> proplist() + +-spec get_q_params(rhc(),list()) -> list(). get_q_params(Rhc, Options) -> options_list([r,pr,timeout], Options ++ options(Rhc)). %% @doc Extract the list of query parameters to use for a PUT -%% @spec put_q_params(rhc(), proplist()) -> proplist() + +-spec put_q_params(rhc(),list()) -> list(). put_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,timeout,asis,{return_body,"returnbody"}], Options ++ options(Rhc)). @@ -897,24 +914,28 @@ counter_q_params(Rhc, Options) -> options_list([r, pr, w, pw, dw, returnvalue, basic_quorum, notfound_ok], Options ++ options(Rhc)). %% @doc Extract the list of query parameters to use for a DELETE -%% @spec delete_q_params(rhc(), proplist()) -> proplist() + +-spec delete_q_params(rhc(),list()) -> list(). delete_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,rw,timeout], Options ++ options(Rhc)). +-spec fetch_type_q_params(rhc(),list()) -> list(). fetch_type_q_params(Rhc, Options) -> options_list([r,pr,basic_quorum,notfound_ok,timeout,include_context], Options ++ options(Rhc)). +-spec update_type_q_params(rhc(),list()) -> list(). update_type_q_params(Rhc, Options) -> options_list([r,w,dw,pr,pw,basic_quorum,notfound_ok,timeout,include_context,{return_body, "returnbody"}], Options ++ options(Rhc)). %% @doc Extract the options for the given `Keys' from the possible %% list of `Options'. -%% @spec options_list([Key::atom()|{Key::atom(),Alias::string()}], -%% proplist()) -> proplist() + +-spec options_list(list(),_) -> list(). options_list(Keys, Options) -> options_list(Keys, Options, []). +-spec options_list(list(), [proplist:property()], list()) -> list(). options_list([K|Rest], Options, Acc) -> {Key,Alias} = case K of {_, _} -> K; @@ -930,12 +951,15 @@ options_list([], _, Acc) -> %% @doc Convert a stats-resource response to an erlang-term server %% information proplist. +-spec erlify_server_info(list()) -> [{node,_} | {server_version,_}]. erlify_server_info(Props) -> lists:flatten([ erlify_server_info(K, V) || {K, V} <- Props ]). +-spec erlify_server_info(_,_) -> [] | {node, term()} | {server_version, term()}. erlify_server_info(<<"nodename">>, Name) -> {node, Name}; erlify_server_info(<<"riak_kv_version">>, Vsn) -> {server_version, Vsn}; erlify_server_info(_Ignore, _) -> []. +-spec get_auth_header(list()) -> [{string(), string()}]. get_auth_header(Options) -> case lists:keyfind(credentials, 1, Options) of {credentials, User, Password} -> @@ -946,6 +970,7 @@ get_auth_header(Options) -> [] end. +-spec get_ssl_options([proplists:property()]) -> [{is_ssl,true} | {ssl_options,list()}]. get_ssl_options(Options) -> case proplists:get_value(is_ssl, Options) of true -> @@ -959,6 +984,8 @@ get_ssl_options(Options) -> [] end. +-spec extract_bucket_type({binary(), bucket()} | bucket()) -> + {undefined | binary() , bucket()}. extract_bucket_type({<<"default">>, B}) -> {undefined, B}; extract_bucket_type({T,B}) -> @@ -966,6 +993,7 @@ extract_bucket_type({T,B}) -> extract_bucket_type(B) -> {undefined, B}. +-spec detect_bucket_flags(list()) -> {false,boolean(),boolean()} | {true,boolean(),boolean()}. detect_bucket_flags(Query) -> {proplists:get_value(?Q_KEYS, Query, ?Q_FALSE) =/= ?Q_FALSE, proplists:get_value(?Q_PROPS, Query, ?Q_FALSE) =/= ?Q_FALSE, diff --git a/src/rhc_bucket.erl b/src/rhc_bucket.erl index 1b074e6..98ef0ec 100644 --- a/src/rhc_bucket.erl +++ b/src/rhc_bucket.erl @@ -29,8 +29,10 @@ -include("raw_http.hrl"). +-spec erlify_props([any()]) -> [{atom(),_}]. erlify_props(Props) -> lists:flatten([ erlify_prop(K, V) || {K, V} <- Props ]). +-spec erlify_prop(_,_) -> [] | {atom(),_}. erlify_prop(?JSON_ALLOW_MULT, AM) -> {allow_mult, AM}; erlify_prop(?JSON_BACKEND, B) -> {backend, B}; erlify_prop(?JSON_BASIC_Q, B) -> {basic_quorum, B}; @@ -55,12 +57,14 @@ erlify_prop(?JSON_W, W) -> {w, erlify_quorum(W)}; erlify_prop(?JSON_YOUNG_VC, I) -> {young_vclock, I}; erlify_prop(_Ignore, _) -> []. +-spec erlify_quorum(_) -> 'all' | 'one' | 'quorum' | 'undefined' | integer(). erlify_quorum(?JSON_ALL) -> all; erlify_quorum(?JSON_QUORUM) -> quorum; erlify_quorum(?JSON_ONE) -> one; erlify_quorum(I) when is_integer(I) -> I; erlify_quorum(_) -> undefined. +-spec erlify_repl(_) -> 'false' | 'fullsync' | 'realtime' | 'true' | 'undefined'. erlify_repl(?JSON_REALTIME) -> realtime; erlify_repl(?JSON_FULLSYNC) -> fullsync; erlify_repl(?JSON_BOTH) -> true; %% both is equivalent to true, but only works in 1.2+ @@ -68,6 +72,7 @@ erlify_repl(true) -> true; erlify_repl(false) -> false; erlify_repl(_) -> undefined. +-spec erlify_chash({'struct',[{<<_:24>>,binary()},...]}) -> {atom(),atom()}. erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) -> try {binary_to_existing_atom(Mod, utf8), binary_to_existing_atom(Fun, utf8)} @@ -77,12 +82,15 @@ erlify_chash({struct, [{?JSON_MOD, Mod}, {?JSON_FUN, Fun}]}=Struct) -> {binary_to_atom(Mod, utf8), binary_to_atom(Fun, utf8)} end. +-spec erlify_linkfun({'struct',[{<<_:24>>,binary()},...]}) -> {'modfun',atom(),atom()}. erlify_linkfun(Struct) -> {Mod, Fun} = erlify_chash(Struct), {modfun, Mod, Fun}. +-spec httpify_props(list()) -> [{<<_:8,_:_*8>>,_}]. httpify_props(Props) -> lists:flatten([ httpify_prop(K, V) || {K, V} <- Props ]). +-spec httpify_prop(_,_) -> [] | {<<_:8,_:_*8>>,_}. httpify_prop(allow_mult, AM) -> {?JSON_ALLOW_MULT, AM}; httpify_prop(backend, B) -> {?JSON_BACKEND, B}; httpify_prop(basic_quorum, B) -> {?JSON_BASIC_Q, B}; @@ -107,6 +115,7 @@ httpify_prop(w, Q) -> {?JSON_W, Q}; httpify_prop(young_vclock, VC) -> {?JSON_YOUNG_VC, VC}; httpify_prop(_Ignore, _) -> []. +-spec httpify_modfun({atom(),atom()} | {'modfun',atom(),atom()}) -> {'struct',[{_,_},...]}. httpify_modfun({modfun, M, F}) -> httpify_modfun({M, F}); httpify_modfun({M, F}) -> diff --git a/src/rhc_dt.erl b/src/rhc_dt.erl index b72bf74..b5b145d 100644 --- a/src/rhc_dt.erl +++ b/src/rhc_dt.erl @@ -31,6 +31,7 @@ -define(FIELD_PATTERN, "^(.*)_(counter|set|register|flag|map)$"). +-spec datatype_from_json({'struct',[any()]}) -> any(). datatype_from_json({struct, Props}) -> Value = proplists:get_value(<<"value">>, Props), Type = binary_to_existing_atom(proplists:get_value(<<"type">>, Props), utf8), @@ -38,6 +39,7 @@ datatype_from_json({struct, Props}) -> Mod = riakc_datatype:module(Type), Mod:new(decode_value(Type, Value), Context). +-spec decode_value('counter' | 'flag' | 'map' | 'register' | 'set',_) -> any(). decode_value(counter, Value) -> Value; decode_value(set, Value) -> Value; decode_value(flag, Value) -> Value; @@ -48,14 +50,17 @@ decode_value(map, {struct, Fields}) -> {{Name,Type}, decode_value(Type, Value)} end || {Field, Value} <- Fields ]. +-spec field_from_json(binary()) -> {binary() | [binary() | string() | char() | {integer(),integer()} | {'error',[any()],binary()} | {'incomplete',[any()],binary()}] | {integer(),integer()} | {'error',string(),binary()} | {'incomplete',string(),binary()},atom()}. field_from_json(Bin) when is_binary(Bin) -> {match, [Name, BinType]} = re:run(Bin, ?FIELD_PATTERN, [anchored, {capture, all_but_first, binary}]), {Name, binary_to_existing_atom(BinType, utf8)}. +-spec field_to_json({binary(),atom()}) -> <<_:8,_:_*8>>. field_to_json({Name, Type}) when is_binary(Name), is_atom(Type) -> BinType = atom_to_binary(Type, utf8), <>. +-spec decode_error(_,{'ok',_,_,_}) -> any(). decode_error(fetch, {ok, "404", Headers, Body}) -> case proplists:get_value("Content-Type", Headers) of "application/json" -> @@ -75,6 +80,7 @@ decode_error(_, {ok, "403", _, Body}) -> decode_error(_, {ok, _, _, Body}) -> Body. +-spec encode_update_request('counter' | 'flag' | 'map' | 'register' | 'set', term(), term()) -> binary() | {'struct',list()}. encode_update_request(register, {assign, Bin}, _Context) -> {struct, [{<<"assign">>, Bin}]}; encode_update_request(flag, Atom, _Context) -> @@ -89,6 +95,7 @@ encode_update_request(map, {update, Ops}, Context) -> {struct, orddict:to_list(lists:foldl(fun encode_map_op/2, orddict:new(), Ops)) ++ include_context(Context)}. +-spec encode_map_op({'add',{binary(),atom()}} | {'remove',{binary(),atom()}} | {'update',{binary(),'counter' | 'flag' | 'map' | 'register' | 'set'},_},[{_,_}]) -> [{_,_},...]. encode_map_op({add, Entry}, Ops) -> orddict:append(add, field_to_json(Entry), Ops); encode_map_op({remove, Entry}, Ops) -> @@ -103,6 +110,7 @@ encode_map_op({update, {_Key,Type}=Field, Op}, Ops) -> orddict:store(update, {struct, [Update]}, Ops) end. +-spec include_context(undefined | binary() | term()) -> [{<<_:56>>,_}]. include_context(undefined) -> []; include_context(<<>>) -> []; include_context(Bin) -> [{<<"context">>, Bin}]. diff --git a/src/rhc_index.erl b/src/rhc_index.erl index 06db9ca..e4497b4 100644 --- a/src/rhc_index.erl +++ b/src/rhc_index.erl @@ -38,6 +38,7 @@ query_options(Options) -> lists:flatmap(fun query_option/1, Options). +-spec query_option(_) -> [{[1..255,...], list()}]. query_option({timeout, N}) when is_integer(N) -> [{?Q_TIMEOUT, integer_to_list(N)}]; query_option({stream, B}) when is_boolean(B) -> @@ -54,10 +55,11 @@ query_option(_) -> []. %% @doc Collects 2i query results on behalf of the caller. --spec wait_for_index(reference()) -> {ok, ?INDEX_RESULTS{}} | {error, term()}. +-spec wait_for_index(reference()) -> {ok, index_results()} | {error, term()}. wait_for_index(ReqId) -> wait_for_index(ReqId, []). +-spec wait_for_index(_,[index_stream_result()]) -> {'error',_} | {'ok', index_results()}. wait_for_index(ReqId, Acc) -> receive {ReqId, {done, Continuation}} -> @@ -70,12 +72,14 @@ wait_for_index(ReqId, Acc) -> wait_for_index(ReqId, [Res|Acc]) end. +-spec collect_results([index_stream_result()],'undefined' | binary()) -> index_results(). collect_results(Acc, Continuation) -> lists:foldl(fun merge_index_results/2, ?INDEX_RESULTS{keys=[], terms=[], continuation=Continuation}, Acc). +-spec merge_index_results(index_stream_result(),index_results()) -> index_results(). merge_index_results(?INDEX_STREAM_RESULT{keys=KL}, ?INDEX_RESULTS{keys=K0}=Acc) when is_list(KL) -> Acc?INDEX_RESULTS{keys=KL++K0}; @@ -92,6 +96,7 @@ index_acceptor(Pid, PidRef) -> index_acceptor(Pid, PidRef, IbrowseRef) end. +-spec index_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error','timeout' | {_,_}}}. index_acceptor(Pid, PidRef, IBRef) -> receive {ibrowse_async_headers, IBRef, Status, Headers} -> @@ -113,6 +118,7 @@ index_acceptor(Pid, PidRef, IBRef) -> %% @doc Receives multipart chunks from webmachine_multipart and parses %% them into results that can be sent to Pid. %% @private +-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()}, term(), 'done_parts' | term()) -> {term(),'done'}. stream_parts_acceptor(Pid, PidRef, done_parts) -> Pid ! {PidRef, done}; stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) -> @@ -127,6 +133,7 @@ stream_parts_acceptor(Pid, PidRef, {{_Name, _Param, Part},Next}) -> %% @doc Sends keys or terms to the Pid if they are present in the %% result, otherwise sends nothing. %% @private +-spec maybe_send_results(term(), term(), term(), term()) -> 'ok' | {_,index_stream_result()}. maybe_send_results(_Pid, _PidRef, undefined, undefined) -> ok; maybe_send_results(Pid, PidRef, Keys, Results) -> Pid ! {PidRef, ?INDEX_STREAM_RESULT{keys=Keys, @@ -135,6 +142,7 @@ maybe_send_results(Pid, PidRef, Keys, Results) -> %% @doc Sends the continuation to Pid if it is present in the result, %% otherwise sends nothing. %% @private +-spec maybe_send_continuation(_,_,_) -> 'ok' | {_,{'done',_}}. maybe_send_continuation(_Pid, _PidRef, undefined) -> ok; maybe_send_continuation(Pid, PidRef, Continuation) -> Pid ! {PidRef, {done, Continuation}}. @@ -142,6 +150,7 @@ maybe_send_continuation(Pid, PidRef, Continuation) -> %% @doc "next" fun for the webmachine_multipart streamer - waits for %% an ibrowse message, and then returns it to the streamer for processing %% @private +-spec stream_parts_helper(_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}). stream_parts_helper(Pid, PidRef, IbrowseRef, First) -> fun() -> receive diff --git a/src/rhc_listkeys.erl b/src/rhc_listkeys.erl index f64aeef..7f5418a 100644 --- a/src/rhc_listkeys.erl +++ b/src/rhc_listkeys.erl @@ -30,6 +30,7 @@ -include("raw_http.hrl"). -include("rhc.hrl"). +-include_lib("riakc/include/riakc.hrl"). -record(parse_state, {buffer=[], %% unused characters in reverse order brace=0, %% depth of braces in current partial @@ -40,10 +41,13 @@ %% @doc Collect all keylist results, and provide them as one list %% instead of streaming to a Pid. %% @spec wait_for_list(term(), integer()) -> -%% {ok, [key()]}|{error, term()} +%% {ok, [key()]}|{error, term()} +-spec wait_for_list(term(), integer()) -> {ok, [key()]}|{error, term()}. + wait_for_list(ReqId, Timeout) -> wait_for_list(ReqId,Timeout,[]). %% @private +-spec wait_for_list(_,_,[any()]) -> {'error',_} | {'ok',[any()]}. wait_for_list(ReqId, _Timeout0, Acc) -> receive {ReqId, done} -> @@ -56,6 +60,7 @@ wait_for_list(ReqId, _Timeout0, Acc) -> %% @doc first stage of ibrowse response handling - just waits to be %% told what ibrowse request ID to expect +-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_) -> {_,'done' | {'error',_}}. list_acceptor(Pid, PidRef, Type) -> receive {ibrowse_req_id, PidRef, IbrowseRef} -> @@ -64,6 +69,7 @@ list_acceptor(Pid, PidRef, Type) -> %% @doc main loop for ibrowse response handling - parses response and %% sends messaged to client Pid +-spec list_acceptor(atom() | pid() | port() | {atom(),atom()},_,_,#parse_state{},_) -> {_,'done' | {'error',_}}. list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) -> receive {ibrowse_async_response_end, IbrowseRef} -> @@ -101,11 +107,13 @@ list_acceptor(Pid,PidRef,IbrowseRef,ParseState,Type) -> end end. +-spec is_empty(#parse_state{}) -> boolean(). is_empty(#parse_state{buffer=[],brace=0,quote=false,escape=false}) -> true; is_empty(#parse_state{}) -> false. +-spec try_parse(binary(),#parse_state{}) -> {[any()],#parse_state{}}. try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) -> Parse = try_parse(binary_to_list(Data), B, D, Q, E), {KeyLists, NewParseState} = @@ -135,6 +143,7 @@ try_parse(Data, #parse_state{buffer=B, brace=D, quote=Q, escape=E}) -> Parse), {lists:flatten(KeyLists), NewParseState}. +-spec try_parse([byte()],_,_,_,_) -> [[any(),...] | #parse_state{},...]. try_parse([], B, D, Q, E) -> [#parse_state{buffer=B, brace=D, quote=Q, escape=E}]; try_parse([_|Rest],B,D,Q,true) -> diff --git a/src/rhc_mapred.erl b/src/rhc_mapred.erl index df54218..de66e02 100644 --- a/src/rhc_mapred.erl +++ b/src/rhc_mapred.erl @@ -48,10 +48,12 @@ %% {jsanon, {bucket(), key()}}| %% {jsanon, binary()} %% @type linkspec() = binary()|'_' +-spec encode_mapred(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_},[{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> any(). encode_mapred(Inputs, Query) -> mochijson2:encode( {struct, [{<<"inputs">>, encode_mapred_inputs(Inputs)}, {<<"query">>, encode_mapred_query(Query)}]}). +-spec encode_mapred_inputs(binary() | [[any(),...] | {binary() | {_,_},_}] | {binary(),binary()} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [[any(),...] | {_,_}] | {binary(),binary()} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_} | {'modfun',atom(),atom(),_} | {'index',binary() | [any()] | {_,_} | {_,_,_,_} | {_,_,_,_,_},{_,_},_,_},{'binary_index',[any()]} | {'integer_index',[any()]},_,_}) -> binary() | [binary() | [any(),...]] | {'struct',[{_,_},...]}. encode_mapred_inputs({BucketType, Bucket}) when is_binary(BucketType), is_binary(Bucket) -> [BucketType, Bucket]; @@ -77,6 +79,7 @@ encode_mapred_inputs({modfun, Module, Function, Options}) -> %% [Bucket, Key] %% or %% [Bucket, Key, KeyData] +-spec normalize_mapred_input([any(),...] | {binary() | {binary() | {binary(),binary()},binary()},_}) -> [any(),...]. normalize_mapred_input({Bucket, Key}) when is_binary(Bucket), is_binary(Key) -> [Bucket, Key]; @@ -96,9 +99,11 @@ normalize_mapred_input([Bucket, Key, KeyData]) when is_binary(Bucket), is_binary(Key) -> [Bucket, Key, KeyData]. +-spec encode_mapred_query([{'link',_,_,_} | {'map',{_,_} | {_,_,_},_,_} | {'reduce',{_,_} | {_,_,_},_,_}]) -> [{'struct',[any(),...]}]. encode_mapred_query(Query) when is_list(Query) -> [ encode_mapred_phase(P) || P <- Query ]. +-spec encode_mapred_phase({'link',_,_,_} | {'map',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_} | {'reduce',{'jsanon',_} | {'jsfun',_} | {'modfun',atom(),atom()},_,_}) -> {'struct',[{<<_:24,_:_*8>>,{_,_}},...]}. encode_mapred_phase({MR, Fundef, Arg, Keep}) when MR =:= map; MR =:= reduce -> Type = if MR =:= map -> <<"map">>; @@ -151,11 +156,13 @@ encode_mapred_phase({link, Bucket, Tag, Keep}) -> %% @spec wait_for_mapred(term(), integer()) -> %% {ok, [phase_result()]}|{error, term()} %% @type phase_result() = {integer(), [term()]} +-spec wait_for_mapred(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred(ReqId, Timeout) -> wait_for_mapred_first(ReqId, Timeout). %% Wait for the first mapred result, so we know at least one phase %% that will be delivering results. +-spec wait_for_mapred_first(reference(),'infinity' | non_neg_integer()) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_first(ReqId, Timeout) -> case receive_mapred(ReqId, Timeout) of done -> @@ -173,6 +180,7 @@ wait_for_mapred_first(ReqId, Timeout) -> %% of accumulating a single phases's outputs will be more efficient %% than the repeated orddict:append_list/3 used when accumulating %% outputs from multiple phases. +-spec wait_for_mapred_one(reference(),'infinity' | non_neg_integer(),integer(),_) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_one(ReqId, Timeout, Phase, Acc) -> case receive_mapred(ReqId, Timeout) of done -> @@ -192,15 +200,18 @@ wait_for_mapred_one(ReqId, Timeout, Phase, Acc) -> end. %% Single-phase outputs are kept as a reverse list of results. +-spec acc_mapred_one([any()],_) -> any(). acc_mapred_one([R|Rest], Acc) -> acc_mapred_one(Rest, [R|Acc]); acc_mapred_one([], Acc) -> Acc. +-spec finish_mapred_one(integer(),[any()]) -> [{integer(),[any()]},...]. finish_mapred_one(Phase, Acc) -> [{Phase, lists:reverse(Acc)}]. %% Tracking outputs from multiple phases. +-spec wait_for_mapred_many(reference(),'infinity' | non_neg_integer(),[tuple(),...]) -> {'error',_} | {'ok',[{_,_}]}. wait_for_mapred_many(ReqId, Timeout, Acc) -> case receive_mapred(ReqId, Timeout) of done -> @@ -216,6 +227,7 @@ wait_for_mapred_many(ReqId, Timeout, Acc) -> %% Many-phase outputs are kepts as a proplist of reversed lists of %% results. +-spec acc_mapred_many(integer(),[any()],[tuple(),...]) -> [tuple(),...]. acc_mapred_many(Phase, Res, Acc) -> case lists:keytake(Phase, 1, Acc) of {value, {Phase, PAcc}, RAcc} -> @@ -224,12 +236,14 @@ acc_mapred_many(Phase, Res, Acc) -> [{Phase,acc_mapred_one(Res,[])}|Acc] end. +-spec finish_mapred_many([tuple(),...]) -> [{_,[any()]}]. finish_mapred_many(Acc) -> [ {P, lists:reverse(A)} || {P, A} <- lists:keysort(1, Acc) ]. %% Receive one mapred message. -spec receive_mapred(reference(), timeout()) -> done | {mapred, integer(), [term()]} | {error, term()} | timeout. + receive_mapred(ReqId, Timeout) -> receive {ReqId, Msg} -> %% Msg should be `done', `{mapred, Phase, Results}', or @@ -241,6 +255,7 @@ receive_mapred(ReqId, Timeout) -> %% @doc first stage of ibrowse response handling - just waits to be %% told what ibrowse request ID to expect +-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer()) -> {_,'done' | {'error','timeout' | {_,_}}}. mapred_acceptor(Pid, PidRef, Timeout) -> receive {ibrowse_req_id, PidRef, IbrowseRef} -> @@ -251,6 +266,7 @@ mapred_acceptor(Pid, PidRef, Timeout) -> %% @doc second stage of ibrowse response handling - waits for headers %% and extracts the boundary of the multipart/mixed message +-spec mapred_acceptor(atom() | pid() | port() | {atom(),atom()},_,'infinity' | non_neg_integer(),_) -> {_,'done' | {'error','timeout' | {_,_}}}. mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) -> receive {ibrowse_async_headers, IbrowseRef, Status, Headers} -> @@ -275,6 +291,7 @@ mapred_acceptor(Pid,PidRef,Timeout,IbrowseRef) -> %% @doc driver of the webmachine_multipart streamer - handles results %% of the parsing process (sends them to the client) and polls for %% the next part +-spec stream_parts_acceptor(atom() | pid() | port() | {atom(),atom()},_,'done_parts' | {{_,_,binary() | maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | byte(),binary() | [])},fun(() -> any())}) -> {_,'done'}. stream_parts_acceptor(Pid,PidRef,done_parts) -> Pid ! {PidRef, done}; stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) -> @@ -286,6 +303,7 @@ stream_parts_acceptor(Pid,PidRef,{{_Name, _Param, Part},Next}) -> %% @doc "next" fun for the webmachine_multipart streamer - waits for %% an ibrowse message, and then returns it to the streamer for processing +-spec stream_parts_helper(_,_,_,_,boolean()) -> fun(() -> {_,'done' | fun(() -> any())}). stream_parts_helper(Pid, PidRef, Timeout, IbrowseRef, First) -> fun() -> receive diff --git a/src/rhc_obj.erl b/src/rhc_obj.erl index 977680c..1335164 100644 --- a/src/rhc_obj.erl +++ b/src/rhc_obj.erl @@ -30,9 +30,10 @@ -include("raw_http.hrl"). -include("rhc.hrl"). - +-include_lib("riakc/include/riakc.hrl"). %% HTTP -> riakc_obj +-spec make_riakc_obj(binary() | {binary(),binary()},'undefined' | binary(),[any()],binary()) -> riakc_obj(). make_riakc_obj(Bucket, Key, Headers, Body) -> Vclock = base64:decode(proplists:get_value(?HEAD_VCLOCK, Headers, "")), case ctype_from_headers(Headers) of @@ -47,10 +48,12 @@ make_riakc_obj(Bucket, Key, Headers, Body) -> [{headers_to_metadata(Headers), Body}]) end. +-spec ctype_from_headers([any()]) -> {[byte()],_}. ctype_from_headers(Headers) -> mochiweb_util:parse_header( proplists:get_value(?HEAD_CTYPE, Headers)). +-spec vtag_from_headers([any()]) -> any(). vtag_from_headers(Headers) -> %% non-sibling uses ETag, sibling uses Etag %% (note different capitalization on 't') @@ -60,6 +63,7 @@ vtag_from_headers(Headers) -> end. +-spec lastmod_from_headers([any()]) -> 'undefined' | {integer(),integer(),0}. lastmod_from_headers(Headers) -> case proplists:get_value("Last-Modified", Headers) of undefined -> @@ -73,6 +77,7 @@ lastmod_from_headers(Headers) -> 0} % Microseconds end. +-spec decode_siblings(maybe_improper_list(),binary()) -> [{dict(),binary()}]. decode_siblings(Boundary, <<"\r\n",SibBody/binary>>) -> decode_siblings(Boundary, SibBody); decode_siblings(Boundary, SibBody) -> @@ -83,6 +88,7 @@ decode_siblings(Boundary, SibBody) -> element(1, split_binary(Body, size(Body)-2))} %% remove trailing \r\n || {_, {_, Headers}, Body} <- Parts ]. +-spec headers_to_metadata([any()]) -> dict(). headers_to_metadata(Headers) -> UserMeta = extract_user_metadata(Headers), @@ -108,13 +114,16 @@ headers_to_metadata(Headers) -> Entries -> dict:store(?MD_INDEX, Entries, LinkMeta) end. +-spec extract_user_metadata([any()]) -> any(). extract_user_metadata(Headers) -> lists:foldl(fun extract_user_metadata/2, dict:new(), Headers). +-spec extract_user_metadata(_,_) -> any(). extract_user_metadata({?HEAD_USERMETA_PREFIX++K, V}, Dict) -> riakc_obj:set_user_metadata_entry(Dict, {K, V}); extract_user_metadata(_, D) -> D. +-spec extract_links([any()]) -> any(). extract_links(Headers) -> {ok, Re} = re:compile("; *riaktag=\"(.*)\""), Extractor = fun(L, Acc) -> @@ -128,9 +137,11 @@ extract_links(Headers) -> LinkHeader = proplists:get_value(?HEAD_LINK, Headers, []), lists:foldl(Extractor, [], string:tokens(LinkHeader, ",")). +-spec extract_indexes([any()]) -> [{binary(),binary() | integer()}]. extract_indexes(Headers) -> [ {list_to_binary(K), decode_index_value(K,V)} || {?HEAD_INDEX_PREFIX++K, V} <- Headers]. +-spec decode_index_value([byte()],maybe_improper_list(binary() | maybe_improper_list(any(),binary() | []) | char(),binary() | [])) -> binary() | integer(). decode_index_value(K, V) -> case lists:last(string:tokens(K, "_")) of "bin" -> @@ -141,9 +152,11 @@ decode_index_value(K, V) -> %% riakc_obj -> HTTP +-spec serialize_riakc_obj(_, riakc_obj()) -> {[{list(), list()}], binary()}. serialize_riakc_obj(Rhc, Object) -> {make_headers(Rhc, Object), make_body(Object)}. +-spec make_headers(rhc:rhc(), riakc_obj()) -> [{list(),list()}]. make_headers(Rhc, Object) -> MD = riakc_obj:get_update_metadata(Object), CType = case dict:find(?MD_CTYPE, MD) of @@ -164,6 +177,7 @@ make_headers(Rhc, Object) -> encode_indexes(MD) | encode_user_metadata(MD) ]). +-spec encode_links(rhc:rhc(), list()) -> list(). encode_links(_, []) -> []; encode_links(#rhc{prefix=Prefix}, Links) -> {{FirstBucket, FirstKey}, FirstTag} = hd(Links), @@ -174,10 +188,12 @@ encode_links(#rhc{prefix=Prefix}, Links) -> format_link(Prefix, FirstBucket, FirstKey, FirstTag), tl(Links)). +-spec encode_user_metadata(dict()) -> []. encode_user_metadata(_Metadata) -> %% TODO []. +-spec encode_indexes(dict()) -> [{nonempty_maybe_improper_list(any(),[] | {_,_,_}),maybe_improper_list()}]. encode_indexes(MD) -> case dict:find(?MD_INDEX, MD) of {ok, Entries} -> @@ -186,6 +202,7 @@ encode_indexes(MD) -> [] end. +-spec encode_index({term(), binary() | list() | integer()}) -> {list(), list()}. encode_index({Name, IntValue}) when is_integer(IntValue) -> encode_index({Name, integer_to_list(IntValue)}); encode_index({Name, BinValue}) when is_binary(BinValue) -> @@ -194,10 +211,12 @@ encode_index({Name, String}) when is_list(String) -> {?HEAD_INDEX_PREFIX ++ unicode:characters_to_list(Name, latin1), String}. +-spec format_link(_, _, riakc:key(), _) -> string(). format_link(Prefix, Bucket, Key, Tag) -> io_lib:format("; riaktag=\"~s\"", [Prefix, Bucket, Key, Tag]). +-spec make_body(riakc_obj()) -> binary(). make_body(Object) -> case riakc_obj:get_update_value(Object) of Val when is_binary(Val) -> Val; @@ -210,6 +229,7 @@ make_body(Object) -> term_to_binary(Val) end. +-spec is_iolist(term()) -> boolean(). is_iolist(Binary) when is_binary(Binary) -> true; is_iolist(List) when is_list(List) -> lists:all(fun is_iolist/1, List);