Writing to a child process made non-blocking.
authorStanislaw Klekot <dozzie@jarowit.net>
Mon, 7 Aug 2017 20:14:59 +0000 (22:14 +0200)
committerStanislaw Klekot <dozzie@jarowit.net>
Mon, 7 Aug 2017 20:14:59 +0000 (22:14 +0200)
This should solve occasional write()/write() deadlock between brook and its
child script, which leads to what effectively is a memory leak in BrookEngine.

bin/brook

index 867ff9c..aeb8663 100755 (executable)
--- a/bin/brook
+++ b/bin/brook
@@ -9,6 +9,8 @@ import json
 import time
 import socket
 import seismometer.poll
+import fcntl
+import errno
 import subprocess
 
 #-----------------------------------------------------------------------------
@@ -72,6 +74,61 @@ if options.pub_chan is None and options.sub_chan is None:
 # sockets and file handles {{{
 
 #-----------------------------------------------------------
+# WriteBuffer {{{
+
+class WriteBuffer:
+    def __init__(self, size = 128 * 1024, chunk = 4 * 1024):
+        self._buffer = bytearray(size)
+        self._chunk = chunk
+        self._pos = 0
+        self._used = 0
+
+    def add(self, data):
+        # `data' should be `str'
+        if self._used + len(data) > len(self._buffer):
+            raise IOError("write buffer overflow")
+
+        free_start = (self._pos + self._used) % len(self._buffer)
+        if free_start + len(data) < len(self._buffer):
+            # no need to wrap
+            self._buffer[free_start:(free_start + len(data))] = data
+        else:
+            split = len(self._buffer) - free_start
+            self._buffer[free_start:] = data[0:split]
+            self._buffer[0:(len(data) - split)] = data[split:]
+        self._used += len(data)
+
+    def read(self):
+        result = self.read_peek()
+        self.read_commit()
+        return result
+
+    def read_peek(self):
+        read_size = min(self._used, self._chunk)
+
+        if self._pos + read_size < len(self._buffer):
+            return self._buffer[self._pos:(self._pos + read_size)]
+        else:
+            left = self._buffer[self._pos:]
+            right = self._buffer[0:(self._pos + read_size - len(self._buffer))]
+            return left + right
+
+    def read_commit(self):
+        read_size = min(self._used, self._chunk)
+        self._pos = (self._pos + read_size) % len(self._buffer)
+        self._used -= read_size
+        # XXX: this gives some chances of keeping used part of the buffer at
+        # the beginning, so end of the buffer may stay untouched (this helps
+        # with memory overcommitting a little); other than that, this `if' has
+        # no function whatsoever
+        if self._used == 0:
+            self._pos = 0
+
+    def __len__(self):
+        return self._used
+
+# }}}
+#-----------------------------------------------------------
 # EmptyHandle {{{
 
 class EmptyHandle:
@@ -96,6 +153,7 @@ class SenderSocket:
         conn.connect((self.host, self.port))
         conn.shutdown(socket.SHUT_RD)
         self.conn = conn
+        # TODO: manually buffered non-blocking write, similar to `WriteHandle'
 
     def __del__(self):
         self.close()
@@ -110,6 +168,9 @@ class SenderSocket:
             ]])
         self.conn.send(sendbuf)
 
+    def flush(self):
+        return
+
     def fileno(self):
         if self.conn is None:
             return None
@@ -181,25 +242,39 @@ class ReaderSocket:
 class WriteHandle:
     def __init__(self, fh):
         self.fh = fh
+        self.fd = fh.fileno() if hasattr(fh, 'fileno') else fh
+        flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
+        fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+        self.buf = WriteBuffer()
 
     def __del__(self):
         self.close()
 
     def write(self, message):
-        json.dump(message, self.fh)
-        self.fh.write("\n")
-        self.fh.flush()
+        self.buf.add(json.dumps(message))
+        self.buf.add("\n")
+        self.flush()
+
+    def flush(self):
+        while len(self.buf) > 0:
+            try:
+                os.write(self.fd, self.buf.read_peek())
+                self.buf.read_commit()
+            except (OSError, IOError), e:
+                if e.errno == errno.EWOULDBLOCK or e.errno == errno.EAGAIN:
+                    break
+                else:
+                    raise
 
     def fileno(self):
-        if self.fh is None:
-            return None
-        return self.fh.fileno()
+        return self.fd
 
     def close(self):
-        if self.fh is None:
+        if self.fd is None:
             return
         self.fh.close()
         self.fh = None
+        self.fd = None
 
 # }}}
 #-----------------------------------------------------------
@@ -331,10 +406,11 @@ message_routing = {
     reader: writefh,
     readfh: sender,
 }
+write_handles = [h for h in message_routing.values() if h is not None]
 
 try:
     while not poll.empty():
-        handles = poll.poll(timeout = None)
+        handles = poll.poll(timeout = 100)
         for h in handles:
             rec = h.read()
             if rec is None:
@@ -343,6 +419,8 @@ try:
                 message_routing[h].write(rec)
             except IOError:
                 raise StopIteration()
+        for h in write_handles:
+            h.flush()
 except StopIteration:
     pass