#!/usr/bin/python
"""
amqpclt
"""
import argparse
import os
import pprint
import re
import sys
import time

import amqpclt
import amqpclt.client
import amqpclt.common

from mtb.conf import read_apache_config, TreeDict
from mtb.pid import \
    pid_read, pid_write, pid_quit, pid_status
from mtb.validation import mutex, reqall, reqany

import auth.credential as credential

import logging
logging.basicConfig()

from warnings import simplefilter
simplefilter("ignore")

PP = pprint.PrettyPrinter(indent=2)

PROG = "amqpclt"
SHORT_DESCRIPTION = "versatile AMQP client"
DESCRIPTION = """
**amqpclt** is a versatile tool to interact with messaging brokers speaking
AMQP and/or message queues (see :py:mod:`messaging.queue`) on disk.

It receives messages (see :py:mod:`messaging.message`) from an incoming
module, optionally massaging them (i.e. filtering and/or modifying), and
sends them to an outgoing module. Depending on which modules are used,
the tool can perform different operations.

Here are the supported incoming modules:
<{LIST_BEGIN}>
- broker: connect to a messaging broker using AMQP, subscribe to one
  or more queues and receive the messages sent by the broker

- queue: read messages from a message queue on disk
  (see :py:mod:`messaging.queue`)
<{LIST_END}>
Here are the supported outgoing modules:
<{LIST_BEGIN}>
- broker: connect to a messaging broker using AMQP and send the messages

- queue: store the messages in a message queue on disk
  (see :py:mod:`messaging.queue`)
<{LIST_END}>
Here are some frequently used combinations:
<{LIST_BEGIN}>
- incoming broker + outgoing queue: drain some destinations, storing
  the messages on disk

- incoming queue + outgoing broker: (re-)send messages that have been
  previously stored on disk, optionally with modifications (such as
  altering the destination)

- incoming broker + outgoing broker: shovel messages from one broker
  to another
<{LIST_END}>
See the "EXAMPLES" sections for concrete examples.
"""
CONFIGURATION_FILE = """
**amqpclt** can read its options from a configuration file. For this,
the Perl Config::General module is used and the option names are the
same as on the command line. For instance::

    daemon = true
    pidfile = /var/run/amqpclt.pid
    incoming-queue = path=/var/spool/amqpclt
    outgoing-broker-uri = amqp://broker.acme.com:5672/virtual_host
    outgoing-broker-auth = "plain name=guest pass=guest"

Alternatively, options can be nested::

    <outgoing-broker>
        uri = amqp://broker.acme.com:5672/virtual_host
        auth = "plain name=guest pass=guest"
    </outgoing-broker>

Or even::

    <outgoing>
        <broker>
            uri = amqp://broker.acme.com:5672/virtual_host
            <auth>
                scheme = plain
                name = guest
                pass = guest
            </auth>
        </broker>
    </outgoing>

The options specified on the command line have precedence over the
ones found in the configuration file.
"""
CALLBACK_GUIDE = """
**amqpclt** can be given python code to execute on all processed messages.
This can be used for different purposes:
<{LIST_BEGIN}>
- massaging: the code can change any part of the message, including setting
  or removing header fields

- filtering: the code can decide if the message must be given to the
  outgoing module or not

- displaying: the code can print any part of the message

- copying: the code can store a copy of the message into files or
  message queues
<{LIST_END}>
To use callbacks, the --callback-path or --callback-code option must be used.
The python code must provide functions with the following signature:
<{LIST_BEGIN}>
- start(self, DATA)
  (optional) this will be called when the program starts, with the supplied
  data (see the --callback-data option) as a list reference

- check(self, MESSAGE)
  (mandatory) this will be called when the program has one message to process;
  it will be given the message (see messaging.message.Message) and must return
  either a message (it could be the same one or a new one) or a string
  describing why the message has been dropped

- idle(self)
  (optional) this will be called when the program has no message to process

- stop(self)
  (optional) this will be called when the program stops
<{LIST_END}>
The code can be put in a file, on the command line or in the **amqpclt**
configuration file, using the "here document" syntax.

Here is an example (to be put in the **amqpclt** configuration file) that
prints on stdout a JSON array of messages::

    callback-code = <<EOF
    def start (self):
        self.count = 0
    def check(self, msg):
        if self.count:
            sys.stdout.write(", ")
        else:
            sys.stdout.write("[")
        self.count += 1
        sys.stdout.write(msg.serialize())
        return msg
    def stop(self):
        if self.count:
            sys.stdout.write("]\\n")
        else:
            sys.stdout.write("[]\\n")
    EOF

For simple callback code that only needs the check subroutine, it is enough
to supply the "inside code". If the function definition is missing,
the supplied code will be wrapped with::

    def check(self, msg):
        hdr = msg.header
        ... your code goes here ...
        return msg

This allows for instance to remove the message-id header with something like::

  $ amqpclt ... --callback-code 'del(hdr["foo"])'
"""
EXAMPLES = {
    "SENDING": """
Here is an example of a configuration file for a message sender
daemon (from queue to broker), forcing the persistent header to true
(something which is highly recommended for reliable messaging) and
setting the destination::

    # define the source message queue
    <incoming-queue>
     path = /var/spool/sender
    </incoming-queue>
    # modify the message header on the fly
    callback-code = <<EOF
        hdr["destination"] = "/queue/app1.data"
        hdr["persistent"] = "true"
    EOF
    # define the destination broker
    <outgoing-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
    </outgoing-broker>
    # miscellaneous options
    reliable = true
    pidfile = /var/run/sender.pid
    daemon = true
    loop = true
    remove = true
""",
    "RECEIVING": """
Here is an example of a configuration file for a message receiver
(from broker to queue)::

    # define the source broker
    <incoming-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
        <auth>
            scheme = plain
            name = receiver
            pass = secret
        </auth>
    </incoming-broker>
    # define the subscriptions
    <subscribe>
        destination = /queue/app1.data
    </subscribe>
    <subscribe>
        destination = /queue/app2.data
    </subscribe>
    # define the destination message queue
    <outgoing-queue>
        path = /var/spool/receiver
    </outgoing-queue>
    # miscellaneous options
    pidfile = /var/run/receiver.pid

To run it as a daemon::

    $ amqpclt --conf test.conf --daemon

To use the configuration file above with some options
on the command line to drain the queues::

    $ amqpclt --conf test.conf --timeout-inactivity 10
""",
    "SHOVELING": """
Here is an example of a configuration file for a message shoveler
(from broker to broker), clearing some headers on the fly so that messages
can be replayed safely::

    # define the source broker
    <incoming-broker>
        uri = "amqp://broker.acme.com:5672/virtual_host"
    </incoming-broker>
    # define the subscriptions
    <subscribe>
        destination = /queue/app1.data
    </subscribe>
    <subscribe>
        destination = /queue/app2.data
    </subscribe>
    # define the destination broker
    <outgoing-broker>
        uri = "amqp://dev-broker.acme.com:5672/virtual_host"
    </outgoing-broker>
    # modify the message destination
    callback-code = <<EOF
        hdr["destination"] = "/queue/dest_to_be_replayed"
    EOF
""",
    "TAPPING": """
Callback code can also be used to tap messages, i.e. get a copy of all
messages processed by **amqpclt**. Here is some callback code for this purpose
that could for instance be merged with the shoveling code above.
It also shows how to use the --callback-data option::

    callback-code = <<EOF
        def start(self, path, qtype="DQS"):
            self.tap_queue = queue.new({"path" : path, "type" : qtype})

        def check(self, msg):
            self.tap_queue.add_message(msg)
            return msg
    EOF

Callback data must be given to specify which message queue to use::

    $ amqpclt --conf tap.conf --callback-data "/tmp/tap,DQS"
"""}

