123456789_123456789_123456789_123456789_123456789_

Class: Puma::Cluster

Relationships & Source Files
Namespace Children
Classes:
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Runner
Instance Chain:
self, Runner
Inherits: Puma::Runner
Defined in: lib/puma/cluster.rb

Overview

This class is instantiated by the Launcher and used to boot and serve a Ruby application when puma “workers” are needed i.e. when using multi-processes. For example $ puma -w 5

At the core of this class is running an instance of Server which gets created via the start_server method from the Runner class that this inherits from.

An instance of this class will spawn the number of processes passed in via the #spawn_workers method call. Each worker will have it’s own instance of a Server.

Class Method Summary

Runner - Inherited

Instance Attribute Summary

Instance Method Summary

Runner - Inherited

Constructor Details

.new(cli, events) ⇒ Cluster

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 22

def initialize(cli, events)
  super cli, events

  @phase = 0
  @workers = []
  @next_check = nil

  @phased_state = :idle
  @phased_restart = false
end

Instance Attribute Details

#all_workers_booted?Boolean (readonly)

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 178

def all_workers_booted?
  @workers.count { |w| !w.booted? } == 0
end

#preload?Boolean (readonly)

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 360

def preload?
  @options[:preload_app]
end

Instance Method Details

#check_workers(force = false)

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 182

def check_workers(force=false)
  return if !force && @next_check && @next_check >= Time.now

  @next_check = Time.now + Const::WORKER_CHECK_INTERVAL

  any = false

  @workers.each do |w|
    next if !w.booted? && !w.ping_timeout?(@options[:worker_boot_timeout])
    if w.ping_timeout?(@options[:worker_timeout])
      log "! Terminating timed out worker: #{w.pid}"
      w.kill
      any = true
    end
  end

  # If we killed any timed out workers, try to catch them
  # during this loop by giving the kernel time to kill them.
  sleep 1 if any

  wait_workers
  cull_workers
  spawn_workers

  if all_workers_booted?
    # If we're running at proper capacity, check to see if
    # we need to phase any workers out (which will restart
    # in the right phase).
    #
    w = @workers.find { |x| x.phase != @phase }

    if w
      if @phased_state == :idle
        @phased_state = :waiting
        log "- Stopping #{w.pid} for phased upgrade..."
      end

      unless w.term?
        w.term
        log "- #{w.signal} sent to #{w.pid}..."
      end
    end
  end
end

#cull_workers

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 156

def cull_workers
  diff = @workers.size - @options[:workers]
  return if diff < 1

  debug "Culling #{diff.inspect} workers"

  workers_to_cull = @workers[-diff,diff]
  debug "Workers to cull: #{workers_to_cull.inspect}"

  workers_to_cull.each do |worker|
    log "- Worker #{worker.index} (pid: #{worker.pid}) terminating"
    worker.term
  end
end

#halt

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 340

def halt
  @status = :halt
  wakeup!
end

#next_worker_index

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 171

def next_worker_index
  all_positions =  0...@options[:workers]
  occupied_positions = @workers.map { |w| w.index }
  available_positions = all_positions.to_a - occupied_positions
  available_positions.first
end

#phased_restart

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 319

def phased_restart
  return false if @options[:preload_app]

  @phased_restart = true
  wakeup!

  true
end

#redirect_io

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 59

def redirect_io
  super

  @workers.each { |x| x.hup }
end

#reload_worker_directory

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 345

def reload_worker_directory
  dir = @launcher.restart_dir
  log "+ Changing to #{dir}"
  Dir.chdir dir
end

#restart

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 314

def restart
  @restart = true
  stop
end

#run

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 402

