diff --git a/lib/rackstash/buffer.rb b/lib/rackstash/buffer.rb index badf193..357632f 100644 --- a/lib/rackstash/buffer.rb +++ b/lib/rackstash/buffer.rb @@ -258,5 +258,64 @@ module Rackstash time.getutc.iso8601(ISO8601_PRECISION).freeze end end + + # Create an event hash from `self`. + # + # * We take the buffer's existing fields and deep-merge the provided + # `fields` into them. Existing fields on the buffer will always have + # precedence here. + # * We add the given additional `tags` to the buffer's tags and add them as + # a raw array of strings to the `event['tags']` field. + # * We add the buffer's array of messages to `event['message']`. This field + # now contains an array of {Message} objects. + # * We add the buffer's timestamp to the `event['@timestamp]` field as an + # ISO 8601 formatted string. The timestamp is always in UTC. + # * We add the version of the logstash event format as + # `event[@version] = 1`. + # + # The typical event emitted here looks like this: + # + # { + # "beep" => "boop", + # "foo" => ["bar", "baz"], + # "tags" => ["request", "controller#action"], + # "message" => [ + # #, + # # + # ], + # "@timestamp" => "2016-10-17T13:37:42.000Z", + # "@version" => "1" + # } + # + # + # Note that the resulting hash still contains an Array of {Message}s in the + # `"message"` field. This allows the {Flow}'s' {Filters} to reject or adapt + # some messages based on their original attributes, e.g., their severity or + # timestamp. + # + # @param fields [Hash Object>, Proc] additional fields which are + # merged with this Buffer's fields in the returned event Hash + # @param tags [Array, Proc] additional tags which are merged + # added to Buffer's tags in the returned event Hash + # @return [Hash] the event expected by the event {Filters}. + def to_event(fields: {}, tags: []) + if (@fields.nil? || @fields.empty?) && ::Hash === fields && fields.empty? + event = {} + else + event = self.fields.deep_merge(fields, force: false).to_h + end + + if (@tags.nil? || @tags.empty?) && ::Array === tags && tags.empty? + event[FIELD_TAGS] = [] + else + event[FIELD_TAGS] = self.tags.merge(tags).to_a + end + + event[FIELD_MESSAGE] = messages + event[FIELD_TIMESTAMP] = timestamp + event[FIELD_VERSION] = '1'.freeze + + event + end end end diff --git a/lib/rackstash/sink.rb b/lib/rackstash/sink.rb index cae9360..e539329 100644 --- a/lib/rackstash/sink.rb +++ b/lib/rackstash/sink.rb @@ -118,43 +118,19 @@ module Rackstash # Create an event hash from the given `buffer` and write it to each of the # defined {#flows}. # - # First, we transform the given `buffer` to an event hash: + # First, we transform the given `buffer` to an event hash using + # {Buffer#to_event}. Here, we are merging the {#default_fields} and + # {#default_tags} into the event Hash. # - # * We deep-merge the {#default_fields} into the `buffer`'s fields and use - # it as the basis the the event hash. Existing fields on the `buffer` will - # always have precedence here. - # * We add the {#default_tags} to the `buffer`'s tags and add them as a raw - # array of strings to the `event['tags']` field. - # * We add the `buffer`'s array of messages to `event['message']`. This - # field now contains an array of {Message} objects. - # * We add the `buffer`'s timestamp to the `event['@timestamp]` field as an - # ISO 8601 formatted string. The timestamp is always in UTC. - # * We add the version of the logstash event format as - # `event[@version] = 1`. - # - # The typical event emitted here looks like this: - # - # { - # "beep" => "boop", - # "foo" => ["bar", "baz"], - # "tags" => ["request", "controller#action"], - # "message" => [ - # #, - # # - # ], - # "@timestamp" => "2016-10-17T13:37:42.000Z", - # "@version" => "1" - # } - # - # The resulting event hash is written to each defined {Flow}. Since a flow - # usually changes the event hash with its filters and encoder, we create a - # fresh copy of the hash for each flow. + # A fresh copy of the resulting event hash is then written to each defined + # {Flow}. This allows each flow to alter the event in any way without + # affecting the other flows. # # @param buffer [Buffer] The buffer cotnaining the data to write to the # {#flows}. # @return [Buffer] the given `buffer` def write(buffer) - event = event_from_buffer(buffer) + event = buffer.to_event(fields: @default_fields, tags: @default_tags) # Memoize the current list of flows for the rest of the method to make # sure it doesn't change while we work with it. @@ -177,27 +153,6 @@ module Rackstash private - # Create a raw event hash from a Buffer. - # - # Note that the resulting hash still contains an Array of {Message}s in the - # `"message"` field. This allows flow {Filters} to reject or adapt some - # messages based on their original attributes, e.g., their severity or - # timestamp. - # - # @see Flow#write - # - # @param buffer [Buffer] a buffer instance - # @return [Hash] the event expected by the event filters. - def event_from_buffer(buffer) - event = buffer.fields.deep_merge(@default_fields, force: false).to_h - event[FIELD_TAGS] = buffer.tags.merge(@default_tags).to_a - event[FIELD_MESSAGE] = buffer.messages - event[FIELD_TIMESTAMP] = buffer.timestamp - event[FIELD_VERSION] = '1'.freeze - - event - end - # Create a deep duplicate of an event hash. It is assumed that the input # event follows the normalized structure as generated by # {Fields::Hash#to_h}. diff --git a/spec/rackstash/buffer_spec.rb b/spec/rackstash/buffer_spec.rb index 71a4e74..228d462 100644 --- a/spec/rackstash/buffer_spec.rb +++ b/spec/rackstash/buffer_spec.rb @@ -390,4 +390,49 @@ describe Rackstash::Buffer do expect(buffer.timestamp).to eql '2016-10-17T07:10:10.000000Z' end end + + describe '#to_event' do + it 'does not merge field and tags if empty' do + expect(buffer).not_to receive(:fields) + expect(buffer).not_to receive(:tags) + + buffer.to_event(fields: {}, tags: []) + end + + it 'merges fields and tags as values' do + fields = {foo: :bar} + tags = ['default_tag'] + + expect(buffer.fields).to receive(:deep_merge).with(fields, force: false) + expect(buffer.tags).to receive(:merge).with(tags) + + buffer.to_event(fields: fields, tags: tags) + end + + it 'merges fields and tags as Procs' do + fields = ->{} + tags = ->{} + + expect(buffer.fields).to receive(:deep_merge).with(fields, force: false) + expect(buffer.tags).to receive(:merge).with(tags) + + buffer.to_event(fields: fields, tags: tags) + end + + it 'creates an event hash' do + message = double(message: 'Hello World', time: Time.now) + allow(message) + buffer.add_message(message) + buffer.fields[:foo] = 'bar' + buffer.tags << 'some_tag' + + expect(buffer.to_event).to match( + 'foo' => 'bar', + 'message' => [message], + 'tags' => ['some_tag'], + '@timestamp' => instance_of(String), + '@version' => '1' + ) + end + end end diff --git a/spec/rackstash/sink_spec.rb b/spec/rackstash/sink_spec.rb index 0e14891..4da33db 100644 --- a/spec/rackstash/sink_spec.rb +++ b/spec/rackstash/sink_spec.rb @@ -139,9 +139,7 @@ describe Rackstash::Sink do let(:buffer) { Rackstash::Buffer.new(sink) } it 'merges default_fields and default_tags' do - expect(buffer.fields).to receive(:deep_merge).once - expect(buffer.tags).to receive(:merge).once - + expect(buffer).to receive(:to_event).with(fields: {}, tags: []) sink.write(buffer) end @@ -155,7 +153,9 @@ describe Rackstash::Sink do # only the first event is duplicated expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered - expect(sink).to receive(:deep_dup_event).with(anything).exactly(4).times.and_call_original + event_spec.each_value do |arg| + expect(sink).to receive(:deep_dup_event).with(arg).and_call_original.ordered + end # During flush, we create a single event, duplicate it and write each to # each of the flows.