123456789_123456789_123456789_123456789_123456789_

Class: Puma::Server

Relationships & Source Files
Super Chains via Extension / Inclusion / Inheritance
Class Chain:
self, Forwardable
Instance Chain:
self, Const
Inherits: Object
Defined in: lib/puma/server.rb

Overview

The HTTP Server itself. Serves out a single Rack app.

This class is used by the Single and Cluster classes to generate one or more Server instances capable of handling requests. Each Puma process will contain one Server instance.

The Server instance pulls requests from the socket, adds them to a Reactor where they get eventually passed to a ThreadPool.

Each Server will have one reactor and one thread pool.

Constant Summary

Const - Included

CGI_VER, CHUNKED, CHUNK_SIZE, CLOSE, CLOSE_CHUNKED, CODE_NAME, COLON, CONNECTION_CLOSE, CONNECTION_KEEP_ALIVE, CONTENT_LENGTH, CONTENT_LENGTH2, CONTENT_LENGTH_S, CONTINUE, EARLY_HINTS, ERROR_RESPONSE, FAST_TRACK_KA_TIMEOUT, FIRST_DATA_TIMEOUT, GATEWAY_INTERFACE, HALT_COMMAND, HEAD, HIJACK, HIJACK_IO, HIJACK_P, HTTP, HTTPS, HTTPS_KEY, HTTP_10_200, HTTP_11, HTTP_11_100, HTTP_11_200, HTTP_CONNECTION, HTTP_EXPECT, HTTP_HOST, HTTP_INJECTION_REGEX, HTTP_VERSION, HTTP_X_FORWARDED_FOR, HTTP_X_FORWARDED_PROTO, HTTP_X_FORWARDED_SCHEME, HTTP_X_FORWARDED_SSL, KEEP_ALIVE, LINE_END, LOCALHOST, LOCALHOST_ADDR, LOCALHOST_IP, MAX_BODY, MAX_FAST_INLINE, MAX_HEADER, NEWLINE, PATH_INFO, PERSISTENT_TIMEOUT, PORT_443, PORT_80, PUMA_CONFIG, PUMA_PEERCERT, PUMA_SERVER_STRING, PUMA_SOCKET, PUMA_TMP_BASE, PUMA_VERSION, QUERY_STRING, RACK_AFTER_REPLY, RACK_INPUT, RACK_URL_SCHEME, REMOTE_ADDR, REQUEST_METHOD, REQUEST_PATH, REQUEST_URI, RESTART_COMMAND, SERVER_NAME, SERVER_PORT, SERVER_PROTOCOL, SERVER_SOFTWARE, STOP_COMMAND, TRANSFER_ENCODING, TRANSFER_ENCODING2, TRANSFER_ENCODING_CHUNKED, WORKER_CHECK_INTERVAL, WRITE_TIMEOUT

Class Method Summary

Instance Attribute Summary

Instance Method Summary

Constructor Details

.new(app, events = Events.stdio, options = {}) ⇒ Server

Create a server for the rack app #app.

#events is an object which will be called when certain error events occur to be handled. See Events for the list of current methods to implement.

#run returns a thread that you can join on to wait for the server to do its work.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 56

def initialize(app, events=Events.stdio, options={})
  @app = app
  @events = events

  @check, @notify = Puma::Util.pipe

  @status = :stop

  @min_threads = 0
  @max_threads = 16
  @auto_trim_time = 30
  @reaping_time = 1

  @thread = nil
  @thread_pool = nil
  @early_hints = nil

  @persistent_timeout = options.fetch(:persistent_timeout, PERSISTENT_TIMEOUT)
  @first_data_timeout = options.fetch(:first_data_timeout, FIRST_DATA_TIMEOUT)

  @binder = Binder.new(events)

  @leak_stack_on_error = true

  @options = options
  @queue_requests = options[:queue_requests].nil? ? true : options[:queue_requests]

  ENV['RACK_ENV'] ||= "development"

  @mode = :http

  @precheck_closing = true
end

Class Method Details

.current

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1087

def self.current
  Thread.current[ThreadLocalKey]
end

Instance Attribute Details

#app (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 39

attr_accessor :app

