1
0
mirror of https://github.com/meineerde/rackstash.git synced 2025-10-17 14:01:01 +00:00

Add FilterChain to wrap and filters on event hashes

This commit is contained in:
Holger Just 2017-07-18 16:49:04 +02:00
parent a4a6b248cf
commit 466c377565
2 changed files with 724 additions and 0 deletions

View File

@ -0,0 +1,281 @@
# frozen_string_literal: true
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'monitor'
module Rackstash
# The FilterChain contains a list of event filters which are used by a {Flow}
# to mutate an event before it is envoded and sent to the adapter for writing.
#
# A filter is any object responding to `call`, e.g. one of the {Filters} or a
# `Proc` object. When running the filters, we call each filter iin turn,
# passing the event. The filter can change the event in any way desired but
# should make sure that it preserves the basic structure of the event:
#
# * Only use basic objects: `Hash`, `Array`, `String`, `Integer`, `Float`.
# * Hash keys should always be strings
#
# Objects of this class are thread-save. Each method call is locked against
# `self`.
class FilterChain
include MonitorMixin
def initialize(filters = [])
mon_initialize
@filters = []
Array(filters).each do |filter|
append(filter)
end
end
# Get an existing filter at `index`.
#
# @param index [Integer, Class, String, Object] The existing filter to
# fetch. It can be described in different ways: When given an `Integer`,
# we expect it to be the index number; when given a `Class`, we try to
# find the first filter being of that type; when given a `String`, we try
# to find the first filter being of a type named like that; when given any
# other object, we assume it is a filter and search for that.
# @return [#call, nil] The existing filter or `nil` if no existing filter
# could be found for `index`
def [](index)
synchronize do
index = index_at(index)
index ? @filters[index] : nil
end
end
# Set the new filter at the given index or at the end of the chain if the
# value at `index` could not be found.
#
# @param index [Integer, Class, String, Object] The existing filter which
# should be overwritten with `filter`. It can be described in different
# ways: When given an `Integer`, we expect it to be the index number; when
# given a `Class`, we try to find the first filter being of that type;
# when given a `String`, we try to find the first filter being of a type
# named like that; when given any other object, we assume it is a filter
# and search for that.
# @param filter [#call, nil] the filter to set at `index`
# @return [#call] the given `filter`
def []=(index, filter)
raise TypeError, 'must provide a filter' unless filter.respond_to?(:call)
synchronize do
id = index_at(index)
unless id && (0..@filters.size).include?(id)
raise ArgumentError, "Cannot insert at index #{index.inspect}"
end
@filters[id] = filter
end
filter
end
# Adds a new filter at the end of the filter chain. You can either give a
# callable object (e.g. a `Proc` or one of the {Filters}) or specify the
# filter with a given block.
#
# @param filter [#call, nil] a filter to add. If given, this value will
# take precedence to the block. If `nil`, we expect a block to be given
# which we will then take as the filter.
# @raise [TypeError] if the given filter is not callable
# @return [self]
def append(filter = nil, &block)
filter ||= block
raise TypeError, 'must provide a filter' unless filter.respond_to?(:call)
synchronize do
@filters.push filter
end
self
end
alias << append
# Filter the given event by calling each defined filter with it.
#
# Each filter will be called with the current event and can manipulate it
# in any way. If any of the filters returns `false`, no further filter will
# be applied and we also return `false`. This behavior can be used by
# filters to cancel the writing of an individual event.
#
# @param event [Hash] an event hash, see {Sink#write} for details
# @return [Hash, false] the filtered event or `false` if any of the
# filters returned `false`
def call(event)
each do |filter|
result = filter.call(event)
return false if result == false
end
event
end
# Delete an existing filter from the filter chain.
#
# @param index [Integer, Class, String, Object] The existing filter to
# delete. It can be described in different ways: When given an `Integer`,
# we expect it to be the index number; when given a `Class`, we try to
# find the first filter being of that type; when given a `String`, we try
# to find the first filter being of a type named like that; when given any
# other object, we assume it is a filter and search for that.
# @raise [ArgumentError] if the existing filter could not be found
# @return [#call, nil] the deleted filter or `nil` if no filter for `index`
# could be found
def delete(index)
synchronize do
index = index_at(index)
@filters.delete_at(index) if index
end
end
# Calls the given block once for each filter in `self`, passing that filter
# as a parameter. Concurrent changes to `self` do not affect the running
# enumeration.
#
# An `Enumerator` is returned if no block is given.
#
# @yield [filter] calls the given block once for each filter
# @yieldparam filter [#call] the yielded filter
# @return [Enumerator, self] Returns `self` if a block was given or an
# `Enumerator` if no block was given.
def each
return enum_for(__method__) unless block_given?
synchronize { @filters.dup }.each do |filter|
yield filter
end
self
end
# Returns the index of the first filter in `self` matching
#
# @param filter [Integer, Class, String, Object] The existing filter to
# find. It can be described in different ways: When given an `Integer`,
# we expect it to be the index number; when given a `Class`, we try to
# find the first filter being of that type; when given a `String`, we try
# to find the first filter being of a type named like that; when given any
# other object, we assume it is a filter and search for that.
# @return [Integer, nil] The index of the existing filter or `nil` if no
# filter could be found for `index`
def index(filter)
synchronize { index_at(filter) }
end
# Insert a new filter after an existing filter in the filter chain.
#
# @param index [Integer, Class, String, Object] The existing filter after
# which the new one should be inserted. It can be described in different
# ways: When given an `Integer`, we expect it to be the index number; when
# given a `Class`, we try to find the first filter being of that type;
# when given a `String`, we try to find the first filter being of a type
# named like that; when given any other object, we assume it is a filter
# and search for that.
# @param filter [#call, nil] a filter to insert. If given, this value will
# take precedence to the block. If `nil`, we expect a block to be given
# which we will then take as the filter.
# @raise [TypeError] if the given filter is not callable
# @raise [ArgumentError] if the existing filter could not be found
# @return [self]
def insert_after(index, filter = nil, &block)
filter ||= block
raise TypeError, 'must provide a filter' unless filter.respond_to?(:call)
synchronize do
id = index_at(index)
unless id && (0...@filters.size).include?(id)
raise ArgumentError, "No such filter to insert after: #{index.inspect}"
end
@filters.insert(id + 1, filter)
end
self
end
# Insert a new filter before an existing filter in the filter chain.
#
# @param index [Integer, Class, String, Object] The existing filter before
# which the new one should be inserted. It can be described in different
# ways: When given an `Integer`, we expect it to be the index number; when
# given a `Class`, we try to find the first filter being of that type;
# when given a `String`, we try to find the first filter being of a type
# named like that; when given any other object, we assume it is a filter
# and search for that.
# @param filter [#call, nil] a filter to insert. If given, this value will
# take precedence to the block. If `nil`, we expect a block to be given
# which we will then take as the filter.
# @raise [TypeError] if the given filter is not callable
# @raise [ArgumentError] if the existing filter could not be found
# @return [self]
def insert_before(index, filter = nil, &block)
filter ||= block
raise TypeError, 'must provide a filter' unless filter.respond_to?(:call)
synchronize do
id = index_at(index)
unless id && (0...@filters.size).include?(id)
raise ArgumentError, "No such filter to insert before: #{index.inspect}"
end
@filters.insert(id, filter)
end
self
end
alias insert insert_before
# @return [String] a string representation of `self`
def inspect
id_str = Object.instance_method(:to_s).bind(self).call[2..-2]
"#<#{id_str} #{self}>"
end
# @return [Integer] the number of elements in `self`. May be zero.
def length
synchronize { @filters.length }
end
alias count length
alias size length
# Prepends a new filter at the beginning of the filter chain. You can either
# give a callable object (e.g. a `Proc` or one of the {Filters}) or specify
# the filter with a given block.
#
# @param filter [#call, nil] a filter to prepend. If given, this value will
# take precedence to the block. If `nil`, we expect a block to be given
# which we will then take as the filter.
# @raise [TypeError] if the given filter is not callable
# @return [self]
def unshift(filter = nil, &block)
filter ||= block
raise TypeError, 'must provide a filter' unless filter.respond_to?(:call)
synchronize do
@filters.unshift filter
end
self
end
# @return [String] an Array-compatible string representation of `self`
def to_s
@filters.to_s
end
private
def index_at(index)
case index
when Integer, ->(o) { o.respond_to?(:to_int) }
index.to_int
when Class
@filters.index { |filter| filter.is_a?(index) }
when String, ->(o) { o.respond_to?(:to_str) }
index = index.to_str
@filters.index { |filter| filter.class.ancestors.map(&:name).include?(index) }
else
@filters.index { |filter| filter == index }
end
end
end
end

