1
0
mirror of https://github.com/meineerde/rackstash.git synced 2026-01-31 17:27:13 +00:00

Remove Rackstash::Sink

The Sink was a vehicle to transport some shared state between the logger
instance and the buffers, most notably the default fields and default
tags.

It turns out however, that error handling during merging of the
default_fields and default_tags is non trivial since there, the buffer
is in sime kind of limbo: users won't write to it anymore (and thus
don't expect exceptions there) while the error handling of the
individual flows is not yet reached. Since users can specify procs in
default_fields, they can still raise for whatever user-defined reason.

Luckily, the insertion of default fields and default tags can easily be
done by a filter later anyway, under the protection of the flow's error
handling in Flow#write. This allows us just remove the whole concept of
the sink and just pass the Flows object to the Buffer.

Not having to merge default_fields during event creation significantly
simplifies Buffer#to_event which was rather ugly to begin with but now
turned out quite beatifully.
This commit is contained in:
Holger Just 2017-10-10 01:49:55 +02:00
parent 72dae8e222
commit 3c27f2ae70
12 changed files with 233 additions and 551 deletions

View File

@ -17,9 +17,8 @@ module Rackstash
# Each time, a message is logged or a field or tag is set to a {Logger}, it
# is set on a Buffer. Each Buffer belongs to exactly one {BufferStack} (and
# thus in turn to exactly one {Logger}) which creates it and controls its
# complete life cycle. The data a buffer holds can be exported via a {Sink}
# and passed on to one or more {Flow}s which send the data to an external
# log receiver.
# complete life cycle. The data a buffer holds can be written to one or more
# {Flow}s which send the data to an external log receiver.
#
# Most methods of the Buffer are directly exposed to the user-accessible
# {Logger}. The Buffer class itself is considered private and should not be
@ -28,18 +27,17 @@ module Rackstash
# by exposing a Buffer to each thread as the "current Buffer".
#
# Buffers can be buffering or non-buffering. While this doesn't affect the
# behavior of the Buffer itself, it affects when the Buffer is flushed to a
# {Sink} and what happens to the data stored in the Buffer after that.
# behavior of the Buffer itself, it affects when the Buffer is flushed to the
# flows and what happens to the data stored in the Buffer after that.
#
# Generally, a non-buffering Buffer will be flushed to the sink after each
# logged message. This thus mostly resembles the way traditional loggers work
# in Ruby. A buffering Buffer however holds log messages for a longer time,
# e.g., for the duration of a web request. Only after the request finished
# all log messages and stored fields for this request will be flushed to the
# {Sink} as a single log event.
# Generally, a non-buffering Buffer will be flushed after each logged message.
# This thus mostly resembles the way traditional loggers work in Ruby. A
# buffering Buffer however holds log messages for a longer time, e.g., for the
# duration of a web request. Only after the request finished, all log messages
# and stored fields for this request will be flushed as a single log event.
#
# While the fields structure of a Buffer is geared towards the format used by
# Logstash, it can be adaptd in many ways suited for a specific log target.
# While the field structure of a Buffer is geared towards the format used by
# Logstash, it can be adapted in many ways suited for a specific log target.
#
# @note The Buffer class is designed to be created and used by its responsible
# {BufferStack} object only and is not intended used from multiple Threads
@ -55,19 +53,22 @@ module Rackstash
FIELD_VERSION, # the version of the Logstash JSON schema. Usually "1"
].freeze
# @return [Sink] the log {Sink} where the buffer is eventually flushed to
attr_reader :sink
# @return [Flows] the list of defined {Flow} objects which are responsible
# for transforming, encoding, and persisting the log events.
attr_reader :flows
# @param flows [Flows] a list of {Flow} objects where this buffer eventually
# writes to
# @param buffering [Boolean] When set to `true`, this buffer is considered
# to be buffering data. When buffering, logged messages will not be
# flushed immediately but only with an explicit call to {#flush}.
# @param allow_silent [Boolean] When set to `true` the data in this buffer
# will be flushed to the sink, even if there
# were just added fields or tags without any logged messages. If this is
# `false` and there were no messages logged with {#add_message}, the
# buffer will not be flushed to the sink but will be silently dropped.
def initialize(sink, buffering: true, allow_silent: true)
@sink = sink
# will be flushed to the flows, even if there were just added fields or
# tags without any logged messages. If this is `false` and there were no
# messages logged with {#add_message}, the buffer will not be flushed but
# will be silently dropped.
def initialize(flows, buffering: true, allow_silent: true)
@flows = flows
@buffering = !!buffering
@allow_silent = !!allow_silent
@ -151,11 +152,11 @@ module Rackstash
end
# When set to `true` in {#initialize}, the data in this buffer will be
# flushed to the sink, even if there were just added fields or tags but no
# messages.
# flushed to the {#flows}, even if there were just added fields or tags but
# no messages.
#
# If this is `false` and there were no messages logged with {#add_message},
# the buffer will not be flushed to the sink but will be silently dropped.
# the buffer will not be flushed to the flows but will be silently dropped.
#
# @return [Boolean]
def allow_silent?
@ -191,7 +192,7 @@ module Rackstash
@fields ||= Rackstash::Fields::Hash.new(forbidden_keys: FORBIDDEN_FIELDS)
end
# Flush the current buffer to the log {#sink} if it is pending.
# Flush the current buffer to the {#flows} if it is pending.
#
# After the flush, the existing buffer should not be used anymore. You
# should either call {#clear} to remove all volatile data or create a new
@ -202,7 +203,7 @@ module Rackstash
def flush
return unless pending?
@sink.write(self)
@flows.write(self.to_event)
self
end
@ -217,7 +218,7 @@ module Rackstash
end
# This flag denotes whether the current buffer holds flushable data. By
# default, a new buffer is not pending and will not be flushed to the sink.
# default, a new buffer is not pending and will not be flushed.
# Each time there is a new message logged, this is set to `true` for the
# buffer. For changes of tags or fields or when setting the {#timestamp},
# the `pending?` flag is only flipped to `true` if {#allow_silent?} is set
@ -283,15 +284,13 @@ module Rackstash
# Create an event hash from `self`.
#
# * We take the buffer's existing fields and deep-merge the provided
# `fields` into them. Existing fields on the buffer will always have
# precedence here.
# * We add the given additional `tags` to the buffer's tags and add them as
# a raw array of strings to the `event['tags']` field.
# * We add the buffer's array of messages to `event['message']`. This field
# now contains an array of {Message} objects.
# * We add the buffer's timestamp to the `event['@timestamp]` field as an
# ISO 8601 formatted string. The timestamp is always in UTC.
# * It contains the all of the current buffer's logged fields
# * We add the buffer's tags and add them as an array of strings to the
# `event['tags']` field.
# * We add the buffer's list of messages to `event['message']`. This field
# thus contains an array of {Message} objects.
# * We add the buffer's timestamp to the `event['@timestamp]` as a `Time`
# object in UTC.
#
# The typical event emitted here looks like this:
#
@ -318,24 +317,10 @@ module Rackstash
# are either `Hash`, `Array`, frozen `String`, `Integer` or `Float` objects.
# All hashes (including nested hashes) use `String` keys.
#
# @param fields [Hash<String => Object>, Proc] additional fields which are
# merged with this Buffer's fields in the returned event Hash
# @param tags [Array<String>, Proc] additional tags which are merged
# added to Buffer's tags in the returned event Hash
# @return [Hash] the event expected by the event {Filters}.
def to_event(fields: {}, tags: [])
if (@fields.nil? || @fields.empty?) && ::Hash === fields && fields.empty?
event = {}
else
event = self.fields.deep_merge(fields, force: false).to_h
end
if (@tags.nil? || @tags.empty?) && ::Array === tags && tags.empty?
event[FIELD_TAGS] = []
else
event[FIELD_TAGS] = self.tags.merge(tags).to_a
end
def to_event
event = fields.to_h
event[FIELD_TAGS] = tags.to_a
event[FIELD_MESSAGE] = messages
event[FIELD_TIMESTAMP] = timestamp
@ -344,14 +329,13 @@ module Rackstash
private
# Non buffering buffers, i.e., those with `buffering: false`, flush
# themselves to the sink whenever there is something logged to it. That way,
# such a buffer acts like a regular old Logger would: it just flushes a
# logged message to its log device as soon as it is logged.
# Non-buffering buffers, i.e., those with `buffering: false`, flush
# themselves to the defined flows whenever there is something logged to it.
# That way, such a buffer acts like a regular old Logger would: it just
# flushes a logged message to its log device as soon as it is logged.
#
# By calling `auto_flush`, the current buffer is flushed and cleared
# Flush and clear the current buffer if necessary.
# By calling `auto_flush`, the current buffer is flushed and cleared if
# necessary.
def auto_flush
return if buffering?

