Class: Puma::Cluster
Relationships & Source Files | |
Namespace Children | |
Classes:
| |
Super Chains via Extension / Inclusion / Inheritance | |
Class Chain:
self,
Runner
|
|
Instance Chain:
self,
Runner
|
|
Inherits: |
Puma::Runner
|
Defined in: | lib/puma/cluster.rb |
Overview
This class is instantiated by the Launcher
and used to boot and serve a Ruby application when puma “workers” are needed i.e. when using multi-processes. For example $ puma -w 5
At the core of this class is running an instance of Server
which gets created via the start_server
method from the Runner
class that this inherits from.
An instance of this class will spawn the number of processes passed in via the #spawn_workers method call. Each worker will have it’s own instance of a Server
.
Class Method Summary
Instance Attribute Summary
- #all_workers_booted? ⇒ Boolean readonly
- #preload? ⇒ Boolean readonly
Runner
- Inherited
Instance Method Summary
- #check_workers(force = false)
- #cull_workers
- #halt
- #next_worker_index
- #phased_restart
- #redirect_io
- #reload_worker_directory
- #restart
- #run
-
#setup_signals
We do this in a separate method to keep the lambda scope of the signals handlers as small as possible.
- #spawn_workers
- #start_phased_restart
-
#stats
Inside of a child process, this will return all zeroes, as @workers is only populated in the master process.
- #stop
- #stop_blocked
- #stop_workers
- #wakeup!
- #worker(index, master)
-
#wait_workers
private
loops thru @workers, removing workers that exited, and calling
#term
if needed.
Runner
- Inherited
Constructor Details
.new(cli, events) ⇒ Cluster
# File 'lib/puma/cluster.rb', line 22
def initialize(cli, events) super cli, events @phase = 0 @workers = [] @next_check = nil @phased_state = :idle @phased_restart = false end
Instance Attribute Details
#all_workers_booted? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 178
def all_workers_booted? @workers.count { |w| !w.booted? } == 0 end
#preload? ⇒ Boolean
(readonly)
[ GitHub ]
# File 'lib/puma/cluster.rb', line 360
def preload? @options[:preload_app] end
Instance Method Details
#check_workers(force = false)
[ GitHub ]# File 'lib/puma/cluster.rb', line 182
def check_workers(force=false) return if !force && @next_check && @next_check >= Time.now @next_check = Time.now + Const::WORKER_CHECK_INTERVAL any = false @workers.each do |w| next if !w.booted? && !w.ping_timeout?(@options[:worker_boot_timeout]) if w.ping_timeout?(@options[:worker_timeout]) log "! Terminating timed out worker: #{w.pid}" w.kill any = true end end # If we killed any timed out workers, try to catch them # during this loop by giving the kernel time to kill them. sleep 1 if any wait_workers cull_workers spawn_workers if all_workers_booted? # If we're running at proper capacity, check to see if # we need to phase any workers out (which will restart # in the right phase). # w = @workers.find { |x| x.phase != @phase } if w if @phased_state == :idle @phased_state = :waiting log "- Stopping #{w.pid} for phased upgrade..." end unless w.term? w.term log "- #{w.signal} sent to #{w.pid}..." end end end end
#cull_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 156
def cull_workers diff = @workers.size - @options[:workers] return if diff < 1 debug "Culling #{diff.inspect} workers" workers_to_cull = @workers[-diff,diff] debug "Workers to cull: #{workers_to_cull.inspect}" workers_to_cull.each do |worker| log "- Worker #{worker.index} (pid: #{worker.pid}) terminating" worker.term end end
#halt
[ GitHub ]# File 'lib/puma/cluster.rb', line 340
def halt @status = :halt wakeup! end
#next_worker_index
[ GitHub ]# File 'lib/puma/cluster.rb', line 171
def next_worker_index all_positions = 0...@options[:workers] occupied_positions = @workers.map { |w| w.index } available_positions = all_positions.to_a - occupied_positions available_positions.first end
#phased_restart
[ GitHub ]# File 'lib/puma/cluster.rb', line 319
def phased_restart return false if @options[:preload_app] @phased_restart = true wakeup! true end
#redirect_io
[ GitHub ]# File 'lib/puma/cluster.rb', line 59
def redirect_io super @workers.each { |x| x.hup } end
#reload_worker_directory
[ GitHub ]# File 'lib/puma/cluster.rb', line 345
def reload_worker_directory dir = @launcher.restart_dir log "+ Changing to #{dir}" Dir.chdir dir end
#restart
[ GitHub ]# File 'lib/puma/cluster.rb', line 314
def restart @restart = true stop end
#run
[ GitHub ]# File 'lib/puma/cluster.rb', line 402
def run @status = :run output_header "cluster" log "* Process workers: #{@options[:workers]}" before = Thread.list if preload? log "* Preloading application" load_and_bind after = Thread.list if after.size > before.size threads = (after - before) if threads.first.respond_to? :backtrace log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:" threads.each do |t| log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}" end else log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot" end end else log "* Phased restart available" unless @launcher.config.app_configured? error "No application configured, nothing to run" exit 1 end @launcher.binder.parse @options[:binds], self end read, @wakeup = Puma::Util.pipe setup_signals # Used by the workers to detect if the master process dies. # If select says that @check_pipe is ready, it's because the # master has exited and @suicide_pipe has been automatically # closed. # @check_pipe, @suicide_pipe = Puma::Util.pipe if daemon? log "* Daemonizing..." Process.daemon(true) else log "Use Ctrl-C to stop" end redirect_io Plugins.fire_background @launcher.write_state start_control @master_read, @worker_write = read, @wakeup @launcher.config.run_hooks :before_fork, nil spawn_workers Signal.trap "SIGINT" do stop end @launcher.events.fire_on_booted! begin force_check = false while @status == :run begin if @phased_restart start_phased_restart @phased_restart = false end check_workers force_check force_check = false res = IO.select([read], nil, nil, Const::WORKER_CHECK_INTERVAL) if res req = read.read_nonblock(1) next if !req || req == "!" result = read.gets pid = result.to_i if w = @workers.find { |x| x.pid == pid } case req when "b" w.boot! log "- Worker #{w.index} (pid: #{pid}) booted, phase: #{w.phase}" force_check = true when "e" # external term, see worker method, Signal.trap "SIGTERM" w.instance_variable_set :@term, true when "t" w.term unless w.term? force_check = true when "p" w.ping!(result.sub(/^\d+/,'').chomp) end else log "! Out-of-sync worker list, no #{pid} worker" end end rescue Interrupt @status = :stop end end stop_workers unless @status == :halt ensure @check_pipe.close @suicide_pipe.close read.close @wakeup.close end end
#setup_signals
We do this in a separate method to keep the lambda scope of the signals handlers as small as possible.
# File 'lib/puma/cluster.rb', line 366
def setup_signals Signal.trap "SIGCHLD" do wakeup! end Signal.trap "TTIN" do @options[:workers] += 1 wakeup! end Signal.trap "TTOU" do @options[:workers] -= 1 if @options[:workers] >= 2 wakeup! end master_pid = Process.pid Signal.trap "SIGTERM" do # The worker installs their own SIGTERM when booted. # Until then, this is run by the worker and the worker # should just exit if they get it. if Process.pid != master_pid log "Early termination of worker" exit! 0 else @launcher.close_binder_listeners stop_workers stop raise(SignalException, "SIGTERM") if @options[:raise_exception_on_sigterm] exit 0 # Clean exit, workers were stopped end end end
#spawn_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 128
def spawn_workers diff = @options[:workers] - @workers.size return if diff < 1 master = Process.pid diff.times do idx = next_worker_index @launcher.config.run_hooks :before_worker_fork, idx pid = fork { worker(idx, master) } if !pid log "! Complete inability to spawn new workers detected" log "! Seppuku is the only choice." exit! 1 end debug "Spawned worker: #{pid}" @workers << Worker.new(idx, pid, @phase, @options) @launcher.config.run_hooks :after_worker_fork, idx end if diff > 0 @phased_state = :idle end end
#start_phased_restart
[ GitHub ]# File 'lib/puma/cluster.rb', line 48
def start_phased_restart @phase += 1 log "- Starting phased worker restart, phase: #{@phase}" # Be sure to change the directory again before loading # the app. This way we can pick up new code. dir = @launcher.restart_dir log "+ Changing to #{dir}" Dir.chdir dir end
#stats
Inside of a child process, this will return all zeroes, as @workers is only populated in the master process.
# File 'lib/puma/cluster.rb', line 353
def stats old_worker_count = @workers.count { |w| w.phase != @phase } booted_worker_count = @workers.count { |w| w.booted? } worker_status = '[' + @workers.map { |w| %Q!{ "started_at": "#{w.started_at.utc.iso8601}", "pid": #{w.pid}, "index": #{w.index}, "phase": #{w.phase}, "booted": #{w.booted?}, "last_checkin": "#{w.last_checkin.utc.iso8601}", "last_status": #{w.last_status} }!}.join(",") + ']' %Q!{ "started_at": "#{@started_at.utc.iso8601}", "workers": #{@workers.size}, "phase": #{@phase}, "booted_workers": #{booted_worker_count}, "old_workers": #{old_worker_count}, "worker_status": #{worker_status} }! end
#stop
[ GitHub ]# File 'lib/puma/cluster.rb', line 328
def stop @status = :stop wakeup! end
#stop_blocked
[ GitHub ]#stop_workers
[ GitHub ]# File 'lib/puma/cluster.rb', line 33
def stop_workers log "- Gracefully shutting down workers..." @workers.each { |x| x.term } begin loop do wait_workers break if @workers.empty? sleep 0.2 end rescue Interrupt log "! Cancelled waiting for workers" end end
#wait_workers (private)
loops thru @workers, removing workers that exited, and calling #term
if needed
# File 'lib/puma/cluster.rb', line 539
def wait_workers @workers.reject! do |w| begin if Process.wait(w.pid, Process::WNOHANG) true else w.term if w.term? nil end rescue Errno::ECHILD true # child is already terminated end end end
#wakeup!
[ GitHub ]# File 'lib/puma/cluster.rb', line 227
def wakeup! return unless @wakeup begin @wakeup.write "!" unless @wakeup.closed? rescue SystemCallError, IOError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue end end
#worker(index, master)
[ GitHub ]# File 'lib/puma/cluster.rb', line 237
def worker(index, master) title = "puma: cluster worker #{index}: #{master}" title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty? $0 = title Signal.trap "SIGINT", "IGNORE" @workers = [] @master_read.close @suicide_pipe.close Thread.new do Puma.set_thread_name "worker check pipe" IO.select [@check_pipe] log "! Detected parent died, dying" exit! 1 end # If we're not running under a Bundler context, then # report the info about the context we will be using if !ENV['BUNDLE_GEMFILE'] if File.exist?("Gemfile") log "+ Gemfile in context: #{File. ("Gemfile")}" elsif File.exist?("gems.rb") log "+ Gemfile in context: #{File. ("gems.rb")}" end end # Invoke any worker boot hooks so they can get # things in shape before booting the app. @launcher.config.run_hooks :before_worker_boot, index server = start_server Signal.trap "SIGTERM" do @worker_write << "e#{Process.pid}\n" rescue nil server.stop end begin @worker_write << "b#{Process.pid}\n" rescue SystemCallError, IOError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue STDERR.puts "Master seems to have exited, exiting." return end Thread.new(@worker_write) do |io| Puma.set_thread_name "stat payload" base_payload = "p#{Process.pid}" while true sleep Const::WORKER_CHECK_INTERVAL begin b = server.backlog || 0 r = server.running || 0 t = server.pool_capacity || 0 m = server.max_threads || 0 payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m} }\n! io << payload rescue IOError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue break end end end server.run.join # Invoke any worker shutdown hooks so they can prevent the worker # exiting until any background operations are completed @launcher.config.run_hooks :before_worker_shutdown, index ensure @worker_write << "t#{Process.pid}\n" rescue nil @worker_write.close end