1
0
mirror of https://github.com/meineerde/rackstash.git synced 2025-10-17 14:01:01 +00:00

Implement the full Sink class

A single Sink is tied to a single Logger. It is responsible to:

* Create a log event from a Buffer on #write and send it to each of the
  flows independently.
* Forward all actions to all of the defined Flows.

The Sink provides access to all configured data of the Logger which is
used for persisting the Buffers.
This commit is contained in:
Holger Just 2017-07-20 00:00:48 +02:00
parent 54a6f5d160
commit 720406b318
5 changed files with 366 additions and 28 deletions

View File

@ -152,7 +152,7 @@ module Rackstash
def flush
return unless pending?
@sink.flush(self)
@sink.write(self)
self
end

View File

@ -5,17 +5,228 @@
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'rackstash/flows'
module Rackstash
class Sink
attr_reader :targets
# @return [Hash, Proc] the default fields which get merged into each buffer
# on {#write}.
attr_reader :default_fields
def initialize(targets)
@targets = targets.respond_to?(:to_ary) ? targets.to_ary : [targets]
# @return [Array<#to_s, Proc>, Proc] the default tags which get merged into
# each buffer on flush. It can either be a Array of String keys or a Proc
# which returns an Array. Each element of the array can also be a Proc
# which gets resolved on {#write}.
attr_reader :default_tags
# @return [Flows] The {Flows} defined for this sink
attr_reader :flows
# @param flows [Array<Flow, Object>, Flow, Adapters::Adapter, Object]
# an array of {Flow}s or a single {Flow}, respectivly object which can be
# used as a {Flow}'s adapter. See {Flow#initialize}.
def initialize(flows)
@flows = Rackstash::Flows.new(flows)
@default_fields = {}
@default_tags = []
end
def flush(buffer)
@targets.each do |target|
target.flush(buffer)
# Set the default fields to the given value.
#
# These fields get merged into each buffer on {#write}. It can either be a
# `Hash` or a `Proc` which in turn returns a `Hash` on call. The `Hash` can
# be nested arbitrarily deep.
#
# Each value can also be an Proc which gets evaluated on {#write}. That way,
# you can set lazy-evaluated values.
#
# @example The following calls show equivalent results
#
# sink.default_fields = {'beep' => 'boop'}
#
# sink.default_fields = lambda {
# {
# 'beep' => 'boop'
# }
# }
#
# sink.default_fields = {
# 'beep' => -> { 'boop' }
# }
#
# @param fields [#to_hash, Proc] The default fields to be merged into each
# buffer on {#write}
# @raise [TypeError] if `fields` is neither a Proc nor can be converted to a
# Hash
# @return [Hash, Proc] the given `fields`
def default_fields=(fields)
fields = fields.to_hash if fields.respond_to?(:to_hash)
unless fields.is_a?(Hash) || fields.is_a?(Proc)
raise TypeError, 'default_fields must be a Hash or Proc'
end
@default_fields = fields
end
# Set the default tags to the given value.
#
# These tags get merged into each buffer on {#write}. It can either be an
# `Array` of strings or a `Proc` which in turn returns an `Array` of strings
# on call. The array's values can also be procs which should return a String
# on call. All procs are evaluated on {#write}. That way, you can set
# lazy-evaluated values.
#
# @example The following calls show equivalent results
#
# sink.default_tags = ['important', 'request']
#
# sink.default_tags = lambda {
# ['important', 'request']
# }
#
# sink.default_tags = [
# 'important', -> { 'request' }
# }
#
# @param tags [#to_ary, Proc] The default tags to be merged into each
# buffer on {#write}
# @raise [TypeError] if `tags` is neither a Proc nor can be converted to an
# Array
# @return [Array, Proc] the given `tags`
def default_tags=(tags)
tags = tags.to_ary if tags.respond_to?(:to_ary)
unless tags.is_a?(Array) || tags.is_a?(Proc)
raise TypeError, 'default_tags must be an Array or Proc'
end
@default_tags = tags
end
# Close the log adapter for each configured {Flow}. This might be a no-op
# depending on each flow's adapter.
#
# @return [nil]
def close
@flows.each(&:close)
nil
end
# Close and re-open the log adapter for each configured {Flow}. This might
# be a no-op depending on each flow's adapter.
#
# @return [nil]
def reopen
@flows.each(&:reopen)
nil
end
# Create an event hash from the given `buffer` and write it to each of the
# defined {#flows}.
#
# First, we transform the given `buffer` to an event hash:
#
# * We deep-merge the {#default_fields} into the `buffer`'s fields and use
# it as the basis the the event hash. Existing fields on the `buffer` will
# always have precedence here.
# * We add the {#default_tags} to the `buffer`'s tags and add them as a raw
# array of strings to the `event['tags']` field.
# * We add the `buffer`'s array of messages to `event['message']`. This
# field now contains an array of {Message} objects.
# * We add the `buffer`'s timestamp to the `event['@timestamp]` field as an
# ISO 8601 formatted string. The timestamp is always in UTC.
# * We add the version of the logstash event format as
# `event[@version] = 1`.
#
# The typical event emitted here looks like this:
#
# {
# "beep" => "boop",
# "foo" => ["bar", "baz"],
# "tags" => ["request", "controller#action"],
# "message" => [
# #<Rackstash::Message:0x007f908b4414c0 ...>,
# #<Rackstash::Message:0x007f908d14aee0 ...>
# ],
# "@timestamp" => "2016-10-17T13:37:42.000Z",
# "@version" => "1"
# }
#
# The resulting event hash is written to each defined {Flow}. Since a flow
# usually changes the event hash with its filters and encoder, we create a
# fresh copy of the hash for each flow.
#
# @param buffer [Buffer] The buffer cotnaining the data to write to the
# {#flows}.
# @return [Buffer] the given `buffer`
def write(buffer)
event = event_from_buffer(buffer)
# Memoize the current list of flows for the rest of the method to make
# sure it doesn't change while we work with it.
flows = @flows.to_a
flows_size = flows.size
flows.each_with_index do |flow, index|
# If we have more than one flow, we provide a fresh copy of the event
# to each flow. The flow's filter and codec can then mutate the event
# however it pleases without affecting later flows. We don't need to
# duplicate the event for the last flow since it won't be re-used
# after that anymore.
current_event = (index == flows_size - 1) ? event : deep_dup_event(event)
flow.write(current_event)
end
buffer
end
private
# Create a raw event hash from a Buffer.
#
# Note that the resulting hash still contains an Array of {Message}s in the
# `"message"` field. This allows flow {Filters} to reject or adapt some
# messages based on their original attributes, e.g., their severity or
# timestamp.
#
# @see Flow#write
#
# @param buffer [Buffer] a buffer instance
# @return [Hash] the event expected by the event filters.
def event_from_buffer(buffer)
event = buffer.fields.deep_merge(@default_fields, force: false).to_h
event[FIELD_TAGS] = buffer.tags.merge(@default_tags).to_a
event[FIELD_MESSAGE] = buffer.messages
event[FIELD_TIMESTAMP] = buffer.timestamp
event[FIELD_VERSION] = '1'.freeze
event
end
# Create a deep duplicate of an event hash. It is assumed that the input
# event follows the normalized structure as generated by
# {Fields::Hash#to_h}.
#
# @param obj [Object] an object to duplicate. When initially called, this is
# expected to be an event hash
# @return [Object] a deep copy of the given `obj` if it was an `Array` or
# `Hash`, the original `obj` otherwise.
def deep_dup_event(obj)
case obj
when Hash
hash = obj.dup
obj.each_pair do |key, value|
# {Rackstash::Fields::Hash} already guarantees that keys are always
# frozen Strings. We don't need to dup them.
hash[key] = deep_dup_event(value)
end
when Array
obj.map { |value| deep_dup_event(value) }
else
# All leaf-values in the event are either frozen or not freezable /
# dupable. They can be used as is. See {AbstractCollection#normalize}
obj
end
end
end

