Implemented support for publishing and subscribing different channels.
authorStanislaw Klekot <dozzie@jarowit.net>
Wed, 10 May 2017 21:32:11 +0000 (23:32 +0200)
committerStanislaw Klekot <dozzie@jarowit.net>
Wed, 10 May 2017 21:32:11 +0000 (23:32 +0200)
This should make it easier for queries to avoid data loops.

CLI client only receives all the channels at the moment.

bin/brook
examples/brookengine.conf
fluentd/lib/fluent/plugin/out_brook_queries.rb

index 6f7787e..f485863 100755 (executable)
--- a/bin/brook
+++ b/bin/brook
@@ -139,6 +139,16 @@ class ReaderSocket:
     def __del__(self):
         self.close()
 
+    def subscribe(self, channel = None):
+        if channel is None:
+            channel = "*"
+        self.conn.send("+%s\n" % (channel,))
+
+    def unsubscribe(self, channel = None):
+        if channel is None:
+            channel = "*"
+        self.conn.send("-%s\n" % (channel,))
+
     def read(self):
         line = self.fh.readline()
         if line == "":
@@ -269,6 +279,7 @@ if options.sender:
 if options.reader:
     host, port = options.read_address.split(":")
     reader = ReaderSocket(host, int(port))
+    reader.subscribe()
 
 if options.mode == "stdio":
     if options.sender:
index 73487f6..2e90c21 100644 (file)
@@ -8,8 +8,9 @@
   type null
 </match>
 
-<match **>
+<match ssmm.**>
   type brook_queries
+  strip_tag ssmm
   #port 5268
   #bind 127.0.0.1
 
@@ -20,4 +21,8 @@
   #brook_read_address 127.0.0.1:5268
 </match>
 
+<match **>
+  type null
+</match>
+
 # vim:ft=fluentd
index 15b4aae..b524577 100644 (file)
@@ -81,9 +81,9 @@ class BrookEngineQueries < Fluent::Output
   def emit(tag, es, chain)
     es.each {|time, record|
       message = record.to_json + "\n"
-      @subscribers.each_key {|sock|
+      @subscribers.each {|sock,channels|
         begin
-          sock.write(message)
+          sock.write(message) if channels.has_key? "*" or channels.has_key? tag
         rescue
           # broken connection will be closed without our intervention
         end
@@ -93,13 +93,21 @@ class BrookEngineQueries < Fluent::Output
   end
 
   def subscribe(sock)
-    @subscribers[sock] = true
+    @subscribers[sock] = {}
   end
 
   def unsubscribe(sock)
     @subscribers.delete(sock)
   end
 
+  def subscribe_channel(sock, channel)
+    @subscribers[sock][channel] = true
+  end
+
+  def unsubscribe_channel(sock, channel)
+    @subscribers[sock].delete(channel)
+  end
+
   def run
     @loop.run(0.1) # 100ms, so the loop doesn't stuck on shutdown
   rescue => e
@@ -123,6 +131,7 @@ class BrookEngineQueries < Fluent::Output
       v = io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, opt)
 
       @pubsub = pubsub
+      @sub_buffer = ""
     end
 
     def on_connect
@@ -130,8 +139,22 @@ class BrookEngineQueries < Fluent::Output
     end
 
     def on_read(data)
-      # no data expected on this socket, so anything that comes is an error
-      close
+      @sub_buffer << data
+      commands = @sub_buffer.split(/\n/)
+      if @sub_buffer[-1] == "\n"
+        @sub_buffer.clear()
+      else
+        @sub_buffer = commands.pop()
+      end
+      for cmd in commands
+        if cmd[0] == "+"
+          @pubsub.subscribe_channel(self, cmd[1..-1])
+        elsif cmd[0] == "-"
+          @pubsub.unsubscribe_channel(self, cmd[1..-1])
+        else
+          close
+        end
+      end
     end
 
     def on_close