From 720406b318a2b6b6365bdac57977bb0eedb40a2b Mon Sep 17 00:00:00 2001 From: Holger Just Date: Thu, 20 Jul 2017 00:00:48 +0200 Subject: [PATCH] Implement the full Sink class A single Sink is tied to a single Logger. It is responsible to: * Create a log event from a Buffer on #write and send it to each of the flows independently. * Forward all actions to all of the defined Flows. The Sink provides access to all configured data of the Logger which is used for persisting the Buffers. --- lib/rackstash/buffer.rb | 2 +- lib/rackstash/sink.rb | 223 +++++++++++++++++++++++++++++++++- spec/rackstash/buffer_spec.rb | 6 +- spec/rackstash/logger_spec.rb | 2 +- spec/rackstash/sink_spec.rb | 161 +++++++++++++++++++++--- 5 files changed, 366 insertions(+), 28 deletions(-) diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index aa5644c..280f242 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -152,7 +152,7 @@ module Rackstash def flush return unless pending? - @sink.flush(self) + @sink.write(self) self end diff --git a/lib/rackstash/sink.rb b/lib/rackstash/sink.rb index 2180a18..c543ee5 100644 --- a/lib/rackstash/sink.rb +++ b/lib/rackstash/sink.rb @@ -5,17 +5,228 @@ # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE.txt file for details. +require 'rackstash/flows' + module Rackstash class Sink - attr_reader :targets + # @return [Hash, Proc] the default fields which get merged into each buffer + # on {#write}. + attr_reader :default_fields - def initialize(targets) - @targets = targets.respond_to?(:to_ary) ? targets.to_ary : [targets] + # @return [Array<#to_s, Proc>, Proc] the default tags which get merged into + # each buffer on flush. It can either be a Array of String keys or a Proc + # which returns an Array. Each element of the array can also be a Proc + # which gets resolved on {#write}. + attr_reader :default_tags + + # @return [Flows] The {Flows} defined for this sink + attr_reader :flows + + # @param flows [Array, Flow, Adapters::Adapter, Object] + # an array of {Flow}s or a single {Flow}, respectivly object which can be + # used as a {Flow}'s adapter. See {Flow#initialize}. + def initialize(flows) + @flows = Rackstash::Flows.new(flows) + + @default_fields = {} + @default_tags = [] end - def flush(buffer) - @targets.each do |target| - target.flush(buffer) + # Set the default fields to the given value. + # + # These fields get merged into each buffer on {#write}. It can either be a + # `Hash` or a `Proc` which in turn returns a `Hash` on call. The `Hash` can + # be nested arbitrarily deep. + # + # Each value can also be an Proc which gets evaluated on {#write}. That way, + # you can set lazy-evaluated values. + # + # @example The following calls show equivalent results + # + # sink.default_fields = {'beep' => 'boop'} + # + # sink.default_fields = lambda { + # { + # 'beep' => 'boop' + # } + # } + # + # sink.default_fields = { + # 'beep' => -> { 'boop' } + # } + # + # @param fields [#to_hash, Proc] The default fields to be merged into each + # buffer on {#write} + # @raise [TypeError] if `fields` is neither a Proc nor can be converted to a + # Hash + # @return [Hash, Proc] the given `fields` + def default_fields=(fields) + fields = fields.to_hash if fields.respond_to?(:to_hash) + unless fields.is_a?(Hash) || fields.is_a?(Proc) + raise TypeError, 'default_fields must be a Hash or Proc' + end + + @default_fields = fields + end + + # Set the default tags to the given value. + # + # These tags get merged into each buffer on {#write}. It can either be an + # `Array` of strings or a `Proc` which in turn returns an `Array` of strings + # on call. The array's values can also be procs which should return a String + # on call. All procs are evaluated on {#write}. That way, you can set + # lazy-evaluated values. + # + # @example The following calls show equivalent results + # + # sink.default_tags = ['important', 'request'] + # + # sink.default_tags = lambda { + # ['important', 'request'] + # } + # + # sink.default_tags = [ + # 'important', -> { 'request' } + # } + # + # @param tags [#to_ary, Proc] The default tags to be merged into each + # buffer on {#write} + # @raise [TypeError] if `tags` is neither a Proc nor can be converted to an + # Array + # @return [Array, Proc] the given `tags` + def default_tags=(tags) + tags = tags.to_ary if tags.respond_to?(:to_ary) + unless tags.is_a?(Array) || tags.is_a?(Proc) + raise TypeError, 'default_tags must be an Array or Proc' + end + + @default_tags = tags + end + + # Close the log adapter for each configured {Flow}. This might be a no-op + # depending on each flow's adapter. + # + # @return [nil] + def close + @flows.each(&:close) + nil + end + + # Close and re-open the log adapter for each configured {Flow}. This might + # be a no-op depending on each flow's adapter. + # + # @return [nil] + def reopen + @flows.each(&:reopen) + nil + end + + # Create an event hash from the given `buffer` and write it to each of the + # defined {#flows}. + # + # First, we transform the given `buffer` to an event hash: + # + # * We deep-merge the {#default_fields} into the `buffer`'s fields and use + # it as the basis the the event hash. Existing fields on the `buffer` will + # always have precedence here. + # * We add the {#default_tags} to the `buffer`'s tags and add them as a raw + # array of strings to the `event['tags']` field. + # * We add the `buffer`'s array of messages to `event['message']`. This + # field now contains an array of {Message} objects. + # * We add the `buffer`'s timestamp to the `event['@timestamp]` field as an + # ISO 8601 formatted string. The timestamp is always in UTC. + # * We add the version of the logstash event format as + # `event[@version] = 1`. + # + # The typical event emitted here looks like this: + # + # { + # "beep" => "boop", + # "foo" => ["bar", "baz"], + # "tags" => ["request", "controller#action"], + # "message" => [ + # #, + # # + # ], + # "@timestamp" => "2016-10-17T13:37:42.000Z", + # "@version" => "1" + # } + # + # The resulting event hash is written to each defined {Flow}. Since a flow + # usually changes the event hash with its filters and encoder, we create a + # fresh copy of the hash for each flow. + # + # @param buffer [Buffer] The buffer cotnaining the data to write to the + # {#flows}. + # @return [Buffer] the given `buffer` + def write(buffer) + event = event_from_buffer(buffer) + + # Memoize the current list of flows for the rest of the method to make + # sure it doesn't change while we work with it. + flows = @flows.to_a + flows_size = flows.size + + flows.each_with_index do |flow, index| + # If we have more than one flow, we provide a fresh copy of the event + # to each flow. The flow's filter and codec can then mutate the event + # however it pleases without affecting later flows. We don't need to + # duplicate the event for the last flow since it won't be re-used + # after that anymore. + current_event = (index == flows_size - 1) ? event : deep_dup_event(event) + + flow.write(current_event) + end + + buffer + end + + private + + # Create a raw event hash from a Buffer. + # + # Note that the resulting hash still contains an Array of {Message}s in the + # `"message"` field. This allows flow {Filters} to reject or adapt some + # messages based on their original attributes, e.g., their severity or + # timestamp. + # + # @see Flow#write + # + # @param buffer [Buffer] a buffer instance + # @return [Hash] the event expected by the event filters. + def event_from_buffer(buffer) + event = buffer.fields.deep_merge(@default_fields, force: false).to_h + event[FIELD_TAGS] = buffer.tags.merge(@default_tags).to_a + event[FIELD_MESSAGE] = buffer.messages + event[FIELD_TIMESTAMP] = buffer.timestamp + event[FIELD_VERSION] = '1'.freeze + + event + end + + # Create a deep duplicate of an event hash. It is assumed that the input + # event follows the normalized structure as generated by + # {Fields::Hash#to_h}. + # + # @param obj [Object] an object to duplicate. When initially called, this is + # expected to be an event hash + # @return [Object] a deep copy of the given `obj` if it was an `Array` or + # `Hash`, the original `obj` otherwise. + def deep_dup_event(obj) + case obj + when Hash + hash = obj.dup + obj.each_pair do |key, value| + # {Rackstash::Fields::Hash} already guarantees that keys are always + # frozen Strings. We don't need to dup them. + hash[key] = deep_dup_event(value) + end + when Array + obj.map { |value| deep_dup_event(value) } + else + # All leaf-values in the event are either frozen or not freezable / + # dupable. They can be used as is. See {AbstractCollection#normalize} + obj end end end diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index f92c3aa..d00d366 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -148,11 +148,11 @@ describe Rackstash::Buffer do buffer.add_message double(message: 'Hello World!', time: Time.now) # We might call Buffer#flush during the following tests - allow(sink).to receive(:flush).with(buffer).once + allow(sink).to receive(:write).with(buffer).once end it 'flushes the buffer to the sink' do - expect(sink).to receive(:flush).with(buffer).once + expect(sink).to receive(:write).with(buffer).once buffer.flush end @@ -169,7 +169,7 @@ describe Rackstash::Buffer do context 'when not pending?' do it 'does not flushes the buffer to the sink' do - expect(sink).not_to receive(:flush) + expect(sink).not_to receive(:write) buffer.flush end diff --git a/spec/rackstash/logger_spec.rb b/spec/rackstash/logger_spec.rb index 30cff2d..ccadcdf 100644 --- a/spec/rackstash/logger_spec.rb +++ b/spec/rackstash/logger_spec.rb @@ -388,7 +388,7 @@ describe Rackstash::Logger do end it 'buffers multiple messages' do - expect(logger.sink).to receive(:flush).once + expect(logger.sink).to receive(:write).once logger.with_buffer do logger.add 1, 'Hello World' diff --git a/spec/rackstash/sink_spec.rb b/spec/rackstash/sink_spec.rb index 6e56a3d..0e14891 100644 --- a/spec/rackstash/sink_spec.rb +++ b/spec/rackstash/sink_spec.rb @@ -9,31 +9,158 @@ require 'spec_helper' require 'rackstash/sink' -describe Rackstash::Sink do - let(:targets) { [] } - let(:sink) { described_class.new(targets) } +require 'rackstash/buffer' +require 'rackstash/flows' +require 'rackstash/flow' - describe '#initialize' do - it 'accepts an array with targets' do - expect(targets).to receive(:to_ary).once.and_call_original - expect(sink.targets).to equal targets +describe Rackstash::Sink do + def a_flow + flow = instance_double('Rackstash::Flow') + allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true) + flow + end + + let(:flow) { a_flow } + let(:sink) { described_class.new(flow) } + + describe 'initialize' do + # We deliberately use the real Rackstash::Flows class here to server as an + # integration test + it 'wraps a single flow in a flows list' do + expect(Rackstash::Flows).to receive(:new).with(flow) + .and_call_original + + sink = described_class.new(flow) + expect(sink.flows).to be_a Rackstash::Flows + expect(sink.flows.to_a).to eql [flow] end - it 'wraps a single target into an array' do - target = Object.new - expect(described_class.new(target).targets).to eql [target] + it 'wraps multiple flows in a flows list' do + flows = [a_flow, a_flow] + + expect(Rackstash::Flows).to receive(:new).with(flows) + .and_call_original + sink = described_class.new(flows) + + expect(sink.flows).to be_a Rackstash::Flows + expect(sink.flows.to_a).to eql flows end end - describe '#flush' do - it 'flushes the buffer to all targets' do - buffer = double('buffer') + describe '#default_fields' do + it 'can set a proc' do + a_proc = proc { nil } + expect(a_proc).not_to receive(:call) - target = double('target') - targets << target + sink.default_fields = a_proc + expect(sink.default_fields).to equal a_proc + end - expect(target).to receive(:flush).with(buffer) - sink.flush(buffer) + it 'can set a Hash' do + hash = { foo: :bar } + sink.default_fields = hash + + expect(sink.default_fields).to equal hash + end + + it 'can set a Hash-like object' do + hash_alike = double('hash') + expect(hash_alike).to receive(:to_hash).and_return(foo: :bar) + + sink.default_fields = hash_alike + expect(sink.default_fields).to eql(foo: :bar) + expect(sink.default_fields).not_to equal hash_alike + end + + it 'refuses invalid fields' do + expect { sink.default_fields = nil }.to raise_error TypeError + expect { sink.default_fields = 42 }.to raise_error TypeError + expect { sink.default_fields = ['foo'] }.to raise_error TypeError + end + end + + describe '#default_tags' do + it 'can set a proc' do + tags = proc { nil } + expect(tags).not_to receive(:call) + + sink.default_tags = tags + expect(sink.default_tags).to equal tags + end + + it 'can set an Array' do + array = [:foo, 'bar'] + sink.default_tags = array + + expect(sink.default_tags).to equal array + end + + it 'can set an Array-like object' do + array_alike = double('array') + expect(array_alike).to receive(:to_ary).and_return([:foo]) + + sink.default_tags = array_alike + expect(sink.default_tags).to eql [:foo] + expect(sink.default_tags).not_to equal array_alike + end + + it 'refuses invalid fields' do + expect { sink.default_tags = nil }.to raise_error TypeError + expect { sink.default_tags = 42 }.to raise_error TypeError + expect { sink.default_tags = { foo: :bar } }.to raise_error TypeError + end + end + + describe '#close' do + let(:flow) { [a_flow, a_flow] } + + it 'calls close on all flows' do + expect(flow).to all receive(:close) + expect(sink.close).to be_nil + end + end + + describe '#reopen' do + let(:flow) { [a_flow, a_flow] } + + it 'calls reopen on all flows' do + expect(flow).to all receive(:reopen) + expect(sink.reopen).to be_nil + end + end + + describe '#write' do + let(:flows) { + [a_flow, a_flow].each do |flow| + allow(flow).to receive(:write) + end + } + let(:sink) { described_class.new(flows) } + let(:buffer) { Rackstash::Buffer.new(sink) } + + it 'merges default_fields and default_tags' do + expect(buffer.fields).to receive(:deep_merge).once + expect(buffer.tags).to receive(:merge).once + + sink.write(buffer) + end + + it 'flushes the buffer to all flows' do + event_spec = { + 'message' => [], + 'tags' => [], + '@timestamp' => instance_of(String), + '@version' => '1' + } + + # only the first event is duplicated + expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered + expect(sink).to receive(:deep_dup_event).with(anything).exactly(4).times.and_call_original + + # During flush, we create a single event, duplicate it and write each to + # each of the flows. + expect(flows).to all receive(:write).with(event_spec) + sink.write(buffer) end end end