ARGUMENTS = {
    "callback-code": {
        "long": "--callback-code",
        "help": "execute the Python code on each message, "
                "see the \"CALLBACK\" section for more information",
                "metavar": "CODE", },
    "callback-data": {
        "long": "--callback-data",
        "help": "pass this data to the user supplied callback code, "
                "see the \"CALLBACK\" section for more information",
        "metavar": "VALUE,...", },
    "callback-path": {
        "long": "--callback-path",
        "help": "execute the Python code in the given file on each "
                "message, see the \"CALLBACK\" section for more "
                "information",
        "metavar": "PATH", },
    "conf": {
        "long": "--conf",
        "help": "use the given configuration file, see the"
                "configURATION FILE section for more information",
        "metavar": "PATH", },
    "count": {
        "long": "--count",
        "short": "-c",
        "type": int,
        "help": "process at most the given number of messages; "
                "note: when using an incoming broker, to avoid "
                "consuming more messages, it is recommended to "
                "enable the --reliable option",
        "metavar": "INTEGER", },
    "daemon": {
        "long": "--daemon",
        "action": "store_true",
        "help": "detach **amqpclt** so that it becomes a daemon "
                "running in the background", },
    "log": {
        "long": "--log",
        "help": "select logging system, one of: stdout, syslog, file, null",
        "metavar": "STRING", },
    "logfile": {
        "long": "--logfile",
        "help": "select logging file if log system file is selected",
        "metavar": "STRING", },
    "loglevel": {
        "long": "--loglevel",
        "help": "select logging level, one of: debug, info, warning and error",
        "metavar": "STRING", },
    "duration": {
        "long": "--duration",
        "type": int,
        "help": "process messages during at most the given number of "
                "seconds and then stop",
        "metavar": "SECONDS", },
#    "heartbeat": {"long": "--heartbeat",
#              "action": "store_true",
#              "help": "enable AMQP heart-beats between **amqpclt** and "
#                       "the broker(s)",
#            },
    "help": {
        "short": "-h",
        "long": "--help",
        "action": "help",
        "help": "print the help page"},
    "incoming-broker-auth": {
        "long": "--incoming-broker-auth",
        "help": "use this authentication string "
                "(see :py:mod:`auth.credential`) "
                "to authenticate to the incoming broker",
        "metavar": "STRING", },
    "incoming-broker-module": {
        "long": "--incoming-broker-module",
        "help": "module to use (pika|kombu)",
        "metavar": "STRING", },
    "incoming-broker-type": {
        "long": "--incoming-broker-type",
        "help": "set the incoming broker type; this can be useful "
                "when using features which are broker specific",
        "metavar": "STRING", },
    "incoming-broker-uri": {
        "long": "--incoming-broker-uri",
        "help": "use this authentication URI "
                "to connect to the incoming broker",
        "metavar": "URI", },
    "incoming-queue": {
        "long": "--incoming-queue",
        "help": "read incoming messages from the given message queue "
                "(see :py:mod:`messaging.queue`)",
        "metavar": "KEY=VALUE...", },
    "lazy": {"long": "--lazy",
             "action": "store_true",
             "help": "initialize the outgoing module only after having "
                     "received the first message", },
    "loop": {"long": "--loop",
             "action": "store_true",
             "help": "when using an incoming message queue, loop over it", },
    "outgoing-broker-auth": {
        "long": "--outgoing-broker-auth",
        "help": "use this authentication string "
                "(see :py:mod:`auth.credential`) "
                "to authenticate to the outgoing broker",
        "metavar": "STRING", },
    "outgoing-broker-module": {
        "long": "--outgoing-broker-module",
        "help": "module to use (pika|kombu)",
        "metavar": "STRING", },
    "outgoing-broker-type": {
        "long": "--outgoing-broker-type",
        "help": "set the outgoing broker type; this can be useful "
                "when using features which are broker specific",
        "metavar": "STRING", },
    "outgoing-broker-uri": {"long": "--outgoing-broker-uri",
                            "help": "use this authentication URI to connect to"
                                    " the outgoing broker",
                            "metavar": "URI", },
    "outgoing-queue": {"long": "--outgoing-queue",
                       "help": "store outgoing messages into the given message"
                               " queue (see :py:mod:`messaging.queue`)",
                       "metavar": "KEY=VALUE...", },
    "pidfile": {"long": "--pidfile",
                "help": "use this pid file",
                "metavar": "PATH", },
    "pod": {
        "long": "--pod",
        "action": "store_true",
        "help": "print the pod guide"},
    "prefetch": {
        "long": "--prefetch",
        "type": int,
        "help": "set the prefetch value (i.e. the maximum number of "
                "messages to received without acknowledging them) on "
                "the incoming broker",
        "metavar": "INTEGER", },
    "quit": {
        "long": "--quit",
        "action": "store_true",
        "help": "tell another instance of **amqpclt** (identified by "
                "its pid file, as specified by the --pidfile option) "
                "to quit", },
    "reliable": {
        "long": "--reliable",
        "action": "store_true",
        "help": "use AMQP features for more reliable messaging "
                "(i.e. client side acknowledgments) at the "
                "cost of less performance", },
    "remove": {
        "long": "--remove",
        "action": "store_true",
        "help": "when using an incoming message queue, remove the "
                "processed messages", },
    "rst": {
        "long": "--rst",
        "action": "store_true",
        "help": "print the rst guide"},
    "statistics": {
        "long": "--statistics",
        "action": "store_true",
        "help": "report statistics at the end of the execution", },
    "status": {
        "long": "--status",
        "action": "store_true",
        "help": "get the status of another instance of **amqpclt** "
                "(identified by its pid file, as specified by the "
                "--pidfile option); the exit code will be zero if the "
                "instance is alive and non-zero otherwise", },
    "subscribe": {
        "long": "--subscribe",
        "action": "append",
        "help": "use these options in the AMQP subscription used "
                "with the incoming broker; this option can be given "
                "multiple times",
        "metavar": "KEY=VALUE...", },
    "timeout-connect": {
        "long": "--timeout-connect",
        "type": float,
        "help": "use this timeout when connecting to the"
                " broker; can be fractional",
        "metavar": "SECONDS", },
    "timeout-inactivity": {
        "long": "--timeout-inactivity",
        "type": float,
        "help": "use this timeout in the incoming module "
                "to stop  **amqpclt** when no new messages "
                "have been received (aka drain mode); can "
                "be fractional",
        "metavar": "SECONDS", },
    "timeout-linger": {
        "long": "--timeout-linger",
        "type": float,
        "help": "when stopping **amqpclt**, use this timeout to"
                " finish interacting with the broker; can be "
                "fractional",
        "metavar": "SECONDS", },
    "version": {
        "long": "--version",
        "action": "version",
        "version": "%s %s" % (PROG, amqpclt.VERSION),
        "help": "print the program version"},
    "window": {
        "long": "--window",
        "type": int,
        "help": "keep at most the given number of "
                "not-yet-acknowledged messages in memory",
        "metavar": "INTEGER", },
}

