Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/ts_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
total_server_weights=0,
job_notify_port,
max_ssh_startup = 20,
tag
tag,
modules_source = [], % list of paths of modules to be compiled and distributed
modules_beam = [] % list of module names (atoms) to be distributed
}).


Expand Down
9 changes: 9 additions & 0 deletions src/tsung_controller/ts_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,15 @@ parse(Element = #xmlElement{name=option, attributes=Attrs},
case getAttr(atom, Attrs, type) of
"" ->
case getAttr(Attrs, name) of
"module_distribution" ->
case getAttr(atom, Attrs, module_type) of
beam ->
ModuleName = getAttr(atom, Attrs, value),
lists:foldl( fun parse/2, Conf#config{modules_beam = [ModuleName | Conf#config.modules_beam]}, Element#xmlElement.content);
source ->
Path = getAttr(string, Attrs, value),
lists:foldl( fun parse/2, Conf#config{modules_source = [Path | Conf#config.modules_source]}, Element#xmlElement.content)
end;
"thinktime" ->
Val = getAttr(float_or_integer,Attrs, value),
ets:insert(Tab,{{thinktime, value}, Val}),
Expand Down
9 changes: 9 additions & 0 deletions src/tsung_controller/ts_config_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ handle_call({read_config, ConfigFile}, _From, State=#state{logdir=LogDir}) ->
%% Compute per phase popularities
NewConfig = compute_popularities(ConfigTmp),
ts_job_notify:listen(NewConfig#config.job_notify_port),
ts_module_distribution:load_modules_locally(NewConfig),
case check_config(NewConfig) of
ok ->
{reply, ok, State#state{config=NewConfig, static_users=NewConfig#config.static_users,total_weight = Sum}};
Expand Down Expand Up @@ -407,6 +408,14 @@ handle_cast({newbeams, HostList}, State=#state{logdir = LogDir,
check_remotes_ok(RemoteNodes),
?LOG("All remote beams started, syncing ~n",?NOTICE),
global:sync(),
?LOG("Syncing done~n", ?DEB),
case length(Config#config.modules_source) + length(Config#config.modules_beam) of
0 ->
ok;
_ ->
?LOG("Preparing custom erlang modules for distribution~n", ?NOTICE),
ts_module_distribution:distribute_modules(RemoteNodes, Config)
end,
?LOG("Syncing done, start remote tsung application ~n", ?INFO),
{Resl, BadNodes} = rpc:multicall(RemoteNodes,tsung,start,[],?RPC_TIMEOUT),
?LOGF("RPC result: ~p ~p ~n",[Resl,BadNodes],?DEB),
Expand Down
96 changes: 96 additions & 0 deletions src/tsung_controller/ts_module_distribution.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
%%% Author : Sebastian Cohnen <[email protected]>
%%%
%%% This program is free software; you can redistribute it and/or modify
%%% it under the terms of the GNU General Public License as published by
%%% the Free Software Foundation; either version 2 of the License, or
%%% (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
%%% GNU General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License
%%% along with this program; if not, write to the Free Software
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
%%%
%%% In addition, as a special exception, you have the permission to
%%% link the code of this program with any library released under
%%% the EPL license and distribute linked combinations including
%%% the two; the MPL (Mozilla Public License), which EPL (Erlang
%%% Public License) is based on, is included in this exception.

-module(ts_module_distribution).

-vc('$Id$ ').
-author('Sebastian Cohnen <[email protected]>').

-include("ts_config.hrl").

-export([ distribute_modules/2, load_modules_locally/1 ]).

-define(RPC_TIMEOUT, 30000).

load_modules_locally(#config{modules_source=Paths, modules_beam=Modules}=Config) ->
lists:foreach(
fun({Module, Binary, Filename}) ->
code:load_binary(Module, Filename, Binary)
end, compile_modules(Paths) ++ load_modules(Modules)).

distribute_modules(Nodes, #config{modules_source=Paths, modules_beam=Modules}) ->
ModuleSpecs = compile_modules(Paths) ++ load_modules(Modules),
distribute_modules_to_nodes(Nodes, ModuleSpecs).

compile_modules([]) -> [];
compile_modules(Paths) ->
lists:map(fun(Path) ->
case compile:file(Path, [binary, compressed, return]) of
{error, Reason} ->
?LOGF("Module from code ~p could not be compiled! Reason: ~p~n", [Path, Reason], ?ERR),
exit(module_compilation_error);
{ok, Module, Binary, []} ->
?LOGF("Module ~p from ~p was compiled successfully!~n", [Module, Path], ?NOTICE),
{Module, Binary, Path};
{ok, Module, Binary, Warnings} ->
?LOGF("There were warnings when compiling module ~p from ~p: ~p~n", [Module, Path, Warnings], ?WARN),
{Module, Binary, Path}
end
end, Paths).

load_modules([]) -> [];
load_modules(Modules) ->
lists:map(fun(Module) ->
case code:get_object_code(Module) of
error ->
?LOGF("Module ~p could not be loaded! Check given load paths.~n", [Module], ?ERR),
exit(module_load_error);
{Module, Binary, Filename} ->
?LOGF("Module ~p from ~p was loaded successfully!~n", [Module, Filename], ?NOTICE),
{Module, Binary, Filename}
end
end, Modules).

distribute_modules_to_nodes([], _) -> ok;
distribute_modules_to_nodes(_, []) -> ok;
distribute_modules_to_nodes(Nodes, Modules) ->
Fun = fun(Module) -> distribute_module(Nodes, Module) end,
DistributionResult = ts_utils:pmap(Fun, Modules, 10),

case lists:all(fun(Status) -> Status == ok end, DistributionResult) of
true ->
ok;
false ->
ts_mon:abort(),
exit(module_distribution_error)
end.

% Distribute given {Module, Binary, Filename} to all given Nodes
distribute_module(Nodes, {Module, Binary, Filename}) ->
{_Resl, BadNodes} = rpc:multicall(Nodes, code, load_binary, [Module, Filename, Binary], ?RPC_TIMEOUT),
case BadNodes of
[] ->
ok;
Bad ->
?LOGF("Can't distribute ~n to all nodes ~p~n", [Module, Bad], ?ERR),
error
end.
3 changes: 2 additions & 1 deletion tsung-1.0.dtd
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
min NMTOKEN #IMPLIED
max NMTOKEN #IMPLIED
type (ts_http | ts_jabber | ts_pgsql | ts_amqp) #IMPLIED
value CDATA #IMPLIED>
value CDATA #IMPLIED
module_type (beam | source) #IMPLIED>

<!ELEMENT set_option (user_agent*| certificate)>
<!ATTLIST set_option
Expand Down