def run
  @status = :run

  output_header "cluster"

  log "* Process workers: #{@options[:workers]}"

  before = Thread.list

  if preload?
    log "* Preloading application"
    load_and_bind

    after = Thread.list

    if after.size > before.size
      threads = (after - before)
      if threads.first.respond_to? :backtrace
        log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:"
        threads.each do |t|
          log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}"
        end
      else
        log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot"
      end
    end
  else
    log "* Phased restart available"

    unless @launcher.config.app_configured?
      error "No application configured, nothing to run"
      exit 1
    end

    @launcher.binder.parse @options[:binds], self
  end

  read, @wakeup = Puma::Util.pipe

  setup_signals

  # Used by the workers to detect if the master process dies.
  # If select says that @check_pipe is ready, it's because the
  # master has exited and @suicide_pipe has been automatically
  # closed.
  #
  @check_pipe, @suicide_pipe = Puma::Util.pipe

  if daemon?
    log "* Daemonizing..."
    Process.daemon(true)
  else
    log "Use Ctrl-C to stop"
  end

  redirect_io

  Plugins.fire_background

  @launcher.write_state

  start_control

  @master_read, @worker_write = read, @wakeup

  @launcher.config.run_hooks :before_fork, nil

  spawn_workers

  Signal.trap "SIGINT" do
    stop
  end

  @launcher.events.fire_on_booted!

  begin
    force_check = false

    while @status == :run
      begin
        if @phased_restart
          start_phased_restart
          @phased_restart = false
        end

        check_workers force_check

        force_check = false

        res = IO.select([read], nil, nil, Const::WORKER_CHECK_INTERVAL)

        if res
          req = read.read_nonblock(1)

          next if !req || req == "!"

          result = read.gets
          pid = result.to_i

          if w = @workers.find { |x| x.pid == pid }
            case req
            when "b"
              w.boot!
              log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}"
              force_check = true
            when "e"
              # external term, see worker method, Signal.trap "SIGTERM"
              w.instance_variable_set :@term, true
            when "t"
              w.term unless w.term?
              force_check = true
            when "p"
              w.ping!(result.sub(/^\d+/,'').chomp)
            end
          else
            log "! Out-of-sync worker list, no #{pid} worker"
          end
        end

      rescue Interrupt
        @status = :stop
      end
    end

    stop_workers unless @status == :halt
  ensure
    @check_pipe.close
    @suicide_pipe.close
    read.close
    @wakeup.close
  end
end

#setup_signals

We do this in a separate method to keep the lambda scope of the signals handlers as small as possible.

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 366

def setup_signals
  Signal.trap "SIGCHLD" do
    wakeup!
  end

  Signal.trap "TTIN" do
    @options[:workers] += 1
    wakeup!
  end

  Signal.trap "TTOU" do
    @options[:workers] -= 1 if @options[:workers] >= 2
    wakeup!
  end

  master_pid = Process.pid

  Signal.trap "SIGTERM" do
    # The worker installs their own SIGTERM when booted.
    # Until then, this is run by the worker and the worker
    # should just exit if they get it.
    if Process.pid != master_pid
      log "Early termination of worker"
      exit! 0
    else
      @launcher.close_binder_listeners

      stop_workers
      stop

      raise(SignalException, "SIGTERM") if @options[:raise_exception_on_sigterm]
      exit 0 # Clean exit, workers were stopped
    end
  end
end

#spawn_workers

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 128

def spawn_workers
  diff = @options[:workers] - @workers.size
  return if diff < 1

  master = Process.pid

  diff.times do
    idx = next_worker_index
    @launcher.config.run_hooks :before_worker_fork, idx

    pid = fork { worker(idx, master) }
    if !pid
      log "! Complete inability to spawn new workers detected"
      log "! Seppuku is the only choice."
      exit! 1
    end

    debug "Spawned worker: #{pid}"
    @workers << Worker.new(idx, pid, @phase, @options)

    @launcher.config.run_hooks :after_worker_fork, idx
  end

  if diff > 0
    @phased_state = :idle
  end
end

#start_phased_restart

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 48

def start_phased_restart
  @phase += 1
  log "- Starting phased worker restart, phase: #{@phase}"

  # Be sure to change the directory again before loading
  # the app. This way we can pick up new code.
  dir = @launcher.restart_dir
  log "+ Changing to #{dir}"
  Dir.chdir dir
