mirror of
https://github.com/meineerde/rackstash.git
synced 2026-01-31 17:27:13 +00:00
Add new data-only buffering mode to Buffer
With this, the Buffer knows three buffering modes: * :full - This is the same as the previous buffering-enabled mode. With this, we buffer everything and never auto-flush * :none - the previous non-buffering mode. We autoflush everytime there is a new messase or explicitly added fields. All stored data is cleared afterwards. * :data - the new mode. It behaves almost like :none with the notable exception that we retain fields and tags after the auto flush. The new mode is especially useful to emulate a regular Logger even when using per-request buffers. With that, you can add fields once to a buffer. Each time a message is added, it will be flushed directly without waiting for the request to be finished. Yet, the flows can still take advantage of all the previously added fields and tags to properly format the emitted log event.
This commit is contained in:
parent
224a87f677
commit
cb95c0b0e4
@ -57,19 +57,52 @@ module Rackstash
|
||||
# for transforming, encoding, and persisting the log events.
|
||||
attr_reader :flows
|
||||
|
||||
# Returns a Symbol describing the buffering behavior of the current buffer.
|
||||
# This value can be set in {#initialize}.
|
||||
#
|
||||
# When set to `true` or `:full` this buffer is buffering all its messages
|
||||
# and stored data. it will never automatically flush anything to the flows
|
||||
# but when explicitly calling {#flush}.
|
||||
#
|
||||
# When set to `:data` or `:none`, the buffer automatically flushes all
|
||||
# messages and data when adding a new message or, with {#allow_silent?}
|
||||
# being `true`, also when adding fields. After each automatic {#flush}, all
|
||||
# {#messages} and the {#timestamp} are cleared from the buffer. If
|
||||
# {#buffering} is set to `:none`, we also clear the stored {#fields} and
|
||||
# {#tags} in addition to the other data during an automatic flush.
|
||||
# If {#buffering} is set to `:data`, all stored data except the {#messages}
|
||||
# and the {#timestamp} are retained after an auto flush.
|
||||
#
|
||||
# @return [Symbol] the buffering behavior
|
||||
attr_reader :buffering
|
||||
|
||||
# @param flows [Flows] a list of {Flow} objects where this buffer eventually
|
||||
# writes to
|
||||
# @param buffering [Boolean] When set to `true`, this buffer is considered
|
||||
# to be buffering data. When buffering, logged messages will not be
|
||||
# flushed immediately but only with an explicit call to {#flush}.
|
||||
# @param buffering [Symbol, Boolean] defines the buffering behavior of the
|
||||
# buffer. When set to `true` or `:full`, we buffer all data and never
|
||||
# automatically flush. When set to `:data`, we auto flush on adding new
|
||||
# data and clear all messages afterwards. When set to `:none` or `false`
|
||||
# we auto flush as above but clear all data from the buffer afterwards.
|
||||
# See {#buffering} for details.
|
||||
# @param allow_silent [Boolean] When set to `true` the data in this buffer
|
||||
# will be flushed to the flows, even if there were just added fields or
|
||||
# tags without any logged messages. If this is `false` and there were no
|
||||
# messages logged with {#add_message}, the buffer will not be flushed but
|
||||
# will be silently dropped.
|
||||
def initialize(flows, buffering: true, allow_silent: true)
|
||||
def initialize(flows, buffering: :full, allow_silent: true)
|
||||
@flows = flows
|
||||
@buffering = !!buffering
|
||||
|
||||
@buffering = case buffering
|
||||
when :full, true
|
||||
:full
|
||||
when :data
|
||||
:data
|
||||
when :none, false
|
||||
:none
|
||||
else
|
||||
raise TypeError, "Unknown buffering argument given: #{buffering.inspect}"
|
||||
end
|
||||
|
||||
@allow_silent = !!allow_silent
|
||||
|
||||
# initialize the internal data structures for fields, tags, ...
|
||||
@ -112,9 +145,9 @@ module Rackstash
|
||||
# The buffer's timestamp will be initialized with the current time if it
|
||||
# wasn't set earlier already.
|
||||
#
|
||||
# If the buffer is not {#buffering?}, it will be {#flush}ed and {#clear}ed
|
||||
# after each added message. All fields, tags, and messages added before as
|
||||
# well as the fields added with this method call will be flushed.
|
||||
# If the buffer is not fully {#buffering}, the buffer will be {#flush}ed to
|
||||
# the flows. Afterwards, all messages will be cleared. New and existing
|
||||
# fields and tags will be cleared only if {#buffering} is set to `:none`.
|
||||
#
|
||||
# @param hash (see Fields::Hash#deep_merge!)
|
||||
# @raise (see Fields::Hash#deep_merge!)
|
||||
@ -135,9 +168,9 @@ module Rackstash
|
||||
# The buffer's timestamp will be initialized with the time of the first
|
||||
# added message if it wasn't set earlier already.
|
||||
#
|
||||
# If the buffer is not {#buffering?}, it will be {#flush}ed and {#clear}ed
|
||||
# after each added message. All fields, tags, and messages added before as
|
||||
# well as the message added with this method call will be flushed.
|
||||
# If the buffer is not fully {#buffering}, the buffer will be {#flush}ed to
|
||||
# the flows. Afterwards, all messages will be cleared. Fields and tags will
|
||||
# be cleared only if {#buffering} is set to `:none`.
|
||||
#
|
||||
# @param message [Message] A {Message} to add to the current message
|
||||
# buffer.
|
||||
@ -163,26 +196,22 @@ module Rackstash
|
||||
@allow_silent
|
||||
end
|
||||
|
||||
# When set to `true` in {#initialize}, this buffer is considered to be
|
||||
# buffering data. When buffering, logged messages will not be flushed
|
||||
# immediately but only with an explicit call to {#flush}.
|
||||
#
|
||||
# @return [Boolean] true if the current buffer is intended to hold buffered
|
||||
# data of multiple log calls
|
||||
def buffering?
|
||||
@buffering
|
||||
end
|
||||
|
||||
# Clear the current buffer from all stored data, just as it was right after
|
||||
# inititialization.
|
||||
#
|
||||
# @param everything [Boolean] When set to `true`, we clear {#messages},
|
||||
# {#fields}, {#tags} and the {#timestamp}. When set to `false`, we only
|
||||
# clear the {#messages} and the {#timestamp} but retain he other data.
|
||||
# @return [self]
|
||||
def clear
|
||||
def clear(everything = true)
|
||||
@messages = []
|
||||
@fields = nil
|
||||
@tags = nil
|
||||
@timestamp = nil
|
||||
|
||||
if everything
|
||||
@fields = nil
|
||||
@tags = nil
|
||||
end
|
||||
|
||||
self
|
||||
end
|
||||
|
||||
@ -337,10 +366,16 @@ module Rackstash
|
||||
# By calling `auto_flush`, the current buffer is flushed and cleared if
|
||||
# necessary.
|
||||
def auto_flush
|
||||
return if buffering?
|
||||
|
||||
flush
|
||||
clear
|
||||
case @buffering
|
||||
when :full
|
||||
return
|
||||
when :data
|
||||
flush
|
||||
clear(false)
|
||||
when :none
|
||||
flush
|
||||
clear(true)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -28,7 +28,7 @@ module Rackstash
|
||||
#
|
||||
# @return [Buffer]
|
||||
def current
|
||||
@stack.last || Buffer.new(@flows, buffering: false).tap do |buffer|
|
||||
@stack.last || Buffer.new(@flows, buffering: :none).tap do |buffer|
|
||||
@stack.push buffer
|
||||
end
|
||||
end
|
||||
|
||||
@ -124,9 +124,9 @@ describe Rackstash::Buffer do
|
||||
end
|
||||
end
|
||||
|
||||
context 'when buffering?' do
|
||||
context 'with buffering: :full' do
|
||||
before do
|
||||
buffer_options[:buffering] = true
|
||||
buffer_options[:buffering] = :full
|
||||
end
|
||||
|
||||
it 'does not call #flush' do
|
||||
@ -137,13 +137,14 @@ describe Rackstash::Buffer do
|
||||
it 'does not call #clear' do
|
||||
expect(buffer).not_to receive(:clear)
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields['key']).to eql 'value'
|
||||
end
|
||||
end
|
||||
|
||||
context 'when not buffering?' do
|
||||
context 'with buffering: :data' do
|
||||
before do
|
||||
buffer_options[:buffering] = false
|
||||
buffer_options[:buffering] = :data
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
@ -151,10 +152,29 @@ describe Rackstash::Buffer do
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
|
||||
it 'calls #clear' do
|
||||
allow(buffer).to receive(:flush)
|
||||
expect(buffer).to receive(:clear).and_call_original
|
||||
it 'clears only messages' do
|
||||
allow(buffer).to receive(:flush).once
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields).to_not be_empty
|
||||
expect(buffer.pending?).to be true
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :none' do
|
||||
before do
|
||||
buffer_options[:buffering] = :none
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
buffer.add_fields(key: 'value')
|
||||
end
|
||||
|
||||
it 'clears the whole buffer' do
|
||||
allow(buffer).to receive(:flush).once
|
||||
buffer.add_fields(key: 'value')
|
||||
|
||||
expect(buffer.fields).to be_empty
|
||||
expect(buffer.pending?).to be false
|
||||
end
|
||||
@ -182,9 +202,9 @@ describe Rackstash::Buffer do
|
||||
expect(buffer.timestamp).to eql time.getutc
|
||||
end
|
||||
|
||||
context 'when buffering?' do
|
||||
context 'with buffering: :full' do
|
||||
before do
|
||||
buffer_options[:buffering] = true
|
||||
buffer_options[:buffering] = :full
|
||||
end
|
||||
|
||||
it 'does not call #flush' do
|
||||
@ -192,16 +212,15 @@ describe Rackstash::Buffer do
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
it 'does not call #clear' do
|
||||
expect(buffer).not_to receive(:clear)
|
||||
it 'retains messages' do
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
expect(buffer.messages.count).to eql 1
|
||||
end
|
||||
end
|
||||
|
||||
context 'when not buffering?' do
|
||||
context 'with buffering: :none' do
|
||||
before do
|
||||
buffer_options[:buffering] = false
|
||||
buffer_options[:buffering] = :data
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
@ -209,24 +228,61 @@ describe Rackstash::Buffer do
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
it 'calls #clear' do
|
||||
it 'clears messages' do
|
||||
allow(buffer).to receive(:flush)
|
||||
expect(buffer).to receive(:clear).and_call_original
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
|
||||
expect(buffer.messages.count).to eql 0
|
||||
expect(buffer.pending?).to be false
|
||||
end
|
||||
end
|
||||
|
||||
context 'with buffering: :none' do
|
||||
before do
|
||||
buffer_options[:buffering] = :none
|
||||
end
|
||||
|
||||
it 'calls #flush' do
|
||||
expect(buffer).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
end
|
||||
|
||||
it 'clears messages' do
|
||||
allow(buffer).to receive(:flush)
|
||||
buffer.add_message double(message: 'Hello World!', time: Time.now)
|
||||
|
||||
expect(buffer.messages.count).to eql 0
|
||||
expect(buffer.pending?).to be false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#buffering?' do
|
||||
it 'defaults to false' do
|
||||
expect(buffer.buffering?).to be true
|
||||
describe '#buffering' do
|
||||
it 'defaults to :full' do
|
||||
expect(buffer.buffering).to eql :full
|
||||
end
|
||||
|
||||
it 'can be overwritten in initialize' do
|
||||
buffer_options[:buffering] = false
|
||||
expect(buffer.buffering?).to be false
|
||||
it 'can be set to :full' do
|
||||
expect(described_class.new(flows, buffering: true).buffering).to eql :full
|
||||
expect(described_class.new(flows, buffering: :full).buffering).to eql :full
|
||||
end
|
||||
|
||||
it 'can be set to :data' do
|
||||
expect(described_class.new(flows, buffering: :data).buffering).to eql :data
|
||||
end
|
||||
|
||||
it 'can be set to :none' do
|
||||
expect(described_class.new(flows, buffering: false).buffering).to eql :none
|
||||
expect(described_class.new(flows, buffering: :none).buffering).to eql :none
|
||||
end
|
||||
|
||||
it 'does not allow other values' do
|
||||
expect { described_class.new(flows, buffering: nil) }
|
||||
.to raise_error(TypeError)
|
||||
expect { described_class.new(flows, buffering: :invalid) }
|
||||
.to raise_error(TypeError)
|
||||
expect { described_class.new(flows, buffering: 'full') }
|
||||
.to raise_error(TypeError)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -48,12 +48,12 @@ describe Rackstash::BufferStack do
|
||||
|
||||
it 'pushes a buffering buffer by default' do
|
||||
stack.push
|
||||
expect(stack.current.buffering?).to be true
|
||||
expect(stack.current.buffering).to eql :full
|
||||
end
|
||||
|
||||
it 'allows to set options on the new buffer' do
|
||||
stack.push(buffering: false)
|
||||
expect(stack.current.buffering?).to be false
|
||||
stack.push(buffering: :data)
|
||||
expect(stack.current.buffering).to eql :data
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user