View File

@ -13,11 +13,12 @@ module Rackstash
# is used by exactly one {Logger}. The responsible {Logger} ensures that each
# BufferStack is only accessed from a single thread.
class BufferStack
# @return [Sink] the log sink where the buffers are eventually flushed to
attr_reader :sink
# @return [Flows] the list of defined {Flow} objects which are responsible
# for transforming, encoding, and persisting the log events.
attr_reader :flows
def initialize(sink)
@sink = sink
def initialize(flows)
@flows = flows
@stack = []
end
@ -27,7 +28,7 @@ module Rackstash
#
# @return [Buffer]
def current
@stack.last || Buffer.new(@sink, buffering: false).tap do |buffer|
@stack.last || Buffer.new(@flows, buffering: false).tap do |buffer|
@stack.push buffer
end
end
@ -43,7 +44,7 @@ module Rackstash
# {Buffer}. See {Buffer#initialize} for allowed values.
# @return [Buffer] the newly created buffer
def push(buffer_args = {})
buffer = Buffer.new(sink, buffer_args)
buffer = Buffer.new(@flows, buffer_args)
@stack.push buffer
buffer

View File

@ -104,7 +104,7 @@ module Rackstash
# the writing of an individual event. Any other return value of filters is
# ignored.
#
# @param event [Hash] an event hash, see {Sink#write} for details
# @param event [Hash] an event hash, see {Buffer#to_event} for details
# @return [Hash, false] the filtered event or `false` if any of the
# filters returned `false`
def call(event)