#auto_trim_time (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 44

attr_accessor :auto_trim_time

#binder (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 90

attr_accessor :binder, :leak_stack_on_error, :early_hints

#early_hints (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 90

attr_accessor :binder, :leak_stack_on_error, :early_hints

#events (readonly)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 38

attr_reader :events

#first_data_timeout (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 46

attr_accessor :first_data_timeout

#leak_stack_on_error (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 90

attr_accessor :binder, :leak_stack_on_error, :early_hints

#max_threads (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 42

attr_accessor :max_threads

#min_threads (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 41

attr_accessor :min_threads

#persistent_timeout (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 43

attr_accessor :persistent_timeout

#reaping_time (rw)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 45

attr_accessor :reaping_time

#shutting_down?Boolean (readonly)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1091

def shutting_down?
  @status == :stop || @status == :restart
end

#thread (readonly)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 37

attr_reader :thread

Instance Method Details

#backlog

[ GitHub ]

  
# File 'lib/puma/server.rb', line 155

def backlog
  @thread_pool and @thread_pool.backlog
end

#begin_restart

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1060

def begin_restart
  notify_safely(RESTART_COMMAND)
end

#closed_socket?(socket)

See additional method definition at line 127.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 150

def closed_socket?(socket)
  return false unless socket.kind_of? TCPSocket
  return false unless @precheck_closing

  begin
    tcp_info = socket.getsockopt(Socket::SOL_TCP, Socket::TCP_INFO)
  rescue IOError, SystemCallError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    @precheck_closing = false
    false
  else
    state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0]
    # TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
    (state >= 6 && state <= 9) || state == 11
  end
end

#cork_socket(socket)

6 == Socket::IPPROTO_TCP 3 == TCP_CORK 1/0 == turn on/off

See additional method definition at line 111.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 144

def cork_socket(socket)
  begin
    socket.setsockopt(6, 3, 1) if socket.kind_of? TCPSocket
  rescue IOError, SystemCallError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
  end
end

#default_server_port(env)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 621

def default_server_port(env)
  if ['on', HTTPS].include?(env[HTTPS_KEY]) || env[HTTP_X_FORWARDED_PROTO].to_s[0...5] == HTTPS || env[HTTP_X_FORWARDED_SCHEME] == HTTPS || env[HTTP_X_FORWARDED_SSL] == "on"
    PORT_443
  else
    PORT_80
  end
end

#fast_write(io, str) (private)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1064

def fast_write(io, str)
  n = 0
  while true
    begin
      n = io.syswrite str
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK
      if !IO.select(nil, [io], nil, WRITE_TIMEOUT)
        raise ConnectionError, "Socket timeout writing data"
      end

      retry
    rescue  Errno::EPIPE, SystemCallError, IOError
      raise ConnectionError, "Socket timeout writing data"
    end

    return if n == str.bytesize
    str = str.byteslice(n..-1)
  end
end

#fetch_status_code(status) (private)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 900

def fetch_status_code(status)
  HTTP_STATUS_CODES.fetch(status) { 'CUSTOM' }
end

#graceful_shutdown

Wait for all outstanding requests to finish.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 979

def graceful_shutdown
  if @options[:shutdown_debug]
    threads = Thread.list
    total = threads.size

    pid = Process.pid

    $stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n"

    threads.each_with_index do |t,i|
      $stdout.syswrite "#{pid}: Thread #{i+1}/#{total}: #{t.inspect}\n"
      $stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n"
    end
    $stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
  end

  if @options[:drain_on_shutdown]
    count = 0

    while true
      ios = IO.select @binder.ios, nil, nil, 0
      break unless ios

      ios.first.each do |sock|
        begin
          if io = sock.accept_nonblock
            count += 1
            client = Client.new io, @binder.env(sock)
            @thread_pool << client
          end
        rescue SystemCallError
        end
      end
    end

    @events.debug "Drained #{count} additional connections."
  end

  if @status != :restart
    @binder.close
  end

  if @thread_pool
    if timeout = @options[:force_shutdown_after]
      @thread_pool.shutdown timeout.to_i
    else
      @thread_pool.shutdown
    end
  end
end

