Source code for plasoscaffolder.bll.services.sqlite_plugin_helper

# -*- coding: utf-8 -*-
# Disable linting until PyCQA/astroid/issues/362 is fixed.
# pylint: skip-file
"""SQLite plugin helper."""
import functools
import os
import re

from plasoscaffolder.bll.services import base_sqlite_plugin_helper
from plasoscaffolder.bll.services import base_sqlite_plugin_path_helper
from plasoscaffolder.dal import base_sql_query_execution
from plasoscaffolder.dal import sql_query_data
from plasoscaffolder.model import sql_query_column_model
from plasoscaffolder.model import sql_query_column_model_data
from plasoscaffolder.model import sql_query_column_model_timestamp
from plasoscaffolder.model import sql_query_model


[docs]class SQLitePluginHelper(base_sqlite_plugin_helper.BaseSQLitePluginHelper): """Class containing helper functions for the SQLite plugin.""" _PLUGIN_NAME_PATTERN = re.compile("[a-z]+((_)[a-z]+)*") _ROW_NAME_PATTERN = re.compile("[A-Z]+([a-zA-Z])*") _COMMA_SEPARATED_PATTERN = re.compile("[a-zA-Z0-9]+((,)[a-zA-Z0-9]+)*") def __init__(self): """Initializes the SQLite plugin helper.""" super().__init__()
[docs] def PluginExists( self, path: str, plugin_name: str, database_suffix: str, path_helper: base_sqlite_plugin_path_helper.BaseSQLitePluginPathHelper(), ) -> bool: """Checks if the plugin already exists. Args: database_suffix: the suffix of the database file path (str): the path of the plaso source plugin_name (str): the Name of the plugin path_helper (BaseSQLitePluginHelper): the SQLite plugin helper Returns: bool: True if the plugin already exists. False if it does not. """ helper = path_helper return (os.path.isfile(helper.formatter_file_path) or os.path.isfile(helper.parser_file_path) or os.path.isfile(helper.formatter_test_file_path) or os.path.isfile(helper.parser_test_file_path) or os.path.isfile(helper.database_path))
[docs] def IsValidPluginName(self, plugin_name: str) -> bool: """Validates the plugin name. Args: plugin_name (str): the plugin name Returns: bool: true if the plugin name is valid """ return self._PLUGIN_NAME_PATTERN.fullmatch(plugin_name)
[docs] def IsValidRowName(self, row_name: str) -> bool: """Validates the row name. Args: row_name (str): the row name Returns: bool: true if the row name is valid """ return self._ROW_NAME_PATTERN.fullmatch(row_name)
[docs] def IsValidCommaSeparatedString(self, text: str) -> bool: """Validates a comma separated string Args: text (str): the string to validate Returns: bool: true if the text is valid """ return self._COMMA_SEPARATED_PATTERN.fullmatch(text)
[docs] def FileExists(self, path: str) -> bool: """Checks if the file exists. Args: path: the plaso folder path Returns: true if the file exists """ return os.path.isfile(path)
[docs] def FolderExists(self, path: str) -> bool: """Checks if folder exists. Args: path: the plaso folder path Returns: true if the folder exists """ return os.path.isdir(path)
[docs] def RunSQLQuery( self, query: str, executor: base_sql_query_execution.BaseSQLQueryExecution ) -> sql_query_data.SQLQueryData: """Validates the SQL query. Args: executor (base_sql_query_execution.SQLQueryExection()) the SQL executor query (str): the SQL query Returns: sql_query_data.SQLQueryData: data returned by executing the query """ return executor.ExecuteReadOnlyQuery(query)
[docs] def GetDistinctColumnsFromSQLQueryData( self, queries: [sql_query_model.SQLQueryModel]) -> [str]: """Get a distinct list of all attributes from multiple queries. Args: queries ([sql_query_model.SQLQueryModel]): an array of multiple SQL query data objects Returns: list[str]: all distinct attributes used in the query """ if len(queries) != 0: list_of_list_of_column_model = [query.columns for query in queries] list_of_column_model = functools.reduce(lambda x, y: x + y, list_of_list_of_column_model) list_of_columns_snake_case = [column.GetColumnAsSnakeCase() for column in list_of_column_model] distinct_columns = sorted(set().union(list_of_columns_snake_case)) return distinct_columns else: return []
[docs] def GetAssumedTimestamps( self, columns: [sql_query_column_model.SQLColumnModel]) -> [str]: """Gets all columns assumed that they are timestamps Args: columns ([sql_query_column_model.SQLColumnModel]): the columns from the SQL query Returns: [str]: the names from the columns assumed they could be a timestamp """ assumed_columns = [column.sql_column for column in columns if 'time' in column.sql_column.lower() or 'date' in column.sql_column.lower()] return assumed_columns
[docs] def GetColumnsAndTimestampColumn( self, columns: [sql_query_column_model.SQLColumnModel], timestamps: [str], data: [str] ) -> ( [sql_query_column_model_data.SQLColumnModelData], [sql_query_column_model_timestamp.SQLColumnModelTimestamp]): """Splits the column list into a list of simple columns and a list for timestamp event columns and adds the data to the simple columns Args: columns ([sql_query_column_model_data.SQLColumnModelData]): the columns from the SQL query timestamps ([str]): the timestamp events data ([str]): the data from the cursor Returns: ([sql_query_column_model_data.SQLColumnModelData], [sql_query_column_model_timestamp.SQLColumnModelTimestamp]): a tuple of columns, the first are the normal columns, the second are the timestamp events """ normal_columns = list() message = {} timestamps_data = {} timestamp_columns = self._GetTimestampColumnsFromColumnsWithoutMessage( columns, timestamps) for column_index in range(0, len(columns)): column = columns[column_index] data_row = 0 if column.sql_column in timestamps: data_row = [timestamp.sql_column for timestamp in timestamp_columns].index(column.sql_column) timestamps_data[column.sql_column] = self._GetDataForTimestamp( data, data_row, column_index) data_row += 1 else: column_data_model, message = self._GetDataForColumnAndAppendMessage( data, column, timestamp_columns, column_index, message) normal_columns.append(column_data_model) timestamp_columns = self._AddMessageAndDataToTimestampColumns( timestamp_columns, message, timestamps_data) return normal_columns, timestamp_columns
def _AddMessageAndDataToTimestampColumns( self, timestamp_columns: [ sql_query_column_model_timestamp.SQLColumnModelTimestamp], message: {str, str}, timestamps_data: {str, str} ) -> [sql_query_column_model_timestamp.SQLColumnModelTimestamp]: """Add Missing Message and data to the timestamp columns Args: timestamp_columns ([ sql_query_column_model_timestamp.SQLColumnModelTimestamp]): the columns to be changed message {str_str}: The message to be added.Dictionary with first part the timestamp name as key and second the message timestamps_data {str,str}: The data to be added. Dictionary with first part the timestamp name as key and second the data Returns: [sql_query_column_model_timestamp.SQLColumnModelTimestamp]: the timestamp columns with additional data and message """ for timestamp in timestamp_columns: timestamp.expected_message = message[timestamp.sql_column] timestamp.timestamp = timestamps_data[timestamp.sql_column] return timestamp_columns def _GetDataForColumnAndAppendMessage( self, data: [str], column: sql_query_column_model_data.SQLColumnModelData, timestamp_columns: [ sql_query_column_model_timestamp.SQLColumnModelTimestamp], column_index: int, message: {str, str} ) -> (sql_query_column_model_data.SQLColumnModelData, {str, str}): """Get the data for the column and append data to the message for the timestamp message. Args: data ([str]): the data to the query. column (sql_query_column_model_data.SQLColumnModelData): the column to get the data for timestamp_columns ([ sql_query_column_model_timestamp.SQLColumnModelTimestamp]): the timestamp columns to get data for each timestamp column_index (int): the index of the column in the data message ({str,str}): the existing message for the timestamp to append the new content Returns: sql_query_column_model_data.SQLColumnModelData, {str, str}: the data model for the column and the passed message with the appended data """ column_data = {} data_row = 0 for timestamp in [timestamp.sql_column for timestamp in timestamp_columns]: data_for_column_and_timestamp = '' if data: data_for_column_and_timestamp = data[data_row][column_index] # if not enough data results it shall take the same data as before if data_row < len(data) - 1: data_row += 1 column_data[timestamp] = data_for_column_and_timestamp message = self._GetTimestampMessageWithAddedColumnData( timestamp, message, column.GetColumnAsDescription(), data_for_column_and_timestamp) column_data_model = sql_query_column_model_data.SQLColumnModelData( sql_column=column.sql_column, sql_column_type=column.sql_column_type, data=column_data) return column_data_model, message def _GetTimestampMessageWithAddedColumnData( self, timestamp: str, message: {str: str}, description: str, data: str) -> {str: str}: """Append Data to the given message for a timestamp Args: timestamp (str): the timestamp to append data to message ({str:str}): the message to be changed description (str): the description to be added for the timestamp data (str): the data to be added for the timestamp Returns: {str:str}: the given message with new data added for the given timestamp """ if timestamp not in message: message[timestamp] = '{0}: {1}'.format(description, data) else: message[timestamp] = '{0} {1}: {2}'.format( message[timestamp], description, data) return message def _GetDataForTimestamp(self, data: [str], data_row: int, timestamp_index: int): """Get the data for the timestamp out of the data array Args: data ([str]): the data array data_row (int): the data row from the data array to get the data from timestamp_index (int): the index in the data row where the data for the timestamp is located Returns: str: the data for the timestamp """ timestamp_data = '' if data: if len(data) > data_row: timestamp_data = data[data_row][timestamp_index] else: timestamp_data = data[len(data) - 1][timestamp_index] return timestamp_data def _GetTimestampColumnsFromColumnsWithoutMessage( self, columns: [sql_query_column_model.SQLColumnModel], timestamps: [str] ) -> [sql_query_column_model_timestamp.SQLColumnModelTimestamp]: """Gets the timestamp columns from the columns and sets the type and the name but not the message. Args: columns ([sql_query_column_model_data.SQLColumnModelData]): the columns from the SQL query timestamps ([str]): the timestamp events Returns: [sql_query_column_model_data.SQLColumnModelData]: the timestamp columns but without the message """ timestamp_columns = list() for column in [column for column in columns if column.sql_column in timestamps]: timestamp_data_model = ( sql_query_column_model_timestamp.SQLColumnModelTimestamp( sql_column_type=column.sql_column_type, sql_column=column.sql_column, expected_message='')) timestamp_columns.append(timestamp_data_model) return timestamp_columns