View File

@ -14,7 +14,7 @@ module Rackstash
# A Flow is responsible for taking a raw log event (originally corresponding
# to a single {Buffer}), transforming it and finally sending it to an adapte
# for persistence. A Flow instance is normally tied to a {Flows} list which in
# turn belongs to a log {Sink}.
# turn belongs to a {Logger}.
#
# In order to transform and persist log events, a Flow uses several
# components:
@ -54,11 +54,11 @@ module Rackstash
# end
# end
#
# # Write an event. This is normally done by the responsible Rackstash::Sink
# # Write an event. This is normally done by a Rackstash::Buffer
# flow.write(an_event)
#
# The event which eventually gets written to the flow is created by the {Sink}
# of a {Logger}.
# The event which eventually gets written to the flow is created from a Buffer
# with {Buffer#to_event}.
class Flow
# @return [Adapters::Adapter] the log adapter
attr_reader :adapter
@ -92,7 +92,7 @@ module Rackstash
end
# Close the log adapter if supported. This might be a no-op if the adapter
# does not support closing. This method is called by the logger's {Sink}.
# does not support closing.
#
# @return [nil]
def close!
@ -205,7 +205,7 @@ module Rackstash
alias filter_prepend filter_unshift
# Re-open the log adapter if supported. This might be a no-op if the adapter
# does not support reopening. This method is called by the logger's {Sink}.
# does not support reopening.
#
# @return [nil]
def reopen!
@ -226,23 +226,21 @@ module Rackstash
end
# Filter, encode and write the given `event` to the configured {#adapter}.
# This method is called by the logger's {Sink} to write a log event. The
# given `event` is updated in-place and should not be re-used afterwards
# anymore.
# The given `event` is updated in-place by the filters and encoder of the
# flow and should not be re-used afterwards anymore.
#
# 1. At first, we filter the event with the defined filters in their given
# order. If any of the filters returns `false`, the writing will be
# 1. At first, we filter the event with the defined {#filter_chain} in their
# given order. If any of the filters returns `false`, the writing will be
# aborted. No further filters will be applied and the event will not be
# written to the adapter. See {FilterChain#call} for details.
# 2. We encode the event to a format suitable for the adapter using the
# configured {#encoder}.
# 3. Finally, the encoded event will be passed to the {#adapter} to be send
# 3. Finally, the encoded event will be passed to the {#adapter} to be sent
# to the actual log target, e.g. a file or an external log receiver.
#
# @param event [Hash] an event hash
# @return [Boolean] `true` if the event was written to the adapter, `false`
# otherwise
# @see Sink#write
def write!(event)
# Silently abort writing if any filter (and thus the while filter chain)
# returns `false`.

View File

