Module errgrep.line_timestamper
Expand source code
import datetime
import dateutil.parser
import functools
import typing
from .lra_dict import LeastRecentlyAddedDict
# max possible... we work down to figure out the right number and then cache it
MAX_TYPE_1_LENGTH = 10
class LineTimestamper:
'''
An object used to grab a timestamp from a given line
'''
line_to_datetime_cache = LeastRecentlyAddedDict(50)
def __init__(self, allow_timestamp_format_changes:bool=False):
self.prefered_datetime_coerce_index = None
self.allow_timestamp_format_changes = allow_timestamp_format_changes
# for type 1 only
self._last_type_1_length = MAX_TYPE_1_LENGTH
@functools.lru_cache(maxsize=1024)
def _coerce_datetime_from_line_0(self, line:str) -> typing.Optional[datetime.datetime]:
# date time, STUFF
try:
return dateutil.parser.parse(line.split(',')[0])
except:
pass
@functools.lru_cache(maxsize=1024)
def _coerce_datetime_from_line_1(self, line:str) -> typing.Optional[datetime.datetime]:
# date time STUFF
# works with up to MAX_TYPE_1_LENGTH items
def doIt(line, length):
try:
return dateutil.parser.parse(' '.join(line.split(' ')[:length]))
except:
pass
result = doIt(line, self._last_type_1_length)
if result is not None:
return result
for length in range(MAX_TYPE_1_LENGTH, 0, -1):
if length == self._last_type_1_length:
continue
result = doIt(line, length)
if result is not None:
self._last_type_1_length = length
return result
@functools.lru_cache(maxsize=1024)
def _coerce_datetime_from_line_2(self, line:str) -> typing.Optional[datetime.datetime]:
# [ seconds_since_boot ]
# used by things like dmesg
if line.startswith('[') and ']' in line:
try:
seconds = float(line.split('[', 1)[1].split(']', 1)[0])
except:
pass
else:
return datetime.datetime.fromtimestamp(seconds)
@functools.lru_cache(maxsize=1024)
def _get_datetime_from_line_coercion_function(self, idx:int):
func = getattr(self, f'_coerce_datetime_from_line_{idx}')
return func
def coerce_datetime_from_line(self, line:str) -> typing.Optional[datetime.datetime]:
'''
Returns a datetime.datetime if a timestamp can be parsed from the given line.
Otherwise returns None.
'''
line = line.strip()
# coercion is slow... cache recent things
if line in self.line_to_datetime_cache:
return self.line_to_datetime_cache[line]
if self.prefered_datetime_coerce_index is not None:
func = self._get_datetime_from_line_coercion_function(self.prefered_datetime_coerce_index)
result = func(line)
if result:
self.line_to_datetime_cache[line] = result
return result
if self.allow_timestamp_format_changes or self.prefered_datetime_coerce_index is None:
# change this number if we add a coercion mechanism
for idx in range(3):
if idx != self.prefered_datetime_coerce_index:
func = self._get_datetime_from_line_coercion_function(idx)
result = func(line)
if result:
self.prefered_datetime_coerce_index = idx
self.line_to_datetime_cache[line] = result
return result
Classes
class LineTimestamper (allow_timestamp_format_changes: bool = False)
-
An object used to grab a timestamp from a given line
Expand source code
class LineTimestamper: ''' An object used to grab a timestamp from a given line ''' line_to_datetime_cache = LeastRecentlyAddedDict(50) def __init__(self, allow_timestamp_format_changes:bool=False): self.prefered_datetime_coerce_index = None self.allow_timestamp_format_changes = allow_timestamp_format_changes # for type 1 only self._last_type_1_length = MAX_TYPE_1_LENGTH @functools.lru_cache(maxsize=1024) def _coerce_datetime_from_line_0(self, line:str) -> typing.Optional[datetime.datetime]: # date time, STUFF try: return dateutil.parser.parse(line.split(',')[0]) except: pass @functools.lru_cache(maxsize=1024) def _coerce_datetime_from_line_1(self, line:str) -> typing.Optional[datetime.datetime]: # date time STUFF # works with up to MAX_TYPE_1_LENGTH items def doIt(line, length): try: return dateutil.parser.parse(' '.join(line.split(' ')[:length])) except: pass result = doIt(line, self._last_type_1_length) if result is not None: return result for length in range(MAX_TYPE_1_LENGTH, 0, -1): if length == self._last_type_1_length: continue result = doIt(line, length) if result is not None: self._last_type_1_length = length return result @functools.lru_cache(maxsize=1024) def _coerce_datetime_from_line_2(self, line:str) -> typing.Optional[datetime.datetime]: # [ seconds_since_boot ] # used by things like dmesg if line.startswith('[') and ']' in line: try: seconds = float(line.split('[', 1)[1].split(']', 1)[0]) except: pass else: return datetime.datetime.fromtimestamp(seconds) @functools.lru_cache(maxsize=1024) def _get_datetime_from_line_coercion_function(self, idx:int): func = getattr(self, f'_coerce_datetime_from_line_{idx}') return func def coerce_datetime_from_line(self, line:str) -> typing.Optional[datetime.datetime]: ''' Returns a datetime.datetime if a timestamp can be parsed from the given line. Otherwise returns None. ''' line = line.strip() # coercion is slow... cache recent things if line in self.line_to_datetime_cache: return self.line_to_datetime_cache[line] if self.prefered_datetime_coerce_index is not None: func = self._get_datetime_from_line_coercion_function(self.prefered_datetime_coerce_index) result = func(line) if result: self.line_to_datetime_cache[line] = result return result if self.allow_timestamp_format_changes or self.prefered_datetime_coerce_index is None: # change this number if we add a coercion mechanism for idx in range(3): if idx != self.prefered_datetime_coerce_index: func = self._get_datetime_from_line_coercion_function(idx) result = func(line) if result: self.prefered_datetime_coerce_index = idx self.line_to_datetime_cache[line] = result return result
Class variables
var line_to_datetime_cache
Methods
def coerce_datetime_from_line(self, line: str) ‑> Union[datetime.datetime, NoneType]
-
Returns a datetime.datetime if a timestamp can be parsed from the given line. Otherwise returns None.
Expand source code
def coerce_datetime_from_line(self, line:str) -> typing.Optional[datetime.datetime]: ''' Returns a datetime.datetime if a timestamp can be parsed from the given line. Otherwise returns None. ''' line = line.strip() # coercion is slow... cache recent things if line in self.line_to_datetime_cache: return self.line_to_datetime_cache[line] if self.prefered_datetime_coerce_index is not None: func = self._get_datetime_from_line_coercion_function(self.prefered_datetime_coerce_index) result = func(line) if result: self.line_to_datetime_cache[line] = result return result if self.allow_timestamp_format_changes or self.prefered_datetime_coerce_index is None: # change this number if we add a coercion mechanism for idx in range(3): if idx != self.prefered_datetime_coerce_index: func = self._get_datetime_from_line_coercion_function(idx) result = func(line) if result: self.prefered_datetime_coerce_index = idx self.line_to_datetime_cache[line] = result return result