diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index b5d4c7e..ef55867 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true # -# Copyright 2017 Holger Just +# Copyright 2017 - 2018 Holger Just # # This software may be modified and distributed under the terms # of the MIT license. See the LICENSE.txt file for details. @@ -57,53 +57,23 @@ module Rackstash # for transforming, encoding, and persisting the log events. attr_reader :flows - # Returns a Symbol describing the buffering behavior of the current buffer. - # This value can be set in {#initialize}. - # - # When set to `true` or `:full` this buffer is buffering all its messages - # and stored data. it will never automatically flush anything to the flows - # but when explicitly calling {#flush}. - # - # When set to `:data` or `:none`, the buffer automatically flushes all - # messages and data when adding a new message or, with {#allow_silent?} - # being `true`, also when adding fields. After each automatic {#flush}, all - # {#messages} and the {#timestamp} are cleared from the buffer. If - # {#buffering} is set to `:none`, we also clear the stored {#fields} and - # {#tags} in addition to the other data during an automatic flush. - # If {#buffering} is set to `:data`, all stored data except the {#messages} - # and the {#timestamp} are retained after an auto flush. - # - # @return [Symbol] the buffering behavior - attr_reader :buffering - # @param flows [Flows] a list of {Flow} objects where this buffer eventually # writes to - # @param buffering [Symbol, Boolean] defines the buffering behavior of the - # buffer. When set to `true` or `:full`, we buffer all data and never - # automatically flush. When set to `:data`, we auto flush on adding new - # data and clear all messages afterwards. When set to `:none` or `false` - # we auto flush as above but clear all data from the buffer afterwards. + # @param buffering [Boolean] defines the buffering behavior of the buffer. + # When set to `true` we buffer all stored data which can be flushed to the + # {#flows} manually. In this mode, we still automatically flush newly + # added data to interested flows directly after adding it. When set to + # `false` we automatically flush to all flows as above but we will clear + # all stored data from the buffer afterwards. # See {#buffering} for details. # @param allow_silent [Boolean] When set to `true` the data in this buffer # will be flushed to the flows, even if there were just added fields or # tags without any logged messages. If this is `false` and there were no # messages logged with {#add_message}, the buffer will not be flushed but # will be silently dropped. - def initialize(flows, buffering: :full, allow_silent: true) + def initialize(flows, buffering: true, allow_silent: true) @flows = flows - - @buffering = - case buffering - when :full, true - :full - when :data - :data - when :none, false - :none - else - raise TypeError, "Unknown buffering argument given: #{buffering.inspect}" - end - + @buffering = !!buffering @allow_silent = !!allow_silent # initialize the internal data structures for fields, tags, ... @@ -124,7 +94,8 @@ module Rackstash # older exceptions in the current buffer. Only by the `force` argument to # `false`, we will preserve existing exceptions. # - # @param exception [Exception] an Exception object as catched by `rescue` + # @param exception [Exception] an Exception object as caught by a + # `begin` ... `rescue` block. # @param force [Boolean] set to `false` to preserve the details of an # existing exception in the current buffer's fields, set to `true` to # overwrite them. @@ -132,7 +103,7 @@ module Rackstash def add_exception(exception, force: true) return exception unless force || fields[FIELD_ERROR].nil? - fields.merge!( + add_fields( FIELD_ERROR => exception.class.name, FIELD_ERROR_MESSAGE => exception.message, FIELD_ERROR_TRACE => (exception.backtrace || []).join("\n") @@ -179,8 +150,7 @@ module Rackstash def add_message(message) timestamp(message.time) @messages << message - - auto_flush + auto_flush(message) message end @@ -197,21 +167,35 @@ module Rackstash @allow_silent end + # The buffering behavior of the current buffer. This value can be set in + # {#initialize}. + # + # When set to `true` this buffer is buffering all its messages and stored + # data. When explicitly calling {#flush}, all the stored data is flushed to + # all {#flows}. To interested flows, we will also automatically flush newly + # added messages along with the stored fields and tags after adding a + # message. If {#allow_silent?} is `true`, we also do this when adding fields + # with {#add_fields} and {#add_exception}. + # + # If {#buffering} is set to `false`, we will automatically flush the buffer + # the same way as before, this time to all buffers however. In addition, we + # will also clear the all stored stored {#fields}, {#tags}, the {#messages} + # and the {#timestamp}. + # + # @return [Boolean] the buffering behavior + def buffering? + @buffering + end + # Clear the current buffer from all stored data, just as it was right after # inititialization. # - # @param everything [Boolean] When set to `true`, we clear {#messages}, - # {#fields}, {#tags} and the {#timestamp}. When set to `false`, we only - # clear the {#messages} and the {#timestamp} but retain he other data. # @return [self] - def clear(everything = true) + def clear @messages = [] @timestamp = nil - - if everything - @fields = nil - @tags = nil - end + @fields = nil + @tags = nil self end @@ -230,16 +214,16 @@ module Rackstash def flush return unless pending? - @flows.write(self) + @flows.flush(event) self end # Return all logged messages on the current buffer. # - # @return [Array] the list of messages of the curent buffer - # @note You can not add messsages to the buffer by modifying this array. + # @return [Array] the list of messages of the current buffer + # @note You can not add messages to the buffer by modifying this array. # Instead, use {#add_message} to add new messages or add filters to the - # responsible {Flow} to remove or change messages. + # responsible {Flow} to remove or change already added messages. def messages @messages.dup end @@ -344,7 +328,7 @@ module Rackstash # All hashes (including nested hashes) use `String` keys. # # @return [Hash] the event expected by the event {Filter}s. - def to_h + def event event = fields.to_h event[FIELD_TAGS] = tags.to_a event[FIELD_MESSAGE] = messages @@ -352,27 +336,74 @@ module Rackstash event end + alias to_h event + alias as_json event private - # Non-buffering buffers, i.e., those with `buffering: false`, flush - # themselves to the defined flows whenever there is something logged to it. - # That way, such a buffer acts like a regular old Logger would: it just - # flushes a logged message to its log device as soon as it is logged. + # Write the data contained in this Buffer to interested {Flow} objects. # - # By calling `auto_flush`, the current buffer is flushed and cleared if - # necessary. - def auto_flush - case @buffering - when :full - return - when :data - flush - clear(false) - when :none - flush - clear(true) + # This method is called after adding new data to the Buffer. Here, we write + # the newly added data to the flows, depending on their type: + # + # Flows with enabled `auto_flush?` will receive an event Hash containing all + # of the current Buffer's fields and tags but only the single currently + # logged message (if any). This happens regardless of whether the current + # Buffer is {#buffering?} or not. + # + # In addition to that, if the current Buffer is not {buffering?}, we write + # pending data to "normal" flows and {#clear} the Buffer afterwards. Such a + # buffer thus acts like a regular old Logger would: it just flushes a logged + # message to its log device as soon as it is added. + # + # Buffering Buffers are not automatically flushed to "normal" flows here. + # They need to be explicitly flushed with {Buffer#flush} in order for their + # buffered data to be written to the normal flows. + # + # @param message [Message, nil] The currently logged message which is added + # to the {#auto_event}. If kept empty (i.e. with `nil`), we do not + # add any messages. + # @return [void] + def auto_flush(message = nil) + # Write the auto_event with the current message (if any) to the + # auto_flushing Flows + flows.auto_flush { auto_event(message) } + + if !buffering? && pending? + flows.flush { event } + clear end end + + # Creates the automatically flushed event hash. It is similar to the one + # created by {#event} but only uses the passed `message` instead of all + # {#messages} and uses either the `message`'s timestamp or the current time + # but never the current Buffer's timestamp. + # + # This event is used to represent an intermediate state of a Buffer which + # can be flushed to interested flows early. + # + # @param message [Message, nil] The currently logged message which is added + # to the auto_event Hash. If kept empty (i.e. with `nil`), we do not + # add any messages. + # @return [Hash] the event Hash for the currently added data. + # @see #event + def auto_event(message = nil) + event = fields.to_h + event[FIELD_TAGS] = tags.to_a + + if message + event[FIELD_MESSAGE] = [message] + + time = message.time + time = time.getutc.freeze unless time.utc? && time.frozen? + event[FIELD_TIMESTAMP] = time + else + event[FIELD_MESSAGE] = [] + event[FIELD_TIMESTAMP] = Time.now.utc.freeze + end + + event + end end end diff --git a/lib/rackstash/filter_chain.rb b/lib/rackstash/filter_chain.rb index 5f39194..47689ac 100644 --- a/lib/rackstash/filter_chain.rb +++ b/lib/rackstash/filter_chain.rb @@ -104,7 +104,7 @@ module Rackstash # the writing of an individual event. Any other return value of filters is # ignored. # - # @param event [Hash] an event hash, see {Buffer#to_event} for details + # @param event [Hash] an event hash, see {Buffer#event} for details # @return [Hash, false] the filtered event or `false` if any of the # filters returned `false` def call(event) diff --git a/lib/rackstash/flow.rb b/lib/rackstash/flow.rb index 8f9196c..9271122 100644 --- a/lib/rackstash/flow.rb +++ b/lib/rackstash/flow.rb @@ -60,8 +60,8 @@ module Rackstash # # Write an event. This is normally done by a Rackstash::Buffer # flow.write(an_event) # - # The event which eventually gets written to the flow is created from a Buffer - # with {Buffer#to_event}. + # The event which eventually gets written to the flow is usually created from + # a {Buffer} with its pending data. class Flow # @return [Adapter::Adapter] the log adapter attr_reader :adapter @@ -83,11 +83,18 @@ module Rackstash # default, we swallow errors after having logging them to not cause # additional issues to the production application just because the logger # doesn't work. + # @param auto_flush [Bool] set to `true` to write added fields or messages + # added to a {Buffer} to this flow immediately. With each write, this flow + # will then receive all current fields of the {Buffer} but only the + # currently added message (if any). When set to `false`, the flow will + # receive the full event with all fields and messages of the Buffer after + # an explicit call to {Buffer#flush} for a buffering Buffer or after each + # added message or fields for a non-bufering Buffer. # @yieldparam flow [self] if the given block accepts an argument, we yield # `self` as a parameter, else, the block is directly executed in the # context of `self`. def initialize(adapter, encoder: nil, filters: [], - error_flow: nil, raise_on_error: false, + error_flow: nil, raise_on_error: false, auto_flush: false, &block ) @adapter = Rackstash::Adapter[adapter] @@ -95,6 +102,7 @@ module Rackstash @filter_chain = Rackstash::FilterChain.new(filters) self.error_flow = error_flow self.raise_on_error = raise_on_error + self.auto_flush = auto_flush if block_given? if block.arity == 0 @@ -105,6 +113,37 @@ module Rackstash end end + # Get or set the `auto_flush` setting. If set to `true`, new messages and + # fields added to a {Buffer} will be written directly to this flow, + # regardless of the buffering setting of the {Buffer}. This can be useful + # during development or testing of an application where the developer might + # want to directly watch the low-cardinality log as the messages are logged. + # + # If set to `false` (the default), buffering Buffers will only be written + # after explicitly calling {Buffer#flush} on them. + # + # @param bool [Bool, nil] the value to set. If omitted, we return the + # current setting. + # @return [Bool] the updated or current `auto_flush` setting + # @see #auto_flush= + def auto_flush(bool = nil) + self.auto_flush = bool unless bool.nil? + auto_flush? + end + + # @return [Bool] the current value of the `auto_flush` setting. + # @see #auto_flush + def auto_flush? + @auto_flush + end + + # @param bool [Bool] `true` to cause buffering Buffers to write their added + # messages and fields to the flow as soon as they are logged, `false` to + # write the whole event only on an explicit call to {Buffer#flush}. + def auto_flush=(bool) + @auto_flush = !!bool + end + # Close the log adapter if supported. This might be a no-op if the adapter # does not support closing. # diff --git a/lib/rackstash/flows.rb b/lib/rackstash/flows.rb index 054c5a5..3c37ec5 100644 --- a/lib/rackstash/flows.rb +++ b/lib/rackstash/flows.rb @@ -164,33 +164,49 @@ module Rackstash nil end - # Write an event `Hash` to each of the defined flows. The event is usually - # created from {Buffer#to_event}. + # Write an event `Hash` to each of the defined normal (that is: non + # `auto_flush`'ing) flows. The event is usually created from {Buffer#event}. # # We write a fresh deep-copy of the event hash to each defined {Flow}. # This allows each flow to alter the event in any way without affecting the # others. # - # @param event [Hash] an event `Hash` - # @return [Hash] the given `event` - def write(event) - # Memoize the current list of flows for the rest of the method to make - # sure it doesn't change while we work with it. - flows = to_a - flows_size = flows.size + # @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details + # @yield [flow] If `event` is `nil`, we call the goven block and use its + # return value as the event. The block is expected to return an event + # `Hash`. + # @return [Hash, nil] the flushed event or `nil` if nothing was flushed + def flush(event = nil) + flows = to_a.delete_if(&:auto_flush?) + return unless flows.any? - event = event.to_h - flows.each_with_index do |flow, index| - # If we have more than one flow, we provide a fresh copy of the event - # to each flow. The flow's filters and encoder can then mutate the event - # however it pleases without affecting later flows. We don't need to - # duplicate the event for the last flow since it won't be re-used - # after that anymore. - current_event = (index == flows_size - 1) ? event : deep_dup_event(event) - flow.write(current_event) - end + event ||= yield if block_given? + return unless event - event + write_to(flows, event.to_h) + end + + # Write an event `Hash` to each of the defined `auto_flush`'ing flows. + # The event is usually created from {Buffer#auto_event} as the buffer is + # automatically flushed after a message or fields were logged to it. + # + # We write a fresh deep-copy of the event hash to each defined {Flow}. + # This allows each flow to alter the event in any way without affecting the + # others. + # + # @param event [Hash, nil] the event `Hash`. See {Buffer#event} for details + # @yield [flow] If `event` is `nil`, we call the goven block and use its + # return value as the event. The block is expected to return an event + # `Hash`. + # @return [Hash, nil] the flushed event or `nil` if nothing was flushed + def auto_flush(event = nil) + flows = to_a.select!(&:auto_flush?) + return unless flows.any? + + event ||= yield if block_given? + return unless event + + write_to(flows, event.to_h) end # @return [Array] an array of all flow elements without any `nil` @@ -241,5 +257,19 @@ module Rackstash obj end end + + def write_to(flows, event) + flows.each_with_index do |flow, index| + # If we have more than one flow, we provide a fresh copy of the event + # to each flow. The flow's filters and encoder can then mutate the event + # however it pleases without affecting later flows. We don't need to + # duplicate the event for the last flow since it won't be re-used + # after that anymore. + current_event = (index == flows.size - 1) ? event : deep_dup_event(event) + flow.write(current_event) + end + + event + end end end diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index 5fcd1ae..0270223 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -11,7 +11,14 @@ require 'rackstash/buffer' RSpec.describe Rackstash::Buffer do let(:buffer_options) { {} } - let(:flows) { instance_double(Rackstash::Flows) } + + let(:flows) { + instance_double(Rackstash::Flows).tap do |flows| + allow(flows).to receive(:flush) + allow(flows).to receive(:auto_flush) + end + } + let(:buffer) { described_class.new(flows, **buffer_options) } describe '#allow_silent?' do @@ -111,6 +118,11 @@ RSpec.describe Rackstash::Buffer do buffer.add_fields(key: 'value') expect(buffer.pending?).to be true end + + it 'calls auto_flush' do + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') + end end context 'when not allow_silent?' do @@ -122,57 +134,49 @@ RSpec.describe Rackstash::Buffer do buffer.add_fields(key: 'value') expect(buffer.pending?).to be false end + + it 'calls auto_flush' do + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') + end end - context 'with buffering: :full' do + context 'with buffering: true' do before do - buffer_options[:buffering] = :full + buffer_options[:buffering] = true end - it 'does not call #flush' do - expect(buffer).not_to receive(:flush) + it 'does not flush the buffer' do + expect(flows).not_to receive(:flush) + # We always auto_flush buffers to send the newly added fields to + # interested flows + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') end - it 'does not call #clear' do + it 'does not clear the buffer' do expect(buffer).not_to receive(:clear) buffer.add_fields(key: 'value') expect(buffer.fields['key']).to eql 'value' - end - end - - context 'with buffering: :data' do - before do - buffer_options[:buffering] = :data - end - - it 'calls #flush' do - expect(buffer).to receive(:flush) - buffer.add_fields(key: 'value') - end - - it 'clears only messages' do - allow(buffer).to receive(:flush).once - buffer.add_fields(key: 'value') - - expect(buffer.fields).to_not be_empty expect(buffer.pending?).to be true end end - context 'with buffering: :none' do + context 'with buffering: false' do before do - buffer_options[:buffering] = :none + buffer_options[:buffering] = false end - it 'calls #flush' do - expect(buffer).to receive(:flush) + it 'flushes the buffer' do + expect(flows).to receive(:flush) + expect(flows).to receive(:auto_flush) + buffer.add_fields(key: 'value') end - it 'clears the whole buffer' do - allow(buffer).to receive(:flush).once + it 'clears the buffer' do buffer.add_fields(key: 'value') expect(buffer.fields).to be_empty @@ -202,13 +206,13 @@ RSpec.describe Rackstash::Buffer do expect(buffer.timestamp).to eql time.getutc end - context 'with buffering: :full' do + context 'with buffering: true' do before do - buffer_options[:buffering] = :full + buffer_options[:buffering] = true end - it 'does not call #flush' do - expect(buffer).not_to receive(:flush) + it 'does not flush the buffer' do + expect(flows).not_to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) end @@ -218,37 +222,17 @@ RSpec.describe Rackstash::Buffer do end end - context 'with buffering: :none' do + context 'with buffering: false' do before do - buffer_options[:buffering] = :data + buffer_options[:buffering] = false end - it 'calls #flush' do - expect(buffer).to receive(:flush) + it 'flushes the buffer' do + expect(flows).to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) end it 'clears messages' do - allow(buffer).to receive(:flush) - buffer.add_message double(message: 'Hello World!', time: Time.now) - - expect(buffer.messages.count).to eql 0 - expect(buffer.pending?).to be false - end - end - - context 'with buffering: :none' do - before do - buffer_options[:buffering] = :none - end - - it 'calls #flush' do - expect(buffer).to receive(:flush) - buffer.add_message double(message: 'Hello World!', time: Time.now) - end - - it 'clears messages' do - allow(buffer).to receive(:flush) buffer.add_message double(message: 'Hello World!', time: Time.now) expect(buffer.messages.count).to eql 0 @@ -258,31 +242,18 @@ RSpec.describe Rackstash::Buffer do end describe '#buffering' do - it 'defaults to :full' do - expect(buffer.buffering).to eql :full + it 'defaults to true' do + expect(buffer.buffering?).to eql true end - it 'can be set to :full' do - expect(described_class.new(flows, buffering: true).buffering).to eql :full - expect(described_class.new(flows, buffering: :full).buffering).to eql :full + it 'can be set to true' do + expect(described_class.new(flows, buffering: true).buffering?).to be true + expect(described_class.new(flows, buffering: 'whatever').buffering?).to be true end - it 'can be set to :data' do - expect(described_class.new(flows, buffering: :data).buffering).to eql :data - end - - it 'can be set to :none' do - expect(described_class.new(flows, buffering: false).buffering).to eql :none - expect(described_class.new(flows, buffering: :none).buffering).to eql :none - end - - it 'does not allow other values' do - expect { described_class.new(flows, buffering: nil) } - .to raise_error(TypeError) - expect { described_class.new(flows, buffering: :invalid) } - .to raise_error(TypeError) - expect { described_class.new(flows, buffering: 'full') } - .to raise_error(TypeError) + it 'can be set to false' do + expect(described_class.new(flows, buffering: false).buffering?).to be false + expect(described_class.new(flows, buffering: nil).buffering?).to be false end end @@ -346,13 +317,13 @@ RSpec.describe Rackstash::Buffer do end context 'when pending?' do - let(:time) { Time.now } + let(:time) { Time.parse('2016-10-17 15:37:00 +02:00') } let(:message) { double(message: 'Hello World!', time: time) } let(:event) { { 'message' => [message], 'tags' => [], - '@timestamp' => Time.now + '@timestamp' => time } } @@ -360,11 +331,11 @@ RSpec.describe Rackstash::Buffer do buffer.add_message(message) # We might call Buffer#flush during the following tests - allow(flows).to receive(:write).with(buffer).once + allow(flows).to receive(:flush).with(event).once end it 'flushes the buffer to the flows' do - expect(flows).to receive(:write).with(buffer).once + expect(flows).to receive(:flush).with(event).once buffer.flush end @@ -381,7 +352,7 @@ RSpec.describe Rackstash::Buffer do context 'when not pending?' do it 'does not flushes the buffer to the flows' do - expect(flows).not_to receive(:write) + expect(flows).not_to receive(:flush) buffer.flush end @@ -562,7 +533,7 @@ RSpec.describe Rackstash::Buffer do end end - describe '#to_h' do + describe '#event' do it 'creates an event hash' do message = double(message: 'Hello World', time: Time.now) allow(message) @@ -570,7 +541,7 @@ RSpec.describe Rackstash::Buffer do buffer.fields[:foo] = 'bar' buffer.tags << 'some_tag' - expect(buffer.to_h).to match( + expect(buffer.event).to match( 'foo' => 'bar', 'message' => [message], 'tags' => ['some_tag'], diff --git a/spec/rackstash/buffer_stack_spec.rb b/spec/rackstash/buffer_stack_spec.rb index 82edbed..7e956c7 100644 --- a/spec/rackstash/buffer_stack_spec.rb +++ b/spec/rackstash/buffer_stack_spec.rb @@ -48,12 +48,12 @@ RSpec.describe Rackstash::BufferStack do it 'pushes a buffering buffer by default' do stack.push - expect(stack.current.buffering).to eql :full + expect(stack.current.buffering?).to eql true end it 'allows to set options on the new buffer' do - stack.push(buffering: :data) - expect(stack.current.buffering).to eql :data + stack.push(buffering: false) + expect(stack.current.buffering?).to eql false end end diff --git a/spec/rackstash/flows_spec.rb b/spec/rackstash/flows_spec.rb index 9632ffa..a97f4e0 100644 --- a/spec/rackstash/flows_spec.rb +++ b/spec/rackstash/flows_spec.rb @@ -13,9 +13,11 @@ require 'rackstash/flow' RSpec.describe Rackstash::Flows do let(:flows) { described_class.new } - def a_flow + def a_flow(auto_flush: false) flow = instance_double('Rackstash::Flow') allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true) + allow(flow).to receive(:auto_flush?).and_return(auto_flush) + flow end @@ -323,8 +325,8 @@ RSpec.describe Rackstash::Flows do end end - describe '#write' do - it 'flushes the buffer to all flows' do + describe '#flush' do + it 'writes the event to the flows' do event_spec = { 'foo' => 'bar', 'tags' => [], @@ -344,7 +346,75 @@ RSpec.describe Rackstash::Flows do # During flush, we create a single event, duplicate it and write each to # each of the flows. - flows.write('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc) + flows.flush('foo' => 'bar', 'tags' => [], '@timestamp' => Time.now.utc) + end + + it 'writes only to normal flows' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).to receive(:write).with('foo' => 'bar') + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + flows.flush('foo' => 'bar') + end + + it 'accepts a block' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).to receive(:write).with('foo' => 'bar') + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + flows.flush { { 'foo' => 'bar'} } + end + + it 'does nothing if there is no normal flow' do + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).not_to receive(:write) + flows << auto_flush_flow + + expect(flows.flush('foo' => 'bar')).to be_nil + expect { |b| flows.flush(&b) }.not_to yield_control + end + end + + describe '#auto_flush' do + it 'writes only to auto_flush flows' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).to receive(:write).with('foo' => 'bar') + flows << auto_flush_flow + + flows.auto_flush('foo' => 'bar') + end + + it 'accepts a block' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + auto_flush_flow = a_flow(auto_flush: true) + expect(auto_flush_flow).to receive(:write).with('foo' => 'bar') + flows << auto_flush_flow + + flows.auto_flush { { 'foo' => 'bar'} } + end + + it 'does nothing if there is no auto_flushing flow' do + normal_flow = a_flow(auto_flush: false) + expect(normal_flow).not_to receive(:write) + flows << normal_flow + + expect(flows.auto_flush('foo' => 'bar')).to be_nil + expect { |b| flows.auto_flush(&b) }.not_to yield_control end end end diff --git a/spec/rackstash/logger_spec.rb b/spec/rackstash/logger_spec.rb index 36643ac..adffcc4 100644 --- a/spec/rackstash/logger_spec.rb +++ b/spec/rackstash/logger_spec.rb @@ -531,7 +531,7 @@ RSpec.describe Rackstash::Logger do end it 'buffers multiple messages' do - expect(logger.flows).to receive(:write).once + expect(logger.flows).to receive(:flush).once logger.with_buffer do logger.add 1, 'Hello World'