@ -11,7 +11,8 @@ require 'rackstash/flow'
module Rackstash
# The `Flows` class provides a thread-safe list of {Flow} objects which are
# used to dispatch a single log events to multiple flows from the {Sink}.
# used to write a single log event of a {Buffer} to multiple flows. Each
# {Logger} object has an associated Flows object to define the logger's flows.
class Flows
# @param flows [::Array<Flow, Adapters::Adapter, Object>] the {Flow} objects
# which should be part of the list. If any of the arguments is not a
@ -133,6 +134,52 @@ module Rackstash
end
alias size length
# Close the log adapter for each configured {Flow}. This might be a no-op
# depending on each flow's adapter.
#
# @return [nil]
def close
each(&:close)
nil
end
# Close and re-open the log adapter for each configured {Flow}. This might
# be a no-op depending on each flow's adapter.
#
# @return [nil]
def reopen
each(&:reopen)
nil
end
# Write an event `Hash` to each of the defined flows. The event is usually
# created from {Buffer#to_event}.
#
# We write a fresh deep-copy of the event hash to each defined {Flow}.
# This allows each flow to alter the event in any way without affecting the
# others.
#
# @param event [Hash] an event `Hash`
# @return [Hash] the given `event`
def write(event)
# Memoize the current list of flows for the rest of the method to make
# sure it doesn't change while we work with it.
flows = to_a
flows_size = flows.size
flows.each_with_index do |flow, index|
# If we have more than one flow, we provide a fresh copy of the event
# to each flow. The flow's filters and encoder can then mutate the event
# however it pleases without affecting later flows. We don't need to
# duplicate the event for the last flow since it won't be re-used
# after that anymore.
current_event = (index == flows_size - 1) ? event : deep_dup_event(event)
flow.write(current_event)
end
event
end
# @return [Array<Flow>] an array of all flow elements without any `nil`
# values
def to_ary
@ -144,5 +191,33 @@ module Rackstash
def to_s
@flows.to_s
end
private
# Create a deep duplicate of an event hash. It is assumed that the input
# event follows the normalized structure as generated by
# {Fields::Hash#to_h}.
#
# @param obj [Object] an object to duplicate. When initially called, this is
# expected to be an event hash
# @return [Object] a deep copy of the given `obj` if it was an `Array` or
# `Hash`, the original `obj` otherwise.
def deep_dup_event(obj)
case obj
when Hash
hash = obj.dup
obj.each_pair do |key, value|
# {Rackstash::Fields::Hash} already guarantees that keys are always
# frozen Strings. We don't need to dup them.
hash[key] = deep_dup_event(value)
end
when Array
obj.map { |value| deep_dup_event(value) }
else
# All leaf-values in the event are either frozen or not freezable /
# dupable. They can be used as is. See {AbstractCollection#normalize}
obj
end
end
end
end

View File

@ -10,7 +10,7 @@ require 'concurrent'
require 'rackstash/buffer_stack'
require 'rackstash/formatter'
require 'rackstash/message'
require 'rackstash/sink'
require 'rackstash/flows'
module Rackstash
# The Logger is the main entry point for Rackstash. It provides an interface
@ -43,9 +43,9 @@ module Rackstash
# By default we use {PROGNAME}.
attr_accessor :progname
# @return [Sink] the log {Sink} which flushes a {Buffer} to one or more
# external log targets like a file, a socket, ...
attr_reader :sink
# @return [Flows] the list of defined {Flow} objects which are responsible
# for transforming, encoding, and persisting the log events.
attr_reader :flows
# Create a new Logger instance.
#
@ -109,12 +109,12 @@ module Rackstash
def initialize(*flows, level: DEBUG, progname: PROGNAME, formatter: Formatter.new, &block)
@buffer_stack = Concurrent::ThreadLocalVar.new
@sink = Rackstash::Sink.new(*flows)
@flows = Rackstash::Flows.new(*flows)
self.level = level
self.progname = progname
self.formatter = formatter
if block_given? && (flow = @sink.flows.last)
if block_given? && (flow = @flows.last)
if block.arity == 0
flow.instance_eval(&block)
else
@ -123,9 +123,9 @@ module Rackstash
end
end
# Add a message to the current buffer without any further formatting. If
# the current {Buffer} is bufering, the message will just be added. Else,
# it will be flushed to the {#sink} directly.
# Add a message to the current {Buffer} without any further formatting. If
# the current buffer is bufering, the message will just be added. Else,
# it will be flushed to the configured {#flows} directly.
#
# @param msg [Object]
# @return [String] the passed `msg`
@ -161,6 +161,11 @@ module Rackstash
buffer.fields[key] = value
end
# (see Flows#close)
def close
@flows.close
end
# Set the base log level as either one of the {SEVERITIES} or a
# String/Symbol describing the level. When logging a message, it will only
# be added if its log level is at or above the base level defined here
@ -171,45 +176,14 @@ module Rackstash
@level = Rackstash.severity(severity)
end
# (see Sink#close)
def close
@sink.close
end
# (see Sink#default_fields)
def default_fields
@sink.default_fields
end
# (see Sink#default_fields=)
def default_fields=(fields)
@sink.default_fields = fields
end
# (see Sink#default_tags)
def default_tags
@sink.default_tags
end
# (see Sink#default_tags=)
def default_tags=(tags)
@sink.default_tags = tags
end
# (see Buffer#fields)
def fields
buffer.fields
end
# (see Sink#flows)
# @!attribute [r] flows
def flows
@sink.flows
end
# (see Sink#reopen)
# (see Flows#reopen)
def reopen
@sink.reopen
@flows.reopen
end
# (see Buffer#tag)
@ -486,7 +460,7 @@ module Rackstash
private
def buffer_stack
@buffer_stack.value ||= BufferStack.new(@sink)
@buffer_stack.value ||= BufferStack.new(@flows)
end
def buffer

