def call_agent(action, args, opts, disc=:auto, &block)
if args[:process_results] == false || @reply_to
return fire_and_forget_request(action, args)
else
args[:process_results] = true
end
if disc == :auto
discovered = discover
else
@force_direct_request = true if Config.instance.direct_addressing
discovered = disc
end
req = new_request(action.to_s, args)
message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts})
message.discovered_hosts = discovered.clone
results = []
respcount = 0
if discovered.size > 0
message.type = :direct_request if @force_direct_request
if @progress && !block_given?
twirl = Progress.new
@stdout.puts
@stdout.print twirl.twirl(respcount, discovered.size)
end
aggregate = load_aggregate_functions(action, @ddl)
@client.req(message) do |resp|
respcount += 1
if block_given?
aggregate = process_results_with_block(action, resp, block, aggregate)
else
@stdout.print twirl.twirl(respcount, discovered.size) if @progress
result, aggregate = process_results_without_block(resp, action, aggregate)
results << result
end
end
if @initial_options[:sort]
results.sort!
end
@stats.aggregate_summary = aggregate.summarize if aggregate
@stats.aggregate_failures = aggregate.failed if aggregate
@stats.client_stats = @client.stats
else
@stderr.print("\nNo request sent, we did not discover any nodes.")
end
@stats.finish_request
RPC.stats(@stats)
@stdout.print("\n\n") if @progress
if block_given?
return stats
else
return [results].flatten
end
end