Added starting and stopping daemonshepherd.
authorStanislaw Klekot <dozzie@jarowit.net>
Wed, 22 Feb 2017 20:05:44 +0000 (21:05 +0100)
committerStanislaw Klekot <dozzie@jarowit.net>
Wed, 22 Feb 2017 20:05:44 +0000 (21:05 +0100)
fluentd/plugin/out_brook_queries.rb

index eaaeb37..9809dc7 100644 (file)
@@ -33,11 +33,11 @@ class BrookEngineQueries < Fluent::Output
   config_param :bind, :string, :default => "127.0.0.1"
   config_param :backlog, :integer, :default => 1
 
-  #config_param :control_socket, :string
-  #config_param :queries, :string
-  #config_param :logging, :string
-  #config_param :brook_send_address, :string
-  #config_param :brook_read_address, :string, :default => nil
+  config_param :control_socket, :string
+  config_param :queries, :string
+  config_param :logging, :string
+  config_param :brook_send_address, :string
+  config_param :brook_read_address, :string, :default => nil
 
   def configure(conf)
     super
@@ -54,13 +54,22 @@ class BrookEngineQueries < Fluent::Output
 
     @io_thread = Thread.new(&method(:run))
 
-    # TODO: start daemonshepherd
+    if @brook_read_address == nil
+      # TODO: read the bind address from @socket
+      @brook_read_address = "#{@bind}:#{@port}"
+    end
+
+    @queries_keeper = QueriesSupervisor.new(
+      @queries, @logging, @control_socket,
+      @brook_send_address, @brook_read_address,
+    )
   end
 
   def shutdown
     @loop.watchers.each {|w| w.detach if w.attached? }
     @loop.stop
     @io_thread.join
+    @queries_keeper.stop
     @socket.close
   end
 
@@ -95,6 +104,9 @@ class BrookEngineQueries < Fluent::Output
     $log.error_backtrace
   end
 
+  #--------------------------------------------------------
+  # connection handler {{{
+
   class Handler < Coolio::Socket
     def initialize(io, pubsub)
       super(io)
@@ -123,6 +135,75 @@ class BrookEngineQueries < Fluent::Output
       @pubsub.unsubscribe(self)
     end
   end
+
+  # }}}
+  #--------------------------------------------------------
+  # daemonshepherd supervisor {{{
+
+  class QueriesSupervisor
+    class FinishThread < Exception
+    end
+
+    def initialize(queries_path, logging_path, socket_path,
+                   send_address, read_address)
+      @queries_path = queries_path
+      @logging_path = logging_path
+      @socket_path  = socket_path
+      @send_address = send_address
+      @read_address = read_address
+
+      @pid = nil
+      @keep_running = true
+      @thread = Thread.new(&method(:watch))
+    end
+
+    def stop
+      @thread.raise(FinishThread.new)
+      @thread.join
+    end
+
+    def watch
+      while true
+        if @pid == nil
+          @pid = Process.spawn(
+            {"BROOK_SEND_ADDRESS" => @send_address,
+              "BROOK_READ_ADDRESS" => @read_address},
+            ["/usr/bin/daemonshepherd", "brook-queries"],
+            "--daemons", @queries_path,
+            "--logging", @logging_path,
+            "--control-socket", @socket_path,
+          )
+          # TODO: check if fork failed
+          $log.info "starting queries supervisor process", :pid => @pid
+        end
+
+        # if the fork() call failed, don't wait for the child process (because
+        # there's none)
+        if @pid != nil
+          pid, status = Process.wait2(@pid, 0)
+          @pid = nil # FIXME: not an atomic operation with Process.wait()
+          $log.warn "queries supervisor process terminated",
+                    :pid => pid,
+                    :exit => status.exitstatus, :signal => status.termsig
+        end
+
+        # XXX: don't restart the daemonshepherd right away, first, to prevent
+        # a tight loop of exec() (in case daemonshepherd can't be executed or
+        # terminates right away), and second, to give Fluentd time to shutdown
+        # on ^C
+        sleep 1
+      end
+    rescue FinishThread
+      if @pid != nil
+        $log.info "stopping queries supervisor process", :pid => @pid
+        Process.kill "TERM", @pid
+        Process.wait2(@pid, 0)
+      end
+    end
+  end
+
+  # }}}
+  #--------------------------------------------------------
 end
 
 # vim:ft=ruby:foldmethod=marker