mirror of
https://github.com/meineerde/rackstash.git
synced 2025-10-17 14:01:01 +00:00
Introduce auto_flushing Flows to simplify the buffering behavior of Buffers
Instead of defining the specific buffering behavior on a Buffer, we can now mark individual flows as auto_flushing or now. An auto_flushing Flow with a buffering Buffer behaves the same as a Buffer with `buffering: :data` would before. This allows us to simplify the buffering logic on the Buffer. Also, we can now use "normal" flows and auto_flushing flows on the same logger in parallel. Each of them behaves as expected with the same unchanged logger code. It is thus easier to define behavior for a development or production environment of an app since the necessary changes can all be defined on the logger itself (through the defined flows) without having to adapt the code which creates suitable Buffers with the Logger#with_buffer method in any way.
This commit is contained in:
parent
88d50afbf9
commit
6b4f009107
@ -1,6 +1,6 @@
|
||||
# frozen_string_literal: true
|
||||
#
|
||||
# Copyright 2017 Holger Just
|
||||
# Copyright 2017 - 2018 Holger Just
|
||||
#
|
||||
# This software may be modified and distributed under the terms
|
||||
# of the MIT license. See the LICENSE.txt file for details.
|
||||
@ -57,53 +57,23 @@ module Rackstash
|
||||
# for transforming, encoding, and persisting the log events.
|
||||
attr_reader :flows
|
||||
|
||||
# Returns a Symbol describing the buffering behavior of the current buffer.
|
||||
# This value can be set in {#initialize}.
|
||||
#
|
||||
# When set to `true` or `:full` this buffer is buffering all its messages
|
||||
# and stored data. it will never automatically flush anything to the flows
|
||||
# but when explicitly calling {#flush}.
|
||||
#
|
||||
# When set to `:data` or `:none`, the buffer automatically flushes all
|
||||
# messages and data when adding a new message or, with {#allow_silent?}
|
||||
# being `true`, also when adding fields. After each automatic {#flush}, all
|
||||
# {#messages} and the {#timestamp} are cleared from the buffer. If
|
||||
# {#buffering} is set to `:none`, we also clear the stored {#fields} and
|
||||
# {#tags} in addition to the other data during an automatic flush.
|
||||
# If {#buffering} is set to `:data`, all stored data except the {#messages}
|
||||
# and the {#timestamp} are retained after an auto flush.
|
||||
#
|
||||
# @return [Symbol] the buffering behavior
|
||||
attr_reader :buffering
|
||||
|
||||
# @param flows [Flows] a list of {Flow} objects where this buffer eventually
|
||||
# writes to
|
||||
# @param buffering [Symbol, Boolean] defines the buffering behavior of the
|
||||
# buffer. When set to `true` or `:full`, we buffer all data and never
|
||||
# automatically flush. When set to `:data`, we auto flush on adding new
|
||||
# data and clear all messages afterwards. When set to `:none` or `false`
|
||||
# we auto flush as above but clear all data from the buffer afterwards.
|
||||
# @param buffering [Boolean] defines the buffering behavior of the buffer.
|
||||
# When set to `true` we buffer all stored data which can be flushed to the
|
||||
# {#flows} manually. In this mode, we still automatically flush newly
|
||||
# added data to interested flows directly after adding it. When set to
|
||||
# `false` we automatically flush to all flows as above but we will clear
|
||||
# all stored data from the buffer afterwards.
|
||||
# See {#buffering} for details.
|
||||
# @param allow_silent [Boolean] When set to `true` the data in this buffer
|
||||
# will be flushed to the flows, even if there were just added fields or
|
||||
# tags without any logged messages. If this is `false` and there were no
|
||||
# messages logged with {#add_message}, the buffer will not be flushed but
|
||||
# will be silently dropped.
|
||||
def initialize(flows, buffering: :full, allow_silent: true)
|
||||
def initialize(flows, buffering: true, allow_silent: true)
|
||||
@flows = flows
|
||||
|
||||
@buffering =
|
||||
case buffering
|
||||
when :full, true
|
||||
:full
|
||||
when :data
|
||||
:data
|
||||
when :none, false
|
||||
:none
|
||||
else
|
||||
raise TypeError, "Unknown buffering argument given: #{buffering.inspect}"
|
||||
end
|
||||
|
||||
@buffering = !!buffering
|
||||
@allow_silent = !!allow_silent
|
||||
|
||||
# initialize the internal data structures for fields, tags, ...
|
||||
@ -124,7 +94,8 @@ module Rackstash
|
||||
# older exceptions in the current buffer. Only by the `force` argument to
|
||||
# `false`, we will preserve existing exceptions.
|
||||
#
|
||||
# @param exception [Exception] an Exception object as catched by `rescue`
|
||||
# @param exception [Exception] an Exception object as caught by a
|
||||
# `begin` ... `rescue` block.
|
||||
# @param force [Boolean] set to `false` to preserve the details of an
|
||||
# existing exception in the current buffer's fields, set to `true` to
|
||||
# overwrite them.
|
||||
@ -132,7 +103,7 @@ module Rackstash
|
||||
def add_exception(exception, force: true)
|
||||
return exception unless force || fields[FIELD_ERROR].nil?
|
||||
|
||||
fields.merge!(
|
||||
add_fields(
|
||||
FIELD_ERROR => exception.class.name,
|
||||
FIELD_ERROR_MESSAGE => exception.message,
|
||||
FIELD_ERROR_TRACE => (exception.backtrace || []).join("\n")
|
||||
@ -179,8 +150,7 @@ module Rackstash
|
||||
def add_message(message)
|
||||
timestamp(message.time)
|
||||
@messages << message
|
||||
|
||||
auto_flush
|
||||
auto_flush(message)
|
||||
|
||||
message
|
||||
end
|
||||
@ -197,21 +167,35 @@ module Rackstash
|
||||
@allow_silent
|
||||
end
|
||||
|
||||
# The buffering behavior of the current buffer. This value can be set in
|
||||
# {#initialize}.
|
||||
#
|
||||
# When set to `true` this buffer is buffering all its messages and stored
|
||||
# data. When explicitly calling {#flush}, all the stored data is flushed to
|
||||
# all {#flows}. To interested flows, we will also automatically flush newly
|
||||
# added messages along with the stored fields and tags after adding a
|
||||
# message. If {#allow_silent?} is `true`, we also do this when adding fields
|
||||
# with {#add_fields} and {#add_exception}.
|
||||
#
|
||||
# If {#buffering} is set to `false`, we will automatically flush the buffer
|
||||
# the same way as before, this time to all buffers however. In addition, we
|
||||
# will also clear the all stored stored {#fields}, {#tags}, the {#messages}
|
||||
# and the {#timestamp}.
|
||||
#
|
||||
# @return [Boolean] the buffering behavior
|
||||
def buffering?
|
||||
@buffering
|
||||
end
|
||||
|
||||
# Clear the current buffer from all stored data, just as it was right after
|
||||
# inititialization.
|
||||
#
|
||||
# @param everything [Boolean] When set to `true`, we clear {#messages},
|
||||
# {#fields}, {#tags} and the {#timestamp}. When set to `false`, we only
|
||||
# clear the {#messages} and the {#timestamp} but retain he other data.
|
||||
# @return [self]
|
||||
def clear(everything = true)
|
||||
def clear
|
||||
@messages = []
|
||||
@timestamp = nil
|
||||
|
||||
if everything
|
||||
@fields = nil
|
||||
@tags = nil
|
||||
end
|
||||
@fields = nil
|
||||
@tags = nil
|
||||
|
||||
self
|
||||
end
|
||||
@ -230,16 +214,16 @@ module Rackstash
|
||||
def flush
|
||||
return unless pending?
|
||||
|
||||
@flows.write(self)
|
||||
@flows.flush(event)
|
||||
self
|
||||
end
|
||||
|
||||
# Return all logged messages on the current buffer.
|
||||
#
|
||||
# @return [Array<Message>] the list of messages of the curent buffer
|
||||
# @note You can not add messsages to the buffer by modifying this array.
|
||||
# @return [Array<Message>] the list of messages of the current buffer
|
||||
# @note You can not add messages to the buffer by modifying this array.
|
||||
# Instead, use {#add_message} to add new messages or add filters to the
|
||||
# responsible {Flow} to remove or change messages.
|
||||
# responsible {Flow} to remove or change already added messages.
|
||||
def messages
|
||||
@messages.dup
|
||||
end
|
||||
@ -344,7 +328,7 @@ module Rackstash
|
||||
# All hashes (including nested hashes) use `String` keys.
|
||||
#
|
||||
# @return [Hash] the event expected by the event {Filter}s.
|
||||
def to_h
|
||||
def event
|
||||
event = fields.to_h
|
||||
event[FIELD_TAGS] = tags.to_a
|
||||
event[FIELD_MESSAGE] = messages
|
||||
@ -352,27 +336,74 @@ module Rackstash
|
||||
|
||||
event
|
||||
end
|
||||
alias to_h event
|
||||
alias as_json event
|
||||
|
||||
private
|
||||
|
||||
# Non-buffering buffers, i.e., those with `buffering: false`, flush
|
||||
# themselves to the defined flows whenever there is something logged to it.
|
||||
# That way, such a buffer acts like a regular old Logger would: it just
|
||||
# flushes a logged message to its log device as soon as it is logged.
|
||||
# Write the data contained in this Buffer to interested {Flow} objects.
|
||||
#
|
||||
# By calling `auto_flush`, the current buffer is flushed and cleared if
|
||||
# necessary.
|
||||
def auto_flush
|
||||
case @buffering
|
||||
when :full
|
||||
return
|
||||
when :data
|
||||
flush
|
||||
clear(false)
|
||||
when :none
|
||||
flush
|
||||
clear(true)
|
||||
# This method is called after adding new data to the Buffer. Here, we write
|
||||
# the newly added data to the flows, depending on their type:
|
||||
#
|
||||
# Flows with enabled `auto_flush?` will receive an event Hash containing all
|
||||
# of the current Buffer's fields and tags but only the single currently
|
||||
# logged message (if any). This happens regardless of whether the current
|
||||
# Buffer is {#buffering?} or not.
|
||||
#
|
||||
# In addition to that, if the current Buffer is not {buffering?}, we write
|
||||
# pending data to "normal" flows and {#clear} the Buffer afterwards. Such a
|
||||
# buffer thus acts like a regular old Logger would: it just flushes a logged
|
||||
# message to its log device as soon as it is added.
|
||||
#
|
||||
# Buffering Buffers are not automatically flushed to "normal" flows here.
|
||||
# They need to be explicitly flushed with {Buffer#flush} in order for their
|
||||
# buffered data to be written to the normal flows.
|
||||
#
|
||||
# @param message [Message, nil] The currently logged message which is added
|
||||
# to the {#auto_event}. If kept empty (i.e. with `nil`), we do not
|
||||
# add any messages.
|
||||
# @return [void]
|
||||
def auto_flush(message = nil)
|
||||
# Write the auto_event with the current message (if any) to the
|
||||
# auto_flushing Flows
|
||||
flows.auto_flush { auto_event(message) }
|
||||
|
||||
if !buffering? && pending?
|
||||
flows.flush { event }
|
||||
clear
|
||||
end
|
||||
end
|
||||
|
||||
# Creates the automatically flushed event hash. It is similar to the one
|
||||
# created by {#event} but only uses the passed `message` instead of all
|
||||
# {#messages} and uses either the `message`'s timestamp or the current time
|
||||
# but never the current Buffer's timestamp.
|
||||
#
|
||||
# This event is used to represent an intermediate state of a Buffer which
|
||||
# can be flushed to interested flows early.
|
||||
#
|
||||
# @param message [Message, nil] The currently logged message which is added
|
||||
# to the auto_event Hash. If kept empty (i.e. with `nil`), we do not
|
||||
# add any messages.
|
||||
# @return [Hash] the event Hash for the currently added data.
|
||||
# @see #event
|
||||
def auto_event(message = nil)
|
||||
event = fields.to_h
|
||||
event[FIELD_TAGS] = tags.to_a
|
||||
|
||||
if message
|
||||
event[FIELD_MESSAGE] = [message]
|
||||
|
||||
time = message.time
|
||||
time = time.getutc.freeze unless time.utc? && time.frozen?
|
||||
event[FIELD_TIMESTAMP] = time
|
||||
else
|
||||
event[FIELD_MESSAGE] = []
|
||||
event[FIELD_TIMESTAMP] = Time.now.utc.freeze
|
||||
end
|
||||
|
||||
event
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -104,7 +104,7 @@ module Rackstash
|
||||
# the writing of an individual event. Any other return value of filters is
|
||||
# ignored.
|
||||
#
|
||||
# @param event [Hash] an event hash, see {Buffer#to_event} for details
|
||||
# @param event [Hash] an event hash, see {Buffer#event} for details
|
||||
# @return [Hash, false] the filtered event or `false` if any of the
|
||||
# filters returned `false`
|
||||
def call(event)
|
||||
|
||||
@ -60,8 +60,8 @@ module Rackstash
|
||||
# # Write an event. This is normally done by a Rackstash::Buffer
|
||||
# flow.write(an_event)
|
||||
#
|
||||
# The event which eventually gets written to the flow is created from a Buffer
|
||||
# with {Buffer#to_event}.
|
||||
# The event which eventually gets written to the flow is usually created from
|
||||
# a {Buffer} with its pending data.
|
||||
class Flow
|
||||
# @return [Adapter::Adapter] the log adapter
|
||||
attr_reader :adapter
|
||||
@ -83,11 +83,18 @@ module Rackstash
|
||||
# default, we swallow errors after having logging them to not cause
|
||||
# additional issues to the production application just because the logger
|
||||
# doesn't work.
|
||||
# @param auto_flush [Bool] set to `true` to write added fields or messages
|
||||
# added to a {Buffer} to this flow immediately. With each write, this flow
|
||||
# will then receive all current fields of the {Buffer} but only the
|
||||
# currently added message (if any). When set to `false`, the flow will
|
||||
# receive the full event with all fields and messages of the Buffer after
|
||||
# an explicit call to {Buffer#flush} for a buffering Buffer or after each
|
||||
# added message or fields for a non-bufering Buffer.
|
||||
# @yieldparam flow [self] if the given block accepts an argument, we yield
|
||||
# `self` as a parameter, else, the block is directly executed in the
|
||||
# context of `self`.
|
||||
def initialize(adapter, encoder: nil, filters: [],
|
||||
error_flow: nil, raise_on_error: false,
|
||||
error_flow: nil, raise_on_error: false, auto_flush: false,
|
||||
&block
|
||||
)
|
||||
@adapter = Rackstash::Adapter[adapter]
|
||||
@ -95,6 +102,7 @@ module Rackstash
|
||||
@filter_chain = Rackstash::FilterChain.new(filters)
|
||||
self.error_flow = error_flow
|
||||
self.raise_on_error = raise_on_error
|
||||
self.auto_flush = auto_flush
|
||||
|
||||
if block_given?
|
||||
if block.arity == 0
|
||||
@ -105,6 +113,37 @@ module Rackstash
|
||||
end
|
||||
end
|
||||
|
||||
# Get or set the `auto_flush` setting. If set to `true`, new messages and
|
||||
# fields added to a {Buffer} will be written directly to this flow,
|
||||
# regardless of the buffering setting of the {Buffer}. This can be useful
|
||||
# during development or testing of an application where the developer might
|
||||
# want to directly watch the low-cardinality log as the messages are logged.
|
||||
#
|
||||
# If set to `false` (the default), buffering Buffers will only be written
|
||||
# after explicitly calling {Buffer#flush} on them.
|
||||
#
|
||||
# @param bool [Bool, nil] the value to set. If omitted, we return the
|
||||
# current setting.
|
||||
# @return [Bool] the updated or current `auto_flush` setting
|
||||
# @see #auto_flush=
|
||||
def auto_flush(bool = nil)
|
||||
self.auto_flush = bool unless bool.nil?
|
||||
auto_flush?
|
||||
end
|
||||
|
||||
# @return [Bool] the current value of the `auto_flush` setting.
|
||||
# @see #auto_flush
|
||||
def auto_flush?
|
||||
@auto_flush
|
||||
end
|
||||
|
||||
# @param bool [Bool] `true` to cause buffering Buffers to write their added
|
||||
# messages and fields to the flow as soon as they are logged, `false` to
|
||||
# write the whole event only on an explicit call to {Buffer#flush}.
|
||||
def auto_flush=(bool)
|
||||
@auto_flush = !!bool
|
||||
end
|
||||
|
||||
# Close the log adapter if supported. This might be a no-op if the adapter
|
||||
# does not support closing.
|
||||
#
|
||||
|
||||
@ -164,33 +164,49 @@ module Rackstash
|
||||
nil
|
||||
end
|
||||
|
||||
# Write an event `Hash` to each of the defined flows. The event is usually
|
||||
# created from {Buffer#to_event}.
|
||||
# Write an event `Hash` to each of the defined normal (that is: non
|
||||
# `auto_flush`'ing) flows. The event is usually created from {Buffer#event}.
|
||||
#
|
||||
# We write a fresh deep-copy of the event hash to each defined {Flow}.
|
||||
# This allows each flow to alter the event in any way without affecting the
|
||||
# others.
|
||||
#
|
||||
# @param event [Hash] an event `Hash`
|
||||
# @return [Hash] the given `event`
|
||||
def write(event)
|
||||
# Memoize the current list of flows for the rest of the method to make
|
||||
# sure it doesn't change while we work with it.
|
||||
flows = to_a
|
||||
flows_size = flows.size
|
||||
# @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details
|
||||
# @yield [flow] If `event` is `nil`, we call the goven block and use its
|
||||
# return value as the event. The block is expected to return an event
|
||||
# `Hash`.
|
||||
# @return [Hash, nil] the flushed event or `nil` if nothing was flushed
|
||||
def flush(event = nil)
|
||||
flows = to_a.delete_if(&:auto_flush?)
|
||||
return unless flows.any?
|
||||
|
||||
event = event.to_h
|
||||
flows.each_with_index do |flow, index|
|
||||
# If we have more than one flow, we provide a fresh copy of the event
|
||||
# to each flow. The flow's filters and encoder can then mutate the event
|
||||
# however it pleases without affecting later flows. We don't need to
|
||||
# duplicate the event for the last flow since it won't be re-used
|
||||
# after that anymore.
|
||||
current_event = (index == flows_size - 1) ? event : deep_dup_event(event)
|
||||
flow.write(current_event)
|
||||
end
|
||||
event ||= yield if block_given?
|
||||
return unless event
|
||||
|
||||
event
|
||||
write_to(flows, event.to_h)
|
||||
end
|
||||
|
||||
# Write an event `Hash` to each of the defined `auto_flush`'ing flows.
|
||||
# The event is usually created from {Buffer#auto_event} as the buffer is
|
||||
# automatically flushed after a message or fields were logged to it.
|
||||
#
|
||||
# We write a fresh deep-copy of the event hash to each defined {Flow}.
|
||||
# This allows each flow to alter the event in any way without affecting the
|
||||
# others.
|
||||
#
|
||||
# @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details
|
||||
# @yield [flow] If `event` is `nil`, we call the goven block and use its
|
||||
# return value as the event. The block is expected to return an event
|
||||
# `Hash`.
|
||||
# @return [Hash, nil] the flushed event or `nil` if nothing was flushed
|
||||
def auto_flush(event = nil)
|
||||
flows = to_a.select!(&:auto_flush?)
|
||||
return unless flows.any?
|
||||
|
||||
event ||= yield if block_given?
|
||||
return unless event
|
||||
|
||||
write_to(flows, event.to_h)
|
||||
end
|
||||
|
||||
# @return [Array<Flow>] an array of all flow elements without any `nil`
|
||||
@ -241,5 +257,19 @@ module Rackstash
|
||||
obj
|
||||
end
|
||||
end
|
||||
|
||||
def write_to(flows, event)
|
||||
flows.each_with_index do |flow, index|
|
||||
# If we have more than one flow, we provide a fresh copy of the event
|
||||
# to each flow. The flow's filters and encoder can then mutate the event
|
||||
# however it pleases without affecting later flows. We don't need to
|
||||
# duplicate the event for the last flow since it won't be re-used
|
||||
# after that anymore.
|
||||
current_event = (index == flows.size - 1) ? event : deep_dup_event(event)
|
||||
flow.write(current_event)
|
||||
end
|
||||
|
||||
event
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -11,7 +11,14 @@ require 'rackstash/buffer'
|
||||
|
||||
RSpec.describe Rackstash::Buffer do
|
||||
let(:buffer_options) { {} }
|
||||
let(:flows) { instance_double(Rackstash::Flows) }
|
||||
|
||||
let(:flows) {
|
||||
instance_double(Rackstash::Flows).tap do |flows|
|
||||
allow(flows).to receive(:flush)
|
||||
allow(flows).to receive(:auto_flush)
|
||||
end
|
||||
}
|
||||
|
||||
let(:buffer) { described_class.new(flows, **buffer_options) }
|
||||
|
||||
describe '#allow_silent?' do
|
||||
@ -111,6 +118,11 @@ RSpec.describe Rackstash::Buffer do
|
||||
buffer.add_fields(key: 'value')
|
||||
expect(buffer.pending?).to be true
|
||||
end
|
||||
|
||||
it 'calls auto_flush' do
|
||||
expect(flows).to receive(:auto_flush)
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
end
|
||||
|
||||
context 'when not allow_silent?' do
|
||||
@ -122,57 +134,49 @@ RSpec.describe Rackstash::Buffer do
|
||||
buffer.add_fields(key: 'value')
|
||||
expect(buffer.pending?).to be false
|
||||
end
|
||||
|
||||
it 'calls auto_flush' do
|
||||
expect(flows).to receive(:auto_flush)
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :full' do
|
||||
context 'with buffering: true' do
|
||||
before do
|
||||
buffer_options[:buffering] = :full
|
||||
buffer_options[:buffering] = true
|
||||
end
|
||||
|
||||
it 'does not call #flush' do
|
||||
expect(buffer).not_to receive(:flush)
|
||||
it 'does not flush the buffer' do
|
||||
expect(flows).not_to receive(:flush)
|
||||
# We always auto_flush buffers to send the newly added fields to
|
||||
# interested flows
|
||||
expect(flows).to receive(:auto_flush)
|
||||
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
|
||||
it 'does not call #clear' do
|
||||
it 'does not clear the buffer' do
|
||||
expect(buffer).not_to receive(:clear)
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields['key']).to eql 'value'
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :data' do
|
||||
before do
|
||||
buffer_options[:buffering] = :data
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
|
||||
it 'clears only messages' do
|
||||
allow(buffer).to receive(:flush).once
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields).to_not be_empty
|
||||
expect(buffer.pending?).to be true
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :none' do
|
||||
context 'with buffering: false' do
|
||||
before do
|
||||
buffer_options[:buffering] = :none
|
||||
buffer_options[:buffering] = false
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
it 'flushes the buffer' do
|
||||
expect(flows).to receive(:flush)
|
||||
expect(flows).to receive(:auto_flush)
|
||||
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
|
||||
it 'clears the whole buffer' do
|
||||
allow(buffer).to receive(:flush).once
|
||||
it 'clears the buffer' do
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields).to be_empty
|
||||
@ -202,13 +206,13 @@ RSpec.describe Rackstash::Buffer do
|
||||
expect(buffer.timestamp).to eql time.getutc
|
||||
end
|
||||
|
||||
context 'with buffering: :full' do
|
||||
context 'with buffering: true' do
|
||||
before do
|
||||
buffer_options[:buffering] = :full
|
||||
buffer_options[:buffering] = true
|
||||
end
|
||||
|
||||
it 'does not call #flush' do
|
||||
expect(buffer).not_to receive(:flush)
|
||||
it 'does not flush the buffer' do
|
||||
expect(flows).not_to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
@ -218,37 +222,17 @@ RSpec.describe Rackstash::Buffer do
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :none' do
|
||||
context 'with buffering: false' do
|
||||
before do
|
||||
buffer_options[:buffering] = :data
|
||||
buffer_options[:buffering] = false
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
it 'flushes the buffer' do
|
||||
expect(flows).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
it 'clears messages' do
|
||||
allow(buffer).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
|
||||
expect(buffer.messages.count).to eql 0
|
||||
expect(buffer.pending?).to be false
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :none' do
|
||||
before do
|
||||
buffer_options[:buffering] = :none
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
it 'clears messages' do
|
||||
allow(buffer).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
|
||||
expect(buffer.messages.count).to eql 0
|
||||
@ -258,31 +242,18 @@ RSpec.describe Rackstash::Buffer do
|
||||
end
|
||||
|
||||
describe '#buffering' do
|
||||
it 'defaults to :full' do
|
||||
expect(buffer.buffering).to eql :full
|
||||
it 'defaults to true' do
|
||||
expect(buffer.buffering?).to eql true
|
||||
end
|
||||
|
||||
it 'can be set to :full' do
|
||||
expect(described_class.new(flows, buffering: true).buffering).to eql :full
|
||||
expect(described_class.new(flows, buffering: :full).buffering).to eql :full
|
||||
it 'can be set to true' do
|
||||
expect(described_class.new(flows, buffering: true).buffering?).to be true
|
||||
expect(described_class.new(flows, buffering: 'whatever').buffering?).to be true
|
||||
end
|
||||
|
||||
it 'can be set to :data' do
|
||||
expect(described_class.new(flows, buffering: :data).buffering).to eql :data
|
||||
end
|
||||
|
||||
it 'can be set to :none' do
|
||||
expect(described_class.new(flows, buffering: false).buffering).to eql :none
|
||||
expect(described_class.new(flows, buffering: :none).buffering).to eql :none
|
||||
end
|
||||
|
||||
it 'does not allow other values' do
|
||||
expect { described_class.new(flows, buffering: nil) }
|
||||
.to raise_error(TypeError)
|
||||
expect { described_class.new(flows, buffering: :invalid) }
|
||||
.to raise_error(TypeError)
|
||||
expect { described_class.new(flows, buffering: 'full') }
|
||||
.to raise_error(TypeError)
|
||||
it 'can be set to false' do
|
||||
expect(described_class.new(flows, buffering: false).buffering?).to be false
|
||||
expect(described_class.new(flows, buffering: nil).buffering?).to be false
|
||||
end
|
||||
end
|
||||
|
||||
@ -346,13 +317,13 @@ RSpec.describe Rackstash::Buffer do
|
||||
end
|
||||
|
||||
context 'when pending?' do
|
||||
let(:time) { Time.now }
|
||||
let(:time) { Time.parse('2016-10-17 15:37:00 +02:00') }
|
||||
let(:message) { double(message: 'Hello World!', time: time) }
|
||||
let(:event) {
|
||||
{
|
||||
'message' => [message],
|
||||
'tags' => [],
|
||||
'@timestamp' => Time.now
|
||||
'@timestamp' => time
|
||||
}
|
||||
}
|
||||
|
||||
@ -360,11 +331,11 @@ RSpec.describe Rackstash::Buffer do
|
||||
buffer.add_message(message)
|
||||
|
||||
# We might call Buffer#flush during the following tests
|
||||
allow(flows).to receive(:write).with(buffer).once
|
||||
allow(flows).to receive(:flush).with(event).once
|
||||
end
|
||||
|
||||
it 'flushes the buffer to the flows' do
|
||||
expect(flows).to receive(:write).with(buffer).once
|
||||
expect(flows).to receive(:flush).with(event).once
|
||||
buffer.flush
|
||||
end
|
||||
|
||||
@ -381,7 +352,7 @@ RSpec.describe Rackstash::Buffer do
|
||||
|
||||
context 'when not pending?' do
|
||||
it 'does not flushes the buffer to the flows' do
|
||||
expect(flows).not_to receive(:write)
|
||||
expect(flows).not_to receive(:flush)
|
||||
buffer.flush
|
||||
end
|
||||
|
||||
@ -562,7 +533,7 @@ RSpec.describe Rackstash::Buffer do
|
||||
end
|
||||
end
|
||||
|
||||
describe '#to_h' do
|
||||
describe '#event' do
|
||||
it 'creates an event hash' do
|
||||
message = double(message: 'Hello World', time: Time.now)
|
||||
allow(message)
|
||||
@ -570,7 +541,7 @@ RSpec.describe Rackstash::Buffer do
|
||||
buffer.fields[:foo] = 'bar'
|
||||
buffer.tags << 'some_tag'
|
||||
|
||||
expect(buffer.to_h).to match(
|
||||
expect(buffer.event).to match(
|
||||
'foo' => 'bar',
|
||||
'message' => [message],
|
||||
'tags' => ['some_tag'],
|
||||
|
||||
@ -48,12 +48,12 @@ RSpec.describe Rackstash::BufferStack do
|
||||
|
||||
it 'pushes a buffering buffer by default' do
|
||||
stack.push
|
||||
expect(stack.current.buffering).to eql :full
|
||||
expect(stack.current.buffering?).to eql true
|
||||
end
|
||||
|
||||
it 'allows to set options on the new buffer' do
|
||||
stack.push(buffering: :data)
|
||||
expect(stack.current.buffering).to eql :data
|
||||
stack.push(buffering: false)
|
||||
expect(stack.current.buffering?).to eql false
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -13,9 +13,11 @@ require 'rackstash/flow'
|
||||
RSpec.describe Rackstash::Flows do
|
||||
let(:flows) { described_class.new }
|
||||
|
||||
def a_flow
|
||||
def a_flow(auto_flush: false)
|
||||
flow = instance_double('Rackstash::Flow')
|
||||
allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true)
|
||||
allow(flow).to receive(:auto_flush?).and_return(auto_flush)
|
||||
|
||||
flow
|
||||
end
|
||||
|
||||
@ -323,8 +325,8 @@ RSpec.describe Rackstash::Flows do
|
||||
end
|
||||
end
|
||||
|
||||
describe '#write' do
|
||||
it 'flushes the buffer to all flows' do
|
||||
describe '#flush' do
|
||||
it 'writes the event to the flows' do
|
||||
event_spec = {
|
||||
'foo' => 'bar',
|
||||
'tags' => [],
|
||||
@ -344,7 +346,75 @@ RSpec.describe Rackstash::Flows do
|
||||
|
||||
# During flush, we create a single event, duplicate it and write each to
|
||||
# each of the flows.
|
||||
flows.write('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc)
|
||||
flows.flush('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc)
|
||||
end
|
||||
|
||||
it 'writes only to normal flows' do
|
||||
normal_flow = a_flow(auto_flush: false)
|
||||
expect(normal_flow).to receive(:write).with('foo' => 'bar')
|
||||
flows << normal_flow
|
||||
|
||||
auto_flush_flow = a_flow(auto_flush: true)
|
||||
expect(auto_flush_flow).not_to receive(:write)
|
||||
flows << auto_flush_flow
|
||||
|
||||
flows.flush('foo' => 'bar')
|
||||
end
|
||||
|
||||
it 'accepts a block' do
|
||||
normal_flow = a_flow(auto_flush: false)
|
||||
expect(normal_flow).to receive(:write).with('foo' => 'bar')
|
||||
flows << normal_flow
|
||||
|
||||
auto_flush_flow = a_flow(auto_flush: true)
|
||||
expect(auto_flush_flow).not_to receive(:write)
|
||||
flows << auto_flush_flow
|
||||
|
||||
flows.flush { { 'foo' => 'bar'} }
|
||||
end
|
||||
|
||||
it 'does nothing if there is no normal flow' do
|
||||
auto_flush_flow = a_flow(auto_flush: true)
|
||||
expect(auto_flush_flow).not_to receive(:write)
|
||||
flows << auto_flush_flow
|
||||
|
||||
expect(flows.flush('foo' => 'bar')).to be_nil
|
||||
expect { |b| flows.flush(&b) }.not_to yield_control
|
||||
end
|
||||
end
|
||||
|
||||
describe '#auto_flush' do
|
||||
it 'writes only to auto_flush flows' do
|
||||
normal_flow = a_flow(auto_flush: false)
|
||||
expect(normal_flow).not_to receive(:write)
|
||||
flows << normal_flow
|
||||
|
||||
auto_flush_flow = a_flow(auto_flush: true)
|
||||
expect(auto_flush_flow).to receive(:write).with('foo' => 'bar')
|
||||
flows << auto_flush_flow
|
||||
|
||||
flows.auto_flush('foo' => 'bar')
|
||||
end
|
||||
|
||||
it 'accepts a block' do
|
||||
normal_flow = a_flow(auto_flush: false)
|
||||
expect(normal_flow).not_to receive(:write)
|
||||
flows << normal_flow
|
||||
|
||||
auto_flush_flow = a_flow(auto_flush: true)
|
||||
expect(auto_flush_flow).to receive(:write).with('foo' => 'bar')
|
||||
flows << auto_flush_flow
|
||||
|
||||
flows.auto_flush { { 'foo' => 'bar'} }
|
||||
end
|
||||
|
||||
it 'does nothing if there is no auto_flushing flow' do
|
||||
normal_flow = a_flow(auto_flush: false)
|
||||
expect(normal_flow).not_to receive(:write)
|
||||
flows << normal_flow
|
||||
|
||||
expect(flows.auto_flush('foo' => 'bar')).to be_nil
|
||||
expect { |b| flows.auto_flush(&b) }.not_to yield_control
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -531,7 +531,7 @@ RSpec.describe Rackstash::Logger do
|
||||
end
|
||||
|
||||
it 'buffers multiple messages' do
|
||||
expect(logger.flows).to receive(:write).once
|
||||
expect(logger.flows).to receive(:flush).once
|
||||
|
||||
logger.with_buffer do
|
||||
logger.add 1, 'Hello World'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user