require 'optparse' require 'logger' module Tailscript class NewTail def initialize(paths) @paths = paths @tails = {} @pos_file = $options[:pos_file] @read_from_head = $options[:read_from_head] @pf = nil @pf_file = nil level = 'info' level = $options[:log_level] if $options.has_key?(:log_level) # trace is not supported, let fallback to debug level = 'debug' if level == 'trace' @log = Logger.new(STDERR) @log.level = level @log.formatter = proc do |severity, time, progname, msg| "#{severity} #{msg}\n" end @log.info "Received paths from sudo tail plugin : #{paths}, log_level=#{level}" end attr_reader :paths def file_exists(path) if File.exist?(path) @log.info "Following tail of #{path}" return path else @log.warn "#{path} does not exist. Cannot tail the file." return nil end end def expand_paths() arr_paths = @paths.split(',').map {|path| path.strip } date = Time.now expanded_paths = [] arr_paths.each { |path| path = date.strftime(path) if path.include?('*') Dir.glob(path).select { |p| begin is_file = !File.directory?(p) if File.readable?(p) && is_file @log.info "Following tail of #{p}" expanded_paths << p elsif !File.readable?(p) @log.warn "#{p} is excluded since it's unreadable or doesn't have proper permissions." else @log.warn "#{p} is a directory and thus cannot be tailed" end rescue Errno::ENOENT @log.debug("#{p} is missing after refreshing file list") end } else file = file_exists(path) if !file.nil? if File.readable?(path) && !File.directory?(path) expanded_paths << file elsif !File.readable?(path) @log.warn "#{path} is excluded since it's unreadable or doesn't have proper permissions." else @log.warn "#{path} is a directory and thus cannot be tailed" end end end } return expanded_paths end def start paths = expand_paths() start_watchers(paths) unless paths.empty? end def shutdown @pf_file.close if @pf_file end def setup_watcher(path, pe) tw = TailWatcher.new(path, pe, @read_from_head, @log, &method(:receive_lines)) tw.on_notify tw end def start_watchers(paths) if @pos_file @pf_file = File.open(@pos_file, File::RDWR|File::CREAT) @pf_file.sync = true @pf = PositionFile.parse(@pf_file, @log) end paths.each { |path| pe = nil if @pf pe = @pf[path] #pe is FilePositionEntry instance if @read_from_head && pe.read_inode.zero? begin pe.update(File::Stat.new(path).ino, 0) rescue Errno::ENOENT @log.warn "#{path} not found. Continuing without tailing it." end end end @tails[path] = setup_watcher(path, pe) } end def receive_lines(lines, tail_watcher) unless lines.empty? puts lines end return true end class TailWatcher def initialize(path, pe, read_from_head, log, &receive_lines) @path = path @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @log = log @receive_lines = receive_lines @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) @io_handler = nil end attr_reader :path def wrap_receive_lines(lines) @receive_lines.call(lines, self) end def on_notify @rotate_handler.on_notify if @rotate_handler return unless @io_handler @io_handler.on_notify end def on_rotate(io) if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if @read_from_head pos = 0 @pe.update(inode, pos) elsif inode == last_inode # rotated file has the same inode number as the pos_file. # seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # read data from the head of the rotated file. pos = 0 @pe.update(inode, pos) else # this is the first MemoryPositionEntry for the first time fluentd started. # seeks to the end of the file to know where to start tailing pos = fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end end class IOHandler def initialize(io, pe, log, &receive_lines) @log = log @io = io @pe = pe @log = log @read_lines_limit = 1000 @receive_lines = receive_lines @buffer = ''.force_encoding('ASCII-8BIT') @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @SEPARATOR = -"\n" end attr_reader :io def on_notify @log.debug "Seeking to read file - #{@io.path} from #{@io.pos} position and file size is #{@io.stat.size}" begin read_more = false if @lines.empty? begin while true if @buffer.empty? @io.readpartial(2048, @buffer) else @buffer << @io.readpartial(2048, @iobuf) end while idx = @buffer.index(@SEPARATOR) @lines << @buffer.slice!(0, idx + 1) end if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true break end end rescue EOFError end end unless @lines.empty? if @receive_lines.call(@lines) @pe.update_pos(@io.pos - @buffer.bytesize) @lines.clear else read_more = false end end end while read_more rescue @log.error "#{$!.to_s}" close end def close @io.close unless @io.closed? end end class NullIOHandler def initialize end def io end def on_notify end def close end end class RotateHandler def initialize(path, log, &on_rotate) @path = path @inode = nil @fsize = -1 # first @on_rotate = on_rotate @log = log end def on_notify begin stat = File.stat(@path) #returns a File::Stat object for the file named @path inode = stat.ino fsize = stat.size rescue Errno::ENOENT # moved or deleted inode = nil fsize = 0 end begin if @inode != inode || fsize < @fsize # rotated or truncated begin io = File.open(@path) rescue Errno::ENOENT end @on_rotate.call(io) end @inode = inode @fsize = fsize end rescue @log.error "#{$!.to_s}" end end end class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff def initialize(file, file_mutex, map, last_pos) @file = file @file_mutex = file_mutex @map = map @last_pos = last_pos end def [](path) if m = @map[path] return m end @file_mutex.synchronize { @file.pos = @last_pos @file.write "#{path}\t0000000000000000\t0000000000000000\n" seek = @last_pos + path.bytesize + 1 @last_pos = @file.pos @map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0) } end def self.parse(file, log) @log = log compact(file) file_mutex = Mutex.new map = {} file.pos = 0 file.each_line {|line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) unless m @log.warn "Unparsable line in pos_file: #{line}" next end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) seek = file.pos - line.bytesize + path.bytesize + 1 map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino) } new(file, file_mutex, map, file.pos) end # Clean up unwatched file entries def self.compact(file) file.pos = 0 existent_entries = file.each_line.map { |line| m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line) unless m @log.warn "Unparsable line in pos_file: #{line}" next end path = m[1] pos = m[2].to_i(16) ino = m[3].to_i(16) # 32bit inode converted to 64bit at this phase pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino]) }.compact file.pos = 0 file.truncate(0) file.write(existent_entries.join) end end # pos inode # ffffffffffffffff\tffffffffffffffff\n class FilePositionEntry POS_SIZE = 16 INO_OFFSET = 17 INO_SIZE = 16 LN_OFFSET = 33 SIZE = 34 def initialize(file, file_mutex, seek, pos, inode) @file = file @file_mutex = file_mutex @seek = seek @pos = pos @inode = inode end def update(ino, pos) @file_mutex.synchronize { @file.pos = @seek @file.write "%016x\t%016x" % [pos, ino] } @pos = pos @inode = ino end def update_pos(pos) @file_mutex.synchronize { @file.pos = @seek @file.write "%016x" % pos } @pos = pos end def read_inode @inode end def read_pos @pos end end class MemoryPositionEntry def initialize @pos = 0 @inode = 0 end def update(ino, pos) @inode = ino @pos = pos end def update_pos(pos) @pos = pos end def read_pos @pos end def read_inode @inode end end end end if __FILE__ == $0 $options = {:read_from_head => false} OptionParser.new do |opts| opts.on("-p", "--posfile [POSFILE]") do |p| $options[:pos_file] = p end opts.on("-h", "--[no-]readfromhead") do |h| $options[:read_from_head] = h end opts.on("--log_level [LOG_LEVEL]") do |level| $options[:log_level] = level end end.parse! begin a = Tailscript::NewTail.new(ARGV[0]) a.start a.shutdown rescue => e log = Logger.new(STDERR) log.formatter = proc do |severity, time, progname, msg| "#{severity} #{msg}\n" end log.error "Tailfilereader crashed due to an unexpected exit --- #{e.message} #{e.backtrace.inspect}" end end