def call_agent_batched(action, args, opts, batch_size, sleep_time, &block)
raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing
raise "Cannot bypass result processing for batched requests" if args[:process_results] == false
validate_batch_size(batch_size)
sleep_time = Float(sleep_time)
Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}")
@force_direct_request = true
discovered = discover
results = []
respcount = 0
if discovered.size > 0
req = new_request(action.to_s, args)
aggregate = load_aggregate_functions(action, @ddl)
if @progress && !block_given?
twirl = Progress.new
@stdout.puts
@stdout.print twirl.twirl(respcount, discovered.size)
end
if (batch_size =~ /^(\d+)%$/)
batch_size = (discovered.size / 100.0 * Integer($1)).ceil
else
batch_size = Integer(batch_size)
end
@stats.requestid = nil
processed_nodes = 0
discovered.in_groups_of(batch_size) do |hosts|
message = Message.new(req, nil, {:agent => @agent,
:type => :direct_request,
:collective => @collective,
:filter => opts[:filter],
:options => opts})
@stats.requestid = message.create_reqid unless @stats.requestid
message.requestid = @stats.requestid
message.discovered_hosts = hosts.clone.compact
@client.req(message) do |resp|
respcount += 1
if block_given?
aggregate = process_results_with_block(action, resp, block, aggregate)
else
@stdout.print twirl.twirl(respcount, discovered.size) if @progress
result, aggregate = process_results_without_block(resp, action, aggregate)
results << result
end
end
if @initial_options[:sort]
results.sort!
end
@stats.noresponsefrom.concat @client.stats[:noresponsefrom]
@stats.responses += @client.stats[:responses]
@stats.blocktime += @client.stats[:blocktime] + sleep_time
@stats.totaltime += @client.stats[:totaltime]
@stats.discoverytime += @client.stats[:discoverytime]
processed_nodes += hosts.length
if (discovered.length > processed_nodes)
sleep sleep_time
end
end
@stats.aggregate_summary = aggregate.summarize if aggregate
@stats.aggregate_failures = aggregate.failed if aggregate
else
@stderr.print("\nNo request sent, we did not discover any nodes.")
end
@stats.finish_request
RPC.stats(@stats)
@stdout.print("\n") if @progress
if block_given?
return stats
else
return [results].flatten
end
end