From 6b4f009107d0e33d19af5103a1fc2e2c6e7a71a7 Mon Sep 17 00:00:00 2001 From: Holger Just Date: Tue, 10 Apr 2018 22:34:18 +0200 Subject: [PATCH] Introduce auto_flushing Flows to simplify the buffering behavior of Buffers Instead of defining the specific buffering behavior on a Buffer, we can now mark individual flows as auto_flushing or now. An auto_flushing Flow with a buffering Buffer behaves the same as a Buffer with `buffering: :data` would before. This allows us to simplify the buffering logic on the Buffer. Also, we can now use "normal" flows and auto_flushing flows on the same logger in parallel. Each of them behaves as expected with the same unchanged logger code. It is thus easier to define behavior for a development or production environment of an app since the necessary changes can all be defined on the logger itself (through the defined flows) without having to adapt the code which creates suitable Buffers with the Logger#with_buffer method in any way. --- lib/rackstash/buffer.rb | 177 ++++++++++++++++------------ lib/rackstash/filter_chain.rb | 2 +- lib/rackstash/flow.rb | 45 ++++++- lib/rackstash/flows.rb | 70 +++++++---- spec/rackstash/buffer_spec.rb | 143 +++++++++------------- spec/rackstash/buffer_stack_spec.rb | 6 +- spec/rackstash/flows_spec.rb | 78 +++++++++++- spec/rackstash/logger_spec.rb | 2 +- 8 files changed, 332 insertions(+), 191 deletions(-) diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index b5d4c7e..ef55867 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true # -# Copyright 2017 Holger Just +# Copyright 2017 - 2018 Holger Just # # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE.txt file for details. @@ -57,53 +57,23 @@ module Rackstash # for transforming, encoding, and persisting the log events. attr_reader :flows - # Returns a Symbol describing the buffering behavior of the current buffer. - # This value can be set in {#initialize}. - # - # When set to `true` or `:full` this buffer is buffering all its messages - # and stored data. it will never automatically flush anything to the flows - # but when explicitly calling {#flush}. - # - # When set to `:data` or `:none`, the buffer automatically flushes all - # messages and data when adding a new message or, with {#allow_silent?} - # being `true`, also when adding fields. After each automatic {#flush}, all - # {#messages} and the {#timestamp} are cleared from the buffer. If - # {#buffering} is set to `:none`, we also clear the stored {#fields} and - # {#tags} in addition to the other data during an automatic flush. - # If {#buffering} is set to `:data`, all stored data except the {#messages} - # and the {#timestamp} are retained after an auto flush. - # - # @return [Symbol] the buffering behavior - attr_reader :buffering - # @param flows [Flows] a list of {Flow} objects where this buffer eventually # writes to - # @param buffering [Symbol, Boolean] defines the buffering behavior of the - # buffer. When set to `true` or `:full`, we buffer all data and never - # automatically flush. When set to `:data`, we auto flush on adding new - # data and clear all messages afterwards. When set to `:none` or `false` - # we auto flush as above but clear all data from the buffer afterwards. + # @param buffering [Boolean] defines the buffering behavior of the buffer. + # When set to `true` we buffer all stored data which can be flushed to the + # {#flows} manually. In this mode, we still automatically flush newly + # added data to interested flows directly after adding it. When set to + # `false` we automatically flush to all flows as above but we will clear + # all stored data from the buffer afterwards. # See {#buffering} for details. # @param allow_silent [Boolean] When set to `true` the data in this buffer # will be flushed to the flows, even if there were just added fields or # tags without any logged messages. If this is `false` and there were no # messages logged with {#add_message}, the buffer will not be flushed but # will be silently dropped. - def initialize(flows, buffering: :full, allow_silent: true) + def initialize(flows, buffering: true, allow_silent: true) @flows = flows - - @buffering = - case buffering - when :full, true - :full - when :data - :data - when :none, false - :none - else - raise TypeError, "Unknown buffering argument given: #{buffering.inspect}" - end - + @buffering = !!buffering @allow_silent = !!allow_silent # initialize the internal data structures for fields, tags, ... @@ -124,7 +94,8 @@ module Rackstash # older exceptions in the current buffer. Only by the `force` argument to # `false`, we will preserve existing exceptions. # - # @param exception [Exception] an Exception object as catched by `rescue` + # @param exception [Exception] an Exception object as caught by a + # `begin` ... `rescue` block. # @param force [Boolean] set to `false` to preserve the details of an # existing exception in the current buffer's fields, set to `true` to # overwrite them. @@ -132,7 +103,7 @@ module Rackstash def add_exception(exception, force: true) return exception unless force || fields[FIELD_ERROR].nil? - fields.merge!( + add_fields( FIELD_ERROR => exception.class.name, FIELD_ERROR_MESSAGE => exception.message, FIELD_ERROR_TRACE => (exception.backtrace || []).join("\n") @@ -179,8 +150,7 @@ module Rackstash def add_message(message) timestamp(message.time) @messages << message - - auto_flush + auto_flush(message) message end @@ -197,21 +167,35 @@ module Rackstash @allow_silent end + # The buffering behavior of the current buffer. This value can be set in + # {#initialize}. + # + # When set to `true` this buffer is buffering all its messages and stored + # data. When explicitly calling {#flush}, all the stored data is flushed to + # all {#flows}. To interested flows, we will also automatically flush newly + # added messages along with the stored fields and tags after adding a + # message. If {#allow_silent?} is `true`, we also do this when adding fields + # with {#add_fields} and {#add_exception}. + # + # If {#buffering} is set to `false`, we will automatically flush the buffer + # the same way as before, this time to all buffers however. In addition, we + # will also clear the all stored stored {#fields}, {#tags}, the {#messages} + # and the {#timestamp}. + # + # @return [Boolean] the buffering behavior + def buffering? + @buffering + end + # Clear the current buffer from all stored data, just as it was right after # inititialization. # - # @param everything [Boolean] When set to `true`, we clear {#messages}, - # {#fields}, {#tags} and the {#timestamp}. When set to `false`, we only - # clear the {#messages} and the {#timestamp} but retain he other data. # @return [self] - def clear(everything = true) + def clear @messages = [] @timestamp = nil - - if everything - @fields = nil - @tags = nil - end + @fields = nil + @tags = nil self end @@ -230,16 +214,16 @@ module Rackstash def flush return unless pending? - @flows.write(self) + @flows.flush(event) self end # Return all logged messages on the current buffer. # - # @return [Array] the list of messages of the curent buffer - # @note You can not add messsages to the buffer by modifying this array. + # @return [Array] the list of messages of the current buffer + # @note You can not add messages to the buffer by modifying this array. # Instead, use {#add_message} to add new messages or add filters to the - # responsible {Flow} to remove or change messages. + # responsible {Flow} to remove or change already added messages. def messages @messages.dup end @@ -344,7 +328,7 @@ module Rackstash # All hashes (including nested hashes) use `String` keys. # # @return [Hash] the event expected by the event {Filter}s. - def to_h + def event event = fields.to_h event[FIELD_TAGS] = tags.to_a event[FIELD_MESSAGE] = messages @@ -352,27 +336,74 @@ module Rackstash event end + alias to_h event + alias as_json event private - # Non-buffering buffers, i.e., those with `buffering: false`, flush - # themselves to the defined flows whenever there is something logged to it. - # That way, such a buffer acts like a regular old Logger would: it just - # flushes a logged message to its log device as soon as it is logged. + # Write the data contained in this Buffer to interested {Flow} objects. # - # By calling `auto_flush`, the current buffer is flushed and cleared if - # necessary. - def auto_flush - case @buffering - when :full - return - when :data - flush - clear(false) - when :none - flush - clear(true) + # This method is called after adding new data to the Buffer. Here, we write + # the newly added data to the flows, depending on their type: + # + # Flows with enabled `auto_flush?` will receive an event Hash containing all + # of the current Buffer's fields and tags but only the single currently + # logged message (if any). This happens regardless of whether the current + # Buffer is {#buffering?} or not. + # + # In addition to that, if the current Buffer is not {buffering?}, we write + # pending data to "normal" flows and {#clear} the Buffer afterwards. Such a + # buffer thus acts like a regular old Logger would: it just flushes a logged + # message to its log device as soon as it is added. + # + # Buffering Buffers are not automatically flushed to "normal" flows here. + # They need to be explicitly flushed with {Buffer#flush} in order for their + # buffered data to be written to the normal flows. + # + # @param message [Message, nil] The currently logged message which is added + # to the {#auto_event}. If kept empty (i.e. with `nil`), we do not + # add any messages. + # @return [void] + def auto_flush(message = nil) + # Write the auto_event with the current message (if any) to the + # auto_flushing Flows + flows.auto_flush { auto_event(message) } + + if !buffering? && pending? + flows.flush { event } + clear end end + + # Creates the automatically flushed event hash. It is similar to the one + # created by {#event} but only uses the passed `message` instead of all + # {#messages} and uses either the `message`'s timestamp or the current time + # but never the current Buffer's timestamp. + # + # This event is used to represent an intermediate state of a Buffer which + # can be flushed to interested flows early. + # + # @param message [Message, nil] The currently logged message which is added + # to the auto_event Hash. If kept empty (i.e. with `nil`), we do not + # add any messages. + # @return [Hash] the event Hash for the currently added data. + # @see #event + def auto_event(message = nil) + event = fields.to_h + event[FIELD_TAGS] = tags.to_a + + if message + event[FIELD_MESSAGE] = [message] + + time = message.time + time = time.getutc.freeze unless time.utc? && time.frozen? + event[FIELD_TIMESTAMP] = time + else + event[FIELD_MESSAGE] = [] + event[FIELD_TIMESTAMP] = Time.now.utc.freeze + end + + event + end end end diff --git a/lib/rackstash/filter_chain.rb b/lib/rackstash/filter_chain.rb index 5f39194..47689ac 100644 --- a/lib/rackstash/filter_chain.rb +++ b/lib/rackstash/filter_chain.rb @@ -104,7 +104,7 @@ module Rackstash # the writing of an individual event. Any other return value of filters is # ignored. # - # @param event [Hash] an event hash, see {Buffer#to_event} for details + # @param event [Hash] an event hash, see {Buffer#event} for details # @return [Hash, false] the filtered event or `false` if any of the # filters returned `false` def call(event) diff --git a/lib/rackstash/flow.rb b/lib/rackstash/flow.rb index 8f9196c..9271122 100644 --- a/lib/rackstash/flow.rb +++ b/lib/rackstash/flow.rb @@ -60,8 +60,8 @@ module Rackstash # # Write an event. This is normally done by a Rackstash::Buffer # flow.write(an_event) # - # The event which eventually gets written to the flow is created from a Buffer - # with {Buffer#to_event}. + # The event which eventually gets written to the flow is usually created from + # a {Buffer} with its pending data. class Flow # @return [Adapter::Adapter] the log adapter attr_reader :adapter @@ -83,11 +83,18 @@ module Rackstash # default, we swallow errors after having logging them to not cause # additional issues to the production application just because the logger # doesn't work. + # @param auto_flush [Bool] set to `true` to write added fields or messages + # added to a {Buffer} to this flow immediately. With each write, this flow + # will then receive all current fields of the {Buffer} but only the + # currently added message (if any). When set to `false`, the flow will + # receive the full event with all fields and messages of the Buffer after + # an explicit call to {Buffer#flush} for a buffering Buffer or after each + # added message or fields for a non-bufering Buffer. # @yieldparam flow [self] if the given block accepts an argument, we yield # `self` as a parameter, else, the block is directly executed in the # context of `self`. def initialize(adapter, encoder: nil, filters: [], - error_flow: nil, raise_on_error: false, + error_flow: nil, raise_on_error: false, auto_flush: false, &block ) @adapter = Rackstash::Adapter[adapter] @@ -95,6 +102,7 @@ module Rackstash @filter_chain = Rackstash::FilterChain.new(filters) self.error_flow = error_flow self.raise_on_error = raise_on_error + self.auto_flush = auto_flush if block_given? if block.arity == 0 @@ -105,6 +113,37 @@ module Rackstash end end + # Get or set the `auto_flush` setting. If set to `true`, new messages and + # fields added to a {Buffer} will be written directly to this flow, + # regardless of the buffering setting of the {Buffer}. This can be useful + # during development or testing of an application where the developer might + # want to directly watch the low-cardinality log as the messages are logged. + # + # If set to `false` (the default), buffering Buffers will only be written + # after explicitly calling {Buffer#flush} on them. + # + # @param bool [Bool, nil] the value to set. If omitted, we return the + # current setting. + # @return [Bool] the updated or current `auto_flush` setting + # @see #auto_flush= + def auto_flush(bool = nil) + self.auto_flush = bool unless bool.nil? + auto_flush? + end + + # @return [Bool] the current value of the `auto_flush` setting. + # @see #auto_flush + def auto_flush? + @auto_flush + end + + # @param bool [Bool] `true` to cause buffering Buffers to write their added + # messages and fields to the flow as soon as they are logged, `false` to + # write the whole event only on an explicit call to {Buffer#flush}. + def auto_flush=(bool) + @auto_flush = !!bool + end + # Close the log adapter if supported. This might be a no-op if the adapter # does not support closing. # diff --git a/lib/rackstash/flows.rb b/lib/rackstash/flows.rb index 054c5a5..3c37ec5 100644 --- a/lib/rackstash/flows.rb +++ b/lib/rackstash/flows.rb @@ -164,33 +164,49 @@ module Rackstash nil end - # Write an event `Hash` to each of the defined flows. The event is usually - # created from {Buffer#to_event}. + # Write an event `Hash` to each of the defined normal (that is: non + # `auto_flush`'ing) flows. The event is usually created from {Buffer#event}. # # We write a fresh deep-copy of the event hash to each defined {Flow}. # This allows each flow to alter the event in any way without affecting the # others. # - # @param event [Hash] an event `Hash` - # @return [Hash] the given `event` - def write(event) - # Memoize the current list of flows for the rest of the method to make - # sure it doesn't change while we work with it. - flows = to_a - flows_size = flows.size + # @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details + # @yield [flow] If `event` is `nil`, we call the goven block and use its + # return value as the event. The block is expected to return an event + # `Hash`. + # @return [Hash, nil] the flushed event or `nil` if nothing was flushed + def flush(event = nil) + flows = to_a.delete_if(&:auto_flush?) + return unless flows.any? - event = event.to_h - flows.each_with_index do |flow, index| - # If we have more than one flow, we provide a fresh copy of the event - # to each flow. The flow's filters and encoder can then mutate the event - # however it pleases without affecting later flows. We don't need to - # duplicate the event for the last flow since it won't be re-used - # after that anymore. - current_event = (index == flows_size - 1) ? event : deep_dup_event(event) - flow.write(current_event) - end + event ||= yield if block_given? + return unless event - event + write_to(flows, event.to_h) + end + + # Write an event `Hash` to each of the defined `auto_flush`'ing flows. + # The event is usually created from {Buffer#auto_event} as the buffer is + # automatically flushed after a message or fields were logged to it. + # + # We write a fresh deep-copy of the event hash to each defined {Flow}. + # This allows each flow to alter the event in any way without affecting the + # others. + # + # @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details + # @yield [flow] If `event` is `nil`, we call the goven block and use its + # return value as the event. The block is expected to return an event + # `Hash`. + # @return [Hash, nil] the flushed event or `nil` if nothing was flushed + def auto_flush(event = nil) + flows = to_a.select!(&:auto_flush?) + return unless flows.any? + + event ||= yield if block_given? + return unless event + + write_to(flows, event.to_h) end # @return [Array] an array of all flow elements without any `nil` @@ -241,5 +257,19 @@ module Rackstash obj end end + + def write_to(flows, event) + flows.each_with_index do |flow, index| + # If we have more than one flow, we provide a fresh copy of the event + # to each flow. The flow's filters and encoder can then mutate the event + # however it pleases without affecting later flows. We don't need to + # duplicate the event for the last flow since it won't be re-used + # after that anymore. + current_event = (index == flows.size - 1) ? event : deep_dup_event(event) + flow.write(current_event) + end + + event + end end end diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index 5fcd1ae..0270223 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -11,7 +11,14 @@ require 'rackstash/buffer' RSpec.describe Rackstash::Buffer do let(:buffer_options) { {} } - let(:flows) { instance_double(Rackstash::Flows) } + + let(:flows) { + instance_double(Rackstash::Flows).tap do |flows| + allow(flows).to receive(:flush) + allow(flows).to receive(:auto_flush) + end + } + let(:buffer) { described_class.new(flows, **buffer_options) } describe '#allow_silent?' do @@ -111,6 +118,11 @@ RSpec.describe Rackstash::Buffer do buffer.add_fields(key: 'value') expect(buffer.pending?).to be true end + + it 'calls auto_flush' do + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') + end end context 'when not allow_silent?' do @@ -122,57 +134,49 @@ RSpec.describe Rackstash::Buffer do buffer.add_fields(key: 'value') expect(buffer.pending?).to be false end + + it 'calls auto_flush' do + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') + end end - context 'with buffering: :full' do + context 'with buffering: true' do before do - buffer_options[:buffering] = :full + buffer_options[:buffering] = true end - it 'does not call #flush' do - expect(buffer).not_to receive(:flush) + it 'does not flush the buffer' do + expect(flows).not_to receive(:flush) + # We always auto_flush buffers to send the newly added fields to + # interested flows + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') end - it 'does not call #clear' do + it 'does not clear the buffer' do expect(buffer).not_to receive(:clear) buffer.add_fields(key: 'value') expect(buffer.fields['key']).to eql 'value' - end - end - - context 'with buffering: :data' do - before do - buffer_options[:buffering] = :data - end - - it 'calls #flush' do - expect(buffer).to receive(:flush) - buffer.add_fields(key: 'value') - end - - it 'clears only messages' do - allow(buffer).to receive(:flush).once - buffer.add_fields(key: 'value') - - expect(buffer.fields).to_not be_empty expect(buffer.pending?).to be true end end - context 'with buffering: :none' do + context 'with buffering: false' do before do - buffer_options[:buffering] = :none + buffer_options[:buffering] = false end - it 'calls #flush' do - expect(buffer).to receive(:flush) + it 'flushes the buffer' do + expect(flows).to receive(:flush) + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') end - it 'clears the whole buffer' do - allow(buffer).to receive(:flush).once + it 'clears the buffer' do buffer.add_fields(key: 'value') expect(buffer.fields).to be_empty @@ -202,13 +206,13 @@ RSpec.describe Rackstash::Buffer do expect(buffer.timestamp).to eql time.getutc end - context 'with buffering: :full' do + context 'with buffering: true' do before do - buffer_options[:buffering] = :full + buffer_options[:buffering] = true end - it 'does not call #flush' do - expect(buffer).not_to receive(:flush) + it 'does not flush the buffer' do + expect(flows).not_to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) end @@ -218,37 +222,17 @@ RSpec.describe Rackstash::Buffer do end end - context 'with buffering: :none' do + context 'with buffering: false' do before do - buffer_options[:buffering] = :data + buffer_options[:buffering] = false end - it 'calls #flush' do - expect(buffer).to receive(:flush) + it 'flushes the buffer' do + expect(flows).to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) end it 'clears messages' do - allow(buffer).to receive(:flush) - buffer.add_message double(message: 'Hello World!', time: Time.now) - - expect(buffer.messages.count).to eql 0 - expect(buffer.pending?).to be false - end - end - - context 'with buffering: :none' do - before do - buffer_options[:buffering] = :none - end - - it 'calls #flush' do - expect(buffer).to receive(:flush) - buffer.add_message double(message: 'Hello World!', time: Time.now) - end - - it 'clears messages' do - allow(buffer).to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) expect(buffer.messages.count).to eql 0 @@ -258,31 +242,18 @@ RSpec.describe Rackstash::Buffer do end describe '#buffering' do - it 'defaults to :full' do - expect(buffer.buffering).to eql :full + it 'defaults to true' do + expect(buffer.buffering?).to eql true end - it 'can be set to :full' do - expect(described_class.new(flows, buffering: true).buffering).to eql :full - expect(described_class.new(flows, buffering: :full).buffering).to eql :full + it 'can be set to true' do + expect(described_class.new(flows, buffering: true).buffering?).to be true + expect(described_class.new(flows, buffering: 'whatever').buffering?).to be true end - it 'can be set to :data' do - expect(described_class.new(flows, buffering: :data).buffering).to eql :data - end - - it 'can be set to :none' do - expect(described_class.new(flows, buffering: false).buffering).to eql :none - expect(described_class.new(flows, buffering: :none).buffering).to eql :none - end - - it 'does not allow other values' do - expect { described_class.new(flows, buffering: nil) } - .to raise_error(TypeError) - expect { described_class.new(flows, buffering: :invalid) } - .to raise_error(TypeError) - expect { described_class.new(flows, buffering: 'full') } - .to raise_error(TypeError) + it 'can be set to false' do + expect(described_class.new(flows, buffering: false).buffering?).to be false + expect(described_class.new(flows, buffering: nil).buffering?).to be false end end @@ -346,13 +317,13 @@ RSpec.describe Rackstash::Buffer do end context 'when pending?' do - let(:time) { Time.now } + let(:time) { Time.parse('2016-10-17 15:37:00 +02:00') } let(:message) { double(message: 'Hello World!', time: time) } let(:event) { { 'message' => [message], 'tags' => [], - '@timestamp' => Time.now + '@timestamp' => time } } @@ -360,11 +331,11 @@ RSpec.describe Rackstash::Buffer do buffer.add_message(message) # We might call Buffer#flush during the following tests - allow(flows).to receive(:write).with(buffer).once + allow(flows).to receive(:flush).with(event).once end it 'flushes the buffer to the flows' do - expect(flows).to receive(:write).with(buffer).once + expect(flows).to receive(:flush).with(event).once buffer.flush end @@ -381,7 +352,7 @@ RSpec.describe Rackstash::Buffer do context 'when not pending?' do it 'does not flushes the buffer to the flows' do - expect(flows).not_to receive(:write) + expect(flows).not_to receive(:flush) buffer.flush end @@ -562,7 +533,7 @@ RSpec.describe Rackstash::Buffer do end end - describe '#to_h' do + describe '#event' do it 'creates an event hash' do message = double(message: 'Hello World', time: Time.now) allow(message) @@ -570,7 +541,7 @@ RSpec.describe Rackstash::Buffer do buffer.fields[:foo] = 'bar' buffer.tags << 'some_tag' - expect(buffer.to_h).to match( + expect(buffer.event).to match( 'foo' => 'bar', 'message' => [message], 'tags' => ['some_tag'], diff --git a/spec/rackstash/buffer_stack_spec.rb b/spec/rackstash/buffer_stack_spec.rb index 82edbed..7e956c7 100644 --- a/spec/rackstash/buffer_stack_spec.rb +++ b/spec/rackstash/buffer_stack_spec.rb @@ -48,12 +48,12 @@ RSpec.describe Rackstash::BufferStack do it 'pushes a buffering buffer by default' do stack.push - expect(stack.current.buffering).to eql :full + expect(stack.current.buffering?).to eql true end it 'allows to set options on the new buffer' do - stack.push(buffering: :data) - expect(stack.current.buffering).to eql :data + stack.push(buffering: false) + expect(stack.current.buffering?).to eql false end end diff --git a/spec/rackstash/flows_spec.rb b/spec/rackstash/flows_spec.rb index 9632ffa..a97f4e0 100644 --- a/spec/rackstash/flows_spec.rb +++ b/spec/rackstash/flows_spec.rb @@ -13,9 +13,11 @@ require 'rackstash/flow' RSpec.describe Rackstash::Flows do let(:flows) { described_class.new } - def a_flow + def a_flow(auto_flush: false) flow = instance_double('Rackstash::Flow') allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true) + allow(flow).to receive(:auto_flush?).and_return(auto_flush) + flow end @@ -323,8 +325,8 @@ RSpec.describe Rackstash::Flows do end end - describe '#write' do - it 'flushes the buffer to all flows' do + describe '#flush' do + it 'writes the event to the flows' do event_spec = { 'foo' => 'bar', 'tags' => [], @@ -344,7 +346,75 @@ RSpec.describe Rackstash::Flows do # During flush, we create a single event, duplicate it and write each to # each of the flows. - flows.write('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc) + flows.flush('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc) + end + + it 'writes only to normal flows' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).to receive(:write).with('foo' => 'bar') + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + flows.flush('foo' => 'bar') + end + + it 'accepts a block' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).to receive(:write).with('foo' => 'bar') + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + flows.flush { { 'foo' => 'bar'} } + end + + it 'does nothing if there is no normal flow' do + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + expect(flows.flush('foo' => 'bar')).to be_nil + expect { |b| flows.flush(&b) }.not_to yield_control + end + end + + describe '#auto_flush' do + it 'writes only to auto_flush flows' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).to receive(:write).with('foo' => 'bar') + flows << auto_flush_flow + + flows.auto_flush('foo' => 'bar') + end + + it 'accepts a block' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).to receive(:write).with('foo' => 'bar') + flows << auto_flush_flow + + flows.auto_flush { { 'foo' => 'bar'} } + end + + it 'does nothing if there is no auto_flushing flow' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + expect(flows.auto_flush('foo' => 'bar')).to be_nil + expect { |b| flows.auto_flush(&b) }.not_to yield_control end end end diff --git a/spec/rackstash/logger_spec.rb b/spec/rackstash/logger_spec.rb index 36643ac..adffcc4 100644 --- a/spec/rackstash/logger_spec.rb +++ b/spec/rackstash/logger_spec.rb @@ -531,7 +531,7 @@ RSpec.describe Rackstash::Logger do end it 'buffers multiple messages' do - expect(logger.flows).to receive(:write).once + expect(logger.flows).to receive(:flush).once logger.with_buffer do logger.add 1, 'Hello World'