View File

@ -0,0 +1,443 @@
# frozen_string_literal: true
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'spec_helper'
require 'rackstash/filter_chain'
describe Rackstash::FilterChain do
let(:filter_chain) { described_class.new }
Struct.new('MyFilter') do
def call(event)
event
end
end
def a_filter
Struct::MyFilter.new
end
let(:filter) { a_filter }
describe '#initialize' do
it 'accepts a single filter' do
filter_chain = described_class.new(-> {})
expect(filter_chain.length).to eql 1
end
it 'accepts a list of filters' do
chain = described_class.new([-> {}, -> {}, -> {}])
expect(chain.length).to eql 3
end
end
describe '#[]' do
it 'returns the filter by index' do
filter_chain << filter
expect(filter_chain[0]).to equal filter
expect(filter_chain[-1]).to equal filter
expect(filter_chain[1]).to be_nil
expect(filter_chain[-2]).to be_nil
end
it 'returns the filter by class or ancestor' do
filter_chain << filter
expect(filter_chain[Struct::MyFilter]).to equal filter
expect(filter_chain[Struct]).to equal filter
expect(filter_chain[Integer]).to be_nil
end
it 'returns the filter by class or ancestor name' do
filter_chain << filter
expect(filter_chain['Struct::MyFilter']).to equal filter
expect(filter_chain['Struct']).to equal filter
expect(filter_chain['Integer']).to be_nil
end
it 'returns the filter by equivalence' do
filter_chain << filter
expect(filter_chain[filter]).to equal filter
expect(filter_chain[false]).to be_nil
expect(filter_chain[true]).to be_nil
end
end
describe '#[]=' do
before(:each) do
filter_chain << filter
filter_chain << -> {}
end
let(:new_filter) { -> {} }
it 'sets a filter by index' do
filter_chain[0] = new_filter
expect(filter_chain[0]).to equal new_filter
filter_chain[2] = new_filter
expect(filter_chain[2]).to equal new_filter
end
it 'sets a filter by class or ancestor' do
filter_chain[Proc] = new_filter
expect(filter_chain[1]).to equal new_filter
filter_chain[Struct] = new_filter
expect(filter_chain[0]).to equal new_filter
end
it 'sets a filter by class or ancestor name' do
filter_chain['Proc'] = new_filter
expect(filter_chain[1]).to equal new_filter
filter_chain['Struct'] = new_filter
expect(filter_chain[0]).to equal new_filter
end
it 'sets a filter by equivalence' do
filter_chain[filter] = new_filter
expect(filter_chain[0]).to equal new_filter
end
it 'raises an ArgumentError if the filter was not found' do
expect { filter_chain[false] = new_filter }.to raise_error ArgumentError
expect { filter_chain[nil] = new_filter }.to raise_error ArgumentError
expect { filter_chain['foo'] = new_filter }.to raise_error ArgumentError
expect { filter_chain[Class.new] = new_filter }.to raise_error ArgumentError
expect { filter_chain[34] = new_filter }.to raise_error ArgumentError
end
it 'raises a TypeError if the object is not a filter' do
expect { filter_chain[0] = :foo }.to raise_error TypeError
expect { filter_chain[0] = nil }.to raise_error TypeError
expect { filter_chain[0] = false }.to raise_error TypeError
expect { filter_chain[0] = 42 }.to raise_error TypeError
expect { filter_chain[0] = 'Foo' }.to raise_error TypeError
end
end
describe '#append' do
it 'appends a filter' do
filter_chain.append filter
expect(filter_chain[0]).to eql filter
end
it 'appends a block as the filter' do
filter_chain.append { :foo }
expect(filter_chain[0]).to be_instance_of(Proc)
end
it 'raises a TypeError if the object is not a filter' do
expect { filter_chain.append(:foo) }.to raise_error TypeError
expect { filter_chain.append(nil) }.to raise_error TypeError
expect { filter_chain.append(false) }.to raise_error TypeError
expect { filter_chain.append(42) }.to raise_error TypeError
expect { filter_chain.append('Foo') }.to raise_error TypeError
end
it 'can use #<< alias' do
filter_chain << filter
expect(filter_chain[0]).to eql filter
end
end
describe '#call' do
it 'calls all the filters' do
event = {}
filters = [a_filter, a_filter, a_filter]
filters.each do |filter|
filter_chain << filter
end
expect(filters).to all receive(:call).with(event)
filter_chain.call({})
end
it 'returns the event' do
event = {}
expect(filter_chain.call(event)).to equal event
filter_chain << filter
expect(filter_chain.call(event)).to equal event
end
it 'stops once a filter returns false' do
filters = [a_filter, a_filter, a_filter]
filters.each do |filter|
filter_chain << filter
end
expect(filters[0]).to receive(:call)
expect(filters[1]).to receive(:call).and_return(false)
expect(filters[2]).not_to receive(:call)
expect(filter_chain.call({})).to be false
end
end
describe '#delete' do
before(:each) do
filter_chain << -> {}
filter_chain << filter
end
it 'deletes by index' do
expect(filter_chain.delete(1)).to equal filter
expect(filter_chain.count).to eql 1
end
it 'deletes by class' do
expect(filter_chain.delete(Struct)).to equal filter
expect(filter_chain.count).to eql 1
end
it 'deletes by class name' do
expect(filter_chain.delete('Struct')).to equal filter
expect(filter_chain.count).to eql 1
end
it 'deletes by reference' do
expect(filter_chain.delete(filter)).to equal filter
expect(filter_chain.count).to eql 1
end
it 'returns nil if the filter was not found' do
expect(filter_chain.delete(nil)).to be_nil
expect(filter_chain.delete(true)).to be_nil
expect(filter_chain.delete(false)).to be_nil
expect(filter_chain.delete('Blar')).to be_nil
expect(filter_chain.delete(Object.new)).to be_nil
expect(filter_chain.delete(Class.new)).to be_nil
# at the end, all filters are still present
expect(filter_chain.count).to eql 2
end
end
describe '#each' do
it 'yields each filter' do
filter_chain << -> {}
filter_chain << filter
expect { |b| filter_chain.each(&b) }
.to yield_successive_args(instance_of(Proc), filter)
end
it 'returns the filter chain if a block was provided' do
filter_chain << -> {}
expect(filter_chain.each {}).to equal filter_chain
end
it 'returns an Enumerator if no block was provided' do
filter_chain << -> {}
expect(filter_chain.each).to be_instance_of Enumerator
end
it 'operators on a copy of the internal data' do
yielded = 0
filter_chain << -> {}
filter_chain.each do |flow|
yielded += 1
filter_chain[1] = flow
end
expect(yielded).to eql 1
end
end
describe '#index' do
it 'finds the filter index' do
filter_chain << filter
expect(filter_chain.index(0)).to eql 0
expect(filter_chain.index(Struct)).to eql 0
expect(filter_chain.index('Struct')).to eql 0
expect(filter_chain.index(filter)).to eql 0
end
it 'returns nil if the filter was not found' do
expect(filter_chain.index(nil)).to be_nil
expect(filter_chain.index(true)).to be_nil
expect(filter_chain.index(false)).to be_nil
expect(filter_chain.index('Blar')).to be_nil
expect(filter_chain.index(Object.new)).to be_nil
expect(filter_chain.index(Class.new)).to be_nil
end
end
describe '#insert_before' do
before(:each) do
filter_chain << -> {}
filter_chain << filter
end
let(:inserted) { -> {} }
it 'inserts before index' do
expect(filter_chain.insert_before(1, inserted)).to equal filter_chain
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to equal filter
end
it 'inserts before class' do
expect(filter_chain.insert_before(Struct, inserted)).to equal filter_chain
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to equal filter
end
it 'inserts before class name' do
expect(filter_chain.insert_before('Struct', inserted)).to equal filter_chain
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to equal filter
end
it 'inserts before reference' do
expect(filter_chain.insert_before(filter, inserted)).to equal filter_chain
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to equal filter
end
it 'raises ArgumentError if the filter was not found' do
expect { filter_chain.insert_before(nil, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_before(true, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_before(false, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_before('Blar', inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_before(Object.new, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_before(Class.new, inserted) }.to raise_error ArgumentError
end
it 'raises a TypeError if the object is not a filter' do
expect { filter_chain.insert_before(1, :foo) }.to raise_error TypeError
expect { filter_chain.insert_before(1, nil) }.to raise_error TypeError
expect { filter_chain.insert_before(1, false) }.to raise_error TypeError
expect { filter_chain.insert_before(1, 42) }.to raise_error TypeError
expect { filter_chain.insert_before(1, 'Foo') }.to raise_error TypeError
end
end
describe '#insert_after' do
before(:each) do
filter_chain << filter
filter_chain << -> {}
end
let(:inserted) { -> {} }
it 'inserts after index' do
expect(filter_chain.insert_after(0, inserted)).to equal filter_chain
expect(filter_chain[0]).to equal filter
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to be_instance_of(Proc)
end
it 'inserts after class' do
expect(filter_chain.insert_after(Struct, inserted)).to equal filter_chain
expect(filter_chain[0]).to equal filter
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to be_instance_of(Proc)
end
it 'inserts after class name' do
expect(filter_chain.insert_after('Struct', inserted)).to equal filter_chain
expect(filter_chain[0]).to equal filter
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to be_instance_of(Proc)
end
it 'inserts after reference' do
expect(filter_chain.insert_after(filter, inserted)).to equal filter_chain
expect(filter_chain[0]).to equal filter
expect(filter_chain[1]).to equal inserted
expect(filter_chain[2]).to be_instance_of(Proc)
end
it 'raises ArgumentError if the filter was not found' do
expect { filter_chain.insert_after(nil, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_after(true, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_after(false, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_after('Blar', inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_after(Object.new, inserted) }.to raise_error ArgumentError
expect { filter_chain.insert_after(Class.new, inserted) }.to raise_error ArgumentError
end
it 'raises a TypeError if the object is not a filter' do
expect { filter_chain.insert_after(1, :foo) }.to raise_error TypeError
expect { filter_chain.insert_after(1, nil) }.to raise_error TypeError
expect { filter_chain.insert_after(1, false) }.to raise_error TypeError
expect { filter_chain.insert_after(1, 42) }.to raise_error TypeError
expect { filter_chain.insert_after(1, 'Foo') }.to raise_error TypeError
end
end
describe '#inspect' do
it 'formats the object' do
expect(filter_chain).to receive(:to_s).and_return('["<filter>"]')
expect(filter_chain.inspect).to(
match %r{\A#<Rackstash::FilterChain:0x[a-f0-9]+ \["<filter>"\]>\z}
)
end
end
describe '#length' do
it 'returns the number of flows' do
expect { filter_chain << -> {} }
.to change { filter_chain.length }.from(0).to(1)
end
it 'can use size alias' do
expect { filter_chain << -> {} }
.to change { filter_chain.size }.from(0).to(1)
end
it 'can use count alias' do
expect { filter_chain << -> {} }
.to change { filter_chain.count }.from(0).to(1)
end
end
describe '#unshift' do
before(:each) do
filter_chain << -> {}
end
it 'prepends a filter' do
filter_chain.unshift filter
expect(filter_chain[0]).to eql filter
expect(filter_chain.size).to eql 2
end
it 'prepends a block as the filter' do
filter_chain.unshift { :foo }
expect(filter_chain[0]).to be_instance_of(Proc)
expect(filter_chain.size).to eql 2
end
it 'raises a TypeError if the object is not a filter' do
expect { filter_chain.unshift(:foo) }.to raise_error TypeError
expect { filter_chain.unshift(nil) }.to raise_error TypeError
expect { filter_chain.unshift(false) }.to raise_error TypeError
expect { filter_chain.unshift(42) }.to raise_error TypeError
expect { filter_chain.unshift('Foo') }.to raise_error TypeError
end
end
describe '#to_s' do
it 'returns the array representation' do
filter_chain << -> {}
expect(filter_chain.to_s).to eql filter_chain.each.to_a.to_s
end
end
end