View File

@ -1,182 +0,0 @@
# frozen_string_literal: true
#
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'rackstash/flows'
module Rackstash
class Sink
# @return [Flows] the defined {Flows} which are responsible for
# transforming, encoding, and persisting an event Hash.
attr_reader :flows
# @param flows [Array<Flow, Object>, Flow, Adapters::Adapter, Object]
# an array of {Flow}s or a single {Flow}, respectivly object which can be
# used as a {Flow}'s adapter. See {Flow#initialize}.
def initialize(*flows)
@flows = Rackstash::Flows.new(*flows)
@default_fields = {}
@default_tags = []
end
# @return [Hash, Proc] the default fields which get deep merged into the
# created event Hash when flushing a {Buffer}.
def default_fields
@default_fields
end
# The default fields get deep merged into the created event hash when
# flushing a {Buffer}. They can be given either as a `Hash` or a `Proc`
# which in turn returns a `Hash` on `call`. The `Hash` can be nested
# arbitrarily deep.
#
# Each Hash value can again optionally be a Proc which in turn is expected
# to return a field value on `call`. You can set nested Hashes or Arrays and
# define nested Procs which in turn are called recursively when flushing a
# {Buffer}. That way, you can set lazy-evaluated values.
#
# @example
# # All three values set the same default fields
# sink.default_fields = {'beep' => 'boop'}
# sink.default_fields = -> { { 'beep' => 'boop' } }
# sink.default_fields = { 'beep' => -> { 'boop' } }
#
# @param fields [#to_hash, Proc] The default fields to be merged into the
# event Hash when flushing a {Buffer}.
# @raise [TypeError] if `fields` is neither a Proc nor can be converted to a
# Hash
# @return [Hash, Proc] the given `fields`
def default_fields=(fields)
fields = fields.to_hash if fields.respond_to?(:to_hash)
unless fields.is_a?(Hash) || fields.is_a?(Proc)
raise TypeError, 'default_fields must be a Hash or Proc'
end
@default_fields = fields
end
# @return [Array<#to_s, Proc>, Proc] the default tags are added to the
# `"@tags"` field of the created event Hash when flushing a {Buffer}. They
# can be given either as an `Array` of `String`s or a `Proc` which in turn
# returns an `Array` of `String`s on `call`.
def default_tags
@default_tags
end
# The default tags are added to the `"@tags"` field of the created event
# Hash when flushing a {Buffer}. They can be given either as an `Array` of
# `String`s or a `Proc` which in turn returns an `Array` of `String`s on
# `call`.
#
# Each value of the Array can again optionally be a Proc which in turn is
# expected to return a String on `call`. All the (potentially nested) procs
# are called recursively when flushing a {Buffer}. That way, you can set
# lazy-evaluated values.
#
# @example
# # All three values set the same default tags
# sink.default_tags = ['important', 'request']
# sink.default_tags = -> { ['important', 'request'] }
# sink.default_tags = [ 'important', -> { 'request' } }
#
# @param tags [#to_ary, Proc] The default tags to be merged into the event
# Hash's `"@tags"` field when flushing a {Buffer}
# @raise [TypeError] if `tags` is neither a Proc nor can be converted to an
# Array
# @return [Array, Proc] the given `tags`
def default_tags=(tags)
tags = tags.to_ary if tags.respond_to?(:to_ary)
unless tags.is_a?(Array) || tags.is_a?(Proc)
raise TypeError, 'default_tags must be an Array or Proc'
end
@default_tags = tags
end
# Close the log adapter for each configured {Flow}. This might be a no-op
# depending on each flow's adapter.
#
# @return [nil]
def close
@flows.each(&:close)
nil
end
# Close and re-open the log adapter for each configured {Flow}. This might
# be a no-op depending on each flow's adapter.
#
# @return [nil]
def reopen
@flows.each(&:reopen)
nil
end
# Create an event hash from the given `buffer` and write it to each of the
# defined {#flows}.
#
# First, we transform the given `buffer` to an event hash using
# {Buffer#to_event}. Here, we are merging the {#default_fields} and
# {#default_tags} into the event Hash.
#
# A fresh copy of the resulting event hash is then written to each defined
# {Flow}. This allows each flow to alter the event in any way without
# affecting the other flows.
#
# @param buffer [Buffer] The buffer cotnaining the data to write to the
# {#flows}.
# @return [Buffer] the given `buffer`
def write(buffer)
event = buffer.to_event(fields: @default_fields, tags: @default_tags)
# Memoize the current list of flows for the rest of the method to make
# sure it doesn't change while we work with it.
flows = @flows.to_a
flows_size = flows.size
flows.each_with_index do |flow, index|
# If we have more than one flow, we provide a fresh copy of the event
# to each flow. The flow's filter and codec can then mutate the event
# however it pleases without affecting later flows. We don't need to
# duplicate the event for the last flow since it won't be re-used
# after that anymore.
current_event = (index == flows_size - 1) ? event : deep_dup_event(event)
flow.write(current_event)
end
buffer
end
private
# Create a deep duplicate of an event hash. It is assumed that the input
# event follows the normalized structure as generated by
# {Fields::Hash#to_h}.
#
# @param obj [Object] an object to duplicate. When initially called, this is
# expected to be an event hash
# @return [Object] a deep copy of the given `obj` if it was an `Array` or
# `Hash`, the original `obj` otherwise.
def deep_dup_event(obj)
case obj
when Hash
hash = obj.dup
obj.each_pair do |key, value|
# {Rackstash::Fields::Hash} already guarantees that keys are always
# frozen Strings. We don't need to dup them.
hash[key] = deep_dup_event(value)
end
when Array
obj.map { |value| deep_dup_event(value) }
else
# All leaf-values in the event are either frozen or not freezable /
# dupable. They can be used as is. See {AbstractCollection#normalize}
obj
end
end
end
end