DEFAULT_OPTIONS = {
    "daemon": False,
    "log": "stdout",
    "loglevel": "warning",
    #"heartbeat": False,
    "lazy": False,
    "loop": False,
    "remove": False,
    "reliable": False,
    "statistics": False, }


def print_rst():
    """ Print the rst for the web page. """
    out = "%s\n" % PROG
    out += "===================\n\n"
    out += "%s %s - %s\n\n" % (PROG, amqpclt.VERSION, SHORT_DESCRIPTION,)
    out += "SYNOPSIS\n"
    out += "--------\n\n"
    out += "**%s** *[OPTIONS]*\n\n" % PROG
    out += "DESCRIPTION\n"
    out += "-----------\n\n"
    out += "%s\n\n" % DESCRIPTION
    out += "OPTIONS\n"
    out += "-------\n\n"
    options = ""
    for _, elopts in sorted(ARGUMENTS.iteritems()):
        options += "**"
        if "short" in elopts:
            options += "%s, " % elopts["short"]
        options += "%s**" % elopts["long"]
        if elopts.get("action", None) is None:
            if "metavar" in elopts:
                options += " *%s*" % elopts["metavar"]
            else:
                options += " *%s*" % \
                    elopts.get("long").replace("-", "").upper()
        else:
            options += ""
        options += "\n\t%s\n\n" % elopts.get("help", "")
    out += options
    for title, text in [("CONFIGURATION FILE", CONFIGURATION_FILE),
                        ("CALLBACK", CALLBACK_GUIDE),
                        ("EXAMPLES", EXAMPLES),
                        ("AUTHOR", "%s - %s" %
                         (amqpclt.AUTHOR, amqpclt.COPYRIGHT))]:
        if title == "EXAMPLES":
            out += "%s\n%s\n\n" % (title, len(title) * "-")
            for stitle, example in text.items():
                out += "%s\n%s\n\n%s\n\n" % \
                    (stitle, len(stitle) * ".", example)
        else:
            out += "%s\n%s\n\n%s\n\n" % (title, len(title) * "-", text)
    out = out.replace("<{LIST_BEGIN}>", "")\
             .replace("<{LIST_END}>", "")
    print(out)


