diff --git a/lib/rackstash/filter_chain.rb b/lib/rackstash/filter_chain.rb new file mode 100644 index 0000000..abaea8b --- /dev/null +++ b/lib/rackstash/filter_chain.rb @@ -0,0 +1,281 @@ +# frozen_string_literal: true + +# Copyright 2017 Holger Just +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE.txt file for details. + +require 'monitor' + +module Rackstash + # The FilterChain contains a list of event filters which are used by a {Flow} + # to mutate an event before it is envoded and sent to the adapter for writing. + # + # A filter is any object responding to `call`, e.g. one of the {Filters} or a + # `Proc` object. When running the filters, we call each filter iin turn, + # passing the event. The filter can change the event in any way desired but + # should make sure that it preserves the basic structure of the event: + # + # * Only use basic objects: `Hash`, `Array`, `String`, `Integer`, `Float`. + # * Hash keys should always be strings + # + # Objects of this class are thread-save. Each method call is locked against + # `self`. + class FilterChain + include MonitorMixin + + def initialize(filters = []) + mon_initialize + + @filters = [] + Array(filters).each do |filter| + append(filter) + end + end + + # Get an existing filter at `index`. + # + # @param index [Integer, Class, String, Object] The existing filter to + # fetch. It can be described in different ways: When given an `Integer`, + # we expect it to be the index number; when given a `Class`, we try to + # find the first filter being of that type; when given a `String`, we try + # to find the first filter being of a type named like that; when given any + # other object, we assume it is a filter and search for that. + # @return [#call, nil] The existing filter or `nil` if no existing filter + # could be found for `index` + def [](index) + synchronize do + index = index_at(index) + index ? @filters[index] : nil + end + end + + # Set the new filter at the given index or at the end of the chain if the + # value at `index` could not be found. + # + # @param index [Integer, Class, String, Object] The existing filter which + # should be overwritten with `filter`. It can be described in different + # ways: When given an `Integer`, we expect it to be the index number; when + # given a `Class`, we try to find the first filter being of that type; + # when given a `String`, we try to find the first filter being of a type + # named like that; when given any other object, we assume it is a filter + # and search for that. + # @param filter [#call, nil] the filter to set at `index` + # @return [#call] the given `filter` + def []=(index, filter) + raise TypeError, 'must provide a filter' unless filter.respond_to?(:call) + + synchronize do + id = index_at(index) + unless id && (0..@filters.size).include?(id) + raise ArgumentError, "Cannot insert at index #{index.inspect}" + end + + @filters[id] = filter + end + filter + end + + # Adds a new filter at the end of the filter chain. You can either give a + # callable object (e.g. a `Proc` or one of the {Filters}) or specify the + # filter with a given block. + # + # @param filter [#call, nil] a filter to add. If given, this value will + # take precedence to the block. If `nil`, we expect a block to be given + # which we will then take as the filter. + # @raise [TypeError] if the given filter is not callable + # @return [self] + def append(filter = nil, &block) + filter ||= block + raise TypeError, 'must provide a filter' unless filter.respond_to?(:call) + + synchronize do + @filters.push filter + end + self + end + alias << append + + # Filter the given event by calling each defined filter with it. + # + # Each filter will be called with the current event and can manipulate it + # in any way. If any of the filters returns `false`, no further filter will + # be applied and we also return `false`. This behavior can be used by + # filters to cancel the writing of an individual event. + # + # @param event [Hash] an event hash, see {Sink#write} for details + # @return [Hash, false] the filtered event or `false` if any of the + # filters returned `false` + def call(event) + each do |filter| + result = filter.call(event) + return false if result == false + end + event + end + + # Delete an existing filter from the filter chain. + # + # @param index [Integer, Class, String, Object] The existing filter to + # delete. It can be described in different ways: When given an `Integer`, + # we expect it to be the index number; when given a `Class`, we try to + # find the first filter being of that type; when given a `String`, we try + # to find the first filter being of a type named like that; when given any + # other object, we assume it is a filter and search for that. + # @raise [ArgumentError] if the existing filter could not be found + # @return [#call, nil] the deleted filter or `nil` if no filter for `index` + # could be found + def delete(index) + synchronize do + index = index_at(index) + @filters.delete_at(index) if index + end + end + + # Calls the given block once for each filter in `self`, passing that filter + # as a parameter. Concurrent changes to `self` do not affect the running + # enumeration. + # + # An `Enumerator` is returned if no block is given. + # + # @yield [filter] calls the given block once for each filter + # @yieldparam filter [#call] the yielded filter + # @return [Enumerator, self] Returns `self` if a block was given or an + # `Enumerator` if no block was given. + def each + return enum_for(__method__) unless block_given? + synchronize { @filters.dup }.each do |filter| + yield filter + end + self + end + + # Returns the index of the first filter in `self` matching + # + # @param filter [Integer, Class, String, Object] The existing filter to + # find. It can be described in different ways: When given an `Integer`, + # we expect it to be the index number; when given a `Class`, we try to + # find the first filter being of that type; when given a `String`, we try + # to find the first filter being of a type named like that; when given any + # other object, we assume it is a filter and search for that. + # @return [Integer, nil] The index of the existing filter or `nil` if no + # filter could be found for `index` + def index(filter) + synchronize { index_at(filter) } + end + + # Insert a new filter after an existing filter in the filter chain. + # + # @param index [Integer, Class, String, Object] The existing filter after + # which the new one should be inserted. It can be described in different + # ways: When given an `Integer`, we expect it to be the index number; when + # given a `Class`, we try to find the first filter being of that type; + # when given a `String`, we try to find the first filter being of a type + # named like that; when given any other object, we assume it is a filter + # and search for that. + # @param filter [#call, nil] a filter to insert. If given, this value will + # take precedence to the block. If `nil`, we expect a block to be given + # which we will then take as the filter. + # @raise [TypeError] if the given filter is not callable + # @raise [ArgumentError] if the existing filter could not be found + # @return [self] + def insert_after(index, filter = nil, &block) + filter ||= block + raise TypeError, 'must provide a filter' unless filter.respond_to?(:call) + + synchronize do + id = index_at(index) + unless id && (0...@filters.size).include?(id) + raise ArgumentError, "No such filter to insert after: #{index.inspect}" + end + + @filters.insert(id + 1, filter) + end + self + end + + # Insert a new filter before an existing filter in the filter chain. + # + # @param index [Integer, Class, String, Object] The existing filter before + # which the new one should be inserted. It can be described in different + # ways: When given an `Integer`, we expect it to be the index number; when + # given a `Class`, we try to find the first filter being of that type; + # when given a `String`, we try to find the first filter being of a type + # named like that; when given any other object, we assume it is a filter + # and search for that. + # @param filter [#call, nil] a filter to insert. If given, this value will + # take precedence to the block. If `nil`, we expect a block to be given + # which we will then take as the filter. + # @raise [TypeError] if the given filter is not callable + # @raise [ArgumentError] if the existing filter could not be found + # @return [self] + def insert_before(index, filter = nil, &block) + filter ||= block + raise TypeError, 'must provide a filter' unless filter.respond_to?(:call) + + synchronize do + id = index_at(index) + unless id && (0...@filters.size).include?(id) + raise ArgumentError, "No such filter to insert before: #{index.inspect}" + end + + @filters.insert(id, filter) + end + self + end + alias insert insert_before + + # @return [String] a string representation of `self` + def inspect + id_str = Object.instance_method(:to_s).bind(self).call[2..-2] + "#<#{id_str} #{self}>" + end + + # @return [Integer] the number of elements in `self`. May be zero. + def length + synchronize { @filters.length } + end + alias count length + alias size length + + # Prepends a new filter at the beginning of the filter chain. You can either + # give a callable object (e.g. a `Proc` or one of the {Filters}) or specify + # the filter with a given block. + # + # @param filter [#call, nil] a filter to prepend. If given, this value will + # take precedence to the block. If `nil`, we expect a block to be given + # which we will then take as the filter. + # @raise [TypeError] if the given filter is not callable + # @return [self] + def unshift(filter = nil, &block) + filter ||= block + raise TypeError, 'must provide a filter' unless filter.respond_to?(:call) + + synchronize do + @filters.unshift filter + end + self + end + + # @return [String] an Array-compatible string representation of `self` + def to_s + @filters.to_s + end + + private + + def index_at(index) + case index + when Integer, ->(o) { o.respond_to?(:to_int) } + index.to_int + when Class + @filters.index { |filter| filter.is_a?(index) } + when String, ->(o) { o.respond_to?(:to_str) } + index = index.to_str + @filters.index { |filter| filter.class.ancestors.map(&:name).include?(index) } + else + @filters.index { |filter| filter == index } + end + end + end +end diff --git a/spec/rackstash/filter_chain_spec.rb b/spec/rackstash/filter_chain_spec.rb new file mode 100644 index 0000000..418fe27 --- /dev/null +++ b/spec/rackstash/filter_chain_spec.rb @@ -0,0 +1,443 @@ +# frozen_string_literal: true + +# Copyright 2017 Holger Just +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE.txt file for details. + +require 'spec_helper' + +require 'rackstash/filter_chain' + +describe Rackstash::FilterChain do + let(:filter_chain) { described_class.new } + + Struct.new('MyFilter') do + def call(event) + event + end + end + + def a_filter + Struct::MyFilter.new + end + + let(:filter) { a_filter } + + describe '#initialize' do + it 'accepts a single filter' do + filter_chain = described_class.new(-> {}) + expect(filter_chain.length).to eql 1 + end + + it 'accepts a list of filters' do + chain = described_class.new([-> {}, -> {}, -> {}]) + expect(chain.length).to eql 3 + end + end + + describe '#[]' do + it 'returns the filter by index' do + filter_chain << filter + + expect(filter_chain[0]).to equal filter + expect(filter_chain[-1]).to equal filter + + expect(filter_chain[1]).to be_nil + expect(filter_chain[-2]).to be_nil + end + + it 'returns the filter by class or ancestor' do + filter_chain << filter + + expect(filter_chain[Struct::MyFilter]).to equal filter + expect(filter_chain[Struct]).to equal filter + expect(filter_chain[Integer]).to be_nil + end + + it 'returns the filter by class or ancestor name' do + filter_chain << filter + + expect(filter_chain['Struct::MyFilter']).to equal filter + expect(filter_chain['Struct']).to equal filter + expect(filter_chain['Integer']).to be_nil + end + + it 'returns the filter by equivalence' do + filter_chain << filter + + expect(filter_chain[filter]).to equal filter + expect(filter_chain[false]).to be_nil + expect(filter_chain[true]).to be_nil + end + end + + describe '#[]=' do + before(:each) do + filter_chain << filter + filter_chain << -> {} + end + + let(:new_filter) { -> {} } + + it 'sets a filter by index' do + filter_chain[0] = new_filter + expect(filter_chain[0]).to equal new_filter + + filter_chain[2] = new_filter + expect(filter_chain[2]).to equal new_filter + end + + it 'sets a filter by class or ancestor' do + filter_chain[Proc] = new_filter + expect(filter_chain[1]).to equal new_filter + + filter_chain[Struct] = new_filter + expect(filter_chain[0]).to equal new_filter + end + + it 'sets a filter by class or ancestor name' do + filter_chain['Proc'] = new_filter + expect(filter_chain[1]).to equal new_filter + + filter_chain['Struct'] = new_filter + expect(filter_chain[0]).to equal new_filter + end + + it 'sets a filter by equivalence' do + filter_chain[filter] = new_filter + expect(filter_chain[0]).to equal new_filter + end + + it 'raises an ArgumentError if the filter was not found' do + expect { filter_chain[false] = new_filter }.to raise_error ArgumentError + expect { filter_chain[nil] = new_filter }.to raise_error ArgumentError + expect { filter_chain['foo'] = new_filter }.to raise_error ArgumentError + expect { filter_chain[Class.new] = new_filter }.to raise_error ArgumentError + expect { filter_chain[34] = new_filter }.to raise_error ArgumentError + end + + it 'raises a TypeError if the object is not a filter' do + expect { filter_chain[0] = :foo }.to raise_error TypeError + expect { filter_chain[0] = nil }.to raise_error TypeError + expect { filter_chain[0] = false }.to raise_error TypeError + expect { filter_chain[0] = 42 }.to raise_error TypeError + expect { filter_chain[0] = 'Foo' }.to raise_error TypeError + end + end + + describe '#append' do + it 'appends a filter' do + filter_chain.append filter + expect(filter_chain[0]).to eql filter + end + + it 'appends a block as the filter' do + filter_chain.append { :foo } + expect(filter_chain[0]).to be_instance_of(Proc) + end + + it 'raises a TypeError if the object is not a filter' do + expect { filter_chain.append(:foo) }.to raise_error TypeError + expect { filter_chain.append(nil) }.to raise_error TypeError + expect { filter_chain.append(false) }.to raise_error TypeError + expect { filter_chain.append(42) }.to raise_error TypeError + expect { filter_chain.append('Foo') }.to raise_error TypeError + end + + it 'can use #<< alias' do + filter_chain << filter + expect(filter_chain[0]).to eql filter + end + end + + describe '#call' do + it 'calls all the filters' do + event = {} + filters = [a_filter, a_filter, a_filter] + filters.each do |filter| + filter_chain << filter + end + + expect(filters).to all receive(:call).with(event) + filter_chain.call({}) + end + + it 'returns the event' do + event = {} + + expect(filter_chain.call(event)).to equal event + + filter_chain << filter + expect(filter_chain.call(event)).to equal event + end + + it 'stops once a filter returns false' do + filters = [a_filter, a_filter, a_filter] + filters.each do |filter| + filter_chain << filter + end + + expect(filters[0]).to receive(:call) + expect(filters[1]).to receive(:call).and_return(false) + expect(filters[2]).not_to receive(:call) + + expect(filter_chain.call({})).to be false + end + end + + describe '#delete' do + before(:each) do + filter_chain << -> {} + filter_chain << filter + end + + it 'deletes by index' do + expect(filter_chain.delete(1)).to equal filter + expect(filter_chain.count).to eql 1 + end + + it 'deletes by class' do + expect(filter_chain.delete(Struct)).to equal filter + expect(filter_chain.count).to eql 1 + end + + it 'deletes by class name' do + expect(filter_chain.delete('Struct')).to equal filter + expect(filter_chain.count).to eql 1 + end + + it 'deletes by reference' do + expect(filter_chain.delete(filter)).to equal filter + expect(filter_chain.count).to eql 1 + end + + it 'returns nil if the filter was not found' do + expect(filter_chain.delete(nil)).to be_nil + expect(filter_chain.delete(true)).to be_nil + expect(filter_chain.delete(false)).to be_nil + expect(filter_chain.delete('Blar')).to be_nil + expect(filter_chain.delete(Object.new)).to be_nil + expect(filter_chain.delete(Class.new)).to be_nil + + # at the end, all filters are still present + expect(filter_chain.count).to eql 2 + end + end + + describe '#each' do + it 'yields each filter' do + filter_chain << -> {} + filter_chain << filter + + expect { |b| filter_chain.each(&b) } + .to yield_successive_args(instance_of(Proc), filter) + end + + it 'returns the filter chain if a block was provided' do + filter_chain << -> {} + expect(filter_chain.each {}).to equal filter_chain + end + + it 'returns an Enumerator if no block was provided' do + filter_chain << -> {} + expect(filter_chain.each).to be_instance_of Enumerator + end + + it 'operators on a copy of the internal data' do + yielded = 0 + filter_chain << -> {} + + filter_chain.each do |flow| + yielded += 1 + filter_chain[1] = flow + end + + expect(yielded).to eql 1 + end + end + + describe '#index' do + it 'finds the filter index' do + filter_chain << filter + + expect(filter_chain.index(0)).to eql 0 + expect(filter_chain.index(Struct)).to eql 0 + expect(filter_chain.index('Struct')).to eql 0 + expect(filter_chain.index(filter)).to eql 0 + end + + it 'returns nil if the filter was not found' do + expect(filter_chain.index(nil)).to be_nil + expect(filter_chain.index(true)).to be_nil + expect(filter_chain.index(false)).to be_nil + expect(filter_chain.index('Blar')).to be_nil + expect(filter_chain.index(Object.new)).to be_nil + expect(filter_chain.index(Class.new)).to be_nil + end + end + + describe '#insert_before' do + before(:each) do + filter_chain << -> {} + filter_chain << filter + end + + let(:inserted) { -> {} } + + it 'inserts before index' do + expect(filter_chain.insert_before(1, inserted)).to equal filter_chain + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to equal filter + end + + it 'inserts before class' do + expect(filter_chain.insert_before(Struct, inserted)).to equal filter_chain + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to equal filter + end + + it 'inserts before class name' do + expect(filter_chain.insert_before('Struct', inserted)).to equal filter_chain + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to equal filter + end + + it 'inserts before reference' do + expect(filter_chain.insert_before(filter, inserted)).to equal filter_chain + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to equal filter + end + + it 'raises ArgumentError if the filter was not found' do + expect { filter_chain.insert_before(nil, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_before(true, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_before(false, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_before('Blar', inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_before(Object.new, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_before(Class.new, inserted) }.to raise_error ArgumentError + end + + it 'raises a TypeError if the object is not a filter' do + expect { filter_chain.insert_before(1, :foo) }.to raise_error TypeError + expect { filter_chain.insert_before(1, nil) }.to raise_error TypeError + expect { filter_chain.insert_before(1, false) }.to raise_error TypeError + expect { filter_chain.insert_before(1, 42) }.to raise_error TypeError + expect { filter_chain.insert_before(1, 'Foo') }.to raise_error TypeError + end + end + + describe '#insert_after' do + before(:each) do + filter_chain << filter + filter_chain << -> {} + end + + let(:inserted) { -> {} } + + it 'inserts after index' do + expect(filter_chain.insert_after(0, inserted)).to equal filter_chain + expect(filter_chain[0]).to equal filter + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to be_instance_of(Proc) + end + + it 'inserts after class' do + expect(filter_chain.insert_after(Struct, inserted)).to equal filter_chain + expect(filter_chain[0]).to equal filter + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to be_instance_of(Proc) + end + + it 'inserts after class name' do + expect(filter_chain.insert_after('Struct', inserted)).to equal filter_chain + expect(filter_chain[0]).to equal filter + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to be_instance_of(Proc) + end + + it 'inserts after reference' do + expect(filter_chain.insert_after(filter, inserted)).to equal filter_chain + expect(filter_chain[0]).to equal filter + expect(filter_chain[1]).to equal inserted + expect(filter_chain[2]).to be_instance_of(Proc) + end + + it 'raises ArgumentError if the filter was not found' do + expect { filter_chain.insert_after(nil, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_after(true, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_after(false, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_after('Blar', inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_after(Object.new, inserted) }.to raise_error ArgumentError + expect { filter_chain.insert_after(Class.new, inserted) }.to raise_error ArgumentError + end + + it 'raises a TypeError if the object is not a filter' do + expect { filter_chain.insert_after(1, :foo) }.to raise_error TypeError + expect { filter_chain.insert_after(1, nil) }.to raise_error TypeError + expect { filter_chain.insert_after(1, false) }.to raise_error TypeError + expect { filter_chain.insert_after(1, 42) }.to raise_error TypeError + expect { filter_chain.insert_after(1, 'Foo') }.to raise_error TypeError + end + end + + describe '#inspect' do + it 'formats the object' do + expect(filter_chain).to receive(:to_s).and_return('[""]') + expect(filter_chain.inspect).to( + match %r{\A#"\]>\z} + ) + end + end + + describe '#length' do + it 'returns the number of flows' do + expect { filter_chain << -> {} } + .to change { filter_chain.length }.from(0).to(1) + end + + it 'can use size alias' do + expect { filter_chain << -> {} } + .to change { filter_chain.size }.from(0).to(1) + end + + it 'can use count alias' do + expect { filter_chain << -> {} } + .to change { filter_chain.count }.from(0).to(1) + end + end + + describe '#unshift' do + before(:each) do + filter_chain << -> {} + end + + it 'prepends a filter' do + filter_chain.unshift filter + expect(filter_chain[0]).to eql filter + expect(filter_chain.size).to eql 2 + end + + it 'prepends a block as the filter' do + filter_chain.unshift { :foo } + expect(filter_chain[0]).to be_instance_of(Proc) + expect(filter_chain.size).to eql 2 + end + + it 'raises a TypeError if the object is not a filter' do + expect { filter_chain.unshift(:foo) }.to raise_error TypeError + expect { filter_chain.unshift(nil) }.to raise_error TypeError + expect { filter_chain.unshift(false) }.to raise_error TypeError + expect { filter_chain.unshift(42) }.to raise_error TypeError + expect { filter_chain.unshift('Foo') }.to raise_error TypeError + end + end + + describe '#to_s' do + it 'returns the array representation' do + filter_chain << -> {} + + expect(filter_chain.to_s).to eql filter_chain.each.to_a.to_s + end + end +end