- def inspect(self):
- " Inspect the database and return a dictionary of table metadata "
- if self._inspect:
- return self._inspect
-
- _inspect = {}
- files = self.files
-
- for filename in files:
- self.files = (filename,)
- path = Path(filename)
- name = path.stem
- if name in _inspect:
- raise Exception("Multiple files with the same stem %s" % name)
- try:
- _inspect[name] = self.original_inspect()[name]
- except sqlite3.DatabaseError:
- tables, views, dbtype = connectors.inspect(path)
- _inspect[name] = {
- "hash": inspect_hash(path),
- "file": str(path),
- "dbtype": dbtype,
- "tables": tables,
- "views": views,
- }
-
- self.files = files
- self._inspect = _inspect
- return self._inspect
-
- datasette.app.Datasette.original_inspect = datasette.app.Datasette.inspect
- datasette.app.Datasette.inspect = inspect
-
-
- async def execute(self, db_name, sql, params=None, truncate=False, custom_time_limit=None, page_size=None):
- """Executes sql against db_name in a thread"""
- page_size = page_size or self.page_size
-
- def is_sqlite3_conn():
- conn = getattr(connections, db_name, None)
- if not conn:
- info = self.inspect()[db_name]
- return info.get('dbtype', 'sqlite3') == 'sqlite3'
- else:
- return isinstance(conn, sqlite3.Connection)
+ async def table_columns(self, table):
+ try:
+ return await self.original_table_columns(table)
+ except sqlite3.DatabaseError:
+ return ConnectorList.table_columns(self.path, table)
+
+ Database.original_table_columns = Database.table_columns
+ Database.table_columns = table_columns
+
+
+ async def primary_keys(self, table):
+ try:
+ return await self.original_primary_keys(table)
+ except sqlite3.DatabaseError:
+ return ConnectorList.primary_keys(self.path, table)
+
+ Database.original_primary_keys = Database.primary_keys
+ Database.primary_keys = primary_keys
+