From 0967593010d995ce5ecf55029e5ad7ee29d5bb78 Mon Sep 17 00:00:00 2001 From: Holger Just Date: Wed, 5 Dec 2018 18:44:43 +0100 Subject: [PATCH] Perform actions on flows asynchronously by default Each flow now has an associated executor which performs all actions (writing events, closing, reopening) asynchronously by default using a Concurrent::SingleThreadExecutor. This improves the responsiveness of the application by performing the (usually) IO-bound task of writing the logs to a background thread. By creating a flow with `synchronous: true`, all actions are run in the calling thread as before, making the flow blocking. --- lib/rackstash.rb | 10 +- lib/rackstash/flow.rb | 180 ++++++++--------- spec/rackstash/flow_spec.rb | 256 +++++++++++-------------- spec/rackstash/rack/middleware_spec.rb | 9 +- spec/rackstash_spec.rb | 4 +- 5 files changed, 212 insertions(+), 247 deletions(-) diff --git a/lib/rackstash.rb b/lib/rackstash.rb index 49df40b..9f5aab4 100644 --- a/lib/rackstash.rb +++ b/lib/rackstash.rb @@ -140,10 +140,10 @@ module Rackstash # # @return [Rackstash::Flow] the default error flow def self.error_flow - @error_flow ||= Rackstash::Flow.new(STDERR) + @error_flow ||= Rackstash::Flow.new(STDERR, synchronous: true) end - # Set a {Flow} which is used bythe normal logger {Flow}s to write details + # Set a {Flow} which is used by the normal logger {Flow}s to write details # of any unexpected errors during interaction with their {Adapter}s. # # You can set a different `error_flow` for each {Flow} if required. You can @@ -154,11 +154,15 @@ module Rackstash # external issues, it is usually desireable to chose a local and mostly # relibable log target. # + # This flow should always be synchronous. + # # @param flow [Flow, Adapter::Adapter, Object] a single {Flow} or an object # which can be used as a {Flow}'s adapter. See {Flow#initialize}. # @return [Rackstash::Flow] the given `flow` def self.error_flow=(flow) - flow = Flow.new(flow) unless flow.is_a?(Rackstash::Flow) + unless flow.is_a?(Rackstash::Flow) + flow = Flow.new(flow, synchronous: true) + end @error_flow = flow end end diff --git a/lib/rackstash/flow.rb b/lib/rackstash/flow.rb index 7190df8..d8dccfa 100644 --- a/lib/rackstash/flow.rb +++ b/lib/rackstash/flow.rb @@ -78,11 +78,6 @@ module Rackstash # @param error_flow [Flow] a special flow which is used to write details # about any occured errors during writing. By default, we use the global # {Rackstash.error_flow} which logs JSON-formatted messages to `STDERR`. - # @param raise_on_error [Bool] set to `true` to re-raise any occured errors - # after logging them to the {#error_flow}. This can aid in debugging. By - # default, we swallow errors after having logging them to not cause - # additional issues to the production application just because the logger - # doesn't work. # @param auto_flush [Bool] set to `true` to write added fields or messages # added to a {Buffer} to this flow immediately. With each write, this flow # will then receive all current fields of the {Buffer} but only the @@ -90,20 +85,35 @@ module Rackstash # receive the full event with all fields and messages of the Buffer after # an explicit call to {Buffer#flush} for a buffering Buffer or after each # added message or fields for a non-bufering Buffer. + # @param synchronous [Bool] set to `true` to write events synchronously. + # When writing events, the caller will thus block until the event was + # written or an error occured (which will be raised to the caller after + # being logged to the {#error_flow}). By default (or when explicitly + # setting the `synchronous` attribute to `false`), we write events + # asynchronously. Here, we return to the caller immediately on {#write}. + # Any errors occuring during logging will be logged to the {#error_flow} + # but will not be re-raised to the caller. # @yieldparam flow [self] if the given block accepts an argument, we yield # `self` as a parameter, else, the block is directly executed in the # context of `self`. def initialize(adapter, encoder: nil, filters: [], - error_flow: nil, raise_on_error: false, auto_flush: false, + error_flow: nil, auto_flush: false, synchronous: false, &block ) @adapter = Rackstash::Adapter[adapter] self.encoder = encoder || @adapter.default_encoder @filter_chain = Rackstash::FilterChain.new(filters) self.error_flow = error_flow - self.raise_on_error = raise_on_error self.auto_flush = auto_flush + @synchronous = !!synchronous + @executor = if synchronous? + ::Concurrent::ImmediateExecutor.new + else + ::Concurrent::SingleThreadExecutor.new(fallback_policy: :abort) + end + + if block_given? if block.arity == 0 instance_eval(&block) @@ -153,26 +163,24 @@ module Rackstash end # Close the log adapter if supported. This might be a no-op if the adapter - # does not support closing. - # - # @return [nil] - def close! - @adapter.close - nil - end - - # (see #close!) + # does not support closing. This method blocks if the flow is + # {#synchronous?}. # # Any error raised by the adapter when closing it is logged to the - # {#error_flow} and then swallowed. Grave exceptions (i.e. all those which - # do not derive from `StandardError`) are logged and then re-raised. + # {#error_flow}. If the current flow is {#synchronous?}, the error is + # re-raised. + # + # @return [true] def close - close! - rescue Exception => exception - log_error("close failed for adapter #{adapter.inspect}", exception) - raise unless exception.is_a?(StandardError) - raise if raise_on_error? - nil + @executor.post do + begin + @adapter.close + rescue Exception => exception + log_error("close failed for adapter #{adapter.inspect}", exception) + raise unless exception.is_a?(StandardError) + raise if synchronous? + end + end end # Get or set the encoder for the log {#adapter}. If this value is not @@ -224,8 +232,12 @@ module Rackstash # This allows you to also give an adapter or just a plain log target which # can be wrapped in an adapter. # + # When setting the `error_flow` to nil, we reset any custom `error_flow` on + # this current Flow and will use the global {Rackstash.error_flow} to log + # any errors. + # # @param error_flow [Flow, Adapter, Object, nil] the separate error flow or - # `nil` to unset the custom error_flow ant to use the global + # `nil` to unset the custom error_flow and to use the global # {Rackstash.error_flow} again # @return [Rackstash::Flow] the newly set error_flow def error_flow=(error_flow) @@ -267,63 +279,41 @@ module Rackstash end alias filter_prepend filter_unshift - # Get or set the raise_on_error setting. Only if set to `true`, we will - # re-raise errors occuring during logging. If set to `false` (the default), - # we will only log the exception to the {#error_flow} and swallow it. + # Return the current value of the {#synchronous} flag. # - # @param bool [Bool] the value to set. If omitted, we just return the - # current setting - # @return [Bool] the updated or current `raise_on_error` setting - def raise_on_error(bool = UNDEFINED) - self.raise_on_error = bool unless UNDEFINED.equal? bool - raise_on_error? - end - - # Set the {#raise_on_error} flag to true in order to re-raise any exceptions - # which occured during logging. + # When set to `true`, we will block on writing events until it was either + # written to the adapter or an error occured (which will be raised to the + # caller after being logged to the {#error_flow}). # - # @see raise_on_error - def raise_on_error! - self.raise_on_error = true - end - - # @return [Bool] `true` if we re-raise any occured errors after logging them - # to the {#error_flow}. This can aid in debugging. By default, i.e., with - # {#raise_on_error?} being `false`, we swallow errors after logging them - # to not cause additional issues to the production application just - # because Rackstash doesn't work. - def raise_on_error? - @raise_on_error - end - - # @param bool [Bool] `true` to cause occured errors to be re-raised after - # logging them to the {#error_flow}, `false` to swallow them after - # logging. - def raise_on_error=(bool) - @raise_on_error = !!bool + # By default (or when explicitly setting the `synchronous` attribute to + # `false`), we write events asynchronously. Here, we return to the caller + # immediately on {#write}. Any errors occuring during logging will be logged + # to the {#error_flow} but will not be re-raised to the caller. + # + # @return [Bool] return the current value of the {#synchronous} flag + def synchronous? + @synchronous end # Re-open the log adapter if supported. This might be a no-op if the adapter - # does not support reopening. - # - # @return [nil] - def reopen! - @adapter.reopen - nil - end - - # (see #reopen!) + # does not support reopening. This method blocks if the flow is + # {#synchronous?}. # # Any error raised by the adapter when reopening it is logged to the - # {#error_flow} and then swallowed. Grave exceptions (i.e. all those which - # do not derive from `StandardError`) are logged and then re-raised. + # {#error_flow}. If the current flow is {#synchronous?}, the error is + # re-raised. + # + # @return [true] def reopen - reopen! - rescue Exception => exception - log_error("reopen failed for adapter #{adapter.inspect}", exception) - raise unless exception.is_a?(StandardError) - raise if raise_on_error? - nil + @executor.post do + begin + @adapter.reopen + rescue Exception => exception + log_error("reopen failed for adapter #{adapter.inspect}", exception) + raise unless exception.is_a?(StandardError) + raise if synchronous? + end + end end # Filter, encode and write the given `event` to the configured {#adapter}. @@ -339,29 +329,27 @@ module Rackstash # 3. Finally, the encoded event will be passed to the {#adapter} to be sent # to the actual log target, e.g. a file or an external log receiver. # + # Any error raised by a filter, the encoder, or the adapter when writing is + # logged to the {#error_flow}. If the current flow is {#synchronous?}, the + # error is re-raised. + # # @param event [Hash] an event hash # @return [Boolean] `true` if the event was written to the adapter, `false` # otherwise - def write!(event) - # Silently abort writing if any filter (and thus the while filter chain) - # returns `false`. - return false unless @filter_chain.call(event) - @adapter.write @encoder.encode(event) - true - end - - # (see #write!) - # - # Any error raised by the adapter when writing to it is logged to the - # {#error_flow} and then swallowed. Grave exceptions (i.e. all those which - # do not derive from `StandardError`) are logged and then re-raised. def write(event) - write!(event) - rescue Exception => exception - log_error("write failed for adapter #{adapter.inspect}", exception) - raise unless exception.is_a?(StandardError) - raise if raise_on_error? - false + @executor.post do + begin + # Silently abort writing if any filter (and thus the whole filter chain) + # returned `false`. + next false unless @filter_chain.call(event) + @adapter.write @encoder.encode(event) + true + rescue Exception => exception + log_error("write failed for adapter #{adapter.inspect}", exception) + raise unless exception.is_a?(StandardError) + raise if synchronous? + end + end end private @@ -378,7 +366,7 @@ module Rackstash FIELD_MESSAGE => [message], FIELD_TIMESTAMP => message.time } - error_flow.write!(error_event) + error_flow.write(error_event) rescue # At this place, writing to the error log has also failed. This is a bad # place to be in and there is very little we can sensibly do now. @@ -386,8 +374,8 @@ module Rackstash # To aid in availability of the app using Rackstash, we swallow any # StandardErrors by default and just continue, hoping that things will # turn out to be okay in the end. - - raise if raise_on_error? + raise unless exception.is_a?(StandardError) + raise if synchronous? end end end diff --git a/spec/rackstash/flow_spec.rb b/spec/rackstash/flow_spec.rb index 8944f64..d9896d0 100644 --- a/spec/rackstash/flow_spec.rb +++ b/spec/rackstash/flow_spec.rb @@ -15,9 +15,14 @@ RSpec.describe Rackstash::Flow do let(:flow) { described_class.new(adapter, **flow_args) } let(:event) { {} } + after(:each) do + # ensure that the asynchonous call was actually performed + flow.instance_variable_get('@executor').shutdown + flow.instance_variable_get('@executor').wait_for_termination(5) + end + describe '#initialize' do it 'creates an adapter' do - expect(Rackstash::Adapter).to receive(:[]).with(nil).and_call_original expect(described_class.new(nil).adapter).to be_a Rackstash::Adapter::Null end @@ -120,27 +125,21 @@ RSpec.describe Rackstash::Flow do end end - describe '#close!' do - it 'calls adapter#close' do - expect(adapter).to receive(:close).and_return(true) - expect(flow.close).to be nil - end - end - describe '#close' do - it 'calls #close!' do - expect(flow).to receive(:close!) + it 'calls adapter#close' do + expect(adapter).to receive(:close) flow.close end - context 'with raise_on_error: false' do + context 'when asynchronous' do before do - flow_args[:raise_on_error] = false + flow_args[:synchronous] = false end - it 'rescues any exception thrown by the adapter' do + it 'logs errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -149,30 +148,31 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) - - expect(flow).to receive(:close!).and_raise('ERROR') - expect(flow.close).to be nil + expect(adapter).to receive(:close).and_raise('ERROR') + expect { flow.close }.not_to raise_error end - it 'rescues errors thrown by the error_flow' do + it 'ignores errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) - expect(flow).to receive(:close!).and_raise('ERROR') - expect(flow.close).to be nil + expect(adapter).to receive(:close).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') + + expect { flow.close }.not_to raise_error end end - context 'with raise_on_error: true' do + context 'when synchronous' do before do - flow_args[:raise_on_error] = true + flow_args[:synchronous] = true end - it 'rescues any exception thrown by the adapter' do + it 'logs and re-raises errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -181,18 +181,17 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) - - expect(flow).to receive(:close!).and_raise('ERROR') + expect(adapter).to receive(:close).and_raise('ERROR') expect { flow.close }.to raise_error RuntimeError, 'ERROR' end - it 'rescues errors thrown by the error_flow' do + it 're-raises errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(adapter).to receive(:close).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') - expect(flow).to receive(:close!).and_raise('ERROR') expect { flow.close }.to raise_error RuntimeError, 'DOUBLE ERROR' end end @@ -361,60 +360,37 @@ RSpec.describe Rackstash::Flow do end end - describe '#raise_on_error?' do + describe '#synchronous?' do it 'defaults to false' do - expect(flow.raise_on_error?).to eql false - expect(flow.raise_on_error).to eql false + expect(flow.synchronous?).to eql false end it 'can set to true or false' do - expect(flow.raise_on_error('something')).to eql true - expect(flow.raise_on_error).to eql true - expect(flow.raise_on_error?).to eql true + expect(described_class.new(adapter, synchronous: true).synchronous?).to eql true + expect(described_class.new(adapter, synchronous: false).synchronous?).to eql false - expect(flow.raise_on_error(nil)).to eql false - expect(flow.raise_on_error).to eql false - expect(flow.raise_on_error?).to eql false - expect(flow.raise_on_error(true)).to eql true - expect(flow.raise_on_error).to eql true - expect(flow.raise_on_error?).to eql true - - expect(flow.raise_on_error(false)).to eql false - expect(flow.raise_on_error).to eql false - expect(flow.raise_on_error?).to eql false - end - end - - describe '#raise_on_error!' do - it 'sets the flag to true' do - expect { flow.raise_on_error! }.to change { flow.raise_on_error? } - .from(false) - .to(true) + expect(described_class.new(adapter, synchronous: 'true').synchronous?).to eql true + expect(described_class.new(adapter, synchronous: 42).synchronous?).to eql true + expect(described_class.new(adapter, synchronous: nil).synchronous?).to eql false end end describe '#reopen' do it 'calls adapter#reopen' do - expect(adapter).to receive(:reopen).and_return(true) - expect(flow.reopen).to be nil - end - end - - describe '#reopen' do - it 'calls #reopen!' do - expect(flow).to receive(:reopen!) + expect(adapter).to receive(:reopen) flow.reopen end - context 'with raise_on_error: false' do + context 'when asynchronous' do before do - flow_args[:raise_on_error] = false + flow_args[:synchronous] = false end - it 'rescues any exception thrown by the adapter' do + it 'logs errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -423,30 +399,31 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) - - expect(flow).to receive(:reopen!).and_raise('ERROR') - expect(flow.reopen).to be nil + expect(adapter).to receive(:reopen).and_raise('ERROR') + expect { flow.reopen }.not_to raise_error end - it 'rescues errors thrown by the error_flow' do + it 'ignores errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) - expect(flow).to receive(:reopen!).and_raise('ERROR') - expect(flow.reopen).to be nil + expect(adapter).to receive(:reopen).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') + + expect { flow.reopen }.not_to raise_error end end - context 'with raise_on_error: true' do + context 'when synchronous' do before do - flow_args[:raise_on_error] = true + flow_args[:synchronous] = true end - it 'rescues any exception thrown by the adapter' do + it 'logs and re-raises errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -455,71 +432,56 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) - - expect(flow).to receive(:reopen!).and_raise('ERROR') + expect(adapter).to receive(:reopen).and_raise('ERROR') expect { flow.reopen }.to raise_error RuntimeError, 'ERROR' end - it 'rescues errors thrown by the error_flow' do + it 're-raises errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(adapter).to receive(:reopen).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') - expect(flow).to receive(:reopen!).and_raise('ERROR') expect { flow.reopen }.to raise_error RuntimeError, 'DOUBLE ERROR' end end end describe '#write!' do - it 'calls the filter_chain' do - expect(flow.filter_chain).to receive(:call) - flow.write!(event) - end - - it 'aborts if the filter_chain returns false' do - expect(flow.filter_chain).to receive(:call).and_return(false) - - expect(flow.encoder).not_to receive(:encode) - expect(flow.adapter).not_to receive(:write) - flow.write!(event) - end - - it 'encodes the event' do - expect(flow.encoder).to receive(:encode).with(event) - flow.write!(event) - end - - it 'writes the encoded event to the adapter' do - expect(flow.encoder).to receive(:encode).and_return 'encoded' - expect(flow.adapter).to receive(:write).with('encoded').and_call_original - - expect(flow.write!(event)).to be true - end - - it 'writes the encoded event to the adapter' do - expect(flow.encoder).to receive(:encode).and_return 'encoded' - expect(flow.adapter).to receive(:write).with('encoded').and_call_original - - expect(flow.write!(event)).to be true - end end describe '#write' do - it 'calls #write!' do - expect(flow).to receive(:write!).with(event) + it 'calls write on the filter_chain' do + expect(flow.filter_chain).to receive(:call) flow.write(event) end - context 'with raise_on_error: false' do + it 'aborts if the filter_chain returns false' do + allow(flow.filter_chain).to receive(:call).and_return(false) + + expect(flow.encoder).not_to receive(:encode) + expect(flow.adapter).not_to receive(:write) + + flow.write(event) + end + + it 'writes the encoded event to the adapter' do + expect(flow.encoder).to receive(:encode).and_return 'encoded' + expect(flow.adapter).to receive(:write).with('encoded').and_call_original + + flow.write(event) + end + + context 'when asynchronous' do before do - flow_args[:raise_on_error] = false + flow_args[:synchronous] = false end - it 'rescues any exception thrown by the adapter' do + it 'logs errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -528,30 +490,31 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) - - expect(flow).to receive(:write!).and_raise('ERROR') - expect(flow.write(event)).to be false + expect(adapter).to receive(:write).and_raise('ERROR') + expect { flow.write(event) }.not_to raise_error end - it 'rescues errors thrown by the error_flow' do + it 'ignores errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) - expect(flow).to receive(:write!).and_raise('ERROR') - expect(flow.write(event)).to be false + expect(adapter).to receive(:write).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') + + expect { flow.write(event) }.not_to raise_error end end - context 'with raise_on_error: true' do + context 'when synchronous' do before do - flow_args[:raise_on_error] = true + flow_args[:synchronous] = true end - it 'rescues any exception thrown by the adapter' do + it 'logs and re-raises errors thrown by the adapter' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(error_flow).to receive(:write) .with( 'error' => 'RuntimeError', 'error_message' => 'ERROR', @@ -560,18 +523,19 @@ RSpec.describe Rackstash::Flow do 'message' => [instance_of(Rackstash::Message)], '@timestamp' => instance_of(Time) ) - expect(flow).to receive(:error_flow).and_return(error_flow) + expect(adapter).to receive(:write).and_raise('ERROR') - expect(flow).to receive(:write!).and_raise('ERROR') + # flow.write(event) expect { flow.write(event) }.to raise_error RuntimeError, 'ERROR' end - it 'rescues errors thrown by the error_flow' do + it 're-raises errors thrown by the error_flow' do error_flow = instance_double(described_class) - expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR') - expect(flow).to receive(:error_flow).and_return(error_flow) + allow(flow).to receive(:error_flow).and_return(error_flow) + + expect(adapter).to receive(:write).and_raise('ERROR') + expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR') - expect(flow).to receive(:write!).and_raise('ERROR') expect { flow.write(event) }.to raise_error RuntimeError, 'DOUBLE ERROR' end end diff --git a/spec/rackstash/rack/middleware_spec.rb b/spec/rackstash/rack/middleware_spec.rb index b39e914..1862346 100644 --- a/spec/rackstash/rack/middleware_spec.rb +++ b/spec/rackstash/rack/middleware_spec.rb @@ -28,7 +28,14 @@ RSpec.describe Rackstash::Rack::Middleware do } let(:log) { [] } - let(:logger) { Rackstash::Logger.new ->(event) { log << event } } + + let(:flow) { + Rackstash::Flow.new( + ->(event) { log << event }, + synchronous: true + ) + } + let(:logger) { Rackstash::Logger.new flow } let(:args) { {} } let(:middleware) { described_class.new(app, logger, **args) } diff --git a/spec/rackstash_spec.rb b/spec/rackstash_spec.rb index fc42734..aebdcdd 100644 --- a/spec/rackstash_spec.rb +++ b/spec/rackstash_spec.rb @@ -163,7 +163,9 @@ RSpec.describe Rackstash do it 'wraps a non-flow' do adapter = 'spec.log' - expect(Rackstash::Flow).to receive(:new).with(adapter).and_return(flow) + expect(Rackstash::Flow).to receive(:new) + .with(adapter, synchronous: true) + .and_return(flow) described_class.error_flow = adapter expect(described_class.error_flow).to equal flow