def print_pod():
    """ Print the pod for the man page. """
    out = "=head1 NAME\n\n"
    out += "%s %s - %s\n\n" % (PROG, amqpclt.VERSION, SHORT_DESCRIPTION,)
    out += "=head1 SYNOPSIS\n\n"
    out += "B<%s> I<[OPTIONS]>\n\n" % PROG
    out += "=head1 DESCRIPTION\n\n"
    out += "%s\n\n" % DESCRIPTION
    out += "=head1 OPTIONS\n\n=over\n\n"
    options = ""
    for _, elopts in sorted(ARGUMENTS.iteritems()):
        options += "=item B<"
        if "short" in elopts:
            options += "%s, " % elopts["short"]
        options += "%s>" % elopts["long"]
        if elopts.get("action") is None:
            if "metavar" in elopts:
                options += " I<%s>" % elopts.get("metavar")
            else:
                options += " I<%s>" % \
                    elopts.get("long").replace("-", "").upper()
        else:
            options += ""
        options += "\n\n%s\n\n" % elopts.get("help", "")
    options += "=back\n\n"
    out += options
    for title, text in [("CONFIGURATION FILE", CONFIGURATION_FILE),
                        ("CALLBACK", CALLBACK_GUIDE),
                        ("EXAMPLES", EXAMPLES),
                        ("AUTHOR", "%s - %s" %
                         (amqpclt.AUTHOR, amqpclt.COPYRIGHT))]:
        if title == "EXAMPLES":
            out += "=head1 %s\n\n" % (title, )
            for stitle, example in text.items():
                out += "=head2 %s\n\n%s\n\n" % (stitle, example)
        else:
            out += "=head1 %s\n\n%s\n\n" % (title, text)
    out = out.replace("::", ":")\
             .replace("**%s**" % PROG, "B<%s>" % PROG)\
             .replace("<{LIST_BEGIN}>", "\n=over\n")\
             .replace("<{LIST_END}>", "\n=back\n")\
             .replace("\n- ", "\n=item *\n\n")
    print(out)


