mirror of
https://github.com/meineerde/rackstash.git
synced 2025-12-19 15:01:12 +00:00
Add File adapter to directly write to a local file
This commit is contained in:
parent
e96a0aaf94
commit
a0c92d57f8
@ -47,9 +47,19 @@ module Rackstash
|
|||||||
FIELD_TAGS = 'tags'.freeze
|
FIELD_TAGS = 'tags'.freeze
|
||||||
FIELD_TIMESTAMP = '@timestamp'.freeze
|
FIELD_TIMESTAMP = '@timestamp'.freeze
|
||||||
FIELD_VERSION = '@version'.freeze
|
FIELD_VERSION = '@version'.freeze
|
||||||
|
|
||||||
|
def self.severity_label(severity)
|
||||||
|
if severity.is_a?(Integer)
|
||||||
|
SEVERITY_LABELS[severity] || SEVERITY_LABELS.last
|
||||||
|
else
|
||||||
|
severity = SEVERITY_NAMES.fetch(severity.to_s.downcase, UNKNOWN)
|
||||||
|
SEVERITY_LABELS[severity]
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
require 'rackstash/logger'
|
require 'rackstash/logger'
|
||||||
|
|
||||||
require 'rackstash/adapters/io'
|
|
||||||
require 'rackstash/adapters/callable'
|
require 'rackstash/adapters/callable'
|
||||||
|
require 'rackstash/adapters/file'
|
||||||
|
require 'rackstash/adapters/io'
|
||||||
|
|||||||
173
lib/rackstash/adapters/file.rb
Normal file
173
lib/rackstash/adapters/file.rb
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
# Copyright 2017 Holger Just
|
||||||
|
#
|
||||||
|
# This software may be modified and distributed under the terms
|
||||||
|
# of the MIT license. See the LICENSE.txt file for details.
|
||||||
|
|
||||||
|
require 'fileutils'
|
||||||
|
require 'pathname'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
|
require 'rackstash/adapters/adapter'
|
||||||
|
|
||||||
|
module Rackstash
|
||||||
|
module Adapters
|
||||||
|
# This log adapter allows to write logs to a file acessible on the local
|
||||||
|
# filesystem. We assume filesystem semantics of the usual local filesystems
|
||||||
|
# used on Linux, macOS, BSDs, or Windows. Here, we can ensure that even
|
||||||
|
# concurrent writes of multiple processes (e.g. multiple worker processes of
|
||||||
|
# an application server) don't produce interleaved log lines.
|
||||||
|
#
|
||||||
|
# When using a remote filesystem (e.g. NFS or most FUSE filesystems but not
|
||||||
|
# for SMB) it might be possible that concurrent log writes to the same file
|
||||||
|
# are interleaved on disk, resulting on probable log corruption. If this is
|
||||||
|
# a concern, you should make sure that only one log adapter of one process
|
||||||
|
# write to a log file at a time or (preferrably) write to a local file
|
||||||
|
# instead.
|
||||||
|
#
|
||||||
|
# When reading the log file, the reader might still see incomplete writes
|
||||||
|
# depending on the OS and filesystem. Since we are only writing complete
|
||||||
|
# lines, it should be safe to continue reading until you observe a newline
|
||||||
|
# (`\n`) character.
|
||||||
|
#
|
||||||
|
# Assuming you are creating the log adapter like this
|
||||||
|
#
|
||||||
|
# Rackstash::Adapters::File.new('/var/log/rackstash/my_app.log')
|
||||||
|
#
|
||||||
|
# you can rotate the file with a config for the the standard
|
||||||
|
# [logrotate](https://github.com/logrotate/logrotate) utility like this:
|
||||||
|
#
|
||||||
|
# /var/log/rackstash/my_app.log {
|
||||||
|
# daily
|
||||||
|
# rotate 30
|
||||||
|
#
|
||||||
|
# # file might be missing if there were no writes that day
|
||||||
|
# missingok
|
||||||
|
# notifempty
|
||||||
|
#
|
||||||
|
# # compress old logfiles but keep the newest rotate file uncompressed
|
||||||
|
# # to still allow writes during rotation
|
||||||
|
# compress
|
||||||
|
# delaycompress
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# Since the {File} adapter automatically reopens the logfile after the
|
||||||
|
# file was moved, you don't need to create the new file there nor should you
|
||||||
|
# use the (potentially destructive) `copytruncate` option of logrotate.
|
||||||
|
class File < Adapter
|
||||||
|
register_for ::String, ::Pathname
|
||||||
|
|
||||||
|
# @return [String] the absolute path to the log file
|
||||||
|
attr_reader :filename
|
||||||
|
|
||||||
|
# Create a new file adapter instance which writes logs to the log file
|
||||||
|
# specified in `filename`.
|
||||||
|
#
|
||||||
|
# We will always resolve `filename` to an absolute path once during
|
||||||
|
# initialization. When passing a relative path, it will be resolved
|
||||||
|
# according to the current working directory. See
|
||||||
|
# [`::File.expand_path`](https://ruby-doc.org/core/File.html#method-c-expand_path)
|
||||||
|
# for details.
|
||||||
|
#
|
||||||
|
# @param filename [String, Pathname] the path to the logfile
|
||||||
|
# @param auto_reopen [Boolean] set to `true` to automatically reopen the
|
||||||
|
# log file (and potentially create a new one) if we detect that the
|
||||||
|
# current log file was moved or deleted, e.g. due to an external
|
||||||
|
# logrotate run
|
||||||
|
def initialize(filename, auto_reopen: true)
|
||||||
|
@filename = ::File.expand_path(filename).freeze
|
||||||
|
@auto_reopen = !!auto_reopen
|
||||||
|
|
||||||
|
@mutex = Mutex.new
|
||||||
|
open_file
|
||||||
|
end
|
||||||
|
|
||||||
|
# @return [Boolean] if `true`, the logfile will be automatically reopened
|
||||||
|
# on write if it is (re-)moved on the filesystem
|
||||||
|
def auto_reopen?
|
||||||
|
@auto_reopen
|
||||||
|
end
|
||||||
|
|
||||||
|
# Write a single log line with a trailing newline character to the open
|
||||||
|
# file. If {#auto_reopen?} is `true`, we will reopen the file object
|
||||||
|
# before the write if we detect that the file was moved, e.g., from an
|
||||||
|
# external logrotate run.
|
||||||
|
#
|
||||||
|
# When writing the log line, ruby uses a single `fwrite(2)` syscall with
|
||||||
|
# `IO#write`. Since we are using unbuffered (sync) IO, concurrent writes
|
||||||
|
# to the file from multiple processes
|
||||||
|
# [are guaranteed](https://stackoverflow.com/a/35256561/421705) to be
|
||||||
|
# serialized by the kernel without overlap.
|
||||||
|
#
|
||||||
|
# @param log [#to_s] the encoded log event
|
||||||
|
# @return [nil]
|
||||||
|
def write_single(log)
|
||||||
|
@mutex.synchronize do
|
||||||
|
auto_reopen
|
||||||
|
@file.write normalize_line(log)
|
||||||
|
end
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
# Close the file. After closing, no further writes are possible. Further
|
||||||
|
# attempts to {#write} will result in an exception being thrown.
|
||||||
|
#
|
||||||
|
# We will not automatically reopen a closed file on {#write}. You have to
|
||||||
|
# explicitly call {#reopen} in this case.
|
||||||
|
#
|
||||||
|
# @return [nil]
|
||||||
|
def close
|
||||||
|
@mutex.synchronize do
|
||||||
|
@file.close
|
||||||
|
end
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
# Reopen the logfile. We will open the file located at the original
|
||||||
|
# {#filename} or create a new one if it does not exist.
|
||||||
|
#
|
||||||
|
# If the file can not be opened, an exception will be raised.
|
||||||
|
# @return [nil]
|
||||||
|
def reopen
|
||||||
|
@mutex.synchronize do
|
||||||
|
reopen_file
|
||||||
|
end
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# Reopen the log file if the original filename does not reference the
|
||||||
|
# opened file anymore (e.g. because it was moved or deleted)
|
||||||
|
def auto_reopen
|
||||||
|
return unless @auto_reopen
|
||||||
|
|
||||||
|
return if @file.closed?
|
||||||
|
return if ::File.identical?(@file, @filename)
|
||||||
|
|
||||||
|
reopen_file
|
||||||
|
end
|
||||||
|
|
||||||
|
def open_file
|
||||||
|
unless ::File.exist? ::File.dirname(@filename)
|
||||||
|
FileUtils.mkdir_p ::File.dirname(@filename)
|
||||||
|
end
|
||||||
|
|
||||||
|
file = ::File.open(
|
||||||
|
filename,
|
||||||
|
::File::WRONLY | ::File::APPEND | ::File::CREAT,
|
||||||
|
external_encoding: Encoding::UTF_8
|
||||||
|
)
|
||||||
|
file.binmode
|
||||||
|
file.sync = true
|
||||||
|
|
||||||
|
@file = file
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
def reopen_file
|
||||||
|
@file.close rescue nil
|
||||||
|
open_file
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
169
spec/rackstash/adapters/file_spec.rb
Normal file
169
spec/rackstash/adapters/file_spec.rb
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
require 'spec_helper'
|
||||||
|
require 'tempfile'
|
||||||
|
require 'tmpdir'
|
||||||
|
|
||||||
|
require 'rackstash/adapters/file'
|
||||||
|
|
||||||
|
describe Rackstash::Adapters::File do
|
||||||
|
let!(:logfile) { Tempfile.new('') }
|
||||||
|
|
||||||
|
let(:adapter_args) { {} }
|
||||||
|
let(:adapter) { described_class.new(logfile.path, **adapter_args) }
|
||||||
|
|
||||||
|
after(:each) do
|
||||||
|
logfile.close
|
||||||
|
logfile.unlink
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#initialize' do
|
||||||
|
it 'accepts a String' do
|
||||||
|
expect(described_class.new(logfile.path).filename)
|
||||||
|
.to eql(logfile.path)
|
||||||
|
.and be_a String
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'accepts a Pathname' do
|
||||||
|
expect(described_class.new(Pathname.new logfile.path).filename)
|
||||||
|
.to eql(logfile.path)
|
||||||
|
.and be_a String
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'rejects non-IO objects' do
|
||||||
|
expect { described_class.new(nil) }.to raise_error TypeError
|
||||||
|
expect { described_class.new(Object.new) }.to raise_error TypeError
|
||||||
|
expect { described_class.new(23) }.to raise_error TypeError
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'creates the file and leading directories' do
|
||||||
|
Dir.mktmpdir do |base|
|
||||||
|
expect(File.exist? File.join(base, 'dir')).to be false
|
||||||
|
|
||||||
|
adapter = described_class.new File.join(base, 'dir', 'sub', 'test.log')
|
||||||
|
|
||||||
|
expect(adapter.filename).to eql File.join(base, 'dir', 'sub', 'test.log')
|
||||||
|
expect(File.directory? File.join(base, 'dir')).to be true
|
||||||
|
expect(File.file? File.join(base, 'dir', 'sub', 'test.log')).to be true
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '.default_encoder' do
|
||||||
|
it 'returns a JSON encoder' do
|
||||||
|
expect(adapter.default_encoder).to be_instance_of Rackstash::Encoders::JSON
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#close' do
|
||||||
|
it 'closes the IO object' do
|
||||||
|
adapter.close
|
||||||
|
expect { adapter.write('hello') }.to raise_error IOError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#reopen' do
|
||||||
|
it 're-opens a closed file' do
|
||||||
|
adapter.close
|
||||||
|
adapter.reopen
|
||||||
|
|
||||||
|
expect { adapter.write('hello') }.not_to raise_error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#write_single' do
|
||||||
|
it 'writes the log line to the file' do
|
||||||
|
adapter.write('a log line')
|
||||||
|
expect(logfile.tap(&:rewind).read).to eql "a log line\n"
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'always writes a string' do
|
||||||
|
adapter.write([123, 'hello'])
|
||||||
|
expect(logfile.tap(&:rewind).read).to eql "[123, \"hello\"]\n"
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'appends a trailing newline if necessary' do
|
||||||
|
adapter.write("a full line.\n")
|
||||||
|
expect(logfile.tap(&:rewind).read).to eql "a full line.\n"
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'with auto_reopen: true' do
|
||||||
|
let(:adapter_args) { { auto_reopen: true } }
|
||||||
|
|
||||||
|
it 'reopens the file if moved' do
|
||||||
|
adapter.write('line1')
|
||||||
|
File.rename(logfile.path, "#{logfile.path}.orig")
|
||||||
|
|
||||||
|
adapter.write('line2')
|
||||||
|
|
||||||
|
expect(File.read("#{logfile.path}.orig")).to eql "line1\n"
|
||||||
|
expect(File.read(logfile.path)).to eql "line2\n"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'with auto_reopen: false' do
|
||||||
|
let(:adapter_args) { { auto_reopen: false } }
|
||||||
|
|
||||||
|
it 'does not reopen the logfile automatically' do
|
||||||
|
adapter.write('line1')
|
||||||
|
File.rename(logfile.path, "#{logfile.path}.orig")
|
||||||
|
|
||||||
|
adapter.write('line2')
|
||||||
|
|
||||||
|
expect(File.read("#{logfile.path}.orig")).to eql "line1\nline2\n"
|
||||||
|
expect(File.exist?(logfile.path)).to be false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'with concurrent processes' do
|
||||||
|
let(:workers) { 20 }
|
||||||
|
let(:lines_per_worker) { 50 }
|
||||||
|
let(:line_length) { 4096 }
|
||||||
|
|
||||||
|
def run_worker(worker_id)
|
||||||
|
filler = (worker_id + 65).chr
|
||||||
|
line = filler * line_length
|
||||||
|
|
||||||
|
adapter = described_class.new(logfile.path)
|
||||||
|
lines_per_worker.times do
|
||||||
|
adapter.write(line)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# This test was adapted from
|
||||||
|
# http://www.notthewizard.com/2014/06/17/are-files-appends-really-atomic/
|
||||||
|
it 'writes atomic log lines' do
|
||||||
|
if Concurrent.on_cruby?
|
||||||
|
worker_processes = workers.times.map { |worker_id|
|
||||||
|
Process.fork do
|
||||||
|
run_worker worker_id
|
||||||
|
end
|
||||||
|
}
|
||||||
|
worker_processes.each do |pid|
|
||||||
|
Process.wait(pid)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
worker_threads = workers.times.map { |worker_id|
|
||||||
|
Thread.new do
|
||||||
|
run_worker worker_id
|
||||||
|
end
|
||||||
|
}
|
||||||
|
worker_threads.each do |thread|
|
||||||
|
thread.join
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Resulting file size is exactly as expected, i.e. no dropped logs
|
||||||
|
# Each line as a trailing newline character.
|
||||||
|
expect(logfile.size).to eql workers * lines_per_worker * (line_length + 1)
|
||||||
|
|
||||||
|
# All lines are written without any overlap
|
||||||
|
expect(File.new(logfile.path).each_line).to all match(/\A(.)\1{#{line_length - 1}}\n\z/)
|
||||||
|
|
||||||
|
# Ensure that not all lines are written sequentially by the same worker,
|
||||||
|
# i.e. there were concurrent writes by multiple workers.
|
||||||
|
expect(
|
||||||
|
File.new(logfile.path).each_line.each_cons(2).count { |l1, l2| l1.to_s[0] == l2.to_s[0] }
|
||||||
|
).to be > workers
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
x
Reference in New Issue
Block a user