end

#stats

Inside of a child process, this will return all zeroes, as @workers is only populated in the master process.

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 353

def stats
  old_worker_count = @workers.count { |w| w.phase != @phase }
  booted_worker_count = @workers.count { |w| w.booted? }
  worker_status = '[' + @workers.map { |w| %Q!{ "started_at": "#{w.started_at.utc.iso8601}", "pid": #{w.pid}, "index": #{w.index}, "phase": #{w.phase}, "booted": #{w.booted?}, "last_checkin": "#{w.last_checkin.utc.iso8601}", "last_status": #{w.last_status} }!}.join(",") + ']'
  %Q!{ "started_at": "#{@started_at.utc.iso8601}", "workers": #{@workers.size}, "phase": #{@phase}, "booted_workers": #{booted_worker_count}, "old_workers": #{old_worker_count}, "worker_status": #{worker_status} }!
end

#stop

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 328

def stop
  @status = :stop
  wakeup!
end

#stop_blocked

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 333

def stop_blocked
  @status = :stop if @status == :run
  wakeup!
  @control.stop(true) if @control
  Process.waitall
end

#stop_workers

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 33

def stop_workers
  log "- Gracefully shutting down workers..."
  @workers.each { |x| x.term }

  begin
    loop do
      wait_workers
      break if @workers.empty?
      sleep 0.2
    end
  rescue Interrupt
    log "! Cancelled waiting for workers"
  end
end

#wait_workers (private)

loops thru @workers, removing workers that exited, and calling #term if needed

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 539

def wait_workers
  @workers.reject! do |w|
    begin
      if Process.wait(w.pid, Process::WNOHANG)
        true
      else
        w.term if w.term?
        nil
      end
    rescue Errno::ECHILD
      true # child is already terminated
    end
  end
end

#wakeup!

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 227

def wakeup!
  return unless @wakeup

  begin
    @wakeup.write "!" unless @wakeup.closed?
  rescue SystemCallError, IOError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
  end
end

#worker(index, master)

[ GitHub ]

  
# File 'lib/puma/cluster.rb', line 237

def worker(index, master)
  title  = "puma: cluster worker #{index}: #{master}"
  title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
  $0 = title

  Signal.trap "SIGINT", "IGNORE"

  @workers = []
  @master_read.close
  @suicide_pipe.close

  Thread.new do
    Puma.set_thread_name "worker check pipe"
    IO.select [@check_pipe]
    log "! Detected parent died, dying"
    exit! 1
  end

  # If we're not running under a Bundler context, then
  # report the info about the context we will be using
  if !ENV['BUNDLE_GEMFILE']
    if File.exist?("Gemfile")
      log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
    elsif File.exist?("gems.rb")
      log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
    end
  end

  # Invoke any worker boot hooks so they can get
  # things in shape before booting the app.
  @launcher.config.run_hooks :before_worker_boot, index

  server = start_server

  Signal.trap "SIGTERM" do
    @worker_write << "e#{Process.pid}\n" rescue nil
    server.stop
  end

  begin
    @worker_write << "b#{Process.pid}\n"
  rescue SystemCallError, IOError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    STDERR.puts "Master seems to have exited, exiting."
    return
  end

  Thread.new(@worker_write) do |io|
    Puma.set_thread_name "stat payload"
    base_payload = "p#{Process.pid}"

    while true
      sleep Const::WORKER_CHECK_INTERVAL
      begin
        b = server.backlog || 0
        r = server.running || 0
        t = server.pool_capacity || 0
        m = server.max_threads || 0
        payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m} }\n!
        io << payload
      rescue IOError
        Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
        break
      end
    end
  end

  server.run.join

  # Invoke any worker shutdown hooks so they can prevent the worker
  # exiting until any background operations are completed
  @launcher.config.run_hooks :before_worker_shutdown, index
ensure
  @worker_write << "t#{Process.pid}\n" rescue nil
  @worker_write.close
end