diff --git a/lib/rackstash.rb b/lib/rackstash.rb index 4717a23..28987c2 100644 --- a/lib/rackstash.rb +++ b/lib/rackstash.rb @@ -47,9 +47,19 @@ module Rackstash FIELD_TAGS = 'tags'.freeze FIELD_TIMESTAMP = '@timestamp'.freeze FIELD_VERSION = '@version'.freeze + + def self.severity_label(severity) + if severity.is_a?(Integer) + SEVERITY_LABELS[severity] || SEVERITY_LABELS.last + else + severity = SEVERITY_NAMES.fetch(severity.to_s.downcase, UNKNOWN) + SEVERITY_LABELS[severity] + end + end end require 'rackstash/logger' -require 'rackstash/adapters/io' require 'rackstash/adapters/callable' +require 'rackstash/adapters/file' +require 'rackstash/adapters/io' diff --git a/lib/rackstash/adapters/file.rb b/lib/rackstash/adapters/file.rb new file mode 100644 index 0000000..ef8bb53 --- /dev/null +++ b/lib/rackstash/adapters/file.rb @@ -0,0 +1,173 @@ +# Copyright 2017 Holger Just +# +# This software may be modified and distributed under the terms +# of the MIT license. See the LICENSE.txt file for details. + +require 'fileutils' +require 'pathname' +require 'thread' + +require 'rackstash/adapters/adapter' + +module Rackstash + module Adapters + # This log adapter allows to write logs to a file acessible on the local + # filesystem. We assume filesystem semantics of the usual local filesystems + # used on Linux, macOS, BSDs, or Windows. Here, we can ensure that even + # concurrent writes of multiple processes (e.g. multiple worker processes of + # an application server) don't produce interleaved log lines. + # + # When using a remote filesystem (e.g. NFS or most FUSE filesystems but not + # for SMB) it might be possible that concurrent log writes to the same file + # are interleaved on disk, resulting on probable log corruption. If this is + # a concern, you should make sure that only one log adapter of one process + # write to a log file at a time or (preferrably) write to a local file + # instead. + # + # When reading the log file, the reader might still see incomplete writes + # depending on the OS and filesystem. Since we are only writing complete + # lines, it should be safe to continue reading until you observe a newline + # (`\n`) character. + # + # Assuming you are creating the log adapter like this + # + # Rackstash::Adapters::File.new('/var/log/rackstash/my_app.log') + # + # you can rotate the file with a config for the the standard + # [logrotate](https://github.com/logrotate/logrotate) utility like this: + # + # /var/log/rackstash/my_app.log { + # daily + # rotate 30 + # + # # file might be missing if there were no writes that day + # missingok + # notifempty + # + # # compress old logfiles but keep the newest rotate file uncompressed + # # to still allow writes during rotation + # compress + # delaycompress + # } + # + # Since the {File} adapter automatically reopens the logfile after the + # file was moved, you don't need to create the new file there nor should you + # use the (potentially destructive) `copytruncate` option of logrotate. + class File < Adapter + register_for ::String, ::Pathname + + # @return [String] the absolute path to the log file + attr_reader :filename + + # Create a new file adapter instance which writes logs to the log file + # specified in `filename`. + # + # We will always resolve `filename` to an absolute path once during + # initialization. When passing a relative path, it will be resolved + # according to the current working directory. See + # [`::File.expand_path`](https://ruby-doc.org/core/File.html#method-c-expand_path) + # for details. + # + # @param filename [String, Pathname] the path to the logfile + # @param auto_reopen [Boolean] set to `true` to automatically reopen the + # log file (and potentially create a new one) if we detect that the + # current log file was moved or deleted, e.g. due to an external + # logrotate run + def initialize(filename, auto_reopen: true) + @filename = ::File.expand_path(filename).freeze + @auto_reopen = !!auto_reopen + + @mutex = Mutex.new + open_file + end + + # @return [Boolean] if `true`, the logfile will be automatically reopened + # on write if it is (re-)moved on the filesystem + def auto_reopen? + @auto_reopen + end + + # Write a single log line with a trailing newline character to the open + # file. If {#auto_reopen?} is `true`, we will reopen the file object + # before the write if we detect that the file was moved, e.g., from an + # external logrotate run. + # + # When writing the log line, ruby uses a single `fwrite(2)` syscall with + # `IO#write`. Since we are using unbuffered (sync) IO, concurrent writes + # to the file from multiple processes + # [are guaranteed](https://stackoverflow.com/a/35256561/421705) to be + # serialized by the kernel without overlap. + # + # @param log [#to_s] the encoded log event + # @return [nil] + def write_single(log) + @mutex.synchronize do + auto_reopen + @file.write normalize_line(log) + end + nil + end + + # Close the file. After closing, no further writes are possible. Further + # attempts to {#write} will result in an exception being thrown. + # + # We will not automatically reopen a closed file on {#write}. You have to + # explicitly call {#reopen} in this case. + # + # @return [nil] + def close + @mutex.synchronize do + @file.close + end + nil + end + + # Reopen the logfile. We will open the file located at the original + # {#filename} or create a new one if it does not exist. + # + # If the file can not be opened, an exception will be raised. + # @return [nil] + def reopen + @mutex.synchronize do + reopen_file + end + nil + end + + private + + # Reopen the log file if the original filename does not reference the + # opened file anymore (e.g. because it was moved or deleted) + def auto_reopen + return unless @auto_reopen + + return if @file.closed? + return if ::File.identical?(@file, @filename) + + reopen_file + end + + def open_file + unless ::File.exist? ::File.dirname(@filename) + FileUtils.mkdir_p ::File.dirname(@filename) + end + + file = ::File.open( + filename, + ::File::WRONLY | ::File::APPEND | ::File::CREAT, + external_encoding: Encoding::UTF_8 + ) + file.binmode + file.sync = true + + @file = file + nil + end + + def reopen_file + @file.close rescue nil + open_file + end + end + end +end diff --git a/spec/rackstash/adapters/file_spec.rb b/spec/rackstash/adapters/file_spec.rb new file mode 100644 index 0000000..b89fab5 --- /dev/null +++ b/spec/rackstash/adapters/file_spec.rb @@ -0,0 +1,169 @@ +require 'spec_helper' +require 'tempfile' +require 'tmpdir' + +require 'rackstash/adapters/file' + +describe Rackstash::Adapters::File do + let!(:logfile) { Tempfile.new('') } + + let(:adapter_args) { {} } + let(:adapter) { described_class.new(logfile.path, **adapter_args) } + + after(:each) do + logfile.close + logfile.unlink + end + + describe '#initialize' do + it 'accepts a String' do + expect(described_class.new(logfile.path).filename) + .to eql(logfile.path) + .and be_a String + end + + it 'accepts a Pathname' do + expect(described_class.new(Pathname.new logfile.path).filename) + .to eql(logfile.path) + .and be_a String + end + + it 'rejects non-IO objects' do + expect { described_class.new(nil) }.to raise_error TypeError + expect { described_class.new(Object.new) }.to raise_error TypeError + expect { described_class.new(23) }.to raise_error TypeError + end + + it 'creates the file and leading directories' do + Dir.mktmpdir do |base| + expect(File.exist? File.join(base, 'dir')).to be false + + adapter = described_class.new File.join(base, 'dir', 'sub', 'test.log') + + expect(adapter.filename).to eql File.join(base, 'dir', 'sub', 'test.log') + expect(File.directory? File.join(base, 'dir')).to be true + expect(File.file? File.join(base, 'dir', 'sub', 'test.log')).to be true + end + end + end + + describe '.default_encoder' do + it 'returns a JSON encoder' do + expect(adapter.default_encoder).to be_instance_of Rackstash::Encoders::JSON + end + end + + describe '#close' do + it 'closes the IO object' do + adapter.close + expect { adapter.write('hello') }.to raise_error IOError + end + end + + describe '#reopen' do + it 're-opens a closed file' do + adapter.close + adapter.reopen + + expect { adapter.write('hello') }.not_to raise_error + end + end + + describe '#write_single' do + it 'writes the log line to the file' do + adapter.write('a log line') + expect(logfile.tap(&:rewind).read).to eql "a log line\n" + end + + it 'always writes a string' do + adapter.write([123, 'hello']) + expect(logfile.tap(&:rewind).read).to eql "[123, \"hello\"]\n" + end + + it 'appends a trailing newline if necessary' do + adapter.write("a full line.\n") + expect(logfile.tap(&:rewind).read).to eql "a full line.\n" + end + + context 'with auto_reopen: true' do + let(:adapter_args) { { auto_reopen: true } } + + it 'reopens the file if moved' do + adapter.write('line1') + File.rename(logfile.path, "#{logfile.path}.orig") + + adapter.write('line2') + + expect(File.read("#{logfile.path}.orig")).to eql "line1\n" + expect(File.read(logfile.path)).to eql "line2\n" + end + end + + context 'with auto_reopen: false' do + let(:adapter_args) { { auto_reopen: false } } + + it 'does not reopen the logfile automatically' do + adapter.write('line1') + File.rename(logfile.path, "#{logfile.path}.orig") + + adapter.write('line2') + + expect(File.read("#{logfile.path}.orig")).to eql "line1\nline2\n" + expect(File.exist?(logfile.path)).to be false + end + end + end + + context 'with concurrent processes' do + let(:workers) { 20 } + let(:lines_per_worker) { 50 } + let(:line_length) { 4096 } + + def run_worker(worker_id) + filler = (worker_id + 65).chr + line = filler * line_length + + adapter = described_class.new(logfile.path) + lines_per_worker.times do + adapter.write(line) + end + end + + # This test was adapted from + # http://www.notthewizard.com/2014/06/17/are-files-appends-really-atomic/ + it 'writes atomic log lines' do + if Concurrent.on_cruby? + worker_processes = workers.times.map { |worker_id| + Process.fork do + run_worker worker_id + end + } + worker_processes.each do |pid| + Process.wait(pid) + end + else + worker_threads = workers.times.map { |worker_id| + Thread.new do + run_worker worker_id + end + } + worker_threads.each do |thread| + thread.join + end + end + + # Resulting file size is exactly as expected, i.e. no dropped logs + # Each line as a trailing newline character. + expect(logfile.size).to eql workers * lines_per_worker * (line_length + 1) + + # All lines are written without any overlap + expect(File.new(logfile.path).each_line).to all match(/\A(.)\1{#{line_length - 1}}\n\z/) + + # Ensure that not all lines are written sequentially by the same worker, + # i.e. there were concurrent writes by multiple workers. + expect( + File.new(logfile.path).each_line.each_cons(2).count { |l1, l2| l1.to_s[0] == l2.to_s[0] } + ).to be > workers + end + end +end