mirror of
https://github.com/meineerde/rackstash.git
synced 2025-12-20 15:21:12 +00:00
Pass the raw Buffer timestamp through the event Hash
This allows filters and finally the encoder to use this without having to first convert the timestamp (back) to a more suitable format.
This commit is contained in:
parent
99728842aa
commit
3078bccafd
@ -274,8 +274,9 @@ module Rackstash
|
|||||||
# @return [String] an ISO 8601 formatted UTC timestamp.
|
# @return [String] an ISO 8601 formatted UTC timestamp.
|
||||||
def timestamp(time = nil)
|
def timestamp(time = nil)
|
||||||
@timestamp ||= begin
|
@timestamp ||= begin
|
||||||
time ||= Time.now
|
time ||= Time.now.utc.freeze
|
||||||
time.getutc.iso8601(ISO8601_PRECISION).freeze
|
time = time.getutc.freeze unless time.utc? && time.frozen?
|
||||||
|
time
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -301,14 +302,20 @@ module Rackstash
|
|||||||
# #<Rackstash::Message:0x007f908b4414c0 ...>,
|
# #<Rackstash::Message:0x007f908b4414c0 ...>,
|
||||||
# #<Rackstash::Message:0x007f908d14aee0 ...>
|
# #<Rackstash::Message:0x007f908d14aee0 ...>
|
||||||
# ],
|
# ],
|
||||||
# "@timestamp" => "2016-10-17T13:37:42.000Z"
|
# "@timestamp" => 2016-10-17 13:37:42 UTC
|
||||||
# }
|
# }
|
||||||
#
|
#
|
||||||
#
|
|
||||||
# Note that the resulting hash still contains an Array of {Message}s in the
|
# Note that the resulting hash still contains an Array of {Message}s in the
|
||||||
# `"message"` field. This allows the {Flow}'s' {Filters} to reject or adapt
|
# `"message"` field and a `Time` object in the '@timestamp' field. This
|
||||||
# some messages based on their original attributes, e.g., their severity or
|
# allows the {Flow}'s components (usually the {Filters} or the
|
||||||
# timestamp.
|
# {Flow#encoder}) to reject or adapt some messages based on
|
||||||
|
# their original attributes, e.g., their severity or timestamp. It is the
|
||||||
|
# responsibility of the {Flow#encoder} to correctly format the
|
||||||
|
# `"@timestamp"` field.
|
||||||
|
#
|
||||||
|
# All other fields in the event Hash besides `"message"` and `@timestamp"`
|
||||||
|
# are either `Hash`, `Array`, frozen `String`, `Integer` or `Float` objects.
|
||||||
|
# All hashes (including nested hashes) use `String` keys.
|
||||||
#
|
#
|
||||||
# @param fields [Hash<String => Object>, Proc] additional fields which are
|
# @param fields [Hash<String => Object>, Proc] additional fields which are
|
||||||
# merged with this Buffer's fields in the returned event Hash
|
# merged with this Buffer's fields in the returned event Hash
|
||||||
|
|||||||
42
lib/rackstash/encoders/helpers/timestamp.rb
Normal file
42
lib/rackstash/encoders/helpers/timestamp.rb
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
# Copyright 2017 Holger Just
|
||||||
|
#
|
||||||
|
# This software may be modified and distributed under the terms
|
||||||
|
# of the MIT license. See the LICENSE.txt file for details.
|
||||||
|
|
||||||
|
require 'date'
|
||||||
|
require 'time'
|
||||||
|
|
||||||
|
module Rackstash
|
||||||
|
module Encoders
|
||||||
|
module Helpers
|
||||||
|
# Some useful helper methods for {Encoders} which help in normalizing and
|
||||||
|
# handling timestamps in the event Hash, especially the {FIELD_TIMESTAMP}
|
||||||
|
# field.
|
||||||
|
module Timestamp
|
||||||
|
# Normalize the `"@timestamp"` field of the given log event Hash.
|
||||||
|
# Before any filters, only the `"@timestamp"` fueld contains a `Time`
|
||||||
|
# object denoting the timestamp of the log event. To represent this
|
||||||
|
# timestamp in logs, it is formatted as an ISO 8601 string. The
|
||||||
|
# timestamp will always be changed into UTC.
|
||||||
|
#
|
||||||
|
# @param event [Hash] a log event Hash
|
||||||
|
# @param field [String] the name of the timestamp field in the event
|
||||||
|
# hash. By default, we use the `"@timestamp"` field.
|
||||||
|
# @return [Hash] the given event with the `field` key set as an ISO 8601
|
||||||
|
# formatted time string.
|
||||||
|
def normalize_timestamp(event, field: FIELD_TIMESTAMP)
|
||||||
|
time = event[field]
|
||||||
|
|
||||||
|
if time.is_a?(Time) || time.is_a?(DateTime)
|
||||||
|
time = time.to_time
|
||||||
|
time = time.getutc unless time.utc?
|
||||||
|
event[field] = time.iso8601(ISO8601_PRECISION).freeze
|
||||||
|
end
|
||||||
|
|
||||||
|
event
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@ -7,6 +7,7 @@
|
|||||||
require 'json'
|
require 'json'
|
||||||
|
|
||||||
require 'rackstash/encoders/helpers/message'
|
require 'rackstash/encoders/helpers/message'
|
||||||
|
require 'rackstash/encoders/helpers/timestamp'
|
||||||
|
|
||||||
module Rackstash
|
module Rackstash
|
||||||
module Encoders
|
module Encoders
|
||||||
@ -16,11 +17,13 @@ module Rackstash
|
|||||||
# Most {Adapters} default to use this codec.
|
# Most {Adapters} default to use this codec.
|
||||||
class JSON
|
class JSON
|
||||||
include Rackstash::Encoders::Helpers::Message
|
include Rackstash::Encoders::Helpers::Message
|
||||||
|
include Rackstash::Encoders::Helpers::Timestamp
|
||||||
|
|
||||||
# @param event [Hash] a log event as produced by the {Flow}
|
# @param event [Hash] a log event as produced by the {Flow}
|
||||||
# @return [String] the event as a single-line JSON string
|
# @return [String] the event as a single-line JSON string
|
||||||
def encode(event)
|
def encode(event)
|
||||||
normalize_message(event)
|
normalize_message(event)
|
||||||
|
normalize_timestamp(event)
|
||||||
|
|
||||||
::JSON.dump(event)
|
::JSON.dump(event)
|
||||||
end
|
end
|
||||||
|
|||||||
@ -178,7 +178,7 @@ describe Rackstash::Buffer do
|
|||||||
msg = double(message: 'Hello World', time: time)
|
msg = double(message: 'Hello World', time: time)
|
||||||
|
|
||||||
buffer.add_message msg
|
buffer.add_message msg
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T10:37:00.000000Z'
|
expect(buffer.timestamp).to eql time.getutc
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when buffering?' do
|
context 'when buffering?' do
|
||||||
@ -256,12 +256,12 @@ describe Rackstash::Buffer do
|
|||||||
|
|
||||||
it 'resets the timestamp' do
|
it 'resets the timestamp' do
|
||||||
buffer.timestamp(Time.parse('2016-10-17 15:37:00 +02:00'))
|
buffer.timestamp(Time.parse('2016-10-17 15:37:00 +02:00'))
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T13:37:00.000000Z'
|
expect(buffer.timestamp).to eql Time.utc(2016, 10, 17, 13, 37, 0)
|
||||||
|
|
||||||
buffer.clear
|
buffer.clear
|
||||||
|
|
||||||
expect(Time).to receive(:now).and_call_original
|
expect(Time).to receive(:now).and_call_original
|
||||||
expect(buffer.timestamp).not_to eql '2016-10-17T13:37:00.000000Z'
|
expect(buffer.timestamp).not_to eql Time.utc(2016, 10, 17, 13, 37, 0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -462,20 +462,22 @@ describe Rackstash::Buffer do
|
|||||||
describe '#timestamp' do
|
describe '#timestamp' do
|
||||||
it 'initializes @timestamp to Time.now.utc' do
|
it 'initializes @timestamp to Time.now.utc' do
|
||||||
now = Time.parse('2016-10-17 13:37:00 +03:00')
|
now = Time.parse('2016-10-17 13:37:00 +03:00')
|
||||||
|
now_utc = Time.utc(2016, 10, 17, 10, 37, 0).freeze
|
||||||
|
|
||||||
expect(Time).to receive(:now).once.and_return(now)
|
expect(Time).to receive(:now).once.and_return(now)
|
||||||
expect(now).to receive(:getutc).once.and_return(now.getutc)
|
expect(now).to receive(:utc).once.and_return(now_utc)
|
||||||
|
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T10:37:00.000000Z'
|
expect(buffer.timestamp).to equal now_utc
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T10:37:00.000000Z'
|
expect(buffer.timestamp).to equal now_utc
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'initializes @timestamp with the passed time' do
|
it 'initializes @timestamp with the passed time' do
|
||||||
now = Time.parse('2016-10-17 13:37:00 +03:00')
|
now = Time.parse('2016-10-17 13:37:00 +03:00')
|
||||||
|
now_utc = Time.utc(2016, 10, 17, 10, 37, 0).freeze
|
||||||
|
|
||||||
expect(Time).not_to receive(:now)
|
expect(Time).not_to receive(:now)
|
||||||
expect(buffer.timestamp(now)).to eql '2016-10-17T10:37:00.000000Z'
|
expect(buffer.timestamp(now)).to eql now_utc
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T10:37:00.000000Z'
|
expect(buffer.timestamp).to eql now_utc
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does not overwrites an already set timestamp' do
|
it 'does not overwrites an already set timestamp' do
|
||||||
@ -483,13 +485,13 @@ describe Rackstash::Buffer do
|
|||||||
second = Time.parse('2016-10-17 20:20:20 +03:00')
|
second = Time.parse('2016-10-17 20:20:20 +03:00')
|
||||||
|
|
||||||
buffer.timestamp(first)
|
buffer.timestamp(first)
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T07:10:10.000000Z'
|
expect(buffer.timestamp).to eql first.getutc
|
||||||
|
|
||||||
buffer.timestamp
|
buffer.timestamp
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T07:10:10.000000Z'
|
expect(buffer.timestamp).to eql first.getutc
|
||||||
|
|
||||||
buffer.timestamp(second)
|
buffer.timestamp(second)
|
||||||
expect(buffer.timestamp).to eql '2016-10-17T07:10:10.000000Z'
|
expect(buffer.timestamp).to eql first.getutc
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -532,7 +534,7 @@ describe Rackstash::Buffer do
|
|||||||
'foo' => 'bar',
|
'foo' => 'bar',
|
||||||
'message' => [message],
|
'message' => [message],
|
||||||
'tags' => ['some_tag'],
|
'tags' => ['some_tag'],
|
||||||
'@timestamp' => instance_of(String)
|
'@timestamp' => instance_of(Time)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
51
spec/rackstash/encoders/helpers/timestamp_spec.rb
Normal file
51
spec/rackstash/encoders/helpers/timestamp_spec.rb
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
# Copyright 2017 Holger Just
|
||||||
|
#
|
||||||
|
# This software may be modified and distributed under the terms
|
||||||
|
# of the MIT license. See the LICENSE.txt file for details.
|
||||||
|
|
||||||
|
require 'spec_helper'
|
||||||
|
|
||||||
|
require 'rackstash/encoders/helpers/timestamp'
|
||||||
|
|
||||||
|
describe Rackstash::Encoders::Helpers::Timestamp do
|
||||||
|
let(:helper) { Object.new.extend(described_class) }
|
||||||
|
let(:event) { {} }
|
||||||
|
|
||||||
|
describe '#normalize_timestamp' do
|
||||||
|
it 'formats a Time object' do
|
||||||
|
event['@timestamp'] = Time.parse('2016-10-17 13:37:00 +03:00')
|
||||||
|
expect(helper.normalize_timestamp(event).fetch('@timestamp')).to eql '2016-10-17T10:37:00.000000Z'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'formats a DateTime object' do
|
||||||
|
event['@timestamp'] = DateTime.parse('2016-10-17 13:37:00 +03:00')
|
||||||
|
expect(helper.normalize_timestamp(event).fetch('@timestamp')).to eql '2016-10-17T10:37:00.000000Z'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'ignores an unset value' do
|
||||||
|
expect(helper.normalize_timestamp(event)).not_to have_key '@timestamp'
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'ignores unknown values' do
|
||||||
|
event['@timestamp'] = 'string'
|
||||||
|
expect(helper.normalize_timestamp(event).fetch('@timestamp')).to eql 'string'
|
||||||
|
|
||||||
|
event['@timestamp'] = nil
|
||||||
|
expect(helper.normalize_timestamp(event).fetch('@timestamp')).to eql nil
|
||||||
|
|
||||||
|
event['@timestamp'] = 42
|
||||||
|
expect(helper.normalize_timestamp(event).fetch('@timestamp')).to eql 42
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'uses the given field name' do
|
||||||
|
event['@timestamp'] = Time.parse('2016-10-17 13:37:00 +03:00')
|
||||||
|
event['custom'] = Time.parse('2016-10-17 20:42:00 +07:00')
|
||||||
|
|
||||||
|
expect(helper.normalize_timestamp(event, field: 'custom')).to match({
|
||||||
|
'@timestamp' => instance_of(Time),
|
||||||
|
'custom' => '2016-10-17T13:42:00.000000Z'
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
@ -146,7 +146,7 @@ describe Rackstash::Sink do
|
|||||||
event_spec = {
|
event_spec = {
|
||||||
'message' => [],
|
'message' => [],
|
||||||
'tags' => [],
|
'tags' => [],
|
||||||
'@timestamp' => instance_of(String)
|
'@timestamp' => instance_of(Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
# only the first event is duplicated
|
# only the first event is duplicated
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user