Added BrookEngine client.
authorStanislaw Klekot <dozzie@jarowit.net>
Wed, 10 May 2017 19:47:13 +0000 (21:47 +0200)
committerStanislaw Klekot <dozzie@jarowit.net>
Wed, 10 May 2017 19:47:13 +0000 (21:47 +0200)
bin/brook [new file with mode: 0755]

diff --git a/bin/brook b/bin/brook
new file mode 100755 (executable)
index 0000000..6f7787e
--- /dev/null
+++ b/bin/brook
@@ -0,0 +1,325 @@
+#!/usr/bin/python
+
+import optparse
+import os
+import sys
+import signal
+import msgpack
+import json
+import time
+import socket
+import seismometer.poll
+import subprocess
+
+#-----------------------------------------------------------------------------
+# command line options {{{
+
+parser = optparse.OptionParser(
+    usage = "\n  <...> | %prog --sender [--send-address=<addr>]"
+            "\n  %prog --reader [--read-address=<addr>] | <...>"
+            "\n  %prog --exec [--reader] [--sender] [options] -- <command> <args> ..."
+)
+
+parser.add_option(
+    "--exec", dest = "mode",
+    action = "store_const", const = "exec", default = "stdio",
+    #help = "",
+)
+parser.add_option(
+    "--sender", dest = "sender",
+    action = "store_true", default = False,
+    #help = "",
+)
+parser.add_option(
+    "--reader", dest = "reader",
+    action = "store_true", default = False,
+    #help = "",
+)
+parser.add_option(
+    "--send-address", dest = "send_address",
+    default = os.environ.get("BROOK_SEND_ADDRESS"), # may still be `None'
+    #help = "(default: 127.0.0.1:5168)",
+    metavar = "HOST:PORT",
+)
+parser.add_option(
+    "--read-address", dest = "read_address",
+    default = os.environ.get("BROOK_READ_ADDRESS"), # may still be `None'
+    #help = "(default: 127.0.0.1:5268)",
+    metavar = "HOST:PORT",
+)
+
+(options, args) = parser.parse_args()
+
+if options.mode == "exec" and len(args) == 0:
+    parser.error("no command to execute provided")
+
+if options.mode == "stdio" and options.sender and options.reader:
+    parser.error("reader and sender are mutually exclusive in STDIO mode")
+
+if options.mode == "exec" and not options.sender and not options.reader:
+    # detect mode(s) based on which addresses are provided
+    options.sender = (options.send_address is not None)
+    options.reader = (options.read_address is not None)
+
+if not options.sender and not options.reader:
+    parser.error("no operation mode (reader/sender) selected")
+
+if options.sender and options.send_address is None:
+    options.send_address = "127.0.0.1:5168"
+if options.reader and options.read_address is None:
+    options.read_address = "127.0.0.1:5268"
+
+# }}}
+#-----------------------------------------------------------------------------
+# sockets and file handles {{{
+
+#-----------------------------------------------------------
+# EmptyHandle {{{
+
+class EmptyHandle:
+    def fileno(self):
+        return None
+
+# }}}
+#-----------------------------------------------------------
+# SenderSocket {{{
+
+class SenderSocket:
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.tag = "ssmm.input"
+        self.conn = None
+        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        conn.settimeout(5)
+        conn.connect((self.host, self.port))
+        conn.shutdown(socket.SHUT_RD)
+        self.conn = conn
+
+    def __del__(self):
+        self.close()
+
+    def write(self, message):
+        timestamp = int(time.time())
+        if isinstance(message, dict):
+            sendbuf = msgpack.packs([self.tag, timestamp, message])
+        else: # list of messages
+            sendbuf = msgpack.packs([self.tag, [
+                [timestamp, rec] for rec in message
+            ]])
+        self.conn.send(sendbuf)
+
+    def fileno(self):
+        if self.conn is None:
+            return None
+        return self.conn.fileno()
+
+    def close(self):
+        if self.conn is None:
+            return
+        self.conn.close()
+        self.conn = None
+
+# }}}
+#-----------------------------------------------------------
+# ReaderSocket {{{
+
+class ReaderSocket:
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.conn = None
+        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        conn.settimeout(5)
+        conn.connect((self.host, self.port))
+        #conn.shutdown(socket.SHUT_WR) # don't; Cool.io closes such socket
+        self.conn = conn
+        self.fh = self.conn.makefile()
+
+    def __del__(self):
+        self.close()
+
+    def read(self):
+        line = self.fh.readline()
+        if line == "":
+            return None
+        try:
+            return json.loads(line)
+        except:
+            return None
+
+    def fileno(self):
+        if self.conn is None:
+            return None
+        return self.conn.fileno()
+
+    def close(self):
+        if self.conn is None:
+            return
+        self.fh.close()
+        self.fh = None
+        self.conn.close()
+        self.conn = None
+
+# }}}
+#-----------------------------------------------------------
+# WriteHandle {{{
+
+class WriteHandle:
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __del__(self):
+        self.close()
+
+    def write(self, message):
+        json.dump(message, self.fh)
+        self.fh.write("\n")
+        self.fh.flush()
+
+    def fileno(self):
+        if self.fh is None:
+            return None
+        return self.fh.fileno()
+
+    def close(self):
+        if self.fh is None:
+            return
+        self.fh.close()
+        self.fh = None
+
+# }}}
+#-----------------------------------------------------------
+# ReadHandle {{{
+
+class ReadHandle:
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __del__(self):
+        self.close()
+
+    def read(self):
+        line = self.fh.readline()
+        if line == "":
+            return None
+        try:
+            return json.loads(line)
+        except:
+            return None
+
+    def fileno(self):
+        if self.fh is None:
+            return None
+        return self.fh.fileno()
+
+    def close(self):
+        if self.fh is None:
+            return
+        self.fh.close()
+        self.fh = None
+
+# }}}
+#-----------------------------------------------------------
+
+# }}}
+#-----------------------------------------------------------------------------
+# signal handlers {{{
+
+def quit_daemon(sig, stack_frame):
+    sys.exit(0)
+
+signal.signal(signal.SIGHUP, quit_daemon)
+signal.signal(signal.SIGINT, quit_daemon)
+signal.signal(signal.SIGTERM, quit_daemon)
+
+# }}}
+#-----------------------------------------------------------------------------
+# terminate_child() {{{
+
+def terminate_child(process):
+    if process is None:
+        return
+    try:
+        process.stdin.close()
+    except: # both close() errors and pipe not being open
+        pass
+    try:
+        process.stdout.close()
+    except: # both close() errors and pipe not being open
+        pass
+    process.terminate()
+    process.wait()
+
+# }}}
+#-----------------------------------------------------------------------------
+# handles setup
+
+sender = None
+writefh = None
+reader = EmptyHandle()
+readfh = EmptyHandle()
+
+child_process = None
+
+if options.sender:
+    host, port = options.send_address.split(":")
+    sender = SenderSocket(host, int(port))
+
+if options.reader:
+    host, port = options.read_address.split(":")
+    reader = ReaderSocket(host, int(port))
+
+if options.mode == "stdio":
+    if options.sender:
+        readfh = ReadHandle(sys.stdin)
+        sys.stdout.close()
+    else: # options.sender
+        writefh = WriteHandle(sys.stdout)
+        sys.stdin.close()
+else:
+    devnull = open("/dev/null", "rw")
+    child_process = subprocess.Popen(
+        args,
+        shell = False,
+        close_fds = True,
+        stdin  = subprocess.PIPE if options.reader else devnull,
+        stdout = subprocess.PIPE if options.sender else devnull,
+    )
+    devnull.close()
+    if child_process.stdout is not None:
+        readfh = ReadHandle(child_process.stdout)
+    if child_process.stdin is not None:
+        writefh = WriteHandle(child_process.stdin)
+
+#-----------------------------------------------------------------------------
+
+poll = seismometer.poll.Poll()
+poll.add(reader)
+poll.add(readfh)
+
+# mapping read_handle => write_handle
+message_routing = {
+    reader: writefh,
+    readfh: sender,
+}
+
+try:
+    while not poll.empty():
+        handles = poll.poll(timeout = None)
+        for h in handles:
+            rec = h.read()
+            if rec is None:
+                raise StopIteration()
+            try:
+                message_routing[h].write(rec)
+            except IOError:
+                raise StopIteration()
+except StopIteration:
+    pass
+
+terminate_child(child_process)
+sys.exit()
+
+#-----------------------------------------------------------------------------
+# vim:ft=python:foldmethod=marker