👩🏻💻 github.com/rosa
🌐 rosa.codes
Principal programmer at 37signals
We'll be using Puma
as our web server
and nginx
on top of it
The code shown is from Rails 7.0.x
and Rack 2.2.x
(but the relevant parts are the same)
Started GET "/boxes/210.json" for ...
Processing by BoxesController#show as JSON
Parameters: {"id"=>"210"}
Oauth::Client Load (0.7ms) SELECT `oauth_clients`.* FROM `oauth_clients` ..
Session Load (0.7ms) SELECT `sessions`.* FROM `sessions`
WHERE `sessions`.`token` = '...' AND `sessions`.`identity_id` = 4 LIMIT 1
Authenticated Identity#4 from OAuth access token
Identity Load (0.6ms) SELECT `identities`.* FROM `identities` WHERE `identities`.`id` = 4 LIMIT 1
Box Load (0.7ms) SELECT `boxes`.* FROM `boxes` ...
Posting Load (0.7ms) SELECT `postings`.* FROM `postings` ...
Completed 304 Not Modified in 13ms
Started GET "/boxes/210.json" for ...
Processing by BoxesController#show as JSON
Parameters: {"id"=>"210"}
CACHE Box Load (0.0ms) SELECT `boxes`.* FROM `boxes`
WHERE `boxes`.`identity_id` = 82
AND `boxes`.`id` = 210 ORDER BY `boxes`.`id` ASC LIMIT 1
CACHE Contact Load (0.0ms) SELECT `contacts`.* FROM `contacts` INNER JOIN `users` ON `contacts`.`contactable_id` = `users`.`id`
WHERE `users`.`identity_id` = 82
AND `contacts`.`contactable_type` = 'User' ORDER BY `contacts`.`id` ASC LIMIT 1
Completed 404 Not Found in 3ms
And some other strange features
class Current < ActiveSupport::CurrentAttributes
attribute :session
delegate :identity, to: :session, allow_nil: true
end
Abstract super class that provides a thread-isolated attributes singleton, which resets automatically before and after each request. Ruby on Rails API docs
Abstract super class that provides a thread-isolated attributes singleton, which resets automatically before and after each request. Ruby on Rails API docs
module Authenticate
included do
before_action :require_session
end
def require_session
resume_session || request_session
end
def resume_session
Current.session ||= resume_session_by_cookie || resume_session_by_token
end
def resume_session
Current.session ||= resume_session_by_cookie || resume_session_by_token
end
def resume_session
Current.session = resume_session_by_cookie || resume_session_by_token
end
initializer "active_support.reset_execution_context" do |app|
app.executor.to_run { ActiveSupport::ExecutionContext.clear }
app.executor.to_complete { ActiveSupport::ExecutionContext.clear }
end
initializer "active_support.reset_all_current_attributes_instances" do |app|
app.executor.to_run { ActiveSupport::CurrentAttributes.reset_all }
app.executor.to_complete { ActiveSupport::CurrentAttributes.reset_all }
# ...
to_run
and to_complete
callbacksrun!
/complete!
or #wrap(&block)
to do the actual wrapping
$ bin/rails middleware
use ActionDispatch::HostAuthorization
use Rack::Sendfile
use ActionDispatch::Static
use ActionDispatch::Executor
use ActionDispatch::ServerTiming
use Rack::Runtime
use Rack::MethodOverride
use ActionDispatch::RequestId
use ActionDispatch::RemoteIp
...
module ActionDispatch
class Executor
def initialize(app, executor)
@app, @executor = app, executor
end
def call(env)
state = @executor.run!
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
end
end
@app, @executor = app, executor
state = @executor.run!
state.complete!
@app, @executor = app, executor
middleware.use ::ActionDispatch::Executor, app.executor # ::ActionDispatch::Executor.new(_app, app.executor)
attr_reader :reloaders, :reloader, :executor, :autoloaders
# ...
def initialize(initial_variable_values = {}, &block)
# ...
@executor = Class.new(ActiveSupport::Executor)
@reloader = Class.new(ActiveSupport::Reloader)
@reloader.executor = @executor
# ...
end
module ActiveSupport
class Executor < ExecutionWrapper
end
end
module ActiveSupport
class ExecutionWrapper
include ActiveSupport::Callbacks
Null = Object.new
def Null.complete!
end
define_callbacks :run
define_callbacks :complete
def self.to_run(*args, &block) # app.executor.to_run { ActiveSupport::CurrentAttributes.reset_all }
set_callback(:run, *args, &block)
end
def self.to_complete(*args, &block) # app.executor.to_complete { ActiveSupport::CurrentAttributes.reset_all }
set_callback(:complete, *args, &block)
end
# Run this execution.
#
# Returns an instance, whose +complete!+ method *must* be invoked
# after the work has been performed.
def self.run!
if active?
Null
else
new.tap do |instance|
success = nil
begin
instance.run!
success = true
ensure
instance.complete! unless success
end
end
end
end
def self.active_key # app.executor.object_id, app.executor is Class.new(ActiveSupport::Executor)
@active_key ||= :"active_execution_wrapper_#{object_id}"
end
def self.active?
IsolatedExecutionState.key?(active_key)
end
def run!
IsolatedExecutionState[self.class.active_key] = self
run
end
def run # :nodoc:
run_callbacks(:run)
end
# Complete this in-flight execution. This method *must* be called
# exactly once on the result of any call to +run!+.
def complete!
complete
ensure
IsolatedExecutionState.delete(self.class.active_key)
end
def complete # :nodoc:
run_callbacks(:complete)
end
state = @executor.run!
complete!
to_run
callbacksrun
, it calls complete!
in the instancestate.complete!
to_complete
callbacks
module ActionDispatch
class Executor
def initialize(app, executor)
@app, @executor = app, executor
end
def call(env)
state = @executor.run!
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
end
end
module ActionDispatch
class Executor
def initialize(app, executor)
@app, @executor = app, executor
end
def call(env)
state = @executor.run!
begin
response = @app.call(env) # response = [ status, header, body ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
end
end
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
proxied_body = ::Rack::BodyProxy.new(body, &block)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
proxied_body = ::Rack::BodyProxy.new(body, &block)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
proxied_body = ::Rack::BodyProxy.new(body, &block)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
proxied_body = ::Rack::BodyProxy.new(body, &block)
response << proxied_body
response = [ status, header, proxied_body
]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
response = [ status, header, body ]
body = response.pop
response = [ status, header ]
proxied_body = ::Rack::BodyProxy.new(body, &block)
response << proxied_body
response = [ status, header, proxied_body ]
returned = response
::Rack::BodyProxy.new(body) { state.complete! }
# Proxy for response bodies allowing calling a block when
# the response body is closed (after the response has been fully
# sent to the client).
class Rack::BodyProxy
def initialize(body, &block)
@body = body
@block = block
@closed = false
end
def close
return if @closed
@closed = true
begin
@body.close if @body.respond_to?(:close)
ensure
@block.call
end
end
end
def call(env)
state = @executor.run!
begin
response = @app.call(env) # response = [ status, header, body ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
@executor
)state = @executor.run!
before invoking the next middleware::Rack::BodyProxy
state.complete!
as a block that will run when the server calls #close
in the response bodystate.complete!
if something went wrong
initializer "active_support.reset_execution_context" do |app|
app.executor.to_run { ActiveSupport::ExecutionContext.clear }
app.executor.to_complete { ActiveSupport::ExecutionContext.clear }
end
initializer "active_support.reset_all_current_attributes_instances" do |app|
app.executor.to_run { ActiveSupport::CurrentAttributes.reset_all }
app.executor.to_complete { ActiveSupport::CurrentAttributes.reset_all }
# ...
run!
returns an active (not complete) executor, to_run
callback wouldn't runcomplete!
over the return value would do nothingcomplete!
is not called over the current executor, the to_complete
callback wouldn't run
def call(env)
state = @executor.run!
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
def call(env)
→ state = @executor.run! # Class object_id: 16536, current attributes reset
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
def call(env)
state = @executor.run!
begin
response = @app.call(env)
→ returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
def call(env)
state = @executor.run!
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
→ state.complete! unless returned
end
end
def call(env)
state = @executor.run!
begin
response = @app.call(env) # response = [ status, header, body ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
→ state.complete! unless returned
# returned = [ status, headers, proxied_body ]
end
end
def call(env)
→ state = @executor.run! # ActiveSupport::ExecutionWrapper::Null, no reset
begin
response = @app.call(env) # response = [ status, header, body ]
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
def call(env)
state = @executor.run! # ActiveSupport::ExecutionWrapper::Null, no reset
begin
response = @app.call(env) # response = [ status, header, body ]
→ returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
module CurrentRequestPinning
extend ActiveSupport::Concern
prepended do
attribute :request_id
end
def request_id=(current_request_id)
if request_id
raise "Current.request_id is already pinned. Pinned: #{request_id} New: #{current_request_id}"
else
super
end
end
end
class Current < ActiveSupport::CurrentAttributes
prepend CurrentRequestPinning
attribute :session
ActiveSupport.on_load :action_controller do
before_action do
Current.request_id = request.request_id # ActionDispatch::RequestId, X-Request-Id header
end
end
RuntimeError (Current.request_id is already pinned.
Pinned: 6be33aff-d56d-429c-a934-2cb076c29979 New: f558cd87-9d8b-4d91-9313-892aef106ce7)
app/models/concerns/current_request_pinning.rb:22:in 'CurrentRequestPinning#guard_request_id_reassignment'
app/models/concerns/current_request_pinning.rb:10:in 'CurrentRequestPinning#request_id='
config/initializers/set_current_request_details.rb:5:in 'block (2 levels)'
A 200 response from the Rails app
Started GET "/topics/1020541119"
Processing by TopicsController#show as HTML
Parameters: {"id" => "1020541119"}
...
Completed 200 OK in 197ms
But a 500 error logged by Nginx!
"GET /topics/1020541119 HTTP/1.1" 500
lowlevel_error_handler do |exception, env|
Sentry.with_scope do |scope|
scope.set_transaction_name "puma"
scope.set_contexts puma: { env: env }
Sentry.capture_exception exception, level: :warning
end
[ 500, { "Content-Type" => "text/html" }, Rails.root.join("public/500.html").open ]
end
The error pointed to a mysterious Critter::UnicornQueueTiming
middleware that wasn't being listed in our middleware stack 😕
# This file is used by Rack-based servers to start the application.
require_relative 'config/environment'
# Not Unicorn-specific. Tracks delay since the X-Queue-Start header set by
# the earliest load balancer that accepts HTTP requests.
use Critter::UnicornQueueTiming if defined? Critter
run Rails.application
--- Puma ---
use Critter::UnicornQueueTiming 💥
use Rack::Sendfile
use ActionDispatch::Static
use ActionDispatch::Executor ⬅️
...
The mysterious Critter::UnicornQueueTiming
middleware wasn't thread-safe and was crashing from time to time 🤦🏻♀️
The immediate fix was easy
config.ru
# Not Unicorn-specific. Tracks delay since the X-Queue-Start header set by
# the earliest load balancer that accepts HTTP requests.
use Critter::UnicornQueueTiming if defined? Critter
run Rails.application
The immediate fix was easy
config.ru
# Not Unicorn-specific. Tracks delay since the X-Queue-Start header set by
# the earliest load balancer that accepts HTTP requests.
#
# *cough* Well, it does assume a single-threaded Rack adapter. Disabled
# pending thread-safe implementation.
# use Critter::UnicornQueueTiming if defined? Critter
run Rails.application
Narrator: they never did the thread-safe implementation 😆
class GuardedExecutor < ActionDispatch::Executor
class ExecutorStateViolation < RuntimeError; end
def call(env)
guard(env) { super }
end
private
def guard(env)
check! :before
response = yield
# Knotty syntax. Match superclass implementation.
returned = response << ::Rack::BodyProxy.new(response.pop) { check! :body_close }
rescue Exception
check! :rescued
raise
end
def check!(whence)
if @executor.active?
raise ExecutorStateViolation, whence
end
end
end
Rails.application.configure do
config.middleware.swap ActionDispatch::Executor, GuardedExecutor, executor
end
We still had to upstream all this to Rails
... but life had other plans
A change in Puma::Request#handle_request
inadvertently made a call to res_body.close
to not always happen.
Yes, that res_body
:
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
def self.run!(reset: false)
if reset
lost_instance = IsolatedExecutionState.delete(active_key)
lost_instance&.complete!
else
return Null if active?
end
new.tap do |instance|
success = nil
begin
instance.run!
success = true
ensure
instance.complete! unless success
end
end
end
module ActionDispatch
class Executor
def initialize(app, executor)
@app, @executor = app, executor
end
def call(env)
state = @executor.run!(reset: true)
begin
response = @app.call(env)
returned = response << ::Rack::BodyProxy.new(response.pop) { state.complete! }
rescue => error
@executor.error_reporter.report(error, handled: false)
raise
ensure
state.complete! unless returned
end
end
end
end