View File

@ -148,11 +148,11 @@ describe Rackstash::Buffer do
buffer.add_message double(message: 'Hello World!', time: Time.now)
# We might call Buffer#flush during the following tests
allow(sink).to receive(:flush).with(buffer).once
allow(sink).to receive(:write).with(buffer).once
end
it 'flushes the buffer to the sink' do
expect(sink).to receive(:flush).with(buffer).once
expect(sink).to receive(:write).with(buffer).once
buffer.flush
end
@ -169,7 +169,7 @@ describe Rackstash::Buffer do
context 'when not pending?' do
it 'does not flushes the buffer to the sink' do
expect(sink).not_to receive(:flush)
expect(sink).not_to receive(:write)
buffer.flush
end

View File

@ -388,7 +388,7 @@ describe Rackstash::Logger do
end
it 'buffers multiple messages' do
expect(logger.sink).to receive(:flush).once
expect(logger.sink).to receive(:write).once
logger.with_buffer do
logger.add 1, 'Hello World'

View File

@ -9,31 +9,158 @@ require 'spec_helper'
require 'rackstash/sink'
require 'rackstash/buffer'
require 'rackstash/flows'
require 'rackstash/flow'
describe Rackstash::Sink do
let(:targets) { [] }
let(:sink) { described_class.new(targets) }
describe '#initialize' do
it 'accepts an array with targets' do
expect(targets).to receive(:to_ary).once.and_call_original
expect(sink.targets).to equal targets
def a_flow
flow = instance_double('Rackstash::Flow')
allow(flow).to receive(:is_a?).with(Rackstash::Flow).and_return(true)
flow
end
it 'wraps a single target into an array' do
target = Object.new
expect(described_class.new(target).targets).to eql [target]
let(:flow) { a_flow }
let(:sink) { described_class.new(flow) }
describe 'initialize' do
# We deliberately use the real Rackstash::Flows class here to server as an
# integration test
it 'wraps a single flow in a flows list' do
expect(Rackstash::Flows).to receive(:new).with(flow)
.and_call_original
sink = described_class.new(flow)
expect(sink.flows).to be_a Rackstash::Flows
expect(sink.flows.to_a).to eql [flow]
end
it 'wraps multiple flows in a flows list' do
flows = [a_flow, a_flow]
expect(Rackstash::Flows).to receive(:new).with(flows)
.and_call_original
sink = described_class.new(flows)
expect(sink.flows).to be_a Rackstash::Flows
expect(sink.flows.to_a).to eql flows
end
end
describe '#flush' do
it 'flushes the buffer to all targets' do
buffer = double('buffer')
describe '#default_fields' do
it 'can set a proc' do
a_proc = proc { nil }
expect(a_proc).not_to receive(:call)
target = double('target')
targets << target
sink.default_fields = a_proc
expect(sink.default_fields).to equal a_proc
end
expect(target).to receive(:flush).with(buffer)
sink.flush(buffer)
it 'can set a Hash' do
hash = { foo: :bar }
sink.default_fields = hash
expect(sink.default_fields).to equal hash
end
it 'can set a Hash-like object' do
hash_alike = double('hash')
expect(hash_alike).to receive(:to_hash).and_return(foo: :bar)
sink.default_fields = hash_alike
expect(sink.default_fields).to eql(foo: :bar)
expect(sink.default_fields).not_to equal hash_alike
end
it 'refuses invalid fields' do
expect { sink.default_fields = nil }.to raise_error TypeError
expect { sink.default_fields = 42 }.to raise_error TypeError
expect { sink.default_fields = ['foo'] }.to raise_error TypeError
end
end
describe '#default_tags' do
it 'can set a proc' do
tags = proc { nil }
expect(tags).not_to receive(:call)
sink.default_tags = tags
expect(sink.default_tags).to equal tags
end
it 'can set an Array' do
array = [:foo, 'bar']
sink.default_tags = array
expect(sink.default_tags).to equal array
end
it 'can set an Array-like object' do
array_alike = double('array')
expect(array_alike).to receive(:to_ary).and_return([:foo])
sink.default_tags = array_alike
expect(sink.default_tags).to eql [:foo]
expect(sink.default_tags).not_to equal array_alike
end
it 'refuses invalid fields' do
expect { sink.default_tags = nil }.to raise_error TypeError
expect { sink.default_tags = 42 }.to raise_error TypeError
expect { sink.default_tags = { foo: :bar } }.to raise_error TypeError
end
end
describe '#close' do
let(:flow) { [a_flow, a_flow] }
it 'calls close on all flows' do
expect(flow).to all receive(:close)
expect(sink.close).to be_nil
end
end
describe '#reopen' do
let(:flow) { [a_flow, a_flow] }
it 'calls reopen on all flows' do
expect(flow).to all receive(:reopen)
expect(sink.reopen).to be_nil
end
end
describe '#write' do
let(:flows) {
[a_flow, a_flow].each do |flow|
allow(flow).to receive(:write)
end
}
let(:sink) { described_class.new(flows) }
let(:buffer) { Rackstash::Buffer.new(sink) }
it 'merges default_fields and default_tags' do
expect(buffer.fields).to receive(:deep_merge).once
expect(buffer.tags).to receive(:merge).once
sink.write(buffer)
end
it 'flushes the buffer to all flows' do
event_spec = {
'message' => [],
'tags' => [],
'@timestamp' => instance_of(String),
'@version' => '1'
}
# only the first event is duplicated
expect(sink).to receive(:deep_dup_event).with(event_spec).and_call_original.ordered
expect(sink).to receive(:deep_dup_event).with(anything).exactly(4).times.and_call_original
# During flush, we create a single event, duplicate it and write each to
# each of the flows.
expect(flows).to all receive(:write).with(event_spec)
sink.write(buffer)
end
end
end