View File

@ -11,8 +11,8 @@ require 'rackstash/buffer'
describe Rackstash::Buffer do
let(:buffer_options) { {} }
let(:sink) { instance_double(Rackstash::Sink) }
let(:buffer) { described_class.new(sink, **buffer_options) }
let(:flows) { instance_double(Rackstash::Flows) }
let(:buffer) { described_class.new(flows, **buffer_options) }
describe '#allow_silent?' do
it 'defaults to true' do
@ -290,15 +290,26 @@ describe Rackstash::Buffer do
end
context 'when pending?' do
let(:time) { Time.now }
let(:message) { double(message: 'Hello World!', time: time) }
let(:event) {
{
'message' => [message],
'tags' => [],
'@timestamp' => Time.now
}
}
before do
buffer.add_message double(message: 'Hello World!', time: Time.now)
buffer.add_message(message)
# We might call Buffer#flush during the following tests
allow(sink).to receive(:write).with(buffer).once
allow(buffer).to receive(:to_event).and_return(event)
allow(flows).to receive(:write).with(event).once
end
it 'flushes the buffer to the sink' do
expect(sink).to receive(:write).with(buffer).once
it 'flushes the buffer to the flows' do
expect(flows).to receive(:write).with(event).once
buffer.flush
end
@ -314,8 +325,8 @@ describe Rackstash::Buffer do
end
context 'when not pending?' do
it 'does not flushes the buffer to the sink' do
expect(sink).not_to receive(:write)
it 'does not flushes the buffer to the flows' do
expect(flows).not_to receive(:write)
buffer.flush
end
@ -497,33 +508,6 @@ describe Rackstash::Buffer do
end
describe '#to_event' do
it 'does not merge field and tags if empty' do
expect(buffer).not_to receive(:fields)
expect(buffer).not_to receive(:tags)
buffer.to_event(fields: {}, tags: [])
end
it 'merges fields and tags as values' do
fields = { foo: :bar }
tags = ['default_tag']
expect(buffer.fields).to receive(:deep_merge).with(fields, force: false)
expect(buffer.tags).to receive(:merge).with(tags)
buffer.to_event(fields: fields, tags: tags)
end
it 'merges fields and tags as Procs' do
fields = -> {}
tags = -> {}
expect(buffer.fields).to receive(:deep_merge).with(fields, force: false)
expect(buffer.tags).to receive(:merge).with(tags)
buffer.to_event(fields: fields, tags: tags)
end
it 'creates an event hash' do
message = double(message: 'Hello World', time: Time.now)
allow(message)