#halt(sync = false)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1055

def halt(sync=false)
  notify_safely(HALT_COMMAND)
  @thread.join if @thread && sync
end

#handle_check

This method is for internal use only.
[ GitHub ]

  
# File 'lib/puma/server.rb', line 443

def handle_check
  cmd = @check.read(1)

  case cmd
  when STOP_COMMAND
    @status = :stop
    return true
  when HALT_COMMAND
    @status = :halt
    return true
  when RESTART_COMMAND
    @status = :restart
    return true
  end

  return false
end

#handle_request(req, lines)

Takes the request req, invokes the Rack application to construct the response and writes it back to req.io.

The second parameter lines is a IO-like object unique to this thread. This is normally an instance of IOBuffer.

It’ll return false when the connection is closed, this doesn’t mean that the response wasn’t successful.

It’ll return :async if the connection remains open but will be handled elsewhere, i.e. the connection has been hijacked by the Rack application.

Finally, it’ll return true on keep-alive connections.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 642

def handle_request(req, lines)
  env = req.env
  client = req.io

  return false if closed_socket?(client)

  normalize_env env, req

  env[PUMA_SOCKET] = client

  if env[HTTPS_KEY] && client.peercert
    env[PUMA_PEERCERT] = client.peercert
  end

  env[HIJACK_P] = true
  env[HIJACK] = req

  body = req.body

  head = env[REQUEST_METHOD] == HEAD

  env[RACK_INPUT] = body
  env[RACK_URL_SCHEME] = default_server_port(env) == PORT_443 ? HTTPS : HTTP

  if @early_hints
    env[EARLY_HINTS] = lambda { |headers|
      begin
        fast_write client, "HTTP/1.1 103 Early Hints\r\n".freeze

        headers.each_pair do |k, vs|
          if vs.respond_to?(:to_s) && !vs.to_s.empty?
            vs.to_s.split(NEWLINE).each do |v|
              next if possible_header_injection?(v)
              fast_write client, "#{k}: #{v}\r\n"
            end
          else
            fast_write client, "#{k}: #{vs}\r\n"
          end
        end

        fast_write client, "\r\n".freeze
      rescue ConnectionError
        # noop, if we lost the socket we just won't send the early hints
      end
    }
  end

  # Fixup any headers with , in the name to have _ now. We emit
  # headers with , in them during the parse phase to avoid ambiguity
  # with the - to _ conversion for critical headers. But here for
  # compatibility, we'll convert them back. This code is written to
  # avoid allocation in the common case (ie there are no headers
  # with , in their names), that's why it has the extra conditionals.

  to_delete = nil
  to_add = nil

  env.each do |k,v|
    if k.start_with?("HTTP_") and k.include?(",") and k != "HTTP_TRANSFER,ENCODING"
      if to_delete
        to_delete << k
      else
        to_delete = [k]
      end

      unless to_add
        to_add = {}
      end

      to_add[k.tr(",", "_")] = v
    end
  end

  if to_delete
    to_delete.each { |k| env.delete(k) }
    env.merge! to_add
  end

  # A rack extension. If the app writes #call'ables to this
  # array, we will invoke them when the request is done.
  #
  after_reply = env[RACK_AFTER_REPLY] = []

  begin
    begin
      status, headers, res_body = @app.call(env)

      return :async if req.hijacked

      status = status.to_i

      if status == -1
        unless headers.empty? and res_body == []
          raise "async response must have empty headers and body"
        end

        return :async
      end
    rescue ThreadPool::ForceShutdown => e
      @events.log "Detected force shutdown of a thread, returning 503"
      @events.unknown_error self, e, "Rack app"

      status = 503
      headers = {}
      res_body = ["Request was internally terminated early\n"]

    rescue Exception => e
      @events.unknown_error self, e, "Rack app", env

      status, headers, res_body = lowlevel_error(e, env)
    end

    content_length = nil
    no_body = head

    if res_body.kind_of? Array and res_body.size == 1
      content_length = res_body[0].bytesize
    end

    cork_socket client

    line_ending = LINE_END
    colon = COLON

    http_11 = if env[HTTP_VERSION] == HTTP_11
      allow_chunked = true
      keep_alive = env.fetch(HTTP_CONNECTION, "").downcase != CLOSE
      include_keepalive_header = false

      # An optimization. The most common response is 200, so we can
      # reply with the proper 200 status without having to compute
      # the response header.
      #
      if status == 200
        lines << HTTP_11_200
      else
        lines.append "HTTP/1.1 ", status.to_s, " ",
                     fetch_status_code(status), line_ending

        no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
      end
      true
    else
      allow_chunked = false
      keep_alive = env.fetch(HTTP_CONNECTION, "").downcase == KEEP_ALIVE
      include_keepalive_header = keep_alive

      # Same optimization as above for HTTP/1.1
      #
      if status == 200
        lines << HTTP_10_200
      else
        lines.append "HTTP/1.0 ", status.to_s, " ",
                     fetch_status_code(status), line_ending

        no_body ||= status < 200 || STATUS_WITH_NO_ENTITY_BODY[status]
      end
      false
    end

    response_hijack = nil

    headers.each do |k, vs|
      case k.downcase
      when CONTENT_LENGTH2
        next if possible_header_injection?(vs)
        content_length = vs
        next
      when TRANSFER_ENCODING
        allow_chunked = false
        content_length = nil
      when HIJACK
        response_hijack = vs
        next
      end

      if vs.respond_to?(:to_s) && !vs.to_s.empty?
        vs.to_s.split(NEWLINE).each do |v|
          next if possible_header_injection?(v)
          lines.append k, colon, v, line_ending
        end
      else
        lines.append k, colon, line_ending
      end
    end

    if include_keepalive_header
      lines << CONNECTION_KEEP_ALIVE
    elsif http_11 && !keep_alive
      lines << CONNECTION_CLOSE
    end

    if no_body
      if content_length and status != 204
        lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
      end

      lines << line_ending
      fast_write client, lines.to_s
      return keep_alive
    end

    if content_length
      lines.append CONTENT_LENGTH_S, content_length.to_s, line_ending
      chunked = false
    elsif !response_hijack and allow_chunked
      lines << TRANSFER_ENCODING_CHUNKED
      chunked = true
    end

    lines << line_ending

    fast_write client, lines.to_s

    if response_hijack
      response_hijack.call client
      return :async
    end

    begin
      res_body.each do |part|
        next if part.bytesize.zero?
        if chunked
          fast_write client, part.bytesize.to_s(16)
          fast_write client, line_ending
          fast_write client, part
          fast_write client, line_ending
        else
          fast_write client, part
        end

        client.flush
      end

      if chunked
        fast_write client, CLOSE_CHUNKED
        client.flush
      end
    rescue SystemCallError, IOError
      raise ConnectionError, "Connection error detected during write"
    end

  ensure
    begin
      uncork_socket client

      body.close
      req.tempfile.unlink if req.tempfile
    ensure
      res_body.close if res_body.respond_to? :close
    end

    after_reply.each { |o| o.call }
  end

  return keep_alive
