1
0
mirror of https://github.com/meineerde/rackstash.git synced 2025-10-17 14:01:01 +00:00

Implement the Flow to transform and persist log events

A single Buffer can be send to one or more flows which in turn each
write to a different adapter. A Flow object is responsible for
filtering, encoding, and finally persisting the event to an adapter.

Each Flow object can be configured differently which allows to write a
single log event to multiple targets as required.
This commit is contained in:
Holger Just 2017-07-19 23:55:49 +02:00
parent 882c10e596
commit 54a6f5d160
2 changed files with 459 additions and 2 deletions

View File

@ -5,10 +5,213 @@
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'rackstash/adapters'
require 'rackstash/encoders'
require 'rackstash/filters'
require 'rackstash/filter_chain'
module Rackstash
# A Flow is responsible for taking a raw log event (originally corresponding
# to a single {Buffer}), transforming it and finally sending it to an adapte
# for persistence. A Flow instance is normally tied to a {Flows} list which in
# turn belongs to a log {Sink}.
#
# In order to transform and persist log events, a Flow uses several
# components:
#
# * Any number of {Filters} (zero or more). The filters can change the log
# event before it is passed to the adapter by adding, changing, or removing
# fields. The filters also have access to the array of {Message} objects in
# `event["messages"]` which provide the original severity and timestamp of
# each message.
# * An `Encoder` which is responsible to transform the filtered event into a
# format suitable for the final log adapter. Most of the time, the encoder
# generates a String but can also produce other formats. Be sure to chose
# an encoder which matches the adapter's expectations. Usually, this is one
# of the {Encoders}.
# * And finally the log `Adapter` which is responsible to send the encoded log
# event to an external log target, e.g. a file or an external log receiver.
# When setting up the flow, you can either provide an existing adapter
# object or provide an object which can be wrapped in an adapter. See
# {Adapters} for a list of pre-defined log adapters.
#
# You can build a Flow using a simple DSL:
#
# flow = Rackstash::Flow.new(STDOUT) do
# encoder Rackstash::Encoders::JSON.new
#
# # Anonymize IPs in the remote_ip field.
# filter Rackstash::Filters::AnonymizeIPMask.new('remote_ip')
#
# # Add the maximum severity of any message in the event into the
# # severity and severity_label fields.
# filter do |event|
# severity = event['messages'].max_by { |message| message.severity }
# severity_label = Rackstash.severity_label(severity)
#
# event['severity'] = severity
# event['severity_label'] = severity_label
# end
# end
#
# # Write an event. This is normally done by the responsible Rackstash::Sink
# flow.write(an_event)
#
# The event which eventually gets written to the flow is created by the {Sink}
# of a {Logger}.
class Flow
def initialize(adapter = nil)
@adapter = adapter
# @return [Adapters::Adapter] the log adapter
attr_reader :adapter
# @return [FilterChain] the mutable filter chain.
attr_reader :filter_chain
# @param adapter [Adapters::Adapter, Object] an adapter or an object which
# can be wrapped in an adapter. See {Adapters.[]}
# @param encoder [#encode] an encoder, usually one of the {Encoders}. If
# this is not given, the adapter's default_encoder will be used.
# @param filters [Array<#call>] an array of filters. Can be one of the
# pre-defined {Filters}, a `Proc`, or any other object which responds to
# `call`.
# @yieldparam flow [self] if the given block accepts an argument, we yield
# `self` as a parameter, else, the block is directly executed in the
# context of `self`.
def initialize(adapter, encoder: nil, filters: [], &block)
@adapter = Rackstash::Adapters[adapter]
self.encoder(encoder || @adapter.default_encoder)
@filter_chain = Rackstash::FilterChain.new(filters)
if block_given?
if block.arity == 0
instance_eval(&block)
else
yield self
end
end
end
# Close the log adapter if supported. This might be a no-op if the adapter
# does not support closing. This method is called by the logger's {Sink}.
#
# @return [nil]
def close
@adapter.close
nil
rescue Exception => exception
log_error("close failed for adapter #{adapter.inspect}", exception)
raise unless exception.is_a?(StandardError)
end
# Get or set the encoder for the log {#adapter}. If this value is not
# explicitly defined, it defaults to the #{adapter}'s default encoder.
#
# @param encoder [#encode, nil] if given, set the flow's encoder to this
# object
# @raise [TypeError] if the given encoder does not respond to the `encode`
# method
# @return [#encode] the new encoder if given or the currently defined one
def encoder(encoder = nil)
return @encoder if encoder.nil?
raise TypeError, 'must provide an encoder' unless encoder.respond_to?(:encode)
@encoder = encoder
end
# (see FilterChain#insert_after)
def filter_after(index, filter = nil, &block)
@filter_chain.insert_after(index, filter, &block)
self
end
# (see FilterChain#append)
def filter_append(filter = nil, &block)
@filter_chain.append(filter, &block)
self
end
alias filter filter_append
# (see FilterChain#delete)
def filter_delete(index)
@filter_chain.delete(index)
end
# (see FilterChain#insert_before)
def filter_before(index, filter = nil, &block)
@filter_chain.insert_before(index, filter, &block)
self
end
# (see FilterChain#unshift)
def filter_prepend(filter = nil, &block)
@filter_chain.unshift(filter, &block)
self
end
alias filter_unshift filter_prepend
# Re-open the log adapter if supported. This might be a no-op if the adapter
# does not support reopening. This method is called by the logger's {Sink}.
#
# @return [nil]
def reopen
@adapter.reopen
nil
rescue Exception => exception
log_error("reopen failed for adapter #{adapter.inspect}", exception)
raise unless exception.is_a?(StandardError)
end
# Filter, encode and write the given `event` to the configured {#adapter}.
# This method is called by the logger's {Sink} to write a log event. The
# given `event` is updated in-place and should not be re-used afterwards
# anymore.
#
# 1. At first, we filter the event with the defined filters in their given
# order. If any of the filters returns `false`, the writing will be
# aborted. No further filters will be applied and the event will not be
# written to the adapter.
# 2. After the filters, we normalize the `event["message"]` field. While the
# filters still had access to the array of {Message} objects for
# filtering, we now concatenate the raw messages as a single string to
# get the final event. The `event["message"]` field now contains a single
# `String` object.
# 3. We encode the event to a format suitable for the adapter using the
# configured {#encoder}.
# 4. Finally, the encoded event will be passed to the {#adapter} to be send
# to the actual log target, e.g. a file or an external log receiver.
#
# @api private
#
# @param event [Hash] an event hash
# @return [Boolean] `true` if the event was written to the adapter, `false`
# otherwise
# @see Sink#write
def write(event)
# Silently abort writing if any filter (and thus the while filter chain)
# returns `false`.
return false unless @filter_chain.call(event)
event[FIELD_MESSAGE] =
case event[FIELD_MESSAGE]
when Array
event[FIELD_MESSAGE].map!(&:to_s).join
when nil
''
else
event[FIELD_MESSAGE].to_s
end
@adapter.write @encoder.encode(event)
true
rescue Exception => exception
log_error("write failed for adapter #{adapter.inspect}", exception)
exception.is_a?(StandardError) ? false : raise
end
private
# TODO: use a fallback flow and send formatted logs there
def log_error(message, exception)
warn("#{message}: #{exception}")
end
end
end