View File

@ -10,13 +10,13 @@ require 'spec_helper'
require 'rackstash/buffer_stack'
describe Rackstash::BufferStack do
let(:sink) { instance_double(Rackstash::Sink) }
let(:stack) { described_class.new(sink) }
let(:flows) { instance_double(Rackstash::Flows) }
let(:stack) { described_class.new(flows) }
describe '#current' do
it 'initializes a buffer' do
expect(stack.current).to be_a Rackstash::Buffer
expect(stack.current.sink).to equal sink
expect(stack.current.flows).to equal flows
end
it 'repeatedly returns the same buffer' do

View File

@ -111,6 +111,28 @@ describe Rackstash::Flows do
end
end
describe '#close' do
it 'calls close on all flows' do
[a_flow, a_flow].each do |flow|
expect(flow).to receive(:close)
flows << flow
end
expect(flows.close).to be_nil
end
end
describe '#reopen' do
it 'calls reopen on all flows' do
[a_flow, a_flow].each do |flow|
expect(flow).to receive(:reopen)
flows << flow
end
expect(flows.reopen).to be_nil
end
end
describe '#each' do
it 'yield each flow' do
flow1 = a_flow
@ -257,4 +279,29 @@ describe Rackstash::Flows do
expect(flows.to_s).to eql flows.to_a.to_s
end
end
describe '#write' do
it 'flushes the buffer to all flows' do
event_spec = {
'foo' => 'bar',
'tags' => [],
'@timestamp' => instance_of(Time)
}
[a_flow, a_flow].each do |flow|
expect(flow).to receive(:write).with(event_spec)
flows << flow
end
# only the first event is duplicated
expect(flows).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered
event_spec.each_value do |arg|
expect(flows).to receive(:deep_dup_event).with(arg).and_call_original.ordered
end
# During flush, we create a single event, duplicate it and write each to
# each of the flows.
flows.write('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc)
end
end
end

View File

@ -15,7 +15,7 @@ describe Rackstash::Logger do
describe '#initialize' do
it 'accepts flows' do
expect(Rackstash::Sink).to receive(:new).with('output.log')
expect(Rackstash::Flows).to receive(:new).with('output.log')
described_class.new('output.log')
end
@ -102,44 +102,15 @@ describe Rackstash::Logger do
end
describe '#close' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:close)
it 'forwards to the flows' do
expect(logger.flows).to receive(:close)
logger.close
end
end
describe '#default_fields' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:default_fields)
logger.default_fields
end
end
describe '#default_fields=' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:default_fields=).with('key' => 'value')
logger.default_fields = { 'key' => 'value' }
end
end
describe '#default_tags' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:default_tags)
logger.default_tags
end
end
describe '#default_tags=' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:default_tags=).with(['tag'])
logger.default_tags = ['tag']
end
end
describe '#flows' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:flows)
logger.flows
it 'is a Rackstash::Flows' do
expect(logger.flows).to be_instance_of Rackstash::Flows
end
end
@ -195,18 +166,12 @@ describe Rackstash::Logger do
end
describe '#reopen' do
it 'forwards to the sink' do
expect(logger.sink).to receive(:reopen)
it 'forwards to the flows' do
expect(logger.flows).to receive(:reopen)
logger.reopen
end
end
describe '#sink' do
it 'returns the created sink' do
expect(logger.sink).to be_a Rackstash::Sink
end
end
describe '#fields' do
it 'gets the current buffer\'s fields' do
buffer = instance_double('Rackstash::Buffer')
@ -562,7 +527,7 @@ describe Rackstash::Logger do
end
it 'buffers multiple messages' do
expect(logger.sink).to receive(:write).once
expect(logger.flows).to receive(:write).once
logger.with_buffer do
logger.add 1, 'Hello World'