end

#handle_servers

[ GitHub ]

  
# File 'lib/puma/server.rb', line 368

def handle_servers
  begin
    check = @check
    sockets = [check] + @binder.ios
    pool = @thread_pool
    queue_requests = @queue_requests

    remote_addr_value = nil
    remote_addr_header = nil

    case @options[:remote_address]
    when :value
      remote_addr_value = @options[:remote_address_value]
    when :header
      remote_addr_header = @options[:remote_address_header]
    end

    while @status == :run
      begin
        ios = IO.select sockets
        ios.first.each do |sock|
          if sock == check
            break if handle_check
          else
            begin
              if io = sock.accept_nonblock
                client = Client.new io, @binder.env(sock)
                if remote_addr_value
                  client.peerip = remote_addr_value
                elsif remote_addr_header
                  client.remote_addr_header = remote_addr_header
                end

                pool << client
                busy_threads = pool.wait_until_not_full
                if busy_threads == 0
                  @options[:out_of_band].each(&:call) if @options[:out_of_band]
                end
              end
            rescue SystemCallError
              # nothing
            rescue Errno::ECONNABORTED
              # client closed the socket even before accept
              begin
                io.close
              rescue
                Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
              end
            end
          end
        end
      rescue Object => e
        @events.unknown_error self, e, "Listen loop"
      end
    end

    @events.fire :state, @status

    graceful_shutdown if @status == :stop || @status == :restart
    if queue_requests
      @reactor.clear!
      @reactor.shutdown
    end
  rescue Exception => e
    STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
    STDERR.puts e.backtrace
  ensure
    @check.close
    @notify.close
  end

  @events.fire :state, :done
