mirror of
https://github.com/meineerde/rackstash.git
synced 2026-02-01 01:37:12 +00:00
Perform actions on flows asynchronously by default
Each flow now has an associated executor which performs all actions (writing events, closing, reopening) asynchronously by default using a Concurrent::SingleThreadExecutor. This improves the responsiveness of the application by performing the (usually) IO-bound task of writing the logs to a background thread. By creating a flow with `synchronous: true`, all actions are run in the calling thread as before, making the flow blocking.
This commit is contained in:
parent
009d0c3cdd
commit
0967593010
@ -140,10 +140,10 @@ module Rackstash
|
||||
#
|
||||
# @return [Rackstash::Flow] the default error flow
|
||||
def self.error_flow
|
||||
@error_flow ||= Rackstash::Flow.new(STDERR)
|
||||
@error_flow ||= Rackstash::Flow.new(STDERR, synchronous: true)
|
||||
end
|
||||
|
||||
# Set a {Flow} which is used bythe normal logger {Flow}s to write details
|
||||
# Set a {Flow} which is used by the normal logger {Flow}s to write details
|
||||
# of any unexpected errors during interaction with their {Adapter}s.
|
||||
#
|
||||
# You can set a different `error_flow` for each {Flow} if required. You can
|
||||
@ -154,11 +154,15 @@ module Rackstash
|
||||
# external issues, it is usually desireable to chose a local and mostly
|
||||
# relibable log target.
|
||||
#
|
||||
# This flow should always be synchronous.
|
||||
#
|
||||
# @param flow [Flow, Adapter::Adapter, Object] a single {Flow} or an object
|
||||
# which can be used as a {Flow}'s adapter. See {Flow#initialize}.
|
||||
# @return [Rackstash::Flow] the given `flow`
|
||||
def self.error_flow=(flow)
|
||||
flow = Flow.new(flow) unless flow.is_a?(Rackstash::Flow)
|
||||
unless flow.is_a?(Rackstash::Flow)
|
||||
flow = Flow.new(flow, synchronous: true)
|
||||
end
|
||||
@error_flow = flow
|
||||
end
|
||||
end
|
||||
|
||||
@ -78,11 +78,6 @@ module Rackstash
|
||||
# @param error_flow [Flow] a special flow which is used to write details
|
||||
# about any occured errors during writing. By default, we use the global
|
||||
# {Rackstash.error_flow} which logs JSON-formatted messages to `STDERR`.
|
||||
# @param raise_on_error [Bool] set to `true` to re-raise any occured errors
|
||||
# after logging them to the {#error_flow}. This can aid in debugging. By
|
||||
# default, we swallow errors after having logging them to not cause
|
||||
# additional issues to the production application just because the logger
|
||||
# doesn't work.
|
||||
# @param auto_flush [Bool] set to `true` to write added fields or messages
|
||||
# added to a {Buffer} to this flow immediately. With each write, this flow
|
||||
# will then receive all current fields of the {Buffer} but only the
|
||||
@ -90,20 +85,35 @@ module Rackstash
|
||||
# receive the full event with all fields and messages of the Buffer after
|
||||
# an explicit call to {Buffer#flush} for a buffering Buffer or after each
|
||||
# added message or fields for a non-bufering Buffer.
|
||||
# @param synchronous [Bool] set to `true` to write events synchronously.
|
||||
# When writing events, the caller will thus block until the event was
|
||||
# written or an error occured (which will be raised to the caller after
|
||||
# being logged to the {#error_flow}). By default (or when explicitly
|
||||
# setting the `synchronous` attribute to `false`), we write events
|
||||
# asynchronously. Here, we return to the caller immediately on {#write}.
|
||||
# Any errors occuring during logging will be logged to the {#error_flow}
|
||||
# but will not be re-raised to the caller.
|
||||
# @yieldparam flow [self] if the given block accepts an argument, we yield
|
||||
# `self` as a parameter, else, the block is directly executed in the
|
||||
# context of `self`.
|
||||
def initialize(adapter, encoder: nil, filters: [],
|
||||
error_flow: nil, raise_on_error: false, auto_flush: false,
|
||||
error_flow: nil, auto_flush: false, synchronous: false,
|
||||
&block
|
||||
)
|
||||
@adapter = Rackstash::Adapter[adapter]
|
||||
self.encoder = encoder || @adapter.default_encoder
|
||||
@filter_chain = Rackstash::FilterChain.new(filters)
|
||||
self.error_flow = error_flow
|
||||
self.raise_on_error = raise_on_error
|
||||
self.auto_flush = auto_flush
|
||||
|
||||
@synchronous = !!synchronous
|
||||
@executor = if synchronous?
|
||||
::Concurrent::ImmediateExecutor.new
|
||||
else
|
||||
::Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
|
||||
end
|
||||
|
||||
|
||||
if block_given?
|
||||
if block.arity == 0
|
||||
instance_eval(&block)
|
||||
@ -153,26 +163,24 @@ module Rackstash
|
||||
end
|
||||
|
||||
# Close the log adapter if supported. This might be a no-op if the adapter
|
||||
# does not support closing.
|
||||
#
|
||||
# @return [nil]
|
||||
def close!
|
||||
@adapter.close
|
||||
nil
|
||||
end
|
||||
|
||||
# (see #close!)
|
||||
# does not support closing. This method blocks if the flow is
|
||||
# {#synchronous?}.
|
||||
#
|
||||
# Any error raised by the adapter when closing it is logged to the
|
||||
# {#error_flow} and then swallowed. Grave exceptions (i.e. all those which
|
||||
# do not derive from `StandardError`) are logged and then re-raised.
|
||||
# {#error_flow}. If the current flow is {#synchronous?}, the error is
|
||||
# re-raised.
|
||||
#
|
||||
# @return [true]
|
||||
def close
|
||||
close!
|
||||
rescue Exception => exception
|
||||
log_error("close failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if raise_on_error?
|
||||
nil
|
||||
@executor.post do
|
||||
begin
|
||||
@adapter.close
|
||||
rescue Exception => exception
|
||||
log_error("close failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if synchronous?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Get or set the encoder for the log {#adapter}. If this value is not
|
||||
@ -224,8 +232,12 @@ module Rackstash
|
||||
# This allows you to also give an adapter or just a plain log target which
|
||||
# can be wrapped in an adapter.
|
||||
#
|
||||
# When setting the `error_flow` to nil, we reset any custom `error_flow` on
|
||||
# this current Flow and will use the global {Rackstash.error_flow} to log
|
||||
# any errors.
|
||||
#
|
||||
# @param error_flow [Flow, Adapter, Object, nil] the separate error flow or
|
||||
# `nil` to unset the custom error_flow ant to use the global
|
||||
# `nil` to unset the custom error_flow and to use the global
|
||||
# {Rackstash.error_flow} again
|
||||
# @return [Rackstash::Flow] the newly set error_flow
|
||||
def error_flow=(error_flow)
|
||||
@ -267,63 +279,41 @@ module Rackstash
|
||||
end
|
||||
alias filter_prepend filter_unshift
|
||||
|
||||
# Get or set the raise_on_error setting. Only if set to `true`, we will
|
||||
# re-raise errors occuring during logging. If set to `false` (the default),
|
||||
# we will only log the exception to the {#error_flow} and swallow it.
|
||||
# Return the current value of the {#synchronous} flag.
|
||||
#
|
||||
# @param bool [Bool] the value to set. If omitted, we just return the
|
||||
# current setting
|
||||
# @return [Bool] the updated or current `raise_on_error` setting
|
||||
def raise_on_error(bool = UNDEFINED)
|
||||
self.raise_on_error = bool unless UNDEFINED.equal? bool
|
||||
raise_on_error?
|
||||
end
|
||||
|
||||
# Set the {#raise_on_error} flag to true in order to re-raise any exceptions
|
||||
# which occured during logging.
|
||||
# When set to `true`, we will block on writing events until it was either
|
||||
# written to the adapter or an error occured (which will be raised to the
|
||||
# caller after being logged to the {#error_flow}).
|
||||
#
|
||||
# @see raise_on_error
|
||||
def raise_on_error!
|
||||
self.raise_on_error = true
|
||||
end
|
||||
|
||||
# @return [Bool] `true` if we re-raise any occured errors after logging them
|
||||
# to the {#error_flow}. This can aid in debugging. By default, i.e., with
|
||||
# {#raise_on_error?} being `false`, we swallow errors after logging them
|
||||
# to not cause additional issues to the production application just
|
||||
# because Rackstash doesn't work.
|
||||
def raise_on_error?
|
||||
@raise_on_error
|
||||
end
|
||||
|
||||
# @param bool [Bool] `true` to cause occured errors to be re-raised after
|
||||
# logging them to the {#error_flow}, `false` to swallow them after
|
||||
# logging.
|
||||
def raise_on_error=(bool)
|
||||
@raise_on_error = !!bool
|
||||
# By default (or when explicitly setting the `synchronous` attribute to
|
||||
# `false`), we write events asynchronously. Here, we return to the caller
|
||||
# immediately on {#write}. Any errors occuring during logging will be logged
|
||||
# to the {#error_flow} but will not be re-raised to the caller.
|
||||
#
|
||||
# @return [Bool] return the current value of the {#synchronous} flag
|
||||
def synchronous?
|
||||
@synchronous
|
||||
end
|
||||
|
||||
# Re-open the log adapter if supported. This might be a no-op if the adapter
|
||||
# does not support reopening.
|
||||
#
|
||||
# @return [nil]
|
||||
def reopen!
|
||||
@adapter.reopen
|
||||
nil
|
||||
end
|
||||
|
||||
# (see #reopen!)
|
||||
# does not support reopening. This method blocks if the flow is
|
||||
# {#synchronous?}.
|
||||
#
|
||||
# Any error raised by the adapter when reopening it is logged to the
|
||||
# {#error_flow} and then swallowed. Grave exceptions (i.e. all those which
|
||||
# do not derive from `StandardError`) are logged and then re-raised.
|
||||
# {#error_flow}. If the current flow is {#synchronous?}, the error is
|
||||
# re-raised.
|
||||
#
|
||||
# @return [true]
|
||||
def reopen
|
||||
reopen!
|
||||
rescue Exception => exception
|
||||
log_error("reopen failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if raise_on_error?
|
||||
nil
|
||||
@executor.post do
|
||||
begin
|
||||
@adapter.reopen
|
||||
rescue Exception => exception
|
||||
log_error("reopen failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if synchronous?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Filter, encode and write the given `event` to the configured {#adapter}.
|
||||
@ -339,29 +329,27 @@ module Rackstash
|
||||
# 3. Finally, the encoded event will be passed to the {#adapter} to be sent
|
||||
# to the actual log target, e.g. a file or an external log receiver.
|
||||
#
|
||||
# Any error raised by a filter, the encoder, or the adapter when writing is
|
||||
# logged to the {#error_flow}. If the current flow is {#synchronous?}, the
|
||||
# error is re-raised.
|
||||
#
|
||||
# @param event [Hash] an event hash
|
||||
# @return [Boolean] `true` if the event was written to the adapter, `false`
|
||||
# otherwise
|
||||
def write!(event)
|
||||
# Silently abort writing if any filter (and thus the while filter chain)
|
||||
# returns `false`.
|
||||
return false unless @filter_chain.call(event)
|
||||
@adapter.write @encoder.encode(event)
|
||||
true
|
||||
end
|
||||
|
||||
# (see #write!)
|
||||
#
|
||||
# Any error raised by the adapter when writing to it is logged to the
|
||||
# {#error_flow} and then swallowed. Grave exceptions (i.e. all those which
|
||||
# do not derive from `StandardError`) are logged and then re-raised.
|
||||
def write(event)
|
||||
write!(event)
|
||||
rescue Exception => exception
|
||||
log_error("write failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if raise_on_error?
|
||||
false
|
||||
@executor.post do
|
||||
begin
|
||||
# Silently abort writing if any filter (and thus the whole filter chain)
|
||||
# returned `false`.
|
||||
next false unless @filter_chain.call(event)
|
||||
@adapter.write @encoder.encode(event)
|
||||
true
|
||||
rescue Exception => exception
|
||||
log_error("write failed for adapter #{adapter.inspect}", exception)
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if synchronous?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
@ -378,7 +366,7 @@ module Rackstash
|
||||
FIELD_MESSAGE => [message],
|
||||
FIELD_TIMESTAMP => message.time
|
||||
}
|
||||
error_flow.write!(error_event)
|
||||
error_flow.write(error_event)
|
||||
rescue
|
||||
# At this place, writing to the error log has also failed. This is a bad
|
||||
# place to be in and there is very little we can sensibly do now.
|
||||
@ -386,8 +374,8 @@ module Rackstash
|
||||
# To aid in availability of the app using Rackstash, we swallow any
|
||||
# StandardErrors by default and just continue, hoping that things will
|
||||
# turn out to be okay in the end.
|
||||
|
||||
raise if raise_on_error?
|
||||
raise unless exception.is_a?(StandardError)
|
||||
raise if synchronous?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -15,9 +15,14 @@ RSpec.describe Rackstash::Flow do
|
||||
let(:flow) { described_class.new(adapter, **flow_args) }
|
||||
let(:event) { {} }
|
||||
|
||||
after(:each) do
|
||||
# ensure that the asynchonous call was actually performed
|
||||
flow.instance_variable_get('@executor').shutdown
|
||||
flow.instance_variable_get('@executor').wait_for_termination(5)
|
||||
end
|
||||
|
||||
describe '#initialize' do
|
||||
it 'creates an adapter' do
|
||||
expect(Rackstash::Adapter).to receive(:[]).with(nil).and_call_original
|
||||
expect(described_class.new(nil).adapter).to be_a Rackstash::Adapter::Null
|
||||
end
|
||||
|
||||
@ -120,27 +125,21 @@ RSpec.describe Rackstash::Flow do
|
||||
end
|
||||
end
|
||||
|
||||
describe '#close!' do
|
||||
it 'calls adapter#close' do
|
||||
expect(adapter).to receive(:close).and_return(true)
|
||||
expect(flow.close).to be nil
|
||||
end
|
||||
end
|
||||
|
||||
describe '#close' do
|
||||
it 'calls #close!' do
|
||||
expect(flow).to receive(:close!)
|
||||
it 'calls adapter#close' do
|
||||
expect(adapter).to receive(:close)
|
||||
flow.close
|
||||
end
|
||||
|
||||
context 'with raise_on_error: false' do
|
||||
context 'when asynchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = false
|
||||
flow_args[:synchronous] = false
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -149,30 +148,31 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||
expect(flow.close).to be nil
|
||||
expect(adapter).to receive(:close).and_raise('ERROR')
|
||||
expect { flow.close }.not_to raise_error
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 'ignores errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||
expect(flow.close).to be nil
|
||||
expect(adapter).to receive(:close).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect { flow.close }.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with raise_on_error: true' do
|
||||
context 'when synchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = true
|
||||
flow_args[:synchronous] = true
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs and re-raises errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -181,18 +181,17 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||
expect(adapter).to receive(:close).and_raise('ERROR')
|
||||
expect { flow.close }.to raise_error RuntimeError, 'ERROR'
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 're-raises errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(adapter).to receive(:close).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||
expect { flow.close }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||
end
|
||||
end
|
||||
@ -361,60 +360,37 @@ RSpec.describe Rackstash::Flow do
|
||||
end
|
||||
end
|
||||
|
||||
describe '#raise_on_error?' do
|
||||
describe '#synchronous?' do
|
||||
it 'defaults to false' do
|
||||
expect(flow.raise_on_error?).to eql false
|
||||
expect(flow.raise_on_error).to eql false
|
||||
expect(flow.synchronous?).to eql false
|
||||
end
|
||||
|
||||
it 'can set to true or false' do
|
||||
expect(flow.raise_on_error('something')).to eql true
|
||||
expect(flow.raise_on_error).to eql true
|
||||
expect(flow.raise_on_error?).to eql true
|
||||
expect(described_class.new(adapter, synchronous: true).synchronous?).to eql true
|
||||
expect(described_class.new(adapter, synchronous: false).synchronous?).to eql false
|
||||
|
||||
expect(flow.raise_on_error(nil)).to eql false
|
||||
expect(flow.raise_on_error).to eql false
|
||||
expect(flow.raise_on_error?).to eql false
|
||||
|
||||
expect(flow.raise_on_error(true)).to eql true
|
||||
expect(flow.raise_on_error).to eql true
|
||||
expect(flow.raise_on_error?).to eql true
|
||||
|
||||
expect(flow.raise_on_error(false)).to eql false
|
||||
expect(flow.raise_on_error).to eql false
|
||||
expect(flow.raise_on_error?).to eql false
|
||||
end
|
||||
end
|
||||
|
||||
describe '#raise_on_error!' do
|
||||
it 'sets the flag to true' do
|
||||
expect { flow.raise_on_error! }.to change { flow.raise_on_error? }
|
||||
.from(false)
|
||||
.to(true)
|
||||
expect(described_class.new(adapter, synchronous: 'true').synchronous?).to eql true
|
||||
expect(described_class.new(adapter, synchronous: 42).synchronous?).to eql true
|
||||
expect(described_class.new(adapter, synchronous: nil).synchronous?).to eql false
|
||||
end
|
||||
end
|
||||
|
||||
describe '#reopen' do
|
||||
it 'calls adapter#reopen' do
|
||||
expect(adapter).to receive(:reopen).and_return(true)
|
||||
expect(flow.reopen).to be nil
|
||||
end
|
||||
end
|
||||
|
||||
describe '#reopen' do
|
||||
it 'calls #reopen!' do
|
||||
expect(flow).to receive(:reopen!)
|
||||
expect(adapter).to receive(:reopen)
|
||||
flow.reopen
|
||||
end
|
||||
|
||||
context 'with raise_on_error: false' do
|
||||
context 'when asynchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = false
|
||||
flow_args[:synchronous] = false
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -423,30 +399,31 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||
expect(flow.reopen).to be nil
|
||||
expect(adapter).to receive(:reopen).and_raise('ERROR')
|
||||
expect { flow.reopen }.not_to raise_error
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 'ignores errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||
expect(flow.reopen).to be nil
|
||||
expect(adapter).to receive(:reopen).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect { flow.reopen }.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with raise_on_error: true' do
|
||||
context 'when synchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = true
|
||||
flow_args[:synchronous] = true
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs and re-raises errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -455,71 +432,56 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||
expect(adapter).to receive(:reopen).and_raise('ERROR')
|
||||
expect { flow.reopen }.to raise_error RuntimeError, 'ERROR'
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 're-raises errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(adapter).to receive(:reopen).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||
expect { flow.reopen }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#write!' do
|
||||
it 'calls the filter_chain' do
|
||||
expect(flow.filter_chain).to receive(:call)
|
||||
flow.write!(event)
|
||||
end
|
||||
|
||||
it 'aborts if the filter_chain returns false' do
|
||||
expect(flow.filter_chain).to receive(:call).and_return(false)
|
||||
|
||||
expect(flow.encoder).not_to receive(:encode)
|
||||
expect(flow.adapter).not_to receive(:write)
|
||||
flow.write!(event)
|
||||
end
|
||||
|
||||
it 'encodes the event' do
|
||||
expect(flow.encoder).to receive(:encode).with(event)
|
||||
flow.write!(event)
|
||||
end
|
||||
|
||||
it 'writes the encoded event to the adapter' do
|
||||
expect(flow.encoder).to receive(:encode).and_return 'encoded'
|
||||
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
|
||||
|
||||
expect(flow.write!(event)).to be true
|
||||
end
|
||||
|
||||
it 'writes the encoded event to the adapter' do
|
||||
expect(flow.encoder).to receive(:encode).and_return 'encoded'
|
||||
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
|
||||
|
||||
expect(flow.write!(event)).to be true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#write' do
|
||||
it 'calls #write!' do
|
||||
expect(flow).to receive(:write!).with(event)
|
||||
it 'calls write on the filter_chain' do
|
||||
expect(flow.filter_chain).to receive(:call)
|
||||
flow.write(event)
|
||||
end
|
||||
|
||||
context 'with raise_on_error: false' do
|
||||
it 'aborts if the filter_chain returns false' do
|
||||
allow(flow.filter_chain).to receive(:call).and_return(false)
|
||||
|
||||
expect(flow.encoder).not_to receive(:encode)
|
||||
expect(flow.adapter).not_to receive(:write)
|
||||
|
||||
flow.write(event)
|
||||
end
|
||||
|
||||
it 'writes the encoded event to the adapter' do
|
||||
expect(flow.encoder).to receive(:encode).and_return 'encoded'
|
||||
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
|
||||
|
||||
flow.write(event)
|
||||
end
|
||||
|
||||
context 'when asynchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = false
|
||||
flow_args[:synchronous] = false
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -528,30 +490,31 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||
expect(flow.write(event)).to be false
|
||||
expect(adapter).to receive(:write).and_raise('ERROR')
|
||||
expect { flow.write(event) }.not_to raise_error
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 'ignores errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||
expect(flow.write(event)).to be false
|
||||
expect(adapter).to receive(:write).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect { flow.write(event) }.not_to raise_error
|
||||
end
|
||||
end
|
||||
|
||||
context 'with raise_on_error: true' do
|
||||
context 'when synchronous' do
|
||||
before do
|
||||
flow_args[:raise_on_error] = true
|
||||
flow_args[:synchronous] = true
|
||||
end
|
||||
|
||||
it 'rescues any exception thrown by the adapter' do
|
||||
it 'logs and re-raises errors thrown by the adapter' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(error_flow).to receive(:write)
|
||||
.with(
|
||||
'error' => 'RuntimeError',
|
||||
'error_message' => 'ERROR',
|
||||
@ -560,18 +523,19 @@ RSpec.describe Rackstash::Flow do
|
||||
'message' => [instance_of(Rackstash::Message)],
|
||||
'@timestamp' => instance_of(Time)
|
||||
)
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
expect(adapter).to receive(:write).and_raise('ERROR')
|
||||
|
||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||
# flow.write(event)
|
||||
expect { flow.write(event) }.to raise_error RuntimeError, 'ERROR'
|
||||
end
|
||||
|
||||
it 'rescues errors thrown by the error_flow' do
|
||||
it 're-raises errors thrown by the error_flow' do
|
||||
error_flow = instance_double(described_class)
|
||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||
allow(flow).to receive(:error_flow).and_return(error_flow)
|
||||
|
||||
expect(adapter).to receive(:write).and_raise('ERROR')
|
||||
expect(error_flow).to receive(:write).and_raise('DOUBLE ERROR')
|
||||
|
||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||
expect { flow.write(event) }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||
end
|
||||
end
|
||||
|
||||
@ -28,7 +28,14 @@ RSpec.describe Rackstash::Rack::Middleware do
|
||||
}
|
||||
|
||||
let(:log) { [] }
|
||||
let(:logger) { Rackstash::Logger.new ->(event) { log << event } }
|
||||
|
||||
let(:flow) {
|
||||
Rackstash::Flow.new(
|
||||
->(event) { log << event },
|
||||
synchronous: true
|
||||
)
|
||||
}
|
||||
let(:logger) { Rackstash::Logger.new flow }
|
||||
|
||||
let(:args) { {} }
|
||||
let(:middleware) { described_class.new(app, logger, **args) }
|
||||
|
||||
@ -163,7 +163,9 @@ RSpec.describe Rackstash do
|
||||
|
||||
it 'wraps a non-flow' do
|
||||
adapter = 'spec.log'
|
||||
expect(Rackstash::Flow).to receive(:new).with(adapter).and_return(flow)
|
||||
expect(Rackstash::Flow).to receive(:new)
|
||||
.with(adapter, synchronous: true)
|
||||
.and_return(flow)
|
||||
|
||||
described_class.error_flow = adapter
|
||||
expect(described_class.error_flow).to equal flow
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user