View File

@ -1,164 +0,0 @@
# frozen_string_literal: true
#
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'spec_helper'
require 'rackstash/sink'
require 'rackstash/buffer'
require 'rackstash/flows'
require 'rackstash/flow'
describe Rackstash::Sink do
def a_flow
flow = instance_double('Rackstash::Flow')
allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true)
flow
end
let(:flows) { [a_flow] }
let(:sink) { described_class.new(*flows) }
describe 'initialize' do
# We deliberately use the real Rackstash::Flows class here to server as an
# integration test
it 'wraps a single flow in a flows list' do
expect(Rackstash::Flows).to receive(:new).with(*flows)
.and_call_original
sink = described_class.new(*flows)
expect(sink.flows).to be_a Rackstash::Flows
expect(sink.flows.to_a).to eql flows
end
it 'wraps multiple flows in a flows list' do
flows = [a_flow, a_flow]
expect(Rackstash::Flows).to receive(:new).with(*flows)
.and_call_original
sink = described_class.new(*flows)
expect(sink.flows).to be_a Rackstash::Flows
expect(sink.flows.to_a).to eql flows
end
end
describe '#default_fields' do
it 'can set a proc' do
a_proc = proc { nil }
expect(a_proc).not_to receive(:call)
sink.default_fields = a_proc
expect(sink.default_fields).to equal a_proc
end
it 'can set a Hash' do
hash = { foo: :bar }
sink.default_fields = hash
expect(sink.default_fields).to equal hash
end
it 'can set a Hash-like object' do
hash_alike = double('hash')
expect(hash_alike).to receive(:to_hash).and_return(foo: :bar)
sink.default_fields = hash_alike
expect(sink.default_fields).to eql(foo: :bar)
expect(sink.default_fields).not_to equal hash_alike
end
it 'refuses invalid fields' do
expect { sink.default_fields = nil }.to raise_error TypeError
expect { sink.default_fields = 42 }.to raise_error TypeError
expect { sink.default_fields = ['foo'] }.to raise_error TypeError
end
end
describe '#default_tags' do
it 'can set a proc' do
tags = proc { nil }
expect(tags).not_to receive(:call)
sink.default_tags = tags
expect(sink.default_tags).to equal tags
end
it 'can set an Array' do
array = [:foo, 'bar']
sink.default_tags = array
expect(sink.default_tags).to equal array
end
it 'can set an Array-like object' do
array_alike = double('array')
expect(array_alike).to receive(:to_ary).and_return([:foo])
sink.default_tags = array_alike
expect(sink.default_tags).to eql [:foo]
expect(sink.default_tags).not_to equal array_alike
end
it 'refuses invalid fields' do
expect { sink.default_tags = nil }.to raise_error TypeError
expect { sink.default_tags = 42 }.to raise_error TypeError
expect { sink.default_tags = { foo: :bar } }.to raise_error TypeError
end
end
describe '#close' do
let(:flows) { [a_flow, a_flow] }
it 'calls close on all flows' do
expect(flows).to all receive(:close)
expect(sink.close).to be_nil
end
end
describe '#reopen' do
let(:flows) { [a_flow, a_flow] }
it 'calls reopen on all flows' do
expect(flows).to all receive(:reopen)
expect(sink.reopen).to be_nil
end
end
describe '#write' do
let(:flows) {
[a_flow, a_flow].each do |flow|
allow(flow).to receive(:write)
end
}
let(:buffer) { Rackstash::Buffer.new(sink) }
it 'merges default_fields and default_tags' do
expect(buffer).to receive(:to_event).with(fields: {}, tags: [])
sink.write(buffer)
end
it 'flushes the buffer to all flows' do
event_spec = {
'message' => [],
'tags' => [],
'@timestamp' => instance_of(Time)
}
# only the first event is duplicated
expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered
event_spec.each_value do |arg|
expect(sink).to receive(:deep_dup_event).with(arg).and_call_original.ordered
end
# During flush, we create a single event, duplicate it and write each to
# each of the flows.
expect(flows).to all receive(:write).with(event_spec)
sink.write(buffer)
end
end
end