end

#handle_servers_lopez_mode

[ GitHub ]

  
# File 'lib/puma/server.rb', line 217

def handle_servers_lopez_mode
  begin
    check = @check
    sockets = [check] + @binder.ios
    pool = @thread_pool

    while @status == :run
      begin
        ios = IO.select sockets
        ios.first.each do |sock|
          if sock == check
            break if handle_check
          else
            begin
              if io = sock.accept_nonblock
                client = Client.new io, nil
                pool << client
              end
            rescue SystemCallError
              # nothing
            rescue Errno::ECONNABORTED
              # client closed the socket even before accept
              begin
                io.close
              rescue
                Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
              end
            end
          end
        end
      rescue Object => e
        @events.unknown_error self, e, "Listen loop"
      end
    end

    @events.fire :state, @status

    graceful_shutdown if @status == :stop || @status == :restart

  rescue Exception => e
    STDERR.puts "Exception handling servers: #{e.message} (#{e.class})"
    STDERR.puts e.backtrace
  ensure
    begin
      @check.close
    rescue
      Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    end

    # Prevent can't modify frozen IOError (RuntimeError)
    begin
      @notify.close
    rescue IOError
      # no biggy
    end
  end

  @events.fire :state, :done
end

#inherit_binder(bind)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 94

def inherit_binder(bind)
  @binder = bind
end

#lowlevel_error(e, env)

A fallback rack response if @app raises as exception.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 961

