7 class OperationalError(Exception):
12 class QueryNotSupported(Exception):
15 def __init__(self, conn):
17 self.connector = conn.connector_class(conn)
26 custom_time_limit=None,
38 sql = ' '.join(sql.split())
40 if sql == "select name from sqlite_master where type='table'" or \
41 sql == "select name from sqlite_master where type=\"table\"":
42 results = [{'name': name} for name in self.connector.table_names()]
43 elif sql == "select name from sqlite_master where rootpage = 0 and sql like '%VIRTUAL TABLE%USING FTS%'":
44 results = [{'name': name} for name in self.connector.hidden_table_names()]
45 elif sql == 'select 1 from sqlite_master where tbl_name = "geometry_columns"':
46 if self.connector.detect_spatialite():
47 results = [{'1': '1'}]
48 elif sql == "select name from sqlite_master where type='view'":
49 results = [{'name': name} for name in self.connector.view_names()]
50 elif sql.startswith("select count(*) from ["):
51 match = re.search(r'select count\(\*\) from \[(.*)\]', sql)
52 results = [{'count(*)': self.connector.table_count(match.group(1))}]
53 elif sql.startswith("select count(*) from "):
54 match = re.search(r'select count\(\*\) from (.*)', sql)
55 results = [{'count(*)': self.connector.table_count(match.group(1))}]
56 elif sql.startswith("PRAGMA table_info("):
57 match = re.search(r'PRAGMA table_info\(\[?\"?([\d\w\/%]*)\"?\]?\)', sql)
58 results = self.connector.table_info(match.group(1))
59 elif sql.startswith("select name from sqlite_master where rootpage = 0 and ( sql like \'%VIRTUAL TABLE%USING FTS%content="):
60 match = re.search(r'select name from sqlite_master where rootpage = 0 and \( sql like \'%VIRTUAL TABLE%USING FTS%content="(.*)"', sql)
61 if self.connector.detect_fts(match.group(1)):
62 results = [{'name': match.group(1)}]
63 elif sql.startswith("PRAGMA foreign_key_list(["):
64 match = re.search(r'PRAGMA foreign_key_list\(\[(.*)\]\)', sql)
65 results = self.connector.foreign_keys(match.group(1))
66 elif sql == "select 1 from sqlite_master where type='table' and name=?":
67 if self.connector.table_exists(params[0]):
68 results = [{'1': '1'}]
69 elif sql == "select sql from sqlite_master where name = :n and type=:t":
70 if self.connector.table_exists(params['n']):
71 results = [{'sql': self.connector.table_definition(params['t'], params['n'])}]
72 elif sql == "select sql from sqlite_master where tbl_name = :n and type='index' and sql is not null":
73 results = [{'sql': sql} for sql in self.connector.indices_definition(params['n'])]
76 results, truncated, description = \
77 self.connector.execute(
81 custom_time_limit=custom_time_limit,
83 log_sql_errors=log_sql_errors,
85 except OperationalError as ex:
86 raise sqlite3.OperationalError(*ex.args)
88 self.rows = [Row(result) for result in results]
89 self.description = description
94 def fetchmany(self, max):
95 return self.rows[:max]
97 def __getitem__(self, index):
98 return self.rows[index]