Source code for plasoscaffolder.frontend.controller.sqlite_controller

# -*- coding: utf-8 -*-
"""File representing the Controller for SQLite plugin."""
import os

import click
from plasoscaffolder.bll.mappings import formatter_init_mapping
from plasoscaffolder.bll.mappings import formatter_mapping
from plasoscaffolder.bll.mappings import formatter_test_mapping
from plasoscaffolder.bll.mappings import mapping_helper
from plasoscaffolder.bll.mappings import parser_init_mapping
from plasoscaffolder.bll.mappings import parser_mapping
from plasoscaffolder.bll.mappings import parser_test_mapping
from plasoscaffolder.bll.services import base_sqlite_plugin_helper
from plasoscaffolder.bll.services import sqlite_generator
from plasoscaffolder.bll.services import sqlite_plugin_helper
from plasoscaffolder.bll.services import sqlite_plugin_path_helper
from plasoscaffolder.common import base_output_handler
from plasoscaffolder.common import file_handler
from plasoscaffolder.dal import base_sql_query_execution
from plasoscaffolder.dal import explain_query_plan
from plasoscaffolder.dal import sqlite_database_information
from plasoscaffolder.dal import sqlite_query_execution
from plasoscaffolder.model import sql_query_column_model
from plasoscaffolder.model import sql_query_column_model_data
from plasoscaffolder.model import sql_query_column_model_timestamp
from plasoscaffolder.model import sql_query_model


