Module errgrep.log_line
Expand source code
import functools
import pathlib
import queue
import re
import sys
import time
import typing
from .line_timestamper import LineTimestamper
from .non_blocking_read_thread import stdin_read_thread
class LogLine:
def __init__(self, raw_text=None, raw_text_lines=None,
log_file=None, read_from_stdin=False, previous_line:typing.Optional[typing.TypeVar('LogLine')]=None,
line_timestamper:typing.Optional[LineTimestamper]=None, max_seconds_till_line_split:float=1,
next_line_index:int=0, allow_timestamp_format_changes:bool=False):
'''
If a - is given as the log_file, will read from stdin, (and ignore read_from_stdin)
'''
if (raw_text and log_file and raw_text_lines and read_from_stdin) or \
(raw_text is None and log_file is None and raw_text_lines is None and read_from_stdin is False):
raise ValueError("Please provide either raw_text or log_file or raw_text_lines... \
not more or less than one. Or we can use read_from_stdin without one of the others.")
# splitlines() is slow on big inputs... try to minimize how often we do it
self.raw_text_lines = []
self.read_from_stdin = read_from_stdin
self.next_line_index = next_line_index
if raw_text_lines:
self.raw_text_lines = raw_text_lines
elif raw_text:
self.raw_text_lines = raw_text.splitlines()
elif log_file:
if log_file == '-':
self.read_from_stdin = True
else:
self.raw_text_lines = pathlib.Path(log_file).read_text().splitlines()
# We can read_from_stdin AFTER raw_text_lines
if self.read_from_stdin:
stdin_read_thread.start_if_not_started_yet()
# when reading from stdin, we wait at most this much time before assuming a log line split
self.max_seconds_till_line_split = max_seconds_till_line_split
self.timestamp = None
self.log_line_lines = []
self.log_message = ''
self.previous_line = previous_line
self.line_timestamper = line_timestamper or LineTimestamper(allow_timestamp_format_changes=allow_timestamp_format_changes)
self._parse()
def _iter_lines(self):
''' yields a line from the given place... if it yields a None, assume that a line break happened '''
if self.raw_text_lines:
for idx in range(self.next_line_index, len(self.raw_text_lines), 1):
yield self.raw_text_lines[idx]
if self.read_from_stdin:
break_force_time = time.time() + self.max_seconds_till_line_split
while stdin_read_thread.is_alive():
try:
line = stdin_read_thread.lines_queue.get_nowait()
self.raw_text_lines.append(line)
break_force_time = time.time() + self.max_seconds_till_line_split
yield line
except queue.Empty:
if time.time() > break_force_time:
break_force_time = time.time() + self.max_seconds_till_line_split
yield None
time.sleep(.0001)
def _parse(self):
self.log_line_lines = []
# Key Assumption:
# All lines without timestamp are part of this log statement
for line in self._iter_lines():
if line is None:
# force a line break right now... timestamp should be set from earlier on
break
timestamp = self.line_timestamper.coerce_datetime_from_line(line)
if timestamp:
if len(self.log_line_lines) == 0:
self.timestamp = timestamp
self.log_line_lines.append(line)
else:
# new timestamp means we're done
break
else:
self.log_line_lines.append(line)
self.log_message = '\n'.join(self.log_line_lines)
@functools.lru_cache(maxsize=100)
def get_next_log_line(self) -> typing.Optional[typing.TypeVar('LogLine')]:
'''
Returns the next LogLine in the log.
Returns None if there is no more available
'''
new_next_line_index = self.next_line_index + len(self.log_line_lines)
if (new_next_line_index < len(self.raw_text_lines)) or (self.read_from_stdin and stdin_read_thread.is_alive()):
return LogLine(raw_text_lines=self.raw_text_lines,
previous_line=self,
read_from_stdin=self.read_from_stdin,
line_timestamper=self.line_timestamper,
next_line_index=new_next_line_index)
def iter_log_lines_with_regex(self, regex, ignore_case=True):
'''
Goes through all LogLines checking if the message matches the regex. For each that,
matches, yields the matching LogLine.
'''
current_line = self
regex_c = re.compile(regex, flags=re.IGNORECASE if ignore_case else 0)
# walk through all lines
while current_line is not None:
if re.findall(regex_c, current_line.log_message):
yield current_line
current_line = current_line.get_next_log_line()
Classes
class LogLine (raw_text=None, raw_text_lines=None, log_file=None, read_from_stdin=False, previous_line: Union[~LogLine, NoneType] = None, line_timestamper: Union[LineTimestamper, NoneType] = None, max_seconds_till_line_split: float = 1, next_line_index: int = 0, allow_timestamp_format_changes: bool = False)
-
If a - is given as the log_file, will read from stdin, (and ignore read_from_stdin)
Expand source code
class LogLine: def __init__(self, raw_text=None, raw_text_lines=None, log_file=None, read_from_stdin=False, previous_line:typing.Optional[typing.TypeVar('LogLine')]=None, line_timestamper:typing.Optional[LineTimestamper]=None, max_seconds_till_line_split:float=1, next_line_index:int=0, allow_timestamp_format_changes:bool=False): ''' If a - is given as the log_file, will read from stdin, (and ignore read_from_stdin) ''' if (raw_text and log_file and raw_text_lines and read_from_stdin) or \ (raw_text is None and log_file is None and raw_text_lines is None and read_from_stdin is False): raise ValueError("Please provide either raw_text or log_file or raw_text_lines... \ not more or less than one. Or we can use read_from_stdin without one of the others.") # splitlines() is slow on big inputs... try to minimize how often we do it self.raw_text_lines = [] self.read_from_stdin = read_from_stdin self.next_line_index = next_line_index if raw_text_lines: self.raw_text_lines = raw_text_lines elif raw_text: self.raw_text_lines = raw_text.splitlines() elif log_file: if log_file == '-': self.read_from_stdin = True else: self.raw_text_lines = pathlib.Path(log_file).read_text().splitlines() # We can read_from_stdin AFTER raw_text_lines if self.read_from_stdin: stdin_read_thread.start_if_not_started_yet() # when reading from stdin, we wait at most this much time before assuming a log line split self.max_seconds_till_line_split = max_seconds_till_line_split self.timestamp = None self.log_line_lines = [] self.log_message = '' self.previous_line = previous_line self.line_timestamper = line_timestamper or LineTimestamper(allow_timestamp_format_changes=allow_timestamp_format_changes) self._parse() def _iter_lines(self): ''' yields a line from the given place... if it yields a None, assume that a line break happened ''' if self.raw_text_lines: for idx in range(self.next_line_index, len(self.raw_text_lines), 1): yield self.raw_text_lines[idx] if self.read_from_stdin: break_force_time = time.time() + self.max_seconds_till_line_split while stdin_read_thread.is_alive(): try: line = stdin_read_thread.lines_queue.get_nowait() self.raw_text_lines.append(line) break_force_time = time.time() + self.max_seconds_till_line_split yield line except queue.Empty: if time.time() > break_force_time: break_force_time = time.time() + self.max_seconds_till_line_split yield None time.sleep(.0001) def _parse(self): self.log_line_lines = [] # Key Assumption: # All lines without timestamp are part of this log statement for line in self._iter_lines(): if line is None: # force a line break right now... timestamp should be set from earlier on break timestamp = self.line_timestamper.coerce_datetime_from_line(line) if timestamp: if len(self.log_line_lines) == 0: self.timestamp = timestamp self.log_line_lines.append(line) else: # new timestamp means we're done break else: self.log_line_lines.append(line) self.log_message = '\n'.join(self.log_line_lines) @functools.lru_cache(maxsize=100) def get_next_log_line(self) -> typing.Optional[typing.TypeVar('LogLine')]: ''' Returns the next LogLine in the log. Returns None if there is no more available ''' new_next_line_index = self.next_line_index + len(self.log_line_lines) if (new_next_line_index < len(self.raw_text_lines)) or (self.read_from_stdin and stdin_read_thread.is_alive()): return LogLine(raw_text_lines=self.raw_text_lines, previous_line=self, read_from_stdin=self.read_from_stdin, line_timestamper=self.line_timestamper, next_line_index=new_next_line_index) def iter_log_lines_with_regex(self, regex, ignore_case=True): ''' Goes through all LogLines checking if the message matches the regex. For each that, matches, yields the matching LogLine. ''' current_line = self regex_c = re.compile(regex, flags=re.IGNORECASE if ignore_case else 0) # walk through all lines while current_line is not None: if re.findall(regex_c, current_line.log_message): yield current_line current_line = current_line.get_next_log_line()
Methods
def get_next_log_line(self) ‑> Union[~LogLine, NoneType]
-
Returns the next LogLine in the log. Returns None if there is no more available
Expand source code
@functools.lru_cache(maxsize=100) def get_next_log_line(self) -> typing.Optional[typing.TypeVar('LogLine')]: ''' Returns the next LogLine in the log. Returns None if there is no more available ''' new_next_line_index = self.next_line_index + len(self.log_line_lines) if (new_next_line_index < len(self.raw_text_lines)) or (self.read_from_stdin and stdin_read_thread.is_alive()): return LogLine(raw_text_lines=self.raw_text_lines, previous_line=self, read_from_stdin=self.read_from_stdin, line_timestamper=self.line_timestamper, next_line_index=new_next_line_index)
def iter_log_lines_with_regex(self, regex, ignore_case=True)
-
Goes through all LogLines checking if the message matches the regex. For each that, matches, yields the matching LogLine.
Expand source code
def iter_log_lines_with_regex(self, regex, ignore_case=True): ''' Goes through all LogLines checking if the message matches the regex. For each that, matches, yields the matching LogLine. ''' current_line = self regex_c = re.compile(regex, flags=re.IGNORECASE if ignore_case else 0) # walk through all lines while current_line is not None: if re.findall(regex_c, current_line.log_message): yield current_line current_line = current_line.get_next_log_line()