1
0
mirror of https://github.com/meineerde/rackstash.git synced 2025-10-17 14:01:01 +00:00

Extract close!, reopen!, and write! methods from Rackstash::Flow

These methods do not rescue any thrown errors. The usual loggers will
always want to use the non-bang methods which rescue errors and attempt
to log them.
This commit is contained in:
Holger Just 2017-07-27 23:15:56 +02:00
parent dcf21d0bfa
commit fb7732ef42
2 changed files with 79 additions and 35 deletions

View File

@ -94,9 +94,18 @@ module Rackstash
# does not support closing. This method is called by the logger's {Sink}.
#
# @return [nil]
def close
def close!
@adapter.close
nil
end
# (see #close!)
#
# Any error raised by the adapter when closing it is written to the
# {#error_flow} and then swallowed. Only grave exceptions (i.e. all those
# which do not derive from `StandardError`) are re-raised.
def close
close!
rescue Exception => exception
log_error("close failed for adapter #{adapter.inspect}", exception)
raise unless exception.is_a?(StandardError)
@ -152,9 +161,18 @@ module Rackstash
# does not support reopening. This method is called by the logger's {Sink}.
#
# @return [nil]
def reopen
def reopen!
@adapter.reopen
nil
end
# (see #reopen!)
#
# Any error raised by the adapter when reopening it is written to the
# {#error_flow} and then swallowed. Only grave exceptions (i.e. all those
# which do not derive from `StandardError`) are re-raised.
def reopen
reopen!
rescue Exception => exception
log_error("reopen failed for adapter #{adapter.inspect}", exception)
raise unless exception.is_a?(StandardError)
@ -185,7 +203,7 @@ module Rackstash
# @return [Boolean] `true` if the event was written to the adapter, `false`
# otherwise
# @see Sink#write
def write(event)
def write!(event)
# Silently abort writing if any filter (and thus the while filter chain)
# returns `false`.
return false unless @filter_chain.call(event)
@ -202,6 +220,15 @@ module Rackstash
@adapter.write @encoder.encode(event)
true
end
# (see #write!)
#
# Any error raised by the adapter when writing to it is written to the
# {#error_flow} and then swallowed. Only grave exceptions (i.e. all those
# which do not derive from `StandardError`) are re-raised.
def write(event)
write!(event)
rescue Exception => exception
log_error("write failed for adapter #{adapter.inspect}", exception)
exception.is_a?(StandardError) ? false : raise

View File

@ -12,6 +12,7 @@ require 'rackstash/flow'
describe Rackstash::Flow do
let(:adapter) { Rackstash::Adapters::Null.new }
let(:flow) { described_class.new(adapter) }
let(:event) { {} }
describe '#initialize' do
it 'creates an adapter' do
@ -60,17 +61,23 @@ describe Rackstash::Flow do
end
end
describe '#close' do
describe '#close!' do
it 'calls adapter#close' do
expect(adapter).to receive(:close).and_return(true)
expect(flow.close).to be nil
end
end
describe '#close' do
it 'calls #close!' do
expect(flow).to receive(:close!)
flow.close
end
it 'rescues any exception thrown by the adapter' do
expect(adapter).to receive(:close).and_raise('ERROR')
expect(flow).to receive(:close!).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^close failed for adapter/)
expect(flow.close).to be nil
flow.close
end
end
@ -176,21 +183,25 @@ describe Rackstash::Flow do
expect(adapter).to receive(:reopen).and_return(true)
expect(flow.reopen).to be nil
end
end
describe '#reopen' do
it 'calls #reopen!' do
expect(flow).to receive(:reopen!)
flow.reopen
end
it 'rescues any exception thrown by the adapter' do
expect(adapter).to receive(:reopen).and_raise('ERROR')
expect(flow).to receive(:reopen!).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^reopen failed for adapter/)
expect(flow.reopen).to be nil
flow.reopen
end
end
describe '#write' do
let(:event) { {} }
describe '#write!' do
it 'calls the filter_chain' do
expect(flow.filter_chain).to receive(:call)
flow.write(event)
flow.write!(event)
end
it 'aborts if the filter_chain returns false' do
@ -198,21 +209,21 @@ describe Rackstash::Flow do
expect(flow.encoder).not_to receive(:encode)
expect(flow.adapter).not_to receive(:write)
flow.write(event)
flow.write!(event)
end
it 'concatenates message array before encoding' do
event['message'] = ["a\n", "b\n"]
expect(flow.encoder).to receive(:encode).with('message' => "a\nb\n")
flow.write(event)
flow.write!(event)
end
it 'sets message to an emoty string if deleted' do
event['message'] = nil
expect(flow.encoder).to receive(:encode).with('message' => '')
flow.write(event)
flow.write!(event)
end
it 'enforces to_s on other messages' do
@ -222,33 +233,39 @@ describe Rackstash::Flow do
expect(foo).to receive(:to_s).and_call_original
expect(flow.encoder).to receive(:encode).with('message' => 'foo')
flow.write(event)
flow.write!(event)
end
it 'encodes the event' do
expect(flow.encoder).to receive(:encode).with(event)
flow.write!(event)
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write!(event)).to be true
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write!(event)).to be true
end
end
describe '#write' do
it 'calls #write!' do
expect(flow).to receive(:write!).with(event)
flow.write(event)
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write(event)).to be true
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write(event)).to be true
end
it 'rescues any exception thrown by the adapter' do
expect(flow.adapter).to receive(:write).and_raise('ERROR')
expect(flow).to receive(:write!).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^write failed for adapter/)
expect(flow.write(event)).to be false
flow.write(event)
end
end
end