def _get_parser():
    """ Create the parser. """
    parser = argparse.ArgumentParser(
        prog=PROG,
        epilog="AUTHOR\n\n%s - %s" % (amqpclt.AUTHOR, amqpclt.COPYRIGHT),
        argument_default=argparse.SUPPRESS,
        formatter_class=lambda prog:
        argparse.RawDescriptionHelpFormatter(
            prog, max_help_position=33))
    for name, elopts in ARGUMENTS.items():
        if name == "help":
            continue
        t_args = list()
        t_kwargs = elopts.copy()
        for arg in ["short", "long"]:
            if arg in t_kwargs:
                t_args.append(t_kwargs.pop(arg))
        t_kwargs["dest"] = name
        parser.add_argument(*t_args, **t_kwargs)
    return parser


def _read_args():
    """ Read the arguments. """
    parsed = _get_parser().parse_args()
    if getattr(parsed, "pod", False):
        print_pod()
        sys.exit(0)
    if getattr(parsed, "rst", False):
        print_rst()
        sys.exit(0)
    return vars(parsed)

##################


_KEYVALUE_RE = re.compile("[, ]")


def _get_kv_from_string(given):
    """ Explode a string in key=value format. """
    result = dict()
    kvs = _KEYVALUE_RE.split(given)
    for kv_item in kvs:
        try:
            key, value = kv_item.split("=")
            result[key] = value
        except ValueError:
            raise ValueError("%s value is invalid: %s" % (given, kv_item))
    return result


