diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index 4e3f387..b5d4c7e 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -222,18 +222,15 @@ module Rackstash @fields ||= Rackstash::Fields::Hash.new(forbidden_keys: FORBIDDEN_FIELDS) end - # Flush the current buffer to the {#flows} if it is pending. - # - # After the flush, the existing buffer should not be used anymore. You - # should either call {#clear} to remove all volatile data or create a new - # buffer instance instead. + # Flush the current buffer to the {#flows} if it is pending. All data in the + # buffer will be preserved. You might call {#clear} to start anew. # # @return [self,nil] returns `self` if the buffer was flushed, `nil` # otherwise def flush return unless pending? - @flows.write(self.to_event) + @flows.write(self) self end @@ -347,7 +344,7 @@ module Rackstash # All hashes (including nested hashes) use `String` keys. # # @return [Hash] the event expected by the event {Filter}s. - def to_event + def to_h event = fields.to_h event[FIELD_TAGS] = tags.to_a event[FIELD_MESSAGE] = messages diff --git a/lib/rackstash/flows.rb b/lib/rackstash/flows.rb index 47ecca8..054c5a5 100644 --- a/lib/rackstash/flows.rb +++ b/lib/rackstash/flows.rb @@ -179,6 +179,7 @@ module Rackstash flows = to_a flows_size = flows.size + event = event.to_h flows.each_with_index do |flow, index| # If we have more than one flow, we provide a fresh copy of the event # to each flow. The flow's filters and encoder can then mutate the event diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index 00324a7..5fcd1ae 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -360,12 +360,11 @@ RSpec.describe Rackstash::Buffer do buffer.add_message(message) # We might call Buffer#flush during the following tests - allow(buffer).to receive(:to_event).and_return(event) - allow(flows).to receive(:write).with(event).once + allow(flows).to receive(:write).with(buffer).once end it 'flushes the buffer to the flows' do - expect(flows).to receive(:write).with(event).once + expect(flows).to receive(:write).with(buffer).once buffer.flush end @@ -563,7 +562,7 @@ RSpec.describe Rackstash::Buffer do end end - describe '#to_event' do + describe '#to_h' do it 'creates an event hash' do message = double(message: 'Hello World', time: Time.now) allow(message) @@ -571,7 +570,7 @@ RSpec.describe Rackstash::Buffer do buffer.fields[:foo] = 'bar' buffer.tags << 'some_tag' - expect(buffer.to_event).to match( + expect(buffer.to_h).to match( 'foo' => 'bar', 'message' => [message], 'tags' => ['some_tag'],