[docs]class SQLiteController(object): """Class representing the SQLite Controller.""" _NUMBER_OF_SQLITE_OUTPUT_EXAMPLES = 3 def __init__(self, output_handler: base_output_handler.BaseOutputHandler(), plugin_helper: base_sqlite_plugin_helper.BaseSQLitePluginHelper()): """Initializes the SQLite Controller. Args: output_handler (base_output_handler.BaseOutputHandler): the handler for the output plugin_helper (base_sqlite_plugin_helper.BaseSQLitePluginHelper): the helper for the SQLite plugin """ super(SQLiteController, self).__init__() self._path = None self._name = None self._testfile = None self._sql_query = [] self._plugin_helper = plugin_helper self._output_handler = output_handler self._query_execution = None
[docs] def SourcePath(self, unused_ctx: click.core.Context, unused_param: click.core.Option, value: str) -> str: """Saving the source path. Args: unused_ctx (click.core.Context): the click context (automatically given via callback) unused_param (click.core.Option): the click command (automatically given via callback) value (str): the source path (automatically given via callback) Returns: str: the source path representing the same as value """ while not self._plugin_helper.FolderExists(value): value = self._output_handler.PromptError( 'Folder does not exists. Enter correct one') self._path = value return value
[docs] def PluginName(self, unused_ctx: click.core.Context, unused_param: click.core.Option, value: str) -> str: """Saving the plugin name. Args: unused_ctx (click.core.Context): the click context (automatically given via callback) unused_param (click.core.Option): the click command (automatically given via callback) value (str): the source path (automatically given via callback) Returns: str: the plugin name representing the same as value """ value = self._ValidatePluginName(value) while self._plugin_helper.PluginExists( self._path, value, '', sqlite_plugin_path_helper.SQLitePluginPathHelper( self._path, value, '')): value = self._output_handler.PromptError( 'Plugin exists. Choose new Name') value = self._ValidatePluginName(value) self._name = value return value
[docs] def TestPath(self, unused_ctx: click.core.Context, unused_param: click.core.Option, value: str) -> str: """Saving the path to the test file. Args: unused_ctx (click.core.Context): the click context (automatically given via callback) unused_param (click.core.Option): the click command (automatically given via callback) value (str): the source path (automatically given via callback) Returns: str: the test file path representing the same as the value """ no_database_file = True while no_database_file: while not self._plugin_helper.FileExists(value): value = self._output_handler.PromptError( 'File does not exists. Choose another.') if not self._IsDatabaseFile(value): value = self._output_handler.PromptError( 'Unable to open the database file. Choose another.') else: no_database_file = False self._testfile = value return value
def _IsDatabaseFile(self, path: str) -> bool: """Try to open the database File. Args: path (str): the database file path Returns: bool: if the file can be opened and is a database file """ execution = sqlite_query_execution.SQLiteQueryExecution(path) if execution.TryToConnect(): self._query_execution = execution return True return False
[docs] def SQLQuery(self, unused_ctx: click.core.Context, unused_param: click.core.Option, value: str) -> [sql_query_model.SQLQueryModel]: """The SQL Query of the plugin. Args: unused_ctx (click.core.Context): the click context (automatically given via callback) unused_param (click.core.Option): the click command (automatically given via callback) value (str): the SQL Query (automatically given via callback) Returns: [sql_query_model.SQLQueryModel]: a list of SQL Query models """ verbose = value add_more_queries = True sql_query_list = [] while add_more_queries: if len(sql_query_list) > 0: sql_query = self._output_handler.PromptInfo( text='Please write your SQL script for the plugin [\'abort\' to ' 'continue]') if sql_query == 'abort': add_more_queries = False else: sql_query = self._output_handler.PromptInfo( text='Please write your SQL script for the plugin') if add_more_queries: query_model = self._CreateSQLQueryModelWithUserInput( sql_query, verbose, self._query_execution) if query_model is not None: sql_query_list.append(query_model) add_more_queries = self._output_handler.Confirm( text='Do you want to add another Query?', abort=False, default=True) self._sql_query = sql_query_list return sql_query_list
def _CreateSQLQueryModelWithUserInput( self, query: str, with_examples: bool, query_execution: base_sql_query_execution.BaseSQLQueryExecution() ) -> sql_query_model.SQLQueryModel: """Asks the user information about the SQL Query. Args: query (str): the SQL Query with_examples (bool): if the user wants examples for the given Query Returns: sql_query_model.SQLQueryModel: a SQL Query model """ query_data = self._plugin_helper.RunSQLQuery(query, query_execution) query_plan = explain_query_plan.ExplainQueryPlan(query_execution) if query_data.has_error: self._output_handler.PrintError(str(query_data.error_message)) return None else: if with_examples: length = len(query_data.data) if length == 0: self._output_handler.PrintInfo('Your query does not return anything.') else: self._output_handler.PrintInfo( 'Your query output could look like this.') self._output_handler.PrintInfo( str(list(map(lambda x: x.sql_column, query_data.columns)))) amount = min(self._NUMBER_OF_SQLITE_OUTPUT_EXAMPLES, length) for index in range(0, amount): self._output_handler.PrintInfo(str(query_data.data[index])) add_query = self._output_handler.Confirm( 'Do you want to add this query?', abort=False, default=True) if not add_query: return None else: self._output_handler.PrintError('The SQL query was ok.') locked_tables = query_plan.GetLockedTables(query) name = ''.join(locked_tables).capitalize() question_parse = 'Do you want to name the query parse row: {0} ?'.format( name) add_recommended_name = self._output_handler.Confirm( question_parse, abort=False, default=True) if not add_recommended_name: question_event = 'What row does the SQL Query parse?' initial_name = self._output_handler.PromptInfo(question_event) name = self._ValidateRowName(initial_name) data_column, timestamp_column = self.GetTimestamps( query_data.columns, query_data.data) if query_data.data is not None: amount_events = len(query_data.data) else: amount_events = 0 message = 'Does the event {0} need customizing?'.format(name) needs_customizing = self._output_handler.Confirm( text=message, abort=False, default=False) if needs_customizing: data_column = self.GetCustomizable(data_column) return sql_query_model.SQLQueryModel( query.strip(), name, data_column, timestamp_column, needs_customizing, amount_events)
[docs] def GetTimestamps( self, columns: [sql_query_column_model.SQLColumnModel], data: [str] ) -> ([sql_query_column_model_data.SQLColumnModelData], [sql_query_column_model_timestamp.SQLColumnModelTimestamp]): """Gets the timestamp from the user and the columns. Args: columns ([sql_query_column_model.SQLColumnModel]): the columns from the SQL query. data ([str]): the data from the cursor Returns: [sql_query_column_model_data.SQLColumnModelData], [sql_query_column_model_timestamp.SQLColumnModelTimestamp]: A tuple of columns. The first column represents the normal column for the query. The second column represents the timestamp events. """ timestamps = set() wrong_timestamps = set() assumed_timestamps = self._plugin_helper.GetAssumedTimestamps(columns) for timestamp in assumed_timestamps: question_timestamp = 'Is the column a time event? {0}'.format(timestamp) is_timestamp = self._output_handler.Confirm(question_timestamp, abort=False, default=True) if is_timestamp: timestamps.add(timestamp) add_own_timestamps = True while add_own_timestamps: own_timestamps = self._output_handler.PromptInfo( 'Enter (additional) timestamp events from the query [columnName,' 'aliasName...] or [abort]') if own_timestamps == 'abort': add_own_timestamps = False else: if len(timestamps) == 0: own_timestamps = self._output_handler.PromptInfo( 'At least one timestamp is required, please add a timestamp') own_timestamps = self._ValidateTimestampString(own_timestamps) new_columns = self._GetValidColumnsAndInvalid(columns, own_timestamps) timestamps.update(new_columns[0]) wrong_timestamps.update(new_columns[1]) self._PrintAddedAndFailedColumns(timestamps, wrong_timestamps) add_own_timestamps = self._output_handler.Confirm( 'Do you want to add more timestamps?', abort=False, default=False) return self._plugin_helper.GetColumnsAndTimestampColumn( columns, timestamps, data)
[docs] def GetCustomizable( self, columns: [sql_query_column_model_data.SQLColumnModelData] ) -> [sql_query_column_model_data.SQLColumnModelData]: """Gets the customizable columns from the user Args: columns ([sql_query_column_model_data.SQLColumnModelData]): the columns from the SQL query. Returns: [sql_query_column_model_data.SQLColumnModelData]: the columns from the SQL query an set if the column is customizable """ customizable = set() wrong_customizable = set() add_own_customizable = True while add_own_customizable: own_column = self._output_handler.PromptInfo( 'Enter columns that are customizable [columnName,aliasName...] or ' '[abort]') own_column = self._ValidateColumnString(own_column) if own_column == 'abort': if len(customizable) == 0: own_column = self._output_handler.PromptInfo( 'At least one column is required, please add a column') own_column = self._ValidateColumnString(own_column) else: add_own_customizable = False if add_own_customizable: new_columns = self._GetValidColumnsAndInvalid(columns, own_column) customizable.update(new_columns[0]) wrong_customizable.update(new_columns[1]) self._PrintAddedAndFailedColumns(customizable, wrong_customizable) add_own_customizable = self._output_handler.Confirm( 'Do you want to add more columns that are customizable?', abort=False, default=False) for column in columns: if column.sql_column in customizable: column.customize = True return columns
[docs] def Generate(self, template_path: str, yapf_path: str): """Generating the files. Args: template_path (str): the path to the template directory yapf_path (str): the path to the yapf style file """ self._output_handler.Confirm('Do you want to Generate the files?') database_suffix = os.path.splitext(self._testfile)[1][1:] sqlite_mapping_helper = mapping_helper.MappingHelper( template_path, yapf_path) generator = sqlite_generator.SQLiteGenerator( self._path, self._name, self._testfile, self._sql_query, self._output_handler, sqlite_plugin_helper.SQLitePluginHelper(), sqlite_plugin_path_helper.SQLitePluginPathHelper( self._path, self._name, database_suffix)) generator.GenerateSQLitePlugin( template_path, file_handler.FileHandler(), formatter_init_mapping.FormatterInitMapping(sqlite_mapping_helper), parser_init_mapping.ParserInitMapping(sqlite_mapping_helper), parser_mapping.ParserMapper(sqlite_mapping_helper), formatter_mapping.FormatterMapper(sqlite_mapping_helper), parser_test_mapping.ParserTestMapper(sqlite_mapping_helper), formatter_test_mapping.FormatterTestMapper(sqlite_mapping_helper), sqlite_mapping_helper, sqlite_database_information.SQLiteDatabaseInformation( self._query_execution))
def _ValidatePluginName(self, plugin_name: str) -> str: """Validate plugin name and prompt until name is valid. Args: plugin_name (str): the name of the plugin Returns: str: a valid plugin name """ while not self._plugin_helper.IsValidPluginName(plugin_name): plugin_name = self._output_handler.PromptError( 'Plugin is not in a valid format. Choose new Name [' 'plugin_name_...]') return plugin_name def _ValidateRowName(self, row_name: str) -> str: """Validate row name and prompt until name is valid. Args: row_name (str): the name of the row Returns: str: a valid row name """ while not self._plugin_helper.IsValidRowName(row_name): row_name = self._output_handler.PromptError( 'Row name is not in a valid format. Choose new Name [RowName...]') return row_name def _ValidateTimestampString(self, timestamp_string: str) -> str: """Validate the timestamp string and prompt until valid Args: timestamp_string (str): the string with the timestamps Returns: str: a comma separated string with timestamps """ while not self._plugin_helper.IsValidCommaSeparatedString(timestamp_string): timestamp_string = self._output_handler.PromptError( 'Timestamps are not in valid format. Reenter them correctly [name,' 'name...]') return timestamp_string def _ValidateColumnString(self, column_string: str) -> str: """Validate the timestamp string and prompt until valid Args: column_string (str): the string with the column names Returns: str: a comma separated string with column names """ while not self._plugin_helper.IsValidCommaSeparatedString(column_string): column_string = self._output_handler.PromptError( 'Column names are not in valid format. Reenter them correctly [name,' 'name...]') return column_string def _PrintAddedAndFailedColumns(self, added: [str], failed: [str]): """Prints the user which columns failed and which are added Args: added ([str]): the added columns failed ([str]): the failed columns """ added_string = 'Added: {0}'.format(','.join(sorted(added))) failed_string = 'Failed: {0}'.format(','.join(sorted(failed))) self._output_handler.PrintInfo(added_string) self._output_handler.PrintInfo(failed_string) def _GetValidColumnsAndInvalid( self, columns: [sql_query_column_model.SQLColumnModel], user_input: str ) -> ([str], [str]): """ Args: columns ([sql_query_column_model.SQLColumnModel]): the columns from the SQL query. user_input: a comma separated list of column names Returns: [str], [str]: the columns for the right column names and the wrong column names as the second part of the tuple """ right_columns = set() wrong_columns = set() for column in user_input.split(','): column_names = [column.sql_column for column in columns] if column in column_names: right_columns.add(column) else: wrong_columns.add(column) return right_columns, wrong_columns