123456789_123456789_123456789_123456789_123456789_

Class: Puma::ThreadPool

Relationships & Source Files
Namespace Children
Classes:
Exceptions:
Inherits: Object
Defined in: lib/puma/thread_pool.rb

Overview

Internal Docs for A simple thread pool management object.

Each Puma “worker” has a thread pool to process requests.

First a connection to a client is made in Server. It is wrapped in a Client instance and then passed to the Reactor to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the #<< operator where it is stored in a @todo array.

Each thread in the pool has an internal loop where it pulls a request from the @todo array and proceses it.

Constant Summary

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(min, max, *extra, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 32

def initialize(min, max, *extra, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @shutdown = false

  @trim_requested = 0

  @workers = []

  @auto_trim = nil
  @reaper = nil

  @mutex.synchronize do
    @min.times { spawn_thread }
  end

  @clean_thread_locals = false
end

Class Method Details

.clean_thread_locals

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 66

def self.clean_thread_locals
  Thread.current.keys.each do |key| # rubocop: disable Performance/HashEachMethods
    Thread.current[key] = nil unless key == :__recursive_key__
  end
end

Instance Attribute Details

#clean_thread_locals (rw)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 64

attr_accessor :clean_thread_locals

#spawned (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 63

attr_reader :spawned, :trim_requested, :waiting

#trim_requested (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 63

attr_reader :spawned, :trim_requested, :waiting

#waiting (readonly)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 63

attr_reader :spawned, :trim_requested, :waiting

Instance Method Details

#<<(work)

Add work to the todo list for a Thread to pickup and process.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 154

def <<(work)
  @mutex.synchronize do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#auto_reap!(timeout = 5)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 278

def auto_reap!(timeout=5)
  @reaper = Automaton.new(self, timeout, "threadpool reaper", :reap)
  @reaper.start!
end

#auto_trim!(timeout = 30)

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 273

def auto_trim!(timeout=30)
  @auto_trim = Automaton.new(self, timeout, "threadpool trimmer", :trim)
  @auto_trim.start!
end

#backlog

How many objects have yet to be processed by the pool?

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 74

def backlog
  @mutex.synchronize { @todo.size }
end

#pool_capacity

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 78

def pool_capacity
  waiting + (@max - spawned)
end

#reap

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 231

def reap
  @mutex.synchronize do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers.delete_if do |w|
      dead_workers.include?(w)
    end
  end
end

#shutdown(timeout = -1))

Tell all threads in the pool to exit and wait for them to finish.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 285

def shutdown(timeout=-1)
  threads = @mutex.synchronize do
    @shutdown = true
    @not_empty.broadcast
    @not_full.broadcast

    @auto_trim.stop if @auto_trim
    @reaper.stop if @reaper
    # dup workers so that we join them all safely
    @workers.dup
  end

  if timeout == -1
    # Wait for threads to finish without force shutdown.
    threads.each(&:join)
  else
    # Wait for threads to finish after n attempts (timeout).
    # If threads are still running, it will forcefully kill them.
    timeout.times do
      threads.delete_if do |t|
        t.join 1
      end

      if threads.empty?
        break
      else
        sleep 1
      end
    end

    threads.each do |t|
      t.raise ForceShutdown
    end

    threads.each do |t|
      t.join SHUTDOWN_GRACE_TIME
    end
  end

  @spawned = 0
  @workers = []
end

#spawn_thread (private)

This method is for internal use only.

Must be called with @mutex held!

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 86

def spawn_thread
  @spawned += 1

  th = Thread.new(@spawned) do |spawned|
    Puma.set_thread_name 'threadpool %03i' % spawned
    todo  = @todo
    block = @block
    mutex = @mutex
    not_empty = @not_empty
    not_full = @not_full

    extra = @extra.map { |i| i.new }

    while true
      work = nil

      continue = true

      mutex.synchronize do
        while todo.empty?
          if @trim_requested > 0
            @trim_requested -= 1
            continue = false
            not_full.signal
            break
          end

          if @shutdown
            continue = false
            break
          end

          @waiting += 1
          not_full.signal
          not_empty.wait mutex
          @waiting -= 1
        end

        work = todo.shift if continue
      end

      break unless continue

      if @clean_thread_locals
        ThreadPool.clean_thread_locals
      end

      begin
        block.call(work, *extra)
      rescue Exception => e
        STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
      end
    end

    mutex.synchronize do
      @spawned -= 1
      @workers.delete th
    end
  end

  @workers << th

  th
end

#trim(force = false)

If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 220

def trim(force=false)
  @mutex.synchronize do
    if (force or @waiting > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_until_not_full

This method is used by Server to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.

The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another ::Puma process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.

For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the Server can pull a request right away.

If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the not_full condition variable is signaled, usually this indicates that a request has been processed.

It’s important to note that even though the server might accept another request, it might not be added to the @todo array right away. For example if a slow client has only sent a header, but not a body then the @todo array would stay the same size as the reactor works to try to buffer the request. In that scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully bufferend request makes it through the reactor and can then be processed by the thread pool.

Returns the current number of busy threads, or nil if shutting down.

[ GitHub ]

  
# File 'lib/puma/thread_pool.rb', line 199

def wait_until_not_full
  @mutex.synchronize do
    while true
      return if @shutdown

      # If we can still spin up new threads and there
      # is work queued that cannot be handled by waiting
      # threads, then accept more work until we would
      # spin up the max number of threads.
      busy_threads = @spawned - @waiting + @todo.size
      return busy_threads if @max > busy_threads

      @not_full.wait @mutex
    end
  end
end