mirror of
https://github.com/meineerde/rackstash.git
synced 2025-10-17 14:01:01 +00:00
Move the responsiblity for creating the event hash to the Buffer
With this move, we can also optimize the sinplest case where there are no fields or tags on the Buffer and no defines default_fields or default_tags on the Sink. In that case, we don't need to merge them, avoiding the creation of several unecessary objects on Buffer flush.
This commit is contained in:
parent
4438bf42bc
commit
3f8745e32c
@ -258,5 +258,64 @@ module Rackstash
|
||||
time.getutc.iso8601(ISO8601_PRECISION).freeze
|
||||
end
|
||||
end
|
||||
|
||||
# Create an event hash from `self`.
|
||||
#
|
||||
# * We take the buffer's existing fields and deep-merge the provided
|
||||
# `fields` into them. Existing fields on the buffer will always have
|
||||
# precedence here.
|
||||
# * We add the given additional `tags` to the buffer's tags and add them as
|
||||
# a raw array of strings to the `event['tags']` field.
|
||||
# * We add the buffer's array of messages to `event['message']`. This field
|
||||
# now contains an array of {Message} objects.
|
||||
# * We add the buffer's timestamp to the `event['@timestamp]` field as an
|
||||
# ISO 8601 formatted string. The timestamp is always in UTC.
|
||||
# * We add the version of the logstash event format as
|
||||
# `event[@version] = 1`.
|
||||
#
|
||||
# The typical event emitted here looks like this:
|
||||
#
|
||||
# {
|
||||
# "beep" => "boop",
|
||||
# "foo" => ["bar", "baz"],
|
||||
# "tags" => ["request", "controller#action"],
|
||||
# "message" => [
|
||||
# #<Rackstash::Message:0x007f908b4414c0 ...>,
|
||||
# #<Rackstash::Message:0x007f908d14aee0 ...>
|
||||
# ],
|
||||
# "@timestamp" => "2016-10-17T13:37:42.000Z",
|
||||
# "@version" => "1"
|
||||
# }
|
||||
#
|
||||
#
|
||||
# Note that the resulting hash still contains an Array of {Message}s in the
|
||||
# `"message"` field. This allows the {Flow}'s' {Filters} to reject or adapt
|
||||
# some messages based on their original attributes, e.g., their severity or
|
||||
# timestamp.
|
||||
#
|
||||
# @param fields [Hash<String => Object>, Proc] additional fields which are
|
||||
# merged with this Buffer's fields in the returned event Hash
|
||||
# @param tags [Array<String>, Proc] additional tags which are merged
|
||||
# added to Buffer's tags in the returned event Hash
|
||||
# @return [Hash] the event expected by the event {Filters}.
|
||||
def to_event(fields: {}, tags: [])
|
||||
if (@fields.nil? || @fields.empty?) && ::Hash === fields && fields.empty?
|
||||
event = {}
|
||||
else
|
||||
event = self.fields.deep_merge(fields, force: false).to_h
|
||||
end
|
||||
|
||||
if (@tags.nil? || @tags.empty?) && ::Array === tags && tags.empty?
|
||||
event[FIELD_TAGS] = []
|
||||
else
|
||||
event[FIELD_TAGS] = self.tags.merge(tags).to_a
|
||||
end
|
||||
|
||||
event[FIELD_MESSAGE] = messages
|
||||
event[FIELD_TIMESTAMP] = timestamp
|
||||
event[FIELD_VERSION] = '1'.freeze
|
||||
|
||||
event
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -118,43 +118,19 @@ module Rackstash
|
||||
# Create an event hash from the given `buffer` and write it to each of the
|
||||
# defined {#flows}.
|
||||
#
|
||||
# First, we transform the given `buffer` to an event hash:
|
||||
# First, we transform the given `buffer` to an event hash using
|
||||
# {Buffer#to_event}. Here, we are merging the {#default_fields} and
|
||||
# {#default_tags} into the event Hash.
|
||||
#
|
||||
# * We deep-merge the {#default_fields} into the `buffer`'s fields and use
|
||||
# it as the basis the the event hash. Existing fields on the `buffer` will
|
||||
# always have precedence here.
|
||||
# * We add the {#default_tags} to the `buffer`'s tags and add them as a raw
|
||||
# array of strings to the `event['tags']` field.
|
||||
# * We add the `buffer`'s array of messages to `event['message']`. This
|
||||
# field now contains an array of {Message} objects.
|
||||
# * We add the `buffer`'s timestamp to the `event['@timestamp]` field as an
|
||||
# ISO 8601 formatted string. The timestamp is always in UTC.
|
||||
# * We add the version of the logstash event format as
|
||||
# `event[@version] = 1`.
|
||||
#
|
||||
# The typical event emitted here looks like this:
|
||||
#
|
||||
# {
|
||||
# "beep" => "boop",
|
||||
# "foo" => ["bar", "baz"],
|
||||
# "tags" => ["request", "controller#action"],
|
||||
# "message" => [
|
||||
# #<Rackstash::Message:0x007f908b4414c0 ...>,
|
||||
# #<Rackstash::Message:0x007f908d14aee0 ...>
|
||||
# ],
|
||||
# "@timestamp" => "2016-10-17T13:37:42.000Z",
|
||||
# "@version" => "1"
|
||||
# }
|
||||
#
|
||||
# The resulting event hash is written to each defined {Flow}. Since a flow
|
||||
# usually changes the event hash with its filters and encoder, we create a
|
||||
# fresh copy of the hash for each flow.
|
||||
# A fresh copy of the resulting event hash is then written to each defined
|
||||
# {Flow}. This allows each flow to alter the event in any way without
|
||||
# affecting the other flows.
|
||||
#
|
||||
# @param buffer [Buffer] The buffer cotnaining the data to write to the
|
||||
# {#flows}.
|
||||
# @return [Buffer] the given `buffer`
|
||||
def write(buffer)
|
||||
event = event_from_buffer(buffer)
|
||||
event = buffer.to_event(fields: @default_fields, tags: @default_tags)
|
||||
|
||||
# Memoize the current list of flows for the rest of the method to make
|
||||
# sure it doesn't change while we work with it.
|
||||
@ -177,27 +153,6 @@ module Rackstash
|
||||
|
||||
private
|
||||
|
||||
# Create a raw event hash from a Buffer.
|
||||
#
|
||||
# Note that the resulting hash still contains an Array of {Message}s in the
|
||||
# `"message"` field. This allows flow {Filters} to reject or adapt some
|
||||
# messages based on their original attributes, e.g., their severity or
|
||||
# timestamp.
|
||||
#
|
||||
# @see Flow#write
|
||||
#
|
||||
# @param buffer [Buffer] a buffer instance
|
||||
# @return [Hash] the event expected by the event filters.
|
||||
def event_from_buffer(buffer)
|
||||
event = buffer.fields.deep_merge(@default_fields, force: false).to_h
|
||||
event[FIELD_TAGS] = buffer.tags.merge(@default_tags).to_a
|
||||
event[FIELD_MESSAGE] = buffer.messages
|
||||
event[FIELD_TIMESTAMP] = buffer.timestamp
|
||||
event[FIELD_VERSION] = '1'.freeze
|
||||
|
||||
event
|
||||
end
|
||||
|
||||
# Create a deep duplicate of an event hash. It is assumed that the input
|
||||
# event follows the normalized structure as generated by
|
||||
# {Fields::Hash#to_h}.
|
||||
|
||||
@ -390,4 +390,49 @@ describe Rackstash::Buffer do
|
||||
expect(buffer.timestamp).to eql '2016-10-17T07:10:10.000000Z'
|
||||
end
|
||||
end
|
||||
|
||||
describe '#to_event' do
|
||||
it 'does not merge field and tags if empty' do
|
||||
expect(buffer).not_to receive(:fields)
|
||||
expect(buffer).not_to receive(:tags)
|
||||
|
||||
buffer.to_event(fields: {}, tags: [])
|
||||
end
|
||||
|
||||
it 'merges fields and tags as values' do
|
||||
fields = {foo: :bar}
|
||||
tags = ['default_tag']
|
||||
|
||||
expect(buffer.fields).to receive(:deep_merge).with(fields, force: false)
|
||||
expect(buffer.tags).to receive(:merge).with(tags)
|
||||
|
||||
buffer.to_event(fields: fields, tags: tags)
|
||||
end
|
||||
|
||||
it 'merges fields and tags as Procs' do
|
||||
fields = ->{}
|
||||
tags = ->{}
|
||||
|
||||
expect(buffer.fields).to receive(:deep_merge).with(fields, force: false)
|
||||
expect(buffer.tags).to receive(:merge).with(tags)
|
||||
|
||||
buffer.to_event(fields: fields, tags: tags)
|
||||
end
|
||||
|
||||
it 'creates an event hash' do
|
||||
message = double(message: 'Hello World', time: Time.now)
|
||||
allow(message)
|
||||
buffer.add_message(message)
|
||||
buffer.fields[:foo] = 'bar'
|
||||
buffer.tags << 'some_tag'
|
||||
|
||||
expect(buffer.to_event).to match(
|
||||
'foo' => 'bar',
|
||||
'message' => [message],
|
||||
'tags' => ['some_tag'],
|
||||
'@timestamp' => instance_of(String),
|
||||
'@version' => '1'
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -139,9 +139,7 @@ describe Rackstash::Sink do
|
||||
let(:buffer) { Rackstash::Buffer.new(sink) }
|
||||
|
||||
it 'merges default_fields and default_tags' do
|
||||
expect(buffer.fields).to receive(:deep_merge).once
|
||||
expect(buffer.tags).to receive(:merge).once
|
||||
|
||||
expect(buffer).to receive(:to_event).with(fields: {}, tags: [])
|
||||
sink.write(buffer)
|
||||
end
|
||||
|
||||
@ -155,7 +153,9 @@ describe Rackstash::Sink do
|
||||
|
||||
# only the first event is duplicated
|
||||
expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered
|
||||
expect(sink).to receive(:deep_dup_event).with(anything).exactly(4).times.and_call_original
|
||||
event_spec.each_value do |arg|
|
||||
expect(sink).to receive(:deep_dup_event).with(arg).and_call_original.ordered
|
||||
end
|
||||
|
||||
# During flush, we create a single event, duplicate it and write each to
|
||||
# each of the flows.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user