Skip to content

Fluentd running on ServerEngine

Sadayuki Furuhashi edited this page Mar 7, 2016 · 5 revisions

Fluentd running on ServerEngine

Logging

  • ServerEngine initializes a logger
    • config[:log] config parameter is required. Fluentd will set it.
  • Fluentd::Log writes logs to ServerEngine's logger so that it can use ServerEngine::DaemonLogger's log rotation config.
    • Fluentd::Log#puts(msg) calls ::Logger#<<(msg) (note: ServerEngine::DaemonLogger extends ::Logger)
  • Fluentd::Log#initialize takes initialize(logger, opts={}) as the arguments (logger is ::Logger) instead of initialize(out, level, opts).
    • Fluentd::Log#initialize copies @level from logger.level.
    • maybe, Fluentd::Log#initialize overwrites logger.level= so that config reloading resets level of Fluentd::Log:
set_logger_orig = logger.class.public_instance_method(:level=).bind(logger)
me = self
logger.define_singleton_method(:level=) {|level| set_logger_orig.call(level); me.level = self.level + 1 }

Signal handler

  • signal handler: ServerEngine needs improvements so that signal can be configurable (for example, SIGTERM is now hardcoded as graceful stop. but it should be configurable like SIGQUIT).
    • ServerEngine's default signal handlers:
      • TERM: stop(graceful: true)
      • QUIT: stop(graceful: false) (not available on Windows)
      • USR1: restart(graceful: true) (not available on Windows)
      • HUP: restart(graceful: false) (not available on Windows)
      • USR2: reload (not available on Windows)
      • INT: live detach (if supervisor is enabled)
      • CONT: Sigdump.dump (not available on Windows)
    • update ServerEngine::Server#install_signal_handlers so that it takes signal name from @config if set.

Daemonize

  • Server::Daemon uses fork. But it needs to fallback to spawn on windows (linux/unix should be able to keep using fork). Spawn needs command name instead of a Proc. So,
    • if on Windows, Server::Daemon#main takes cmdline of the subprocess from config. Then call spawn instead of fork:
if ServerEngine.windows?
  windows_daemon_cmdline = config[:windows_daemon_cmdline]
  Process.spwan(*Array(windows_daemon_cmdline))
  ...
  pid = ...
else
  Process.fork do
    ...
  end
  pid = rpipe.gets.to_i
end

if @pid_path
  File.open(@pid_path, "w") {|f|
    f.write "#{pid}\n"
  }
end

...
  • Add some utility methods so that users (including Flunetd) can implement :windows_daemon_cmdline easily. Like:
def Daemon.run_server(server_module, worker_module, load_config_proc={}, &block)
  Daemon.new(server_module, worker_module, load_config_proc, &block).server_main
end

def Daemon#server_main
  $0 = @daemon_process_name if @daemon_process_name

  Daemon.change_privilege(@chuser, @chgroup)
  File.umask(@chumask) if @chumask

  s = create_server(create_logger)

  STDIN.reopen(File::NULL)
  STDOUT.reopen(File::NULL, "wb")
  STDERR.reopen(File::NULL, "wb")

  s.install_signal_handlers

  s.main
end

def ServerEngine.ruby_bin_path
  if ServerEngine.windows?
    ruby_path = "\0" * 256
    GetModuleFileName.call(0, ruby_path, 256)
    return ruby_path.rstrip.gsub(/\\/, '/')
  else
    return File.join(RbConfig::CONFIG["bindir"], RbConfig::CONFIG["RUBY_INSTALL_NAME"]) + RbConfig::CONFIG["EXEEXT"]
  end
end
  • user code will be like:
    • config[:windows_daemon_cmdline] = [ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), "fluentd", "daemon.rb"), MyServer.name, MyWorker.name, config_path, js]
    • daemon.rb:
require 'serverengine'
require 'fluentd'
ServerEngine::Daemon.run_server(const_get(ARGV[0]), const_get(ARGV[1])) { Fluentd.load_config(ARGV[2], JSON.parse(ARGV[3]))) }

Worker processes

  • Use worker_type: "spawn"

  • maybe add something similar to Daemon.run_server for convenience

  • ServerEngine loads configuration by calling a block given to ServerEngine.create. So, Fluentd gives a proc that loads a fluentd.config and convert it to ServerEngine config:

def Fluentd.load_config(path, overwrite_params_parsed_from_cmdline)
  fluentd_conf = Fluent::Config.parse(...)
  system_config = ...
  se_config = {
    worker_type: "spawn",
    log_level: system_config.log_level,
    ...,
    windows_daemon_cmdline: [ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), "fluentd", "daemon.rb"), MyServer.name, MyWorker.name, path, JSON.dump(overwrite_params_parsed_from_cmdline)]
    fluentd: fluentd_conf,
  }
  return se_config
end

ServerEngine.create(Fluent::ServerModule, Fluent::WorkerModule) do
  Fluentd.load_config(path, overwrite_parameters_from_cmdline)
end
  • ServerModule calls the block when Daemon, Supervisor, and Server are created (3 times). it might be better to have cache (e.g. reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed).

  • ServerModule initializes SocketManager (not supervisor) so that restarting fluentd closes all sockets.

Misc

  • Daemon.change_privilege needs to use Etc instead of id command so that it can work on windows