diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index 87e4715..09982bb 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -17,9 +17,8 @@ module Rackstash # Each time, a message is logged or a field or tag is set to a {Logger}, it # is set on a Buffer. Each Buffer belongs to exactly one {BufferStack} (and # thus in turn to exactly one {Logger}) which creates it and controls its - # complete life cycle. The data a buffer holds can be exported via a {Sink} - # and passed on to one or more {Flow}s which send the data to an external - # log receiver. + # complete life cycle. The data a buffer holds can be written to one or more + # {Flow}s which send the data to an external log receiver. # # Most methods of the Buffer are directly exposed to the user-accessible # {Logger}. The Buffer class itself is considered private and should not be @@ -28,18 +27,17 @@ module Rackstash # by exposing a Buffer to each thread as the "current Buffer". # # Buffers can be buffering or non-buffering. While this doesn't affect the - # behavior of the Buffer itself, it affects when the Buffer is flushed to a - # {Sink} and what happens to the data stored in the Buffer after that. + # behavior of the Buffer itself, it affects when the Buffer is flushed to the + # flows and what happens to the data stored in the Buffer after that. # - # Generally, a non-buffering Buffer will be flushed to the sink after each - # logged message. This thus mostly resembles the way traditional loggers work - # in Ruby. A buffering Buffer however holds log messages for a longer time, - # e.g., for the duration of a web request. Only after the request finished - # all log messages and stored fields for this request will be flushed to the - # {Sink} as a single log event. + # Generally, a non-buffering Buffer will be flushed after each logged message. + # This thus mostly resembles the way traditional loggers work in Ruby. A + # buffering Buffer however holds log messages for a longer time, e.g., for the + # duration of a web request. Only after the request finished, all log messages + # and stored fields for this request will be flushed as a single log event. # - # While the fields structure of a Buffer is geared towards the format used by - # Logstash, it can be adaptd in many ways suited for a specific log target. + # While the field structure of a Buffer is geared towards the format used by + # Logstash, it can be adapted in many ways suited for a specific log target. # # @note The Buffer class is designed to be created and used by its responsible # {BufferStack} object only and is not intended used from multiple Threads @@ -55,19 +53,22 @@ module Rackstash FIELD_VERSION, # the version of the Logstash JSON schema. Usually "1" ].freeze - # @return [Sink] the log {Sink} where the buffer is eventually flushed to - attr_reader :sink + # @return [Flows] the list of defined {Flow} objects which are responsible + # for transforming, encoding, and persisting the log events. + attr_reader :flows + # @param flows [Flows] a list of {Flow} objects where this buffer eventually + # writes to # @param buffering [Boolean] When set to `true`, this buffer is considered # to be buffering data. When buffering, logged messages will not be # flushed immediately but only with an explicit call to {#flush}. # @param allow_silent [Boolean] When set to `true` the data in this buffer - # will be flushed to the sink, even if there - # were just added fields or tags without any logged messages. If this is - # `false` and there were no messages logged with {#add_message}, the - # buffer will not be flushed to the sink but will be silently dropped. - def initialize(sink, buffering: true, allow_silent: true) - @sink = sink + # will be flushed to the flows, even if there were just added fields or + # tags without any logged messages. If this is `false` and there were no + # messages logged with {#add_message}, the buffer will not be flushed but + # will be silently dropped. + def initialize(flows, buffering: true, allow_silent: true) + @flows = flows @buffering = !!buffering @allow_silent = !!allow_silent @@ -151,11 +152,11 @@ module Rackstash end # When set to `true` in {#initialize}, the data in this buffer will be - # flushed to the sink, even if there were just added fields or tags but no - # messages. + # flushed to the {#flows}, even if there were just added fields or tags but + # no messages. # # If this is `false` and there were no messages logged with {#add_message}, - # the buffer will not be flushed to the sink but will be silently dropped. + # the buffer will not be flushed to the flows but will be silently dropped. # # @return [Boolean] def allow_silent? @@ -191,7 +192,7 @@ module Rackstash @fields ||= Rackstash::Fields::Hash.new(forbidden_keys: FORBIDDEN_FIELDS) end - # Flush the current buffer to the log {#sink} if it is pending. + # Flush the current buffer to the {#flows} if it is pending. # # After the flush, the existing buffer should not be used anymore. You # should either call {#clear} to remove all volatile data or create a new @@ -202,7 +203,7 @@ module Rackstash def flush return unless pending? - @sink.write(self) + @flows.write(self.to_event) self end @@ -217,7 +218,7 @@ module Rackstash end # This flag denotes whether the current buffer holds flushable data. By - # default, a new buffer is not pending and will not be flushed to the sink. + # default, a new buffer is not pending and will not be flushed. # Each time there is a new message logged, this is set to `true` for the # buffer. For changes of tags or fields or when setting the {#timestamp}, # the `pending?` flag is only flipped to `true` if {#allow_silent?} is set @@ -283,15 +284,13 @@ module Rackstash # Create an event hash from `self`. # - # * We take the buffer's existing fields and deep-merge the provided - # `fields` into them. Existing fields on the buffer will always have - # precedence here. - # * We add the given additional `tags` to the buffer's tags and add them as - # a raw array of strings to the `event['tags']` field. - # * We add the buffer's array of messages to `event['message']`. This field - # now contains an array of {Message} objects. - # * We add the buffer's timestamp to the `event['@timestamp]` field as an - # ISO 8601 formatted string. The timestamp is always in UTC. + # * It contains the all of the current buffer's logged fields + # * We add the buffer's tags and add them as an array of strings to the + # `event['tags']` field. + # * We add the buffer's list of messages to `event['message']`. This field + # thus contains an array of {Message} objects. + # * We add the buffer's timestamp to the `event['@timestamp]` as a `Time` + # object in UTC. # # The typical event emitted here looks like this: # @@ -318,24 +317,10 @@ module Rackstash # are either `Hash`, `Array`, frozen `String`, `Integer` or `Float` objects. # All hashes (including nested hashes) use `String` keys. # - # @param fields [Hash Object>, Proc] additional fields which are - # merged with this Buffer's fields in the returned event Hash - # @param tags [Array, Proc] additional tags which are merged - # added to Buffer's tags in the returned event Hash # @return [Hash] the event expected by the event {Filters}. - def to_event(fields: {}, tags: []) - if (@fields.nil? || @fields.empty?) && ::Hash === fields && fields.empty? - event = {} - else - event = self.fields.deep_merge(fields, force: false).to_h - end - - if (@tags.nil? || @tags.empty?) && ::Array === tags && tags.empty? - event[FIELD_TAGS] = [] - else - event[FIELD_TAGS] = self.tags.merge(tags).to_a - end - + def to_event + event = fields.to_h + event[FIELD_TAGS] = tags.to_a event[FIELD_MESSAGE] = messages event[FIELD_TIMESTAMP] = timestamp @@ -344,14 +329,13 @@ module Rackstash private - # Non buffering buffers, i.e., those with `buffering: false`, flush - # themselves to the sink whenever there is something logged to it. That way, - # such a buffer acts like a regular old Logger would: it just flushes a - # logged message to its log device as soon as it is logged. + # Non-buffering buffers, i.e., those with `buffering: false`, flush + # themselves to the defined flows whenever there is something logged to it. + # That way, such a buffer acts like a regular old Logger would: it just + # flushes a logged message to its log device as soon as it is logged. # - # By calling `auto_flush`, the current buffer is flushed and cleared - - # Flush and clear the current buffer if necessary. + # By calling `auto_flush`, the current buffer is flushed and cleared if + # necessary. def auto_flush return if buffering? diff --git a/lib/rackstash/buffer_stack.rb b/lib/rackstash/buffer_stack.rb index 96c31f9..3eebdf7 100644 --- a/lib/rackstash/buffer_stack.rb +++ b/lib/rackstash/buffer_stack.rb @@ -13,11 +13,12 @@ module Rackstash # is used by exactly one {Logger}. The responsible {Logger} ensures that each # BufferStack is only accessed from a single thread. class BufferStack - # @return [Sink] the log sink where the buffers are eventually flushed to - attr_reader :sink + # @return [Flows] the list of defined {Flow} objects which are responsible + # for transforming, encoding, and persisting the log events. + attr_reader :flows - def initialize(sink) - @sink = sink + def initialize(flows) + @flows = flows @stack = [] end @@ -27,7 +28,7 @@ module Rackstash # # @return [Buffer] def current - @stack.last || Buffer.new(@sink, buffering: false).tap do |buffer| + @stack.last || Buffer.new(@flows, buffering: false).tap do |buffer| @stack.push buffer end end @@ -43,7 +44,7 @@ module Rackstash # {Buffer}. See {Buffer#initialize} for allowed values. # @return [Buffer] the newly created buffer def push(buffer_args = {}) - buffer = Buffer.new(sink, buffer_args) + buffer = Buffer.new(@flows, buffer_args) @stack.push buffer buffer diff --git a/lib/rackstash/filter_chain.rb b/lib/rackstash/filter_chain.rb index 420c0f5..257863e 100644 --- a/lib/rackstash/filter_chain.rb +++ b/lib/rackstash/filter_chain.rb @@ -104,7 +104,7 @@ module Rackstash # the writing of an individual event. Any other return value of filters is # ignored. # - # @param event [Hash] an event hash, see {Sink#write} for details + # @param event [Hash] an event hash, see {Buffer#to_event} for details # @return [Hash, false] the filtered event or `false` if any of the # filters returned `false` def call(event) diff --git a/lib/rackstash/flow.rb b/lib/rackstash/flow.rb index 4b01313..0a2901c 100644 --- a/lib/rackstash/flow.rb +++ b/lib/rackstash/flow.rb @@ -14,7 +14,7 @@ module Rackstash # A Flow is responsible for taking a raw log event (originally corresponding # to a single {Buffer}), transforming it and finally sending it to an adapte # for persistence. A Flow instance is normally tied to a {Flows} list which in - # turn belongs to a log {Sink}. + # turn belongs to a {Logger}. # # In order to transform and persist log events, a Flow uses several # components: @@ -54,11 +54,11 @@ module Rackstash # end # end # - # # Write an event. This is normally done by the responsible Rackstash::Sink + # # Write an event. This is normally done by a Rackstash::Buffer # flow.write(an_event) # - # The event which eventually gets written to the flow is created by the {Sink} - # of a {Logger}. + # The event which eventually gets written to the flow is created from a Buffer + # with {Buffer#to_event}. class Flow # @return [Adapters::Adapter] the log adapter attr_reader :adapter @@ -92,7 +92,7 @@ module Rackstash end # Close the log adapter if supported. This might be a no-op if the adapter - # does not support closing. This method is called by the logger's {Sink}. + # does not support closing. # # @return [nil] def close! @@ -205,7 +205,7 @@ module Rackstash alias filter_prepend filter_unshift # Re-open the log adapter if supported. This might be a no-op if the adapter - # does not support reopening. This method is called by the logger's {Sink}. + # does not support reopening. # # @return [nil] def reopen! @@ -226,23 +226,21 @@ module Rackstash end # Filter, encode and write the given `event` to the configured {#adapter}. - # This method is called by the logger's {Sink} to write a log event. The - # given `event` is updated in-place and should not be re-used afterwards - # anymore. + # The given `event` is updated in-place by the filters and encoder of the + # flow and should not be re-used afterwards anymore. # - # 1. At first, we filter the event with the defined filters in their given - # order. If any of the filters returns `false`, the writing will be + # 1. At first, we filter the event with the defined {#filter_chain} in their + # given order. If any of the filters returns `false`, the writing will be # aborted. No further filters will be applied and the event will not be # written to the adapter. See {FilterChain#call} for details. # 2. We encode the event to a format suitable for the adapter using the # configured {#encoder}. - # 3. Finally, the encoded event will be passed to the {#adapter} to be send + # 3. Finally, the encoded event will be passed to the {#adapter} to be sent # to the actual log target, e.g. a file or an external log receiver. # # @param event [Hash] an event hash # @return [Boolean] `true` if the event was written to the adapter, `false` # otherwise - # @see Sink#write def write!(event) # Silently abort writing if any filter (and thus the while filter chain) # returns `false`. diff --git a/lib/rackstash/flows.rb b/lib/rackstash/flows.rb index 5894b33..4c43ef4 100644 --- a/lib/rackstash/flows.rb +++ b/lib/rackstash/flows.rb @@ -11,7 +11,8 @@ require 'rackstash/flow' module Rackstash # The `Flows` class provides a thread-safe list of {Flow} objects which are - # used to dispatch a single log events to multiple flows from the {Sink}. + # used to write a single log event of a {Buffer} to multiple flows. Each + # {Logger} object has an associated Flows object to define the logger's flows. class Flows # @param flows [::Array] the {Flow} objects # which should be part of the list. If any of the arguments is not a @@ -133,6 +134,52 @@ module Rackstash end alias size length + # Close the log adapter for each configured {Flow}. This might be a no-op + # depending on each flow's adapter. + # + # @return [nil] + def close + each(&:close) + nil + end + + # Close and re-open the log adapter for each configured {Flow}. This might + # be a no-op depending on each flow's adapter. + # + # @return [nil] + def reopen + each(&:reopen) + nil + end + + # Write an event `Hash` to each of the defined flows. The event is usually + # created from {Buffer#to_event}. + # + # We write a fresh deep-copy of the event hash to each defined {Flow}. + # This allows each flow to alter the event in any way without affecting the + # others. + # + # @param event [Hash] an event `Hash` + # @return [Hash] the given `event` + def write(event) + # Memoize the current list of flows for the rest of the method to make + # sure it doesn't change while we work with it. + flows = to_a + flows_size = flows.size + + flows.each_with_index do |flow, index| + # If we have more than one flow, we provide a fresh copy of the event + # to each flow. The flow's filters and encoder can then mutate the event + # however it pleases without affecting later flows. We don't need to + # duplicate the event for the last flow since it won't be re-used + # after that anymore. + current_event = (index == flows_size - 1) ? event : deep_dup_event(event) + flow.write(current_event) + end + + event + end + # @return [Array] an array of all flow elements without any `nil` # values def to_ary @@ -144,5 +191,33 @@ module Rackstash def to_s @flows.to_s end + + private + + # Create a deep duplicate of an event hash. It is assumed that the input + # event follows the normalized structure as generated by + # {Fields::Hash#to_h}. + # + # @param obj [Object] an object to duplicate. When initially called, this is + # expected to be an event hash + # @return [Object] a deep copy of the given `obj` if it was an `Array` or + # `Hash`, the original `obj` otherwise. + def deep_dup_event(obj) + case obj + when Hash + hash = obj.dup + obj.each_pair do |key, value| + # {Rackstash::Fields::Hash} already guarantees that keys are always + # frozen Strings. We don't need to dup them. + hash[key] = deep_dup_event(value) + end + when Array + obj.map { |value| deep_dup_event(value) } + else + # All leaf-values in the event are either frozen or not freezable / + # dupable. They can be used as is. See {AbstractCollection#normalize} + obj + end + end end end diff --git a/lib/rackstash/logger.rb b/lib/rackstash/logger.rb index eea2c7c..a2a2f42 100644 --- a/lib/rackstash/logger.rb +++ b/lib/rackstash/logger.rb @@ -10,7 +10,7 @@ require 'concurrent' require 'rackstash/buffer_stack' require 'rackstash/formatter' require 'rackstash/message' -require 'rackstash/sink' +require 'rackstash/flows' module Rackstash # The Logger is the main entry point for Rackstash. It provides an interface @@ -43,9 +43,9 @@ module Rackstash # By default we use {PROGNAME}. attr_accessor :progname - # @return [Sink] the log {Sink} which flushes a {Buffer} to one or more - # external log targets like a file, a socket, ... - attr_reader :sink + # @return [Flows] the list of defined {Flow} objects which are responsible + # for transforming, encoding, and persisting the log events. + attr_reader :flows # Create a new Logger instance. # @@ -109,12 +109,12 @@ module Rackstash def initialize(*flows, level: DEBUG, progname: PROGNAME, formatter: Formatter.new, &block) @buffer_stack = Concurrent::ThreadLocalVar.new - @sink = Rackstash::Sink.new(*flows) + @flows = Rackstash::Flows.new(*flows) self.level = level self.progname = progname self.formatter = formatter - if block_given? && (flow = @sink.flows.last) + if block_given? && (flow = @flows.last) if block.arity == 0 flow.instance_eval(&block) else @@ -123,9 +123,9 @@ module Rackstash end end - # Add a message to the current buffer without any further formatting. If - # the current {Buffer} is bufering, the message will just be added. Else, - # it will be flushed to the {#sink} directly. + # Add a message to the current {Buffer} without any further formatting. If + # the current buffer is bufering, the message will just be added. Else, + # it will be flushed to the configured {#flows} directly. # # @param msg [Object] # @return [String] the passed `msg` @@ -161,6 +161,11 @@ module Rackstash buffer.fields[key] = value end + # (see Flows#close) + def close + @flows.close + end + # Set the base log level as either one of the {SEVERITIES} or a # String/Symbol describing the level. When logging a message, it will only # be added if its log level is at or above the base level defined here @@ -171,45 +176,14 @@ module Rackstash @level = Rackstash.severity(severity) end - # (see Sink#close) - def close - @sink.close - end - - # (see Sink#default_fields) - def default_fields - @sink.default_fields - end - - # (see Sink#default_fields=) - def default_fields=(fields) - @sink.default_fields = fields - end - - # (see Sink#default_tags) - def default_tags - @sink.default_tags - end - - # (see Sink#default_tags=) - def default_tags=(tags) - @sink.default_tags = tags - end - # (see Buffer#fields) def fields buffer.fields end - # (see Sink#flows) - # @!attribute [r] flows - def flows - @sink.flows - end - - # (see Sink#reopen) + # (see Flows#reopen) def reopen - @sink.reopen + @flows.reopen end # (see Buffer#tag) @@ -486,7 +460,7 @@ module Rackstash private def buffer_stack - @buffer_stack.value ||= BufferStack.new(@sink) + @buffer_stack.value ||= BufferStack.new(@flows) end def buffer diff --git a/lib/rackstash/sink.rb b/lib/rackstash/sink.rb deleted file mode 100644 index d8dc6d6..0000000 --- a/lib/rackstash/sink.rb +++ /dev/null @@ -1,182 +0,0 @@ -# frozen_string_literal: true -# -# Copyright 2017 Holger Just -# -# This software may be modified and distributed under the terms -# of the MIT license. See the LICENSE.txt file for details. - -require 'rackstash/flows' - -module Rackstash - class Sink - # @return [Flows] the defined {Flows} which are responsible for - # transforming, encoding, and persisting an event Hash. - attr_reader :flows - - # @param flows [Array, Flow, Adapters::Adapter, Object] - # an array of {Flow}s or a single {Flow}, respectivly object which can be - # used as a {Flow}'s adapter. See {Flow#initialize}. - def initialize(*flows) - @flows = Rackstash::Flows.new(*flows) - - @default_fields = {} - @default_tags = [] - end - - # @return [Hash, Proc] the default fields which get deep merged into the - # created event Hash when flushing a {Buffer}. - def default_fields - @default_fields - end - - # The default fields get deep merged into the created event hash when - # flushing a {Buffer}. They can be given either as a `Hash` or a `Proc` - # which in turn returns a `Hash` on `call`. The `Hash` can be nested - # arbitrarily deep. - # - # Each Hash value can again optionally be a Proc which in turn is expected - # to return a field value on `call`. You can set nested Hashes or Arrays and - # define nested Procs which in turn are called recursively when flushing a - # {Buffer}. That way, you can set lazy-evaluated values. - # - # @example - # # All three values set the same default fields - # sink.default_fields = {'beep' => 'boop'} - # sink.default_fields = -> { { 'beep' => 'boop' } } - # sink.default_fields = { 'beep' => -> { 'boop' } } - # - # @param fields [#to_hash, Proc] The default fields to be merged into the - # event Hash when flushing a {Buffer}. - # @raise [TypeError] if `fields` is neither a Proc nor can be converted to a - # Hash - # @return [Hash, Proc] the given `fields` - def default_fields=(fields) - fields = fields.to_hash if fields.respond_to?(:to_hash) - unless fields.is_a?(Hash) || fields.is_a?(Proc) - raise TypeError, 'default_fields must be a Hash or Proc' - end - - @default_fields = fields - end - - # @return [Array<#to_s, Proc>, Proc] the default tags are added to the - # `"@tags"` field of the created event Hash when flushing a {Buffer}. They - # can be given either as an `Array` of `String`s or a `Proc` which in turn - # returns an `Array` of `String`s on `call`. - def default_tags - @default_tags - end - - # The default tags are added to the `"@tags"` field of the created event - # Hash when flushing a {Buffer}. They can be given either as an `Array` of - # `String`s or a `Proc` which in turn returns an `Array` of `String`s on - # `call`. - # - # Each value of the Array can again optionally be a Proc which in turn is - # expected to return a String on `call`. All the (potentially nested) procs - # are called recursively when flushing a {Buffer}. That way, you can set - # lazy-evaluated values. - # - # @example - # # All three values set the same default tags - # sink.default_tags = ['important', 'request'] - # sink.default_tags = -> { ['important', 'request'] } - # sink.default_tags = [ 'important', -> { 'request' } } - # - # @param tags [#to_ary, Proc] The default tags to be merged into the event - # Hash's `"@tags"` field when flushing a {Buffer} - # @raise [TypeError] if `tags` is neither a Proc nor can be converted to an - # Array - # @return [Array, Proc] the given `tags` - def default_tags=(tags) - tags = tags.to_ary if tags.respond_to?(:to_ary) - unless tags.is_a?(Array) || tags.is_a?(Proc) - raise TypeError, 'default_tags must be an Array or Proc' - end - - @default_tags = tags - end - - # Close the log adapter for each configured {Flow}. This might be a no-op - # depending on each flow's adapter. - # - # @return [nil] - def close - @flows.each(&:close) - nil - end - - # Close and re-open the log adapter for each configured {Flow}. This might - # be a no-op depending on each flow's adapter. - # - # @return [nil] - def reopen - @flows.each(&:reopen) - nil - end - - # Create an event hash from the given `buffer` and write it to each of the - # defined {#flows}. - # - # First, we transform the given `buffer` to an event hash using - # {Buffer#to_event}. Here, we are merging the {#default_fields} and - # {#default_tags} into the event Hash. - # - # A fresh copy of the resulting event hash is then written to each defined - # {Flow}. This allows each flow to alter the event in any way without - # affecting the other flows. - # - # @param buffer [Buffer] The buffer cotnaining the data to write to the - # {#flows}. - # @return [Buffer] the given `buffer` - def write(buffer) - event = buffer.to_event(fields: @default_fields, tags: @default_tags) - - # Memoize the current list of flows for the rest of the method to make - # sure it doesn't change while we work with it. - flows = @flows.to_a - flows_size = flows.size - - flows.each_with_index do |flow, index| - # If we have more than one flow, we provide a fresh copy of the event - # to each flow. The flow's filter and codec can then mutate the event - # however it pleases without affecting later flows. We don't need to - # duplicate the event for the last flow since it won't be re-used - # after that anymore. - current_event = (index == flows_size - 1) ? event : deep_dup_event(event) - - flow.write(current_event) - end - - buffer - end - - private - - # Create a deep duplicate of an event hash. It is assumed that the input - # event follows the normalized structure as generated by - # {Fields::Hash#to_h}. - # - # @param obj [Object] an object to duplicate. When initially called, this is - # expected to be an event hash - # @return [Object] a deep copy of the given `obj` if it was an `Array` or - # `Hash`, the original `obj` otherwise. - def deep_dup_event(obj) - case obj - when Hash - hash = obj.dup - obj.each_pair do |key, value| - # {Rackstash::Fields::Hash} already guarantees that keys are always - # frozen Strings. We don't need to dup them. - hash[key] = deep_dup_event(value) - end - when Array - obj.map { |value| deep_dup_event(value) } - else - # All leaf-values in the event are either frozen or not freezable / - # dupable. They can be used as is. See {AbstractCollection#normalize} - obj - end - end - end -end diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index 3aeb2d6..d2fb1ee 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -11,8 +11,8 @@ require 'rackstash/buffer' describe Rackstash::Buffer do let(:buffer_options) { {} } - let(:sink) { instance_double(Rackstash::Sink) } - let(:buffer) { described_class.new(sink, **buffer_options) } + let(:flows) { instance_double(Rackstash::Flows) } + let(:buffer) { described_class.new(flows, **buffer_options) } describe '#allow_silent?' do it 'defaults to true' do @@ -290,15 +290,26 @@ describe Rackstash::Buffer do end context 'when pending?' do + let(:time) { Time.now } + let(:message) { double(message: 'Hello World!', time: time) } + let(:event) { + { + 'message' => [message], + 'tags' => [], + '@timestamp' => Time.now + } + } + before do - buffer.add_message double(message: 'Hello World!', time: Time.now) + buffer.add_message(message) # We might call Buffer#flush during the following tests - allow(sink).to receive(:write).with(buffer).once + allow(buffer).to receive(:to_event).and_return(event) + allow(flows).to receive(:write).with(event).once end - it 'flushes the buffer to the sink' do - expect(sink).to receive(:write).with(buffer).once + it 'flushes the buffer to the flows' do + expect(flows).to receive(:write).with(event).once buffer.flush end @@ -314,8 +325,8 @@ describe Rackstash::Buffer do end context 'when not pending?' do - it 'does not flushes the buffer to the sink' do - expect(sink).not_to receive(:write) + it 'does not flushes the buffer to the flows' do + expect(flows).not_to receive(:write) buffer.flush end @@ -497,33 +508,6 @@ describe Rackstash::Buffer do end describe '#to_event' do - it 'does not merge field and tags if empty' do - expect(buffer).not_to receive(:fields) - expect(buffer).not_to receive(:tags) - - buffer.to_event(fields: {}, tags: []) - end - - it 'merges fields and tags as values' do - fields = { foo: :bar } - tags = ['default_tag'] - - expect(buffer.fields).to receive(:deep_merge).with(fields, force: false) - expect(buffer.tags).to receive(:merge).with(tags) - - buffer.to_event(fields: fields, tags: tags) - end - - it 'merges fields and tags as Procs' do - fields = -> {} - tags = -> {} - - expect(buffer.fields).to receive(:deep_merge).with(fields, force: false) - expect(buffer.tags).to receive(:merge).with(tags) - - buffer.to_event(fields: fields, tags: tags) - end - it 'creates an event hash' do message = double(message: 'Hello World', time: Time.now) allow(message) diff --git a/spec/rackstash/buffer_stack_spec.rb b/spec/rackstash/buffer_stack_spec.rb index 11d4c28..58c2358 100644 --- a/spec/rackstash/buffer_stack_spec.rb +++ b/spec/rackstash/buffer_stack_spec.rb @@ -10,13 +10,13 @@ require 'spec_helper' require 'rackstash/buffer_stack' describe Rackstash::BufferStack do - let(:sink) { instance_double(Rackstash::Sink) } - let(:stack) { described_class.new(sink) } + let(:flows) { instance_double(Rackstash::Flows) } + let(:stack) { described_class.new(flows) } describe '#current' do it 'initializes a buffer' do expect(stack.current).to be_a Rackstash::Buffer - expect(stack.current.sink).to equal sink + expect(stack.current.flows).to equal flows end it 'repeatedly returns the same buffer' do diff --git a/spec/rackstash/flows_spec.rb b/spec/rackstash/flows_spec.rb index 0fd77d5..9511d4d 100644 --- a/spec/rackstash/flows_spec.rb +++ b/spec/rackstash/flows_spec.rb @@ -111,6 +111,28 @@ describe Rackstash::Flows do end end + describe '#close' do + it 'calls close on all flows' do + [a_flow, a_flow].each do |flow| + expect(flow).to receive(:close) + flows << flow + end + + expect(flows.close).to be_nil + end + end + + describe '#reopen' do + it 'calls reopen on all flows' do + [a_flow, a_flow].each do |flow| + expect(flow).to receive(:reopen) + flows << flow + end + + expect(flows.reopen).to be_nil + end + end + describe '#each' do it 'yield each flow' do flow1 = a_flow @@ -257,4 +279,29 @@ describe Rackstash::Flows do expect(flows.to_s).to eql flows.to_a.to_s end end + + describe '#write' do + it 'flushes the buffer to all flows' do + event_spec = { + 'foo' => 'bar', + 'tags' => [], + '@timestamp' => instance_of(Time) + } + + [a_flow, a_flow].each do |flow| + expect(flow).to receive(:write).with(event_spec) + flows << flow + end + + # only the first event is duplicated + expect(flows).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered + event_spec.each_value do |arg| + expect(flows).to receive(:deep_dup_event).with(arg).and_call_original.ordered + end + + # During flush, we create a single event, duplicate it and write each to + # each of the flows. + flows.write('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc) + end + end end diff --git a/spec/rackstash/logger_spec.rb b/spec/rackstash/logger_spec.rb index 34b0c58..6ffa106 100644 --- a/spec/rackstash/logger_spec.rb +++ b/spec/rackstash/logger_spec.rb @@ -15,7 +15,7 @@ describe Rackstash::Logger do describe '#initialize' do it 'accepts flows' do - expect(Rackstash::Sink).to receive(:new).with('output.log') + expect(Rackstash::Flows).to receive(:new).with('output.log') described_class.new('output.log') end @@ -102,44 +102,15 @@ describe Rackstash::Logger do end describe '#close' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:close) + it 'forwards to the flows' do + expect(logger.flows).to receive(:close) logger.close end end - describe '#default_fields' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:default_fields) - logger.default_fields - end - end - - describe '#default_fields=' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:default_fields=).with('key' => 'value') - logger.default_fields = { 'key' => 'value' } - end - end - - describe '#default_tags' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:default_tags) - logger.default_tags - end - end - - describe '#default_tags=' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:default_tags=).with(['tag']) - logger.default_tags = ['tag'] - end - end - describe '#flows' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:flows) - logger.flows + it 'is a Rackstash::Flows' do + expect(logger.flows).to be_instance_of Rackstash::Flows end end @@ -195,18 +166,12 @@ describe Rackstash::Logger do end describe '#reopen' do - it 'forwards to the sink' do - expect(logger.sink).to receive(:reopen) + it 'forwards to the flows' do + expect(logger.flows).to receive(:reopen) logger.reopen end end - describe '#sink' do - it 'returns the created sink' do - expect(logger.sink).to be_a Rackstash::Sink - end - end - describe '#fields' do it 'gets the current buffer\'s fields' do buffer = instance_double('Rackstash::Buffer') @@ -562,7 +527,7 @@ describe Rackstash::Logger do end it 'buffers multiple messages' do - expect(logger.sink).to receive(:write).once + expect(logger.flows).to receive(:write).once logger.with_buffer do logger.add 1, 'Hello World' diff --git a/spec/rackstash/sink_spec.rb b/spec/rackstash/sink_spec.rb deleted file mode 100644 index 107f12f..0000000 --- a/spec/rackstash/sink_spec.rb +++ /dev/null @@ -1,164 +0,0 @@ -# frozen_string_literal: true -# -# Copyright 2017 Holger Just -# -# This software may be modified and distributed under the terms -# of the MIT license. See the LICENSE.txt file for details. - -require 'spec_helper' - -require 'rackstash/sink' - -require 'rackstash/buffer' -require 'rackstash/flows' -require 'rackstash/flow' - -describe Rackstash::Sink do - def a_flow - flow = instance_double('Rackstash::Flow') - allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true) - flow - end - - let(:flows) { [a_flow] } - let(:sink) { described_class.new(*flows) } - - describe 'initialize' do - # We deliberately use the real Rackstash::Flows class here to server as an - # integration test - it 'wraps a single flow in a flows list' do - expect(Rackstash::Flows).to receive(:new).with(*flows) - .and_call_original - - sink = described_class.new(*flows) - expect(sink.flows).to be_a Rackstash::Flows - expect(sink.flows.to_a).to eql flows - end - - it 'wraps multiple flows in a flows list' do - flows = [a_flow, a_flow] - - expect(Rackstash::Flows).to receive(:new).with(*flows) - .and_call_original - sink = described_class.new(*flows) - - expect(sink.flows).to be_a Rackstash::Flows - expect(sink.flows.to_a).to eql flows - end - end - - describe '#default_fields' do - it 'can set a proc' do - a_proc = proc { nil } - expect(a_proc).not_to receive(:call) - - sink.default_fields = a_proc - expect(sink.default_fields).to equal a_proc - end - - it 'can set a Hash' do - hash = { foo: :bar } - sink.default_fields = hash - - expect(sink.default_fields).to equal hash - end - - it 'can set a Hash-like object' do - hash_alike = double('hash') - expect(hash_alike).to receive(:to_hash).and_return(foo: :bar) - - sink.default_fields = hash_alike - expect(sink.default_fields).to eql(foo: :bar) - expect(sink.default_fields).not_to equal hash_alike - end - - it 'refuses invalid fields' do - expect { sink.default_fields = nil }.to raise_error TypeError - expect { sink.default_fields = 42 }.to raise_error TypeError - expect { sink.default_fields = ['foo'] }.to raise_error TypeError - end - end - - describe '#default_tags' do - it 'can set a proc' do - tags = proc { nil } - expect(tags).not_to receive(:call) - - sink.default_tags = tags - expect(sink.default_tags).to equal tags - end - - it 'can set an Array' do - array = [:foo, 'bar'] - sink.default_tags = array - - expect(sink.default_tags).to equal array - end - - it 'can set an Array-like object' do - array_alike = double('array') - expect(array_alike).to receive(:to_ary).and_return([:foo]) - - sink.default_tags = array_alike - expect(sink.default_tags).to eql [:foo] - expect(sink.default_tags).not_to equal array_alike - end - - it 'refuses invalid fields' do - expect { sink.default_tags = nil }.to raise_error TypeError - expect { sink.default_tags = 42 }.to raise_error TypeError - expect { sink.default_tags = { foo: :bar } }.to raise_error TypeError - end - end - - describe '#close' do - let(:flows) { [a_flow, a_flow] } - - it 'calls close on all flows' do - expect(flows).to all receive(:close) - expect(sink.close).to be_nil - end - end - - describe '#reopen' do - let(:flows) { [a_flow, a_flow] } - - it 'calls reopen on all flows' do - expect(flows).to all receive(:reopen) - expect(sink.reopen).to be_nil - end - end - - describe '#write' do - let(:flows) { - [a_flow, a_flow].each do |flow| - allow(flow).to receive(:write) - end - } - let(:buffer) { Rackstash::Buffer.new(sink) } - - it 'merges default_fields and default_tags' do - expect(buffer).to receive(:to_event).with(fields: {}, tags: []) - sink.write(buffer) - end - - it 'flushes the buffer to all flows' do - event_spec = { - 'message' => [], - 'tags' => [], - '@timestamp' => instance_of(Time) - } - - # only the first event is duplicated - expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered - event_spec.each_value do |arg| - expect(sink).to receive(:deep_dup_event).with(arg).and_call_original.ordered - end - - # During flush, we create a single event, duplicate it and write each to - # each of the flows. - expect(flows).to all receive(:write).with(event_spec) - sink.write(buffer) - end - end -end