Source code for tests.dal.test_sqlite_query_execution

# !/usr/bin/python
# -*- coding: utf-8 -*-
"""test class"""
import os
import unittest

from plasoscaffolder.dal import sqlite_query_execution
from tests.test_helper import path_helper


[docs]class SQLiteQueryExecutionTest(unittest.TestCase): """test the SQLite Query execution test"""
[docs] def setUp(self): database_path = path_helper.TestDatabasePath() file_path = os.path.join(database_path, 'twitter_ios.db') self.execute = sqlite_query_execution.SQLiteQueryExecution(file_path) self.execute.TryToConnect() file_path_types = os.path.join(database_path, 'test_database_types.db') self.execute_types = sqlite_query_execution.SQLiteQueryExecution( file_path_types) self.execute_types.TryToConnect() file_path_names = os.path.join(database_path, 'test_database_names.db') self.execute_names = sqlite_query_execution.SQLiteQueryExecution( file_path_names) self.execute_names.TryToConnect()
[docs] def testTryToConnect(self): """try to connect without error""" database_path = path_helper.TestDatabasePath() file_path = os.path.join(database_path, 'twitter_ios.db') execute = sqlite_query_execution.SQLiteQueryExecution(file_path) connected = execute.TryToConnect() self.assertTrue(connected)
[docs] def testTryToConnectWithError(self): """try to connect 2 times resulting in a error""" database_path = path_helper.TestDatabasePath() file_path = os.path.join(database_path, 'twitter_ios_error.db') execute = sqlite_query_execution.SQLiteQueryExecution(file_path) connected = execute.TryToConnect() self.assertFalse(connected)
[docs] def testRollbackWorks(self): """testing if the rollback works""" query_select_all_users = 'SELECT * FROM Users' query_drop_table = 'DROP TABLE Users' result_users_before = self.execute.ExecuteQuery(query_select_all_users) result_drop_table = self.execute.ExecuteQuery(query_drop_table) result_users_after = self.execute.ExecuteQuery(query_select_all_users) self.assertEqual(len(result_users_before.data), len(result_users_after.data)) self.assertTrue(len(result_users_after.data) is 25) self.assertTrue(len(result_users_before.data) is 25) self.assertFalse(result_drop_table.has_error) self.assertIsNone(result_drop_table.error_message)
[docs] def testMultipleTestAfterOneAnother(self): """test two querys after another to test the connection is still open""" query_simple = ('SELECT createdDate, updatedAt, screenName, ' 'Name, profileImageUrl,' 'location, description, url, following, followersCount, ' 'followingCount' ' FROM Users ORDER BY createdDate') result_simple = self.execute.ExecuteQuery(query_simple) query_join = ('SELECT Statuses.date AS date, Statuses.text AS text,' ' Statuses.userId AS user_id, Users.Name AS Name, ' 'Statuses.retweetCount AS ' 'retweetCount, Statuses.favoriteCount AS favoriteCount, ' 'Statuses.favorited AS favorited, Statuses.updatedAt AS ' 'updatedAt ' 'FROM Statuses LEFT join Users ON Statuses.userId = Users.id ' 'ORDER BY date') result_join = self.execute.ExecuteQuery(query_join) self.assertIsNone(result_join.error_message) self.assertIsNone(result_simple.error_message) self.assertFalse(result_simple.has_error) self.assertFalse(result_join.has_error) self.assertEqual(len(result_join.data), 67) self.assertEqual(len(result_simple.data), 25)
[docs] def testQueryErrorNoSuchColumn(self): """test two querys after another to test the connection is still open""" query = 'SELECT createdDates FROM Users' result = self.execute.ExecuteQuery(query) expected_error = 'Error: no such column: createdDates' self.assertTrue(result.has_error) self.assertIsNone(result.data) self.assertEqual(str(result.error_message), expected_error)
[docs] def testQueryErrorNoSuchTable(self): """test two querys after another to test the connection is still open""" query = 'SELECT createdDate FROM Userss' result = self.execute.ExecuteQuery(query) expected_error = 'Error: no such table: Userss' self.assertTrue(result.has_error) self.assertIsNone(result.data) self.assertEqual(str(result.error_message), expected_error)
[docs] def testQueryWarning(self): """test two querys after another to test the connection is still open""" query = 'SELECT id from users;Select id from users' result = self.execute.ExecuteQuery(query) expected_error = 'Warning: You can only execute one statement at a time.' self.assertTrue(result.has_error) self.assertIsNone(result.data) self.assertEqual(str(result.error_message), expected_error)
[docs] def testExecuteQueryDetailedSimple(self): """test the execution of a simple Query""" query = ('SELECT createdDate, updatedAt, screenName, Name, profileImageUrl,' 'location, description, url, following, followersCount, ' 'followingCount' ' FROM Users ORDER BY createdDate') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_simple_query_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertEqual(result.columns[0].sql_column, 'createdDate') self.assertEqual(result.columns[1].sql_column, 'updatedAt') self.assertEqual(result.columns[2].sql_column, 'screenName') self.assertEqual(result.columns[3].sql_column, 'name') self.assertEqual(result.columns[4].sql_column, 'profileImageUrl') self.assertEqual(result.columns[5].sql_column, 'location') self.assertEqual(result.columns[6].sql_column, 'description') self.assertEqual(result.columns[7].sql_column, 'url') self.assertEqual(result.columns[8].sql_column, 'following') self.assertEqual(result.columns[9].sql_column, 'followersCount') self.assertEqual(result.columns[10].sql_column, 'followingCount') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[1].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[2].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[3].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[4].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[5].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[6].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[7].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[8].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[9].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[10].GetColumnTypeAsName(), 'int')
[docs] def testExecuteQueryDetailedWithOneDuplicateColumnNames(self): """test the execution of a simple Query with one duplicate column name""" query = ('SELECT t1.a, t1.b, t2.a from t1 join t2') result = self.execute_names.ExecuteQueryDetailed(query) expected_error_message = 'Please use an alias (AS) for those column ' \ 'names: a' self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryDetailedWithTwoDuplicateColumnNames(self): """test the execution of a simple Query with two duplicate column names""" query = ('SELECT t1.a, t1.b, t2.a, t2.b from t1 join t2') result = self.execute_names.ExecuteQueryDetailed(query) expected_error_message = 'Please use an alias (AS) for those column ' \ 'names: a b' self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryDetailedWithJoin(self): """test the execution of a more complex Query""" query = ('SELECT Statuses.date AS date, Statuses.text AS text,' ' Statuses.userId AS user_id, Users.Name AS Name, ' 'Statuses.retweetCount AS ' 'retweetCount, Statuses.favoriteCount AS favoriteCount, ' 'Statuses.favorited AS favorited, Statuses.updatedAt AS updatedAt ' 'FROM Statuses LEFT join Users ON Statuses.userId = Users.id ' 'ORDER BY date') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_join_query_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertEqual(result.columns[0].sql_column, 'date') self.assertEqual(result.columns[1].sql_column, 'text') self.assertEqual(result.columns[2].sql_column, 'user_id') self.assertEqual(result.columns[3].sql_column, 'Name') self.assertEqual(result.columns[4].sql_column, 'retweetCount') self.assertEqual(result.columns[5].sql_column, 'favoriteCount') self.assertEqual(result.columns[6].sql_column, 'favorited') self.assertEqual(result.columns[7].sql_column, 'updatedAt') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[1].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[2].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[3].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[4].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[5].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[6].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[7].GetColumnTypeAsName(), 'float')
[docs] def testExecuteQueryDetailedWithSpecialCharacters(self): """test the execution of a more complex Query""" query = ('SELECT [AS].[id] as [AS], [AS].name as "name" from Users AS [AS]') result = self.execute.ExecuteQueryDetailed(query) expected_error_message = ('Warning: Don\'t use any characters beside ' 'a-z A-Z 0-9 . ; , * = _') self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryDetailedWithAliasWithUnderscore(self): """test the execution of a more complex Query""" query = ('SELECT id as the_id, name as the_name from users') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_id_name_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data))
[docs] def testExecuteQueryDetailedWithAliasWithUnderscore2(self): """test the execution of a more complex Query""" query = ('SELECT users.id as the_id, name as the_name from users') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_id_name_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data))
[docs] def testExecuteQueryDetailedWithAliasWithUnderscore3(self): """test the execution of a more complex Query""" query = ('SELECT users.id as the_id, name as the_name ' 'from users join statuses') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_id_name_join_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data))
[docs] def testExecuteQueryDetailedWithJoinAndAliasWithUnderscore(self): """test the execution of a more complex Query""" query = ('SELECT users.id as the_id, users.name as the_name ' 'from users join statuses') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_id_name_join_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data))
[docs] def testExecuteQueryDetailedWithAliasForTable(self): """test the execution of a more complex Query""" query = ('SELECT x.id, x.name from users as x') result = self.execute.ExecuteQueryDetailed(query) expected_error_message = ('Warning: Don\'t use any alias for a table name') self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryReadOnlyWithAliasForMultipleTable(self): """test the execution of a more complex Query""" query = ('SELECT x.id, x.name from users as x join statuses') result = self.execute.ExecuteQueryDetailed(query) expected_error_message = ('Warning: Don\'t use any alias for a table name') self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryDetailedWithJoinAndAliasForTable(self): """test the execution of a more complex Query""" query = ('SELECT x.id as userid, x.name, y.id as statusid ' 'from users as x join statuses as y') result = self.execute.ExecuteQueryDetailed(query) expected_error_message = ('Warning: Don\'t use any alias for a table name') self.assertTrue(result.has_error) self.assertEqual(result.error_message, expected_error_message)
[docs] def testExecuteQueryDetailedWithJoinAndNoTable(self): """test the execution of a more complex Query""" query = ('SELECT users.id, name from users join statuses') result = self.execute.ExecuteQueryDetailed(query) expected_data = self._ReadFromFileRelative('expected_id_name_join_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data))
[docs] def testExecuteReadOnlyQueryWithSelect(self): """test execute read only with a simple select query""" query = 'SELECT id from users where id==2220776716' result = self.execute.ExecuteReadOnlyQuery(query) expected = '[(2220776716,)]' self.assertFalse(result.has_error) self.assertEqual(str(result.data), expected) self.assertIsNone(result.error_message) self.assertEqual(result.columns[0].sql_column, 'id') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'int')
[docs] def testExecuteQueryDetailedSimpleNoData(self): """test the execution of a simple Query""" query = ('SELECT * From nodata') result = self.execute_types.ExecuteQueryDetailed(query) expected_data = '[]' self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertEqual(result.columns[0].sql_column, 'intval') self.assertEqual(result.columns[1].sql_column, 'integerval') self.assertEqual(result.columns[2].sql_column, 'tinyintval') self.assertEqual(result.columns[3].sql_column, 'smallintval') self.assertEqual(result.columns[4].sql_column, 'mediuintval') self.assertEqual(result.columns[5].sql_column, 'bigintval') self.assertEqual(result.columns[6].sql_column, 'unsignedbigintval') self.assertEqual(result.columns[7].sql_column, 'int2val') self.assertEqual(result.columns[8].sql_column, 'int8val') self.assertEqual(result.columns[9].sql_column, 'characterval') self.assertEqual(result.columns[10].sql_column, 'varcharval') self.assertEqual(result.columns[11].sql_column, 'varyingcharacterval') self.assertEqual(result.columns[12].sql_column, 'ncharval') self.assertEqual(result.columns[13].sql_column, 'nativecharacterval') self.assertEqual(result.columns[14].sql_column, 'nvarcharval') self.assertEqual(result.columns[15].sql_column, 'textval') self.assertEqual(result.columns[16].sql_column, 'clobval') self.assertEqual(result.columns[17].sql_column, 'blobval') self.assertEqual(result.columns[18].sql_column, 'realval') self.assertEqual(result.columns[19].sql_column, 'doubleval') self.assertEqual(result.columns[20].sql_column, 'doubleprecisionval') self.assertEqual(result.columns[21].sql_column, 'floatval') self.assertEqual(result.columns[22].sql_column, 'numericval') self.assertEqual(result.columns[23].sql_column, 'decimalval') self.assertEqual(result.columns[24].sql_column, 'booleanval') self.assertEqual(result.columns[25].sql_column, 'dateval') self.assertEqual(result.columns[26].sql_column, 'datetimeval') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[1].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[2].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[3].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[4].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[5].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[6].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[7].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[8].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[9].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[10].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[12].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[13].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[14].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[15].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[16].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[17].GetColumnTypeAsName(), 'bytes') self.assertEqual(result.columns[18].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[19].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[20].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[21].GetColumnTypeAsName(), 'float') self.assertEqual(result.columns[22].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[23].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[24].GetColumnTypeAsName(), 'bool') self.assertEqual(result.columns[25].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[26].GetColumnTypeAsName(), 'int')
[docs] def testExecuteQueryDetailedJoinNoData(self): """test the execution of a join Query with no data""" query = ('SELECT t1.a as a, t2.a as a2, t2.c, t1.b, t2.b as b2 ' 'from t1 join t2') result = self.execute_names.ExecuteQueryDetailed(query) expected_data = '[]' self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertEqual(result.columns[0].sql_column, 'a') self.assertEqual(result.columns[1].sql_column, 'a2') self.assertEqual(result.columns[2].sql_column, 'c') self.assertEqual(result.columns[3].sql_column, 'b') self.assertEqual(result.columns[4].sql_column, 'b2') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[1].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[2].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[3].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[4].GetColumnTypeAsName(), 'str')
[docs] def testExecuteQueryDetailedJoinNoDataNoSpace(self): """test the execution of a join Query with no data""" query = ('SELECT t1.a as a,t2.a as a2,t2.c, t1.b,t2.b as b2 ' 'from t1 join t2') result = self.execute_names.ExecuteQueryDetailed(query) expected_data = '[]' self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertEqual(result.columns[0].sql_column, 'a') self.assertEqual(result.columns[1].sql_column, 'a2') self.assertEqual(result.columns[2].sql_column, 'c') self.assertEqual(result.columns[3].sql_column, 'b') self.assertEqual(result.columns[4].sql_column, 'b2') self.assertEqual(result.columns[0].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[1].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[2].GetColumnTypeAsName(), 'str') self.assertEqual(result.columns[3].GetColumnTypeAsName(), 'int') self.assertEqual(result.columns[4].GetColumnTypeAsName(), 'str')
[docs] def testExecuteQuerySimple(self): """test the execution of a simple Query""" query = ('SELECT createdDate, updatedAt, screenName, Name, profileImageUrl,' 'location, description, url, following, followersCount, ' 'followingCount' ' FROM Users ORDER BY createdDate') result = self.execute.ExecuteQuery(query) expected_data = self._ReadFromFileRelative('expected_simple_query_data') self.assertIsNone(result.error_message) self.assertFalse(result.has_error) self.assertEqual(expected_data, str(result.data)) self.assertIsNone(result.columns)
[docs] def testExecuteReadOnlyQueryWithErrorBecauseOfDrop(self): """test execute read only with a drop query""" query = 'DROP table users' result = self.execute.ExecuteReadOnlyQuery(query) expected_error = 'Query has to be a single SELECT query.' self.assertTrue(result.has_error) self.assertEqual(str(result.error_message), expected_error) self.assertIsNone(result.data) self.assertIsNone(result.columns)
[docs] def testExecuteReadOnlyQueryWithErrorBecauseOfAlter(self): """test execute read only with a alter rename query""" query = 'Alter table users rename to users2' result = self.execute.ExecuteReadOnlyQuery(query) expected_error = 'Query has to be a single SELECT query.' self.assertTrue(result.has_error) self.assertEqual(str(result.error_message), expected_error) self.assertIsNone(result.data) self.assertIsNone(result.columns)
[docs] def testExecuteReadOnlyQueryWithWarning(self): """test execute read only with two queries at the same time""" query = 'SELECT id from users;SELECT id from users;' result = self.execute.ExecuteReadOnlyQuery(query) expected_error = 'Warning: You can only execute one statement at a time.' self.assertTrue(result.has_error) self.assertEqual(str(result.error_message), expected_error) self.assertIsNone(result.data) self.assertIsNone(result.columns)
def _ReadFromFileRelative(self, path: str): """Read from file with relative path Args: path (str): the relative file path Returns: str: content of the file""" current_dir = os.path.dirname(__file__) abs_file_path = os.path.join(current_dir, path) with open(abs_file_path, 'r', encoding='utf-8') as f: return f.read()
if __name__ == '__main__': unittest.main()