def connect(connector = ::Stomp::Connection)
if @connection
Log.debug("Already connection, not re-initializing connection")
return
end
begin
@base64 = get_bool_option("rabbitmq.base64", "false")
pools = Integer(get_option("rabbitmq.pool.size"))
hosts = []
middleware_user = ''
middleware_password = ''
prompt_for_username = get_bool_option("rabbitmq.prompt_user", "false")
prompt_for_password = get_bool_option("rabbitmq.prompt_password", "false")
if prompt_for_username
Log.debug("No previous user exists and rabbitmq.prompt-user is set to true")
print "Please enter user to connect to middleware: "
middleware_user = STDIN.gets.chomp
end
if prompt_for_password
Log.debug("No previous password exists and rabbitmq.prompt-password is set to true")
middleware_password = MCollective::Util.get_hidden_input("Please enter password: ")
print "\n"
end
1.upto(pools) do |poolnum|
host = {}
host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user", middleware_user)
if prompt_for_username and host[:login] != middleware_user
Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+
"plugin.rabbitmq.prompt_user should be set to false to remove the prompt.")
end
host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password", middleware_password)
if prompt_for_password and host[:passcode] != middleware_password
Log.info("Using password from config file to connect to #{host[:host]}. "+
"plugin.rabbitmq.prompt_password should be set to false to remove the prompt.")
end
if host[:ssl]
host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false"))
end
Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
hosts << host
end
raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0
connection = {:hosts => hosts}
connection[:use_exponential_back_off] = @use_exponential_back_off
connection[:initial_reconnect_delay] = @initial_reconnect_delay
connection[:back_off_multiplier] = @back_off_multiplier
connection[:max_reconnect_delay] = @max_reconnect_delay
connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
connection[:backup] = get_bool_option("rabbitmq.backup", "false")
connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
connection[:reliable] = true
connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 0))
connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))
connection[:connect_headers] = connection_headers
connection[:logger] = EventLogger.new
@connection = connector.new(connection)
rescue ClientTimeoutError => e
raise e
rescue Exception => e
raise("Could not connect to RabbitMQ Server: #{e}")
end
end