mirror of
https://github.com/meineerde/rackstash.git
synced 2026-03-02 23:31:45 +00:00
Allow to set Flow#raise_on_error? to aid in debugging flows
This commit is contained in:
parent
0605fc433d
commit
96f102875f
@ -78,14 +78,23 @@ module Rackstash
|
|||||||
# @param error_flow [Flow] a special flow which is used to write details
|
# @param error_flow [Flow] a special flow which is used to write details
|
||||||
# about any occured errors during writing. By default, we use the global
|
# about any occured errors during writing. By default, we use the global
|
||||||
# {Rackstash.error_flow} which logs JSON-formatted messages to `STDERR`.
|
# {Rackstash.error_flow} which logs JSON-formatted messages to `STDERR`.
|
||||||
|
# @param raise_on_error [Bool] set to `true` to re-raise any occured errors
|
||||||
|
# after logging them to the {#error_flow}. This can aid in debugging. By
|
||||||
|
# default, we swallow errors after having logging them to not cause
|
||||||
|
# additional issues to the production application just because the logger
|
||||||
|
# doesn't work.
|
||||||
# @yieldparam flow [self] if the given block accepts an argument, we yield
|
# @yieldparam flow [self] if the given block accepts an argument, we yield
|
||||||
# `self` as a parameter, else, the block is directly executed in the
|
# `self` as a parameter, else, the block is directly executed in the
|
||||||
# context of `self`.
|
# context of `self`.
|
||||||
def initialize(adapter, encoder: nil, filters: [], error_flow: nil, &block)
|
def initialize(adapter, encoder: nil, filters: [],
|
||||||
|
error_flow: nil, raise_on_error: false,
|
||||||
|
&block
|
||||||
|
)
|
||||||
@adapter = Rackstash::Adapter[adapter]
|
@adapter = Rackstash::Adapter[adapter]
|
||||||
self.encoder = encoder || @adapter.default_encoder
|
self.encoder = encoder || @adapter.default_encoder
|
||||||
@filter_chain = Rackstash::FilterChain.new(filters)
|
@filter_chain = Rackstash::FilterChain.new(filters)
|
||||||
self.error_flow = error_flow
|
self.error_flow = error_flow
|
||||||
|
self.raise_on_error = raise_on_error
|
||||||
|
|
||||||
if block_given?
|
if block_given?
|
||||||
if block.arity == 0
|
if block.arity == 0
|
||||||
@ -115,6 +124,8 @@ module Rackstash
|
|||||||
rescue Exception => exception
|
rescue Exception => exception
|
||||||
log_error("close failed for adapter #{adapter.inspect}", exception)
|
log_error("close failed for adapter #{adapter.inspect}", exception)
|
||||||
raise unless exception.is_a?(StandardError)
|
raise unless exception.is_a?(StandardError)
|
||||||
|
raise if raise_on_error?
|
||||||
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get or set the encoder for the log {#adapter}. If this value is not
|
# Get or set the encoder for the log {#adapter}. If this value is not
|
||||||
@ -209,6 +220,22 @@ module Rackstash
|
|||||||
end
|
end
|
||||||
alias filter_prepend filter_unshift
|
alias filter_prepend filter_unshift
|
||||||
|
|
||||||
|
# @return [Bool] `true` if we re-raise any occured errors after logging them
|
||||||
|
# to the {#error_flow}. This can aid in debugging. By default, i.e., with
|
||||||
|
# {#raise_on_error?} being `false`, we swallow errors after logging them
|
||||||
|
# to not cause additional issues to the production application just
|
||||||
|
# because Rackstash doesn't work.
|
||||||
|
def raise_on_error?
|
||||||
|
@raise_on_error
|
||||||
|
end
|
||||||
|
|
||||||
|
# @param bool [Bool] `true` to cause occured errors to be re-raised after
|
||||||
|
# logging them to the {#error_flow}, `false` to swallow them after
|
||||||
|
# logging.
|
||||||
|
def raise_on_error=(bool)
|
||||||
|
@raise_on_error = !!bool
|
||||||
|
end
|
||||||
|
|
||||||
# Re-open the log adapter if supported. This might be a no-op if the adapter
|
# Re-open the log adapter if supported. This might be a no-op if the adapter
|
||||||
# does not support reopening.
|
# does not support reopening.
|
||||||
#
|
#
|
||||||
@ -228,6 +255,8 @@ module Rackstash
|
|||||||
rescue Exception => exception
|
rescue Exception => exception
|
||||||
log_error("reopen failed for adapter #{adapter.inspect}", exception)
|
log_error("reopen failed for adapter #{adapter.inspect}", exception)
|
||||||
raise unless exception.is_a?(StandardError)
|
raise unless exception.is_a?(StandardError)
|
||||||
|
raise if raise_on_error?
|
||||||
|
nil
|
||||||
end
|
end
|
||||||
|
|
||||||
# Filter, encode and write the given `event` to the configured {#adapter}.
|
# Filter, encode and write the given `event` to the configured {#adapter}.
|
||||||
@ -263,7 +292,9 @@ module Rackstash
|
|||||||
write!(event)
|
write!(event)
|
||||||
rescue Exception => exception
|
rescue Exception => exception
|
||||||
log_error("write failed for adapter #{adapter.inspect}", exception)
|
log_error("write failed for adapter #{adapter.inspect}", exception)
|
||||||
exception.is_a?(StandardError) ? false : raise
|
raise unless exception.is_a?(StandardError)
|
||||||
|
raise if raise_on_error?
|
||||||
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
@ -286,8 +317,10 @@ module Rackstash
|
|||||||
# place to be in and there is very little we can sensibly do now.
|
# place to be in and there is very little we can sensibly do now.
|
||||||
#
|
#
|
||||||
# To aid in availability of the app using Rackstash, we swallow any
|
# To aid in availability of the app using Rackstash, we swallow any
|
||||||
# StandardErrors here and just continue, hoping that things will turn out
|
# StandardErrors by default and just continue, hoping that things will
|
||||||
# to be okay in the end.
|
# turn out to be okay in the end.
|
||||||
|
|
||||||
|
raise if raise_on_error?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@ -11,7 +11,8 @@ require 'rackstash/flow'
|
|||||||
|
|
||||||
describe Rackstash::Flow do
|
describe Rackstash::Flow do
|
||||||
let(:adapter) { Rackstash::Adapter::Null.new }
|
let(:adapter) { Rackstash::Adapter::Null.new }
|
||||||
let(:flow) { described_class.new(adapter) }
|
let(:flow_args) { {} }
|
||||||
|
let(:flow) { described_class.new(adapter, **flow_args) }
|
||||||
let(:event) { {} }
|
let(:event) { {} }
|
||||||
|
|
||||||
describe '#initialize' do
|
describe '#initialize' do
|
||||||
@ -81,32 +82,72 @@ describe Rackstash::Flow do
|
|||||||
flow.close
|
flow.close
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues any exception thrown by the adapter' do
|
context 'with raise_on_error: false' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!)
|
flow_args[:raise_on_error] = false
|
||||||
.with(
|
end
|
||||||
'error' => 'RuntimeError',
|
|
||||||
'error_message' => 'ERROR',
|
|
||||||
'error_trace' => instance_of(String),
|
|
||||||
|
|
||||||
'tags' => [],
|
it 'rescues any exception thrown by the adapter' do
|
||||||
'message' => [instance_of(Rackstash::Message)],
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
'@timestamp' => instance_of(Time)
|
'tags' => [],
|
||||||
)
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
|
||||||
|
|
||||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
'@timestamp' => instance_of(Time)
|
||||||
expect(flow.close).to be nil
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||||
|
expect(flow.close).to be nil
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||||
|
expect(flow.close).to be nil
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues errors thrown by the error_flow' do
|
context 'with raise_on_error: true' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
flow_args[:raise_on_error] = true
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
end
|
||||||
|
|
||||||
expect(flow).to receive(:close!).and_raise('ERROR')
|
it 'rescues any exception thrown by the adapter' do
|
||||||
expect(flow.close).to be nil
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
|
'tags' => [],
|
||||||
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
|
|
||||||
|
'@timestamp' => instance_of(Time)
|
||||||
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||||
|
expect { flow.close }.to raise_error RuntimeError, 'ERROR'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:close!).and_raise('ERROR')
|
||||||
|
expect { flow.close }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -254,6 +295,26 @@ describe Rackstash::Flow do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe '#raise_on_error?' do
|
||||||
|
it 'defaults to false' do
|
||||||
|
expect(flow.raise_on_error?).to eql false
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'can set a boolean' do
|
||||||
|
flow.raise_on_error = 'something'
|
||||||
|
expect(flow.raise_on_error?).to eql true
|
||||||
|
|
||||||
|
flow.raise_on_error = nil
|
||||||
|
expect(flow.raise_on_error?).to eql false
|
||||||
|
|
||||||
|
flow.raise_on_error = true
|
||||||
|
expect(flow.raise_on_error?).to eql true
|
||||||
|
|
||||||
|
flow.raise_on_error = false
|
||||||
|
expect(flow.raise_on_error?).to eql false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe '#reopen' do
|
describe '#reopen' do
|
||||||
it 'calls adapter#reopen' do
|
it 'calls adapter#reopen' do
|
||||||
expect(adapter).to receive(:reopen).and_return(true)
|
expect(adapter).to receive(:reopen).and_return(true)
|
||||||
@ -267,32 +328,72 @@ describe Rackstash::Flow do
|
|||||||
flow.reopen
|
flow.reopen
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues any exception thrown by the adapter' do
|
context 'with raise_on_error: false' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!)
|
flow_args[:raise_on_error] = false
|
||||||
.with(
|
end
|
||||||
'error' => 'RuntimeError',
|
|
||||||
'error_message' => 'ERROR',
|
|
||||||
'error_trace' => instance_of(String),
|
|
||||||
|
|
||||||
'tags' => [],
|
it 'rescues any exception thrown by the adapter' do
|
||||||
'message' => [instance_of(Rackstash::Message)],
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
'@timestamp' => instance_of(Time)
|
'tags' => [],
|
||||||
)
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
|
||||||
|
|
||||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
'@timestamp' => instance_of(Time)
|
||||||
expect(flow.reopen).to be nil
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||||
|
expect(flow.reopen).to be nil
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||||
|
expect(flow.reopen).to be nil
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues errors thrown by the error_flow' do
|
context 'with raise_on_error: true' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
flow_args[:raise_on_error] = true
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
end
|
||||||
|
|
||||||
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
it 'rescues any exception thrown by the adapter' do
|
||||||
expect(flow.reopen).to be nil
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
|
'tags' => [],
|
||||||
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
|
|
||||||
|
'@timestamp' => instance_of(Time)
|
||||||
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||||
|
expect { flow.reopen }.to raise_error RuntimeError, 'ERROR'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:reopen!).and_raise('ERROR')
|
||||||
|
expect { flow.reopen }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -336,32 +437,72 @@ describe Rackstash::Flow do
|
|||||||
flow.write(event)
|
flow.write(event)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues any exception thrown by the adapter' do
|
context 'with raise_on_error: false' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!)
|
flow_args[:raise_on_error] = false
|
||||||
.with(
|
end
|
||||||
'error' => 'RuntimeError',
|
|
||||||
'error_message' => 'ERROR',
|
|
||||||
'error_trace' => instance_of(String),
|
|
||||||
|
|
||||||
'tags' => [],
|
it 'rescues any exception thrown by the adapter' do
|
||||||
'message' => [instance_of(Rackstash::Message)],
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
'@timestamp' => instance_of(Time)
|
'tags' => [],
|
||||||
)
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
|
||||||
|
|
||||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
'@timestamp' => instance_of(Time)
|
||||||
expect(flow.write(event)).to be false
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||||
|
expect(flow.write(event)).to be false
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||||
|
expect(flow.write(event)).to be false
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'rescues errors thrown by the error_flow' do
|
context 'with raise_on_error: true' do
|
||||||
error_flow = instance_double(described_class)
|
before do
|
||||||
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
flow_args[:raise_on_error] = true
|
||||||
expect(flow).to receive(:error_flow).and_return(error_flow)
|
end
|
||||||
|
|
||||||
expect(flow).to receive(:write!).and_raise('ERROR')
|
it 'rescues any exception thrown by the adapter' do
|
||||||
expect(flow.write(event)).to be false
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!)
|
||||||
|
.with(
|
||||||
|
'error' => 'RuntimeError',
|
||||||
|
'error_message' => 'ERROR',
|
||||||
|
'error_trace' => instance_of(String),
|
||||||
|
|
||||||
|
'tags' => [],
|
||||||
|
'message' => [instance_of(Rackstash::Message)],
|
||||||
|
|
||||||
|
'@timestamp' => instance_of(Time)
|
||||||
|
)
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||||
|
expect { flow.write(event) }.to raise_error RuntimeError, 'ERROR'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rescues errors thrown by the error_flow' do
|
||||||
|
error_flow = instance_double(described_class)
|
||||||
|
expect(error_flow).to receive(:write!).and_raise('DOUBLE ERROR')
|
||||||
|
expect(flow).to receive(:error_flow).and_return(error_flow)
|
||||||
|
|
||||||
|
expect(flow).to receive(:write!).and_raise('ERROR')
|
||||||
|
expect { flow.write(event) }.to raise_error RuntimeError, 'DOUBLE ERROR'
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user