def lowlevel_error(e, env)
  if handler = @options[:lowlevel_error_handler]
    if handler.arity == 1
      return handler.call(e)
    else
      return handler.call(e, env)
    end
  end

  if @leak_stack_on_error
    [500, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]]
  else
    [500, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
  end
end

#normalize_env(env, client)

Given a Hash env for the request read from client, add and fixup keys to comply with Rack’s env guidelines.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 568

def normalize_env(env, client)
  if host = env[HTTP_HOST]
    if colon = host.index(":")
      env[SERVER_NAME] = host[0, colon]
      env[SERVER_PORT] = host[colon+1, host.bytesize]
    else
      env[SERVER_NAME] = host
      env[SERVER_PORT] = default_server_port(env)
    end
  else
    env[SERVER_NAME] = LOCALHOST
    env[SERVER_PORT] = default_server_port(env)
  end

  unless env[REQUEST_PATH]
    # it might be a dumbass full host request header
    uri = URI.parse(env[REQUEST_URI])
    env[REQUEST_PATH] = uri.path

    raise "No REQUEST PATH" unless env[REQUEST_PATH]

    # A nil env value will cause a LintError (and fatal errors elsewhere),
    # so only set the env value if there actually is a value.
    env[QUERY_STRING] = uri.query if uri.query
  end

  env[PATH_INFO] = env[REQUEST_PATH]

  # From http://www.ietf.org/rfc/rfc3875 :
  # "Script authors should be aware that the REMOTE_ADDR and
  # REMOTE_HOST meta-variables (see sections 4.1.8 and 4.1.9)
  # may not identify the ultimate source of the request.
  # They identify the client for the immediate request to the
  # server; that client may be a proxy, gateway, or other
  # intermediary acting on behalf of the actual source client."
  #

  unless env.key?(REMOTE_ADDR)
    begin
      addr = client.peerip
    rescue Errno::ENOTCONN
      # Client disconnects can result in an inability to get the
      # peeraddr from the socket; default to localhost.
      addr = LOCALHOST_IP
    end

    # Set unix socket addrs to localhost
    addr = LOCALHOST_IP if addr.empty?

    env[REMOTE_ADDR] = addr
  end
end

#notify_safely(message) (private)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1030

def notify_safely(message)
  begin
    @notify << message
  rescue IOError
     # The server, in another thread, is shutting down
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
  rescue RuntimeError => e
    # Temporary workaround for https://bugs.ruby-lang.org/issues/13239
    if e.message.include?('IOError')
      Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    else
      raise e
    end
  end
end

#pool_capacity

This number represents the number of requests that the server is capable of taking right now.

For example if the number is 5 then it means there are 5 threads sitting idle ready to take a request. If one request comes in, then the value would be 4 until it finishes processing.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 171

def pool_capacity
  @thread_pool and @thread_pool.pool_capacity
end

#possible_header_injection?(header_value) ⇒ Boolean (private)

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1095

def possible_header_injection?(header_value)
  HTTP_INJECTION_REGEX =~ header_value.to_s
end

#process_client(client, buffer)

Given a connection on client, handle the incoming requests.

This method support HTTP Keep-Alive so it may, depending on if the client indicates that it supports keep alive, wait for another request before returning.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 467

def process_client(client, buffer)
  begin

    clean_thread_locals = @options[:clean_thread_locals]
    close_socket = true

    requests = 0

    while true
      case handle_request(client, buffer)
      when false
        return
      when :async
        close_socket = false
        return
      when true
        return unless @queue_requests
        buffer.reset

        ThreadPool.clean_thread_locals if clean_thread_locals

        requests += 1

        # Closing keepalive sockets after they've made a reasonable
        # number of requests allows Puma to service many connections
        # fairly, even when the number of concurrent connections exceeds
        # the size of the threadpool. It also allows cluster mode Pumas
        # to keep load evenly distributed across workers, because clients
        # are randomly assigned a new worker when opening a new connection.
        #
        # Previously, Puma would kick connections in this conditional back
        # to the reactor. However, because this causes the todo set to increase
        # in size, the wait_until_full mutex would never unlock, leaving
        # any additional connections unserviced.
        break if requests >= MAX_FAST_INLINE

        check_for_more_data = @status == :run

        unless client.reset(check_for_more_data)
          close_socket = false
          client.set_timeout @persistent_timeout
          @reactor.add client
          return
        end
      end
    end

  # The client disconnected while we were reading data
  rescue ConnectionError
    # Swallow them. The ensure tries to close client down

  # SSL handshake error
  rescue MiniSSL::SSLError => e
    lowlevel_error(e, client.env)

    ssl_socket = client.io
    addr = ssl_socket.peeraddr.last
    cert = ssl_socket.peercert

    close_socket = true

    @events.ssl_error self, addr, cert, e

  # The client doesn't know HTTP well
  rescue HttpParserError => e
    lowlevel_error(e, client.env)

    client.write_error(400)

    @events.parse_error self, client.env, e
  rescue HttpParserError501 => e
    lowlevel_error(e, client.env)

    client.write_error(501)

    @events.parse_error self, client.env, e
  # Server error
  rescue StandardError => e
    lowlevel_error(e, client.env)

    client.write_error(500)

    @events.unknown_error self, e, "Read"

  ensure
    buffer.reset

    begin
      client.close if close_socket
    rescue IOError, SystemCallError
      Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
      # Already closed
    rescue StandardError => e
      @events.unknown_error self, e, "Client"
    end
  end
end

#read_body(env, client, body, cl)

Given the request env from client and the partial body body plus a potential Content-Length value cl, finish reading the body and return it.

If the body is larger than MAX_BODY, a Tempfile object is used for the body, otherwise a StringIO is used.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 912

def read_body(env, client, body, cl)
  content_length = cl.to_i

  remain = content_length - body.bytesize

  return StringIO.new(body) if remain <= 0

  # Use a Tempfile if there is a lot of data left
  if remain > MAX_BODY
    stream = Tempfile.new(Const::PUMA_TMP_BASE)
    stream.binmode
  else
    # The body[0,0] trick is to get an empty string in the same
    # encoding as body.
    stream = StringIO.new body[0,0]
  end

  stream.write body

  # Read an odd sized chunk so we can read even sized ones
  # after this
  chunk = client.readpartial(remain % CHUNK_SIZE)

  # No chunk means a closed socket
  unless chunk
    stream.close
    return nil
  end

  remain -= stream.write(chunk)

  # Raed the rest of the chunks
  while remain > 0
    chunk = client.readpartial(CHUNK_SIZE)
    unless chunk
      stream.close
      return nil
    end

    remain -= stream.write(chunk)
  end

  stream.rewind

  return stream
end

#run(background = true)

Runs the server.

If background is true (the default) then a thread is spun up in the background to handle requests. Otherwise requests are handled synchronously.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 282

def run(background=true)
  BasicSocket.do_not_reverse_lookup = true

  @events.fire :state, :booting

  @status = :run

  if @mode == :tcp
    return run_lopez_mode(background)
  end

  queue_requests = @queue_requests

  @thread_pool = ThreadPool.new(@min_threads,
                                @max_threads,
                                IOBuffer) do |client, buffer|

    # Advertise this server into the thread
    Thread.current[ThreadLocalKey] = self

    process_now = false

    begin
      if queue_requests
        process_now = client.eagerly_finish
      else
        client.finish
        process_now = true
      end
    rescue MiniSSL::SSLError => e
      ssl_socket = client.io
      addr = ssl_socket.peeraddr.last
      cert = ssl_socket.peercert

      client.close

      @events.ssl_error self, addr, cert, e
    rescue HttpParserError => e
      client.write_error(400)
      client.close

      @events.parse_error self, client.env, e
    rescue HttpParserError501 => e
      client.write_error(501)
      client.close
      @events.parse_error self, client.env, e
    rescue ConnectionError, EOFError
      client.close
    else
      if process_now
        process_client client, buffer
      else
        client.set_timeout @first_data_timeout
        @reactor.add client
      end
    end
  end

  @thread_pool.clean_thread_locals = @options[:clean_thread_locals]

  if queue_requests
    @reactor = Reactor.new self, @thread_pool
    @reactor.run_in_thread
  end

  if @reaping_time
    @thread_pool.auto_reap!(@reaping_time)
  end

  if @auto_trim_time
    @thread_pool.auto_trim!(@auto_trim_time)
  end

  @events.fire :state, :running

  if background
    @thread = Thread.new do
      Puma.set_thread_name "server"
      handle_servers
    end
    return @thread
  else
    handle_servers
  end
end

#run_lopez_mode(background = true)

Lopez Mode == raw tcp apps

[ GitHub ]

  
# File 'lib/puma/server.rb', line 177

def run_lopez_mode(background=true)
  @thread_pool = ThreadPool.new(@min_threads,
                                @max_threads,
                                Hash) do |client, tl|

    io = client.to_io
    addr = io.peeraddr.last

    if addr.empty?
      # Set unix socket addrs to localhost
      addr = "127.0.0.1:0"
    else
      addr = "#{addr}:#{io.peeraddr[1]}"
    end

    env = { 'thread' => tl, REMOTE_ADDR => addr }

    begin
      @app.call env, client.to_io
    rescue Object => e
      STDERR.puts "! Detected exception at toplevel: #{e.message} (#{e.class})"
      STDERR.puts e.backtrace
    end

    client.close unless env['detach']
  end

  @events.fire :state, :running

  if background
    @thread = Thread.new do
      Puma.set_thread_name "server"
      handle_servers_lopez_mode
    end
    return @thread
  else
    handle_servers_lopez_mode
  end
end

#running

[ GitHub ]

  
# File 'lib/puma/server.rb', line 159

def running
  @thread_pool and @thread_pool.spawned
end

#stop(sync = false)

Stops the acceptor thread and then causes the worker threads to finish off the request queue before finally exiting.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 1050

def stop(sync=false)
  notify_safely(STOP_COMMAND)
  @thread.join if @thread && sync
end

#tcp_mode!

[ GitHub ]

  
# File 'lib/puma/server.rb', line 98

def tcp_mode!
  @mode = :tcp
end

#uncork_socket(socket)

See additional method definition at line 119.

[ GitHub ]

  
# File 'lib/puma/server.rb', line 147

def uncork_socket(socket)
  begin
    socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket
  rescue IOError, SystemCallError
    Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
  end
end