1
0
mirror of https://github.com/meineerde/rackstash.git synced 2026-02-01 01:37:12 +00:00
Holger Just aa37d47b8f Singularize the Adapter and Encoder modules
Since we are using single objects from these namespaces, they are much
more suitable to be named in singular than in plural.
2017-10-20 22:27:50 +02:00

243 lines
7.3 KiB
Ruby

# frozen_string_literal: true
#
# Copyright 2017 Holger Just
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE.txt file for details.
require 'concurrent'
require 'rackstash/flow'
module Rackstash
# The `Flows` class provides a thread-safe list of {Flow} objects which are
# used to write a single log event of a {Buffer} to multiple flows. Each
# {Logger} object has an associated Flows object to define the logger's flows.
class Flows
# @param flows [::Array<Flow, Adapter::Adapter, Object>] the {Flow} objects
# which should be part of the list. If any of the arguments is not a
# {Flow} already, we assume it is an adapter and create a new {Flow} for
# it.
def initialize(*flows)
@flows = Concurrent::Array.new
flows.each do |flow|
add(flow)
end
end
# Add a new flow at the end of the list.
#
# @param flow [Flow, Adapter::Adapter, Object] The flow to add to the end
# of the list. If the argument is not a {Flow}, we assume it is an adapter
# and create a new {Flow} with it.
# @return [self]
def <<(flow)
flow = Flow.new(flow) unless flow.is_a?(Flow)
@flows << flow
self
end
alias add <<
# Retrieve an existing flow from a given `index`
#
# @param index [Integer] the index in the list where we fetch the flow
# @return [Flow, nil] the flow at `index` or `nil` if no flow could be found
def [](index)
@flows[index]
end
# Set a flow at a given index. If the argument is not a {Flow}, we assume it
# is an adapter and create a new {Flow} for it.
#
# @param index [Integer] the index in the list where we set the flow
# @param flow [Flow, Adapter::Adapter, Object] The flow to add at `index`.
# If the argument is not a {Flow}, we assume it is an adapter and create
# a new {Flow} with it.
# @return [void]
def []=(index, flow)
flow = Flow.new(flow) unless flow.is_a?(Flow)
@flows[index] = flow
flow
end
# Calls the given block once for each flow in `self`, passing that flow as
# a parameter. We only yield non-nil elements. Concurrent changes to `self`
# do not affect the running enumeration.
#
# An `Enumerator` is returned if no block is given.
#
# @yield [flow] calls the given block once for each flow
# @yieldparam flow [Flow] the yielded flow
# @return [Enumerator, self] `self` if a block was given or an `Enumerator`
# if no block was given.
def each
return enum_for(__method__) unless block_given?
to_a.each do |flow|
yield flow
end
self
end
# @return [Boolean] `true` if `self` contains no elements, `false` otherwise
def empty?
@flows.empty?
end
# @overload first
# @return [Flow, nil] the first flow. If the list is empty, `nil` is
# returned.
#
# @overload first(n)
# @param n [Integer] the number of flows to return
# @return [Array<Flow>] the first `n` flows. If the list is empty, an
# empty array is returned.
#
# @return [Flow, Array<Flow>, nil]
# @see #last #last for the opposite effect.
def first(n = UNDEFINED)
if UNDEFINED.equal? n
@flows.first
else
@flows.first(n)
end
end
# Prevents further modifications to the flows list. A `RuntimeError` will be
# raised if you attempt to add, delete, or overwrite flows in the list.
# There is no way to unfreeze a frozen object.
#
# @return [self]
def freeze
@flows.freeze
super
end
# @return [String] a string representation of `self`
def inspect
id_str = Object.instance_method(:to_s).bind(self).call[2..-2]
"#<#{id_str} #{self}>"
end
# @overload last
# @return [Flow, nil] the last flow. If the list is empty, `nil` is
# returned.
#
# @overload last(n)
# @param n [Integer] the number of flows to return
# @return [Array<Flow>] the last `n` flows. If the list is empty, an empty
# array is returned.
#
# @return [Flow, Array<Flow>, nil]
# @see #first #first for the opposite effect.
def last(n = UNDEFINED)
if UNDEFINED.equal? n
@flows.last
else
@flows.last(n)
end
end
# @return [Integer] the number of elements in `self`. May be zero.
def length
@flows.length
end
alias size length
# Close the log adapter for each configured {Flow}. This might be a no-op
# depending on each flow's adapter.
#
# @return [nil]
def close
each(&:close)
nil
end
# Close and re-open the log adapter for each configured {Flow}. This might
# be a no-op depending on each flow's adapter.
#
# @return [nil]
def reopen
each(&:reopen)
nil
end
# Write an event `Hash` to each of the defined flows. The event is usually
# created from {Buffer#to_event}.
#
# We write a fresh deep-copy of the event hash to each defined {Flow}.
# This allows each flow to alter the event in any way without affecting the
# others.
#
# @param event [Hash] an event `Hash`
# @return [Hash] the given `event`
def write(event)
# Memoize the current list of flows for the rest of the method to make
# sure it doesn't change while we work with it.
flows = to_a
flows_size = flows.size
flows.each_with_index do |flow, index|
# If we have more than one flow, we provide a fresh copy of the event
# to each flow. The flow's filters and encoder can then mutate the event
# however it pleases without affecting later flows. We don't need to
# duplicate the event for the last flow since it won't be re-used
# after that anymore.
current_event = (index == flows_size - 1) ? event : deep_dup_event(event)
flow.write(current_event)
end
event
end
# @return [Array<Flow>] an array of all flow elements without any `nil`
# values
def to_ary
@flows.to_a.tap(&:compact!)
end
alias to_a to_ary
# @return [String] an Array-compatible string representation of `self`
def to_s
@flows.to_s
end
protected
attr_reader :flows
private
def initialize_copy(other)
@flows = other.flows.dup
super
end
# Create a deep duplicate of an event hash. It is assumed that the input
# event follows the normalized structure as generated by
# {Fields::Hash#to_h}.
#
# @param obj [Object] an object to duplicate. When initially called, this is
# expected to be an event hash
# @return [Object] a deep copy of the given `obj` if it was an `Array` or
# `Hash`, the original `obj` otherwise.
def deep_dup_event(obj)
case obj
when Hash
hash = obj.dup
obj.each_pair do |key, value|
# {Rackstash::Fields::Hash} already guarantees that keys are always
# frozen Strings. We don't need to dup them.
hash[key] = deep_dup_event(value)
end
when Array
obj.map { |value| deep_dup_event(value) }
else
# All leaf-values in the event are either frozen or not freezable /
# dupable. They can be used as is. See {AbstractCollection#normalize}
obj
end
end
end
end