254
spec/rackstash/flow_spec.rb Normal file
View File

@ -0,0 +1,254 @@
# frozen_string_literal: true
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'spec_helper'
require 'rackstash/flow'
describe Rackstash::Flow do
let(:adapter) { Rackstash::Adapters::Null.new }
let(:flow) { described_class.new(adapter) }
describe '#initialize' do
it 'creates an adapter' do
expect(Rackstash::Adapters).to receive(:[]).with(nil).and_call_original
expect(described_class.new(nil).adapter).to be_a Rackstash::Adapters::Null
end
it 'sets the default encoder from the adapter' do
encoder = adapter.default_encoder
expect(adapter).to receive(:default_encoder).and_return(encoder)
expect(flow.encoder).to equal encoder
end
it 'allows to set a custom encoder' do
encoder = Rackstash::Encoders::Raw.new
flow = described_class.new(adapter, encoder: encoder)
expect(flow.encoder).to equal encoder
end
it 'creates an empty filter_chain by default' do
expect(flow.filter_chain).to be_a Rackstash::FilterChain
expect(flow.filter_chain.length).to eql 0
end
it 'accepts filters' do
filter = -> {}
flow = described_class.new(adapter, filters: [filter])
expect(flow.filter_chain[0]).to equal filter
end
it 'yields the adapter if block given' do
expect { |b| described_class.new(adapter, &b) }
.to yield_with_args(instance_of(described_class))
end
it 'evals the supplied block if it accepts no arguments' do
context = nil
flow = described_class.new(adapter) do
context = self
end
expect(context).to equal flow
end
end
describe '#close' do
it 'calls adapter#close' do
expect(adapter).to receive(:close).and_return(true)
expect(flow.close).to be nil
end
it 'rescues any exception thrown by the adapter' do
expect(adapter).to receive(:close).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^close failed for adapter/)
expect(flow.close).to be nil
end
end
describe '#encoder' do
it 'returns the current encoder' do
expect(flow.encoder).to respond_to(:encode)
expect(flow.encoder(nil)).to respond_to(:encode)
end
it 'allows to set a new encoder' do
encoder = Rackstash::Encoders::JSON.new
expect(flow.encoder(encoder)).to equal encoder
# The encoder is persisted and is returned afterwards
expect(flow.encoder).to equal encoder
end
it 'rejects invalid encoders' do
expect { flow.encoder :foo }.to raise_error TypeError
expect { flow.encoder 23 }.to raise_error TypeError
expect { flow.encoder true }.to raise_error TypeError
expect { flow.encoder false }.to raise_error TypeError
expect { flow.encoder ->{} }.to raise_error TypeError
end
end
describe '#filter_after' do
before(:each) do
flow.filter_chain << ->(event) {}
end
it 'calls FilterChain#insert_after' do
expect(flow.filter_chain).to receive(:insert_after).twice.and_call_original
expect(flow.filter_after(0, ->(event) { event })).to equal flow
expect(flow.filter_after(0) { |event| event }).to equal flow
expect(flow.filter_chain.size).to eql 3
end
end
describe '#filter_append' do
it 'calls FilterChain#append' do
expect(flow.filter_chain).to receive(:append).twice.and_call_original
expect(flow.filter_append ->(event) { event }).to equal flow
expect(flow.filter_append { |event| event }).to equal flow
expect(flow.filter_chain.size).to eql 2
end
it 'can use the #filter alias' do
expect(flow.method(:filter)).to eql flow.method(:filter_append)
end
end
describe '#filter_delete' do
before(:each) do
flow.filter_chain << ->(event) {}
end
it 'calls FilterChain#delete' do
expect(flow.filter_chain).to receive(:delete).once.and_call_original
expect(flow.filter_delete(0)).to be_a Proc
expect(flow.filter_chain.size).to eql 0
end
end
describe '#filter_before' do
before(:each) do
flow.filter_chain << ->(event) {}
end
it 'calls FilterChain#insert_before' do
expect(flow.filter_chain).to receive(:insert_before).twice.and_call_original
expect(flow.filter_before(0, ->(event) { event })).to equal flow
expect(flow.filter_before(0) { |event| event }).to equal flow
expect(flow.filter_chain.size).to eql 3
end
end
describe '#filter_prepend' do
it 'calls FilterChain#unshift' do
expect(flow.filter_chain).to receive(:unshift).twice.and_call_original
expect(flow.filter_prepend ->(event) { event }).to equal flow
expect(flow.filter_prepend { |event| event }).to equal flow
expect(flow.filter_chain.size).to eql 2
end
it 'can use the #filter_unshift alias' do
expect(flow.method(:filter_unshift)).to eql flow.method(:filter_prepend)
end
end
describe '#reopen' do
it 'calls adapter#reopen' do
expect(adapter).to receive(:reopen).and_return(true)
expect(flow.reopen).to be nil
end
it 'rescues any exception thrown by the adapter' do
expect(adapter).to receive(:reopen).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^reopen failed for adapter/)
expect(flow.reopen).to be nil
end
end
describe '#write' do
let(:event) { {} }
it 'calls the filter_chain' do
expect(flow.filter_chain).to receive(:call)
flow.write(event)
end
it 'aborts if the filter_chain returns false' do
expect(flow.filter_chain).to receive(:call).and_return(false)
expect(flow.encoder).not_to receive(:encode)
expect(flow.adapter).not_to receive(:write)
flow.write(event)
end
it 'concatenates message array before encoding' do
event['message'] = ["a\n", "b\n"]
expect(flow.encoder).to receive(:encode).with('message' => "a\nb\n")
flow.write(event)
end
it 'sets message to an emoty string if deleted' do
event['message'] = nil
expect(flow.encoder).to receive(:encode).with('message' => '')
flow.write(event)
end
it 'enforces to_s on other messages' do
foo = String.new('foo')
event['message'] = foo
expect(foo).to receive(:to_s).and_call_original
expect(flow.encoder).to receive(:encode).with('message' => 'foo')
flow.write(event)
end
it 'encodes the event' do
expect(flow.encoder).to receive(:encode).with(event)
flow.write(event)
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write(event)).to be true
end
it 'writes the encoded event to the adapter' do
expect(flow.encoder).to receive(:encode).and_return 'encoded'
expect(flow.adapter).to receive(:write).with('encoded').and_call_original
expect(flow.write(event)).to be true
end
it 'rescues any exception thrown by the adapter' do
expect(flow.adapter).to receive(:write).and_raise('ERROR')
expect(flow).to receive(:warn).with(/^write failed for adapter/)
expect(flow.write(event)).to be false
end
end
end