def _clean_configuration(config):
    """ Clean configuration. """
    for arg, arg_props in ARGUMENTS.items():
        val = config.get(arg)
        if val is not None and arg_props.get("type", str) in [int, float]:
            try:
                val = arg_props.get("type", str)(val)
            except ValueError:
                raise ValueError(
                    "%s must be of %s type" % arg_props.get("type", str))
            config[arg] = val

    for arg in ["incoming-queue", "outgoing-queue", "subscribe"]:
        val = config.get(arg)
        if val is None:
            continue
        if type(val) in [str, unicode]:
            # explode it
            newval = _get_kv_from_string(val)
            config[arg] = newval
        elif type(val) == list:
            newval = list()
            for value in val:
                # explode it
                if type(val) in [str, unicode]:
                    new_value = _get_kv_from_string(value)
                else:
                    new_value = value
                newval.append(new_value)
            config[arg] = newval

    for auth in ["incoming-broker-auth", "outgoing-broker-auth"]:
        val = config.get(auth)
        if val is None:
            continue
        elif type(val) == str:
            cred = credential.parse(val)
        elif type(val) == unicode:
            cred = credential.parse(val.encode())
        else:
            for key, value in val.items():
                val[key] = value.encode()
            cred = credential.new(**val)
        cred.check()
        config[auth] = cred


def option_parse():
    """ Parse options. """
    args = _read_args()
    conf_path = args.get("conf", None)
    merged = dict()
    if conf_path is not None:
        merged = read_apache_config(
            os.path.abspath(conf_path),
            {"InterPolateVars": True, })
    merged.update(args)
    config = TreeDict(merged)
    _clean_configuration(config)

    mutex(config, "quit", "status")
    reqall(config, "quit", "pidfile")
    reqall(config, "status", "pidfile")

    if "quit" in config:
        pid = pid_read(config["pidfile"])
        timeout_linger = config.get("timeout-linger")
        if pid and timeout_linger is not None:
            print("%s (pid %d) is being told to quit..." % (PROG, pid))
            pid_write(config["pidfile"], pid, "quit")
            while timeout_linger >= 0:
                try:
                    os.kill(pid, 0)
                except OSError:
                    break
                time.sleep(1)
                timeout_linger -= 1
            try:
                os.kill(pid, 0)
            except OSError:
                print("%s (pid %d) does not seem to be running anymore" %
                      (PROG, pid))
                pid_quit(config["pidfile"])
                sys.exit(0)
        pid_quit(config["pidfile"], PROG)
        sys.exit(0)
    if "status" in config:
        (status, message) = pid_status(config["pidfile"], 60)
        print("%s %s" % (PROG, message))
        sys.exit(status)

    reqany(config, None, "incoming-broker", "incoming-queue")
    reqany(config, None, "outgoing-broker", "outgoing-queue")
    reqall(config, "loop", "incoming-queue")
    reqall(config, "remove", "incoming-queue")
    reqall(config, "prefetch", "incoming-broker")
    reqall(config, "subscribe", "incoming-broker")
    reqall(config, "incoming-broker", "subscribe")
    mutex(config, "incoming-broker", "incoming-queue")
    mutex(config, "outgoing-broker", "outgoing-queue")
    mutex(config, "callback-code", "callback-path")
    mutex(config, "daemon", "statistics")
    reqany(config, "callback-data", "callback-code", "callback-path")
    reqany(config, "heart-beat", "incoming-broker", "outgoing-broker")

    for opt, val in DEFAULT_OPTIONS.items():
        config.setdefault(opt, val)

    new_subscribe = list()
    subscriptions = config.get("subscribe", [])
    if type(subscriptions) != list:
        subscriptions = [subscriptions, ]
    for sub in subscriptions:
        if type(sub) == str:
            sub = dict([tuple(sub.split("=")), ])
        destination = sub.get("destination", None)
        if destination is None:
            raise ValueError("subscribe destination must be provided")
        new_subscribe.append(
            amqpclt.common.parse_subscribe_destination(destination))
    config["subscribe"] = new_subscribe

    if "prefetch" not in config and config["reliable"] and config.get("count"):
        config["prefetch"] = min(config.get("count", 100), 100)

    if config["loglevel"] == "debug":
        PP.pprint(config)
    return config


def main():
    """ Launch execution. """
    config = option_parse()
    client = amqpclt.client.Client(config)
    client.initialize()
    client.work()
    client.clean()

if __name__ == "__main__":
    main()
