-# Announcing datasette-connectors 1.0.0
+# Announcing datasette-connectors 2.0.0
## What's new
-This is the first release of datasette-connectors, derived from the [Datasette](https://github.com/simonw/datasette) fork for supporting [Datasette-Pytables](https://github.com/PyTables/datasette-pytables).
+This is a mayor version of datasette-connectors. There are two main features added. First, [Datasette 0.51.1](https://github.com/simonw/datasette) is used. Second, our API has been improved to a more pythonic style. Details can be found at [dummy example](https://github.com/PyTables/datasette-connectors/blob/master/tests/dummy.py).
## What it is
# Release notes for datasette-connectors
-## Changes from 1.0.0 to 1.0.1
+## Changes from 1.0.0 to 2.0.0
- #XXX version-specific blurb XXX#
+* Compatible with Datasette 0.51.1
+
+* A lot of changes in API adopting a more pythonic style.
## Initial version 1.0.0
+from .connectors import Connector
+from .connection import Connection
+from .cursor import OperationalError
from .monkey import patch_datasette; patch_datasette()
-from .connectors import load; load()
+from .connectors import ConnectorList; ConnectorList.load()
from datasette.cli import cli
--- /dev/null
+from .cursor import Cursor
+
+
+class Connection:
+ def __init__(self, path, connector_class):
+ self.path = path
+ self.connector_class = connector_class
+
+ def execute(self, *args, **kwargs):
+ cursor = Cursor(self)
+ cursor.execute(*args, **kwargs)
+ return cursor
+
+ def cursor(self):
+ return Cursor(self)
+
+ def set_progress_handler(self, handler, n):
+ pass
-import pkg_resources
+from .connection import Connection
+
db_connectors = {}
-def load():
- for entry_point in pkg_resources.iter_entry_points('datasette.connectors'):
- db_connectors[entry_point.name] = entry_point.load()
-
-def inspect(path):
- for connector in db_connectors.values():
- try:
- return connector.inspect(path)
- except:
- pass
- else:
- raise Exception("No database connector found for %s" % path)
-
-def connect(path, dbtype):
- try:
- return db_connectors[dbtype].Connection(path)
- except:
- raise Exception("No database connector found for %s" % path)
+
+class ConnectorList:
+ @staticmethod
+ def load():
+ for entry_point in pkg_resources.iter_entry_points('datasette.connectors'):
+ db_connectors[entry_point.name] = entry_point.load()
+
+ @staticmethod
+ def add_connector(name, connector):
+ db_connectors[name] = connector
+
+ class DatabaseNotSupported(Exception):
+ pass
+
+ @staticmethod
+ def connect(path):
+ for connector in db_connectors.values():
+ try:
+ return connector.connect(path)
+ except:
+ pass
+ else:
+ raise ConnectorList.DatabaseNotSupported
+
+
+class Connector:
+ connector_type = None
+ connection_class = Connection
+
+ @classmethod
+ def connect(cls, path):
+ return cls.connection_class(path, cls)
+
+ def __init__(self, conn):
+ self.conn = conn
+
+ def table_names(self):
+ """
+ Return a list of table names
+ """
+ raise NotImplementedError
+
+ def hidden_table_names(self):
+ raise NotImplementedError
+
+ def detect_spatialite(self):
+ """
+ Return boolean indicating if geometry_columns exists
+ """
+ raise NotImplementedError
+
+ def view_names(self):
+ """
+ Return a list of view names
+ """
+ raise NotImplementedError
+
+ def table_count(self, table_name):
+ """
+ Return an integer with the rows count of the table
+ """
+ raise NotImplementedError
+
+ def table_info(self, table_name):
+ """
+ Return a list of dictionaries with columns description, with format:
+ [
+ {
+ 'idx': 0,
+ 'name': 'column1',
+ 'primary_key': False,
+ },
+ ...
+ ]
+ """
+ raise NotImplementedError
+
+ def detect_fts(self, table_name):
+ """
+ Return boolean indicating if table has a corresponding FTS virtual table
+ """
+ raise NotImplementedError
+
+ def foreign_keys(self, table_name):
+ """
+ Return a list of dictionaries with foreign keys description
+ id, seq, table_name, from_, to_, on_update, on_delete, match
+ """
+ raise NotImplementedError
+
+ def table_exists(self, table_name):
+ """
+ Return boolean indicating if table exists in the database
+ """
+ raise NotImplementedError
+
+ def table_definition(self, table_type, table_name):
+ """
+ Return string with a 'CREATE TABLE' sql definition
+ """
+ raise NotImplementedError
+
+ def indices_definition(self, table_name):
+ """
+ Return a list of strings with 'CREATE INDEX' sql definitions
+ """
+ raise NotImplementedError
+
+ def execute(
+ self,
+ sql,
+ params=None,
+ truncate=False,
+ custom_time_limit=None,
+ page_size=None,
+ log_sql_errors=True,
+ ):
+ raise NotImplementedError
--- /dev/null
+import re
+import sqlite3
+
+from .row import Row
+
+
+class OperationalError(Exception):
+ pass
+
+
+class Cursor:
+ class QueryNotSupported(Exception):
+ pass
+
+ def __init__(self, conn):
+ self.conn = conn
+ self.connector = conn.connector_class(conn)
+ self.rows = []
+ self.description = ()
+
+ def execute(
+ self,
+ sql,
+ params=None,
+ truncate=False,
+ custom_time_limit=None,
+ page_size=None,
+ log_sql_errors=True,
+ ):
+ if params is None:
+ params = {}
+ results = []
+ truncated = False
+ description = ()
+
+ # Normalize sql
+ sql = sql.strip()
+ sql = ' '.join(sql.split())
+
+ if sql == "select name from sqlite_master where type='table'" or \
+ sql == "select name from sqlite_master where type=\"table\"":
+ results = [{'name': name} for name in self.connector.table_names()]
+ elif sql == "select name from sqlite_master where rootpage = 0 and sql like '%VIRTUAL TABLE%USING FTS%'":
+ results = [{'name': name} for name in self.connector.hidden_table_names()]
+ elif sql == 'select 1 from sqlite_master where tbl_name = "geometry_columns"':
+ if self.connector.detect_spatialite():
+ results = [{'1': '1'}]
+ elif sql == "select name from sqlite_master where type='view'":
+ results = [{'name': name} for name in self.connector.view_names()]
+ elif sql.startswith("select count(*) from ["):
+ match = re.search(r'select count\(\*\) from \[(.*)\]', sql)
+ results = [{'count(*)': self.connector.table_count(match.group(1))}]
+ elif sql.startswith("select count(*) from "):
+ match = re.search(r'select count\(\*\) from (.*)', sql)
+ results = [{'count(*)': self.connector.table_count(match.group(1))}]
+ elif sql.startswith("PRAGMA table_info("):
+ match = re.search(r'PRAGMA table_info\(\[?\"?([\d\w\/%]*)\"?\]?\)', sql)
+ results = self.connector.table_info(match.group(1))
+ elif sql.startswith("select name from sqlite_master where rootpage = 0 and ( sql like \'%VIRTUAL TABLE%USING FTS%content="):
+ match = re.search(r'select name from sqlite_master where rootpage = 0 and \( sql like \'%VIRTUAL TABLE%USING FTS%content="(.*)"', sql)
+ if self.connector.detect_fts(match.group(1)):
+ results = [{'name': match.group(1)}]
+ elif sql.startswith("PRAGMA foreign_key_list(["):
+ match = re.search(r'PRAGMA foreign_key_list\(\[(.*)\]\)', sql)
+ results = self.connector.foreign_keys(match.group(1))
+ elif sql == "select 1 from sqlite_master where type='table' and name=?":
+ if self.connector.table_exists(params[0]):
+ results = [{'1': '1'}]
+ elif sql == "select sql from sqlite_master where name = :n and type=:t":
+ if self.connector.table_exists(params['n']):
+ results = [{'sql': self.connector.table_definition(params['t'], params['n'])}]
+ elif sql == "select sql from sqlite_master where tbl_name = :n and type='index' and sql is not null":
+ results = [{'sql': sql} for sql in self.connector.indices_definition(params['n'])]
+ else:
+ try:
+ results, truncated, description = \
+ self.connector.execute(
+ sql,
+ params=params,
+ truncate=truncate,
+ custom_time_limit=custom_time_limit,
+ page_size=page_size,
+ log_sql_errors=log_sql_errors,
+ )
+ except OperationalError as ex:
+ raise sqlite3.OperationalError(*ex.args)
+
+ self.rows = [Row(result) for result in results]
+ self.description = description
+
+ def fetchall(self):
+ return self.rows
+
+ def fetchmany(self, max):
+ return self.rows[:max]
+
+ def __getitem__(self, index):
+ return self.rows[index]
import asyncio
-import datasette
-from datasette.app import connections
-from datasette.inspect import inspect_hash
-from datasette.utils import Results
-from pathlib import Path
+import threading
import sqlite3
-from . import connectors
+import datasette.views.base
+from datasette.tracer import trace
+from datasette.database import Database
+from datasette.database import Results
+
+from .connectors import ConnectorList
+
+connections = threading.local()
def patch_datasette():
Monkey patching for original Datasette
"""
- def inspect(self):
- " Inspect the database and return a dictionary of table metadata "
- if self._inspect:
- return self._inspect
-
- _inspect = {}
- files = self.files
-
- for filename in files:
- self.files = (filename,)
- path = Path(filename)
- name = path.stem
- if name in _inspect:
- raise Exception("Multiple files with the same stem %s" % name)
- try:
- _inspect[name] = self.original_inspect()[name]
- except sqlite3.DatabaseError:
- tables, views, dbtype = connectors.inspect(path)
- _inspect[name] = {
- "hash": inspect_hash(path),
- "file": str(path),
- "dbtype": dbtype,
- "tables": tables,
- "views": views,
- }
-
- self.files = files
- self._inspect = _inspect
- return self._inspect
-
- datasette.app.Datasette.original_inspect = datasette.app.Datasette.inspect
- datasette.app.Datasette.inspect = inspect
+ def connect(self, write=False):
+ try:
+ # Check if it's a sqlite database
+ conn = self.original_connect(write=write)
+ conn.execute("select name from sqlite_master where type='table'")
+ return conn
+ except sqlite3.DatabaseError:
+ conn = ConnectorList.connect(self.path)
+ return conn
+ Database.original_connect = Database.connect
+ Database.connect = connect
- async def execute(self, db_name, sql, params=None, truncate=False, custom_time_limit=None, page_size=None):
- """Executes sql against db_name in a thread"""
- page_size = page_size or self.page_size
- def is_sqlite3_conn():
- conn = getattr(connections, db_name, None)
+ async def execute_fn(self, fn):
+ def in_thread():
+ conn = getattr(connections, self.name, None)
if not conn:
- info = self.inspect()[db_name]
- return info.get('dbtype', 'sqlite3') == 'sqlite3'
- else:
- return isinstance(conn, sqlite3.Connection)
-
- def sql_operation_in_thread():
- conn = getattr(connections, db_name, None)
- if not conn:
- info = self.inspect()[db_name]
- conn = connectors.connect(info['file'], info['dbtype'])
- setattr(connections, db_name, conn)
-
- rows, truncated, description = conn.execute(
- sql,
- params or {},
- truncate=truncate,
- page_size=page_size,
- max_returned_rows=self.max_returned_rows,
- )
- return Results(rows, truncated, description)
-
- if is_sqlite3_conn():
- return await self.original_execute(db_name, sql, params=params, truncate=truncate, custom_time_limit=custom_time_limit, page_size=page_size)
- else:
- return await asyncio.get_event_loop().run_in_executor(
- self.executor, sql_operation_in_thread
- )
-
- datasette.app.Datasette.original_execute = datasette.app.Datasette.execute
- datasette.app.Datasette.execute = execute
+ conn = self.connect()
+ if isinstance(conn, sqlite3.Connection):
+ self.ds._prepare_connection(conn, self.name)
+ setattr(connections, self.name, conn)
+ return fn(conn)
+
+ return await asyncio.get_event_loop().run_in_executor(
+ self.ds.executor, in_thread
+ )
+
+ Database.original_execute_fn = Database.execute_fn
+ Database.execute_fn = execute_fn
long_description=get_long_description(),
long_description_content_type='text/markdown',
author='Javier Sancho',
+ author_email='jsf@jsancho.org',
url='https://github.com/pytables/datasette-connectors',
license='Apache License, Version 2.0',
packages=['datasette_connectors'],
- install_requires=['datasette==0.25'],
- tests_require=['pytest', 'aiohttp'],
+ install_requires=[
+ 'datasette==0.51.1',
+ ],
+ tests_require=[
+ 'pytest',
+ 'aiohttp',
+ 'asgiref',
+ ],
entry_points='''
[console_scripts]
datasette=datasette_connectors.cli:cli
-from datasette_connectors.row import Row
+import datasette_connectors as dc
-_connector_type = 'dummy'
+class DummyConnector(dc.Connector):
+ connector_type = 'dummy'
-def inspect(path):
- tables = {}
- views = []
+ def table_names(self):
+ return ['table1', 'table2']
- for table in ['table1', 'table2']:
- tables[table] = {
- 'name': table,
- 'columns': ['c1', 'c2', 'c3'],
- 'primary_keys': [],
- 'count': 2,
- 'label_column': None,
- 'hidden': False,
- 'fts_table': None,
- 'foreign_keys': {'incoming': [], 'outgoing': []},
- }
+ def hidden_table_names(self):
+ return []
- return tables, views, _connector_type
+ def detect_spatialite(self):
+ return False
+ def view_names(self):
+ return []
-class Connection:
- def __init__(self, path):
- self.path = path
+ def table_count(self, table_name):
+ return 2
- def execute(self, sql, params=None, truncate=False, page_size=None, max_returned_rows=None):
- sql = sql.strip()
+ def table_info(self, table_name):
+ return [
+ {
+ 'cid': 0,
+ 'name': 'c1',
+ 'type': 'integer',
+ 'notnull': False,
+ 'default_value': None,
+ 'is_pk': False,
+ },
+ {
+ 'cid': 1,
+ 'name': 'c2',
+ 'type': 'integer',
+ 'notnull': False,
+ 'default_value': None,
+ 'is_pk': False,
+ },
+ {
+ 'cid': 2,
+ 'name': 'c3',
+ 'type': 'integer',
+ 'notnull': False,
+ 'default_value': None,
+ 'is_pk': False,
+ },
+ ]
- rows = []
+ def detect_fts(self, table_name):
+ return False
+
+ def foreign_keys(self, table_name):
+ return []
+
+ def table_exists(self, table_name):
+ return table_name in ['table1', 'table2']
+
+ def table_definition(self, table_type, table_name):
+ return 'CREATE TABLE ' + table_name + ' (c1, c2, c3)'
+
+ def indices_definition(self, table_name):
+ return []
+
+ def execute(
+ self,
+ sql,
+ params=None,
+ truncate=False,
+ custom_time_limit=None,
+ page_size=None,
+ log_sql_errors=True,
+ ):
+ results = []
truncated = False
- description = []
+ description = ()
if sql == 'select c1 from table1':
- rows = [
- Row({'c1': 10}),
- Row({'c1': 20})
+ results = [
+ {'c1': 10},
+ {'c1': 20},
]
description = (('c1',),)
- elif sql == 'select rowid, * from table2 order by rowid limit 51':
- rows = [
- Row({'rowid': 1, 'c1': 100, 'c2': 120, 'c3': 130}),
- Row({'rowid': 2, 'c1': 200, 'c2': 220, 'c3': 230})
- ]
- description = (('rowid',), ('c1',), ('c2',), ('c3',))
- elif sql == 'select count(*) from table2':
- rows = [Row({'count(*)': 2})]
- description = (('count(*)',),)
- elif sql == """select distinct rowid from table2
- where rowid is not null
- limit 31""":
- rows = [
- Row({'rowid': 1}),
- Row({'rowid': 2})
+ elif sql == 'select c1, c2, c3 from table2 limit 51':
+ results = [
+ {'c1': 100, 'c2': 120, 'c3': 130},
+ {'c1': 200, 'c2': 220, 'c3': 230},
]
- description = (('rowid',),)
- elif sql == """select distinct c1 from table2
- where c1 is not null
- limit 31""":
- rows = [
- Row({'c1': 100}),
- Row({'c1': 200})
+ description = (('c1',), ('c2',), ('c3',))
+ elif sql == "select * from (select c1, c2, c3 from table2 ) limit 0":
+ pass
+ elif sql == "select c1, count(*) as n from ( select c1, c2, c3 from table2 ) where c1 is not null group by c1 limit 31":
+ results = [
+ {'c1': 100, 'n': 1},
+ {'c1': 200, 'n': 1},
]
- description = (('c1',),)
- elif sql == """select distinct c2 from table2
- where c2 is not null
- limit 31""":
- rows = [
- Row({'c2': 120}),
- Row({'c2': 220})
+ description = (('c1',), ('n',))
+ elif sql == "select c2, count(*) as n from ( select c1, c2, c3 from table2 ) where c2 is not null group by c2 limit 31":
+ results = [
+ {'c2': 120, 'n': 1},
+ {'c2': 220, 'n': 1},
]
- description = (('c2',),)
- elif sql == """select distinct c3 from table2
- where c3 is not null
- limit 31""":
- rows = [
- Row({'c3': 130}),
- Row({'c3': 230})
+ description = (('c2',), ('n',))
+ elif sql == "select c3, count(*) as n from ( select c1, c2, c3 from table2 ) where c3 is not null group by c3 limit 31":
+ results = [
+ {'c3': 130, 'n': 1},
+ {'c3': 230, 'n': 1},
]
- description = (('c3',),)
- elif sql == 'select sql from sqlite_master where name = :n and type=:t':
- if params['t'] != 'view':
- rows = [Row({'sql': 'CREATE TABLE ' + params['n'] + ' (c1, c2, c3)'})]
- description = (('sql',),)
+ description = (('c3',), ('n',))
+ elif sql == 'select date(c1) from ( select c1, c2, c3 from table2 ) where c1 glob "????-??-*" limit 100;':
+ pass
+ elif sql == "select c1, c2, c3 from blah limit 51":
+ raise dc.OperationalError("no such table: blah")
else:
- raise Exception("Unexpected query: %s" % sql)
+ raise Exception("Unexpected query:", sql)
- return rows, truncated, description
+ return results, truncated, description
from datasette_connectors import monkey; monkey.patch_datasette()
-from datasette_connectors import connectors
-from . import dummy
-connectors.db_connectors['dummy'] = dummy
+from datasette_connectors.connectors import ConnectorList
+from .dummy import DummyConnector
+ConnectorList.add_connector('dummy', DummyConnector)
from datasette.app import Datasette
+from datasette.utils.testing import TestClient
import os
import pytest
import tempfile
+import contextlib
-@pytest.fixture(scope='session')
-def app_client(max_returned_rows=None):
+
+def populate_file(filepath):
+ dummyfile = open(filepath, "w")
+ dummyfile.write("This is a dummy file. We need something to force a SQLite error")
+ dummyfile.close()
+
+
+@contextlib.contextmanager
+def make_app_client(
+ max_returned_rows=None,
+ config=None,
+ is_immutable=False,
+):
with tempfile.TemporaryDirectory() as tmpdir:
filepath = os.path.join(tmpdir, 'dummy_tables.db')
populate_file(filepath)
+ if is_immutable:
+ files = []
+ immutables = [filepath]
+ else:
+ files = [filepath]
+ immutables = []
+ config = config or {}
+ config.update({
+ 'default_page_size': 50,
+ 'max_returned_rows': max_returned_rows or 1000,
+ })
ds = Datasette(
- [filepath],
- config={
- 'default_page_size': 50,
- 'max_returned_rows': max_returned_rows or 1000,
- }
+ files,
+ immutables=immutables,
+ config=config,
)
- client = ds.app().test_client
+ client = TestClient(ds.app())
client.ds = ds
yield client
-def populate_file(filepath):
- dummyfile = open(filepath, "w")
- dummyfile.write("This is a dummy file. We need something to force a SQLite error")
- dummyfile.close()
+@pytest.fixture(scope='session')
+def app_client():
+ with make_app_client() as client:
+ yield client
+
+
+@pytest.fixture(scope='session')
+def app_client_with_hash():
+ with make_app_client(config={"hash_urls": True}, is_immutable=True) as client:
+ yield client
from urllib.parse import urlencode
def test_homepage(app_client):
- _, response = app_client.get('/.json')
+ response = app_client.get('/.json')
assert response.status == 200
assert response.json.keys() == {'dummy_tables': 0}.keys()
d = response.json['dummy_tables']
assert d['tables_count'] == 2
def test_database_page(app_client):
- response = app_client.get('/dummy_tables.json', gather_request=False)
+ response = app_client.get('/dummy_tables.json')
data = response.json
assert 'dummy_tables' == data['database']
- assert [{
- 'name': 'table1',
- 'columns': ['c1', 'c2', 'c3'],
- 'primary_keys': [],
- 'count': 2,
- 'label_column': None,
- 'hidden': False,
- 'fts_table': None,
- 'foreign_keys': {'incoming': [], 'outgoing': []}
- }, {
- 'name': 'table2',
- 'columns': ['c1', 'c2', 'c3'],
- 'primary_keys': [],
- 'count': 2,
- 'label_column': None,
- 'hidden': False,
- 'fts_table': None,
- 'foreign_keys': {'incoming': [], 'outgoing': []}
- }] == data['tables']
+ assert len(data['tables']) == 2
+ assert data['tables'][0]['count'] == 2
+ assert data['tables'][0]['columns'] == ['c1', 'c2', 'c3']
def test_custom_sql(app_client):
response = app_client.get(
'sql': 'select c1 from table1',
'_shape': 'objects'
}),
- gather_request=False
)
data = response.json
assert {
assert not data['truncated']
def test_invalid_custom_sql(app_client):
- response = app_client.get(
- '/dummy_tables.json?sql=.schema',
- gather_request=False
- )
+ response = app_client.get('/dummy_tables.json?sql=.schema')
assert response.status == 400
assert response.json['ok'] is False
assert 'Statement must be a SELECT' == response.json['error']
def test_table_json(app_client):
- response = app_client.get(
- '/dummy_tables/table2.json?_shape=objects',
- gather_request=False
- )
+ response = app_client.get('/dummy_tables/table2.json?_shape=objects')
assert response.status == 200
data = response.json
- assert data['query']['sql'] == 'select rowid, * from table2 order by rowid limit 51'
- assert data['rows'] == [{
- 'rowid': 1,
- 'c1': 100,
- 'c2': 120,
- 'c3': 130
- }, {
- 'rowid': 2,
- 'c1': 200,
- 'c2': 220,
- 'c3': 230
- }]
+ assert data['query']['sql'] == 'select c1, c2, c3 from table2 limit 51'
+ assert data['rows'] == [
+ {
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130,
+ },
+ {
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230,
+ }]
def test_table_not_exists_json(app_client):
assert {
'error': 'Table not found: blah',
'status': 404,
'title': None,
- } == app_client.get(
- '/dummy_tables/blah.json', gather_request=False
- ).json
+ } == app_client.get('/dummy_tables/blah.json').json
def test_table_shape_arrays(app_client):
- response = app_client.get(
- '/dummy_tables/table2.json?_shape=arrays',
- gather_request=False
- )
+ response = app_client.get('/dummy_tables/table2.json?_shape=arrays')
assert [
- [1, 100, 120, 130],
- [2, 200, 220, 230],
+ [100, 120, 130],
+ [200, 220, 230],
] == response.json['rows']
def test_table_shape_objects(app_client):
- response = app_client.get(
- '/dummy_tables/table2.json?_shape=objects',
- gather_request=False
- )
- assert [{
- 'rowid': 1,
- 'c1': 100,
- 'c2': 120,
- 'c3': 130,
- }, {
- 'rowid': 2,
- 'c1': 200,
- 'c2': 220,
- 'c3': 230,
- }] == response.json['rows']
+ response = app_client.get('/dummy_tables/table2.json?_shape=objects')
+ assert [
+ {
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130,
+ },
+ {
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230,
+ },
+ ] == response.json['rows']
def test_table_shape_array(app_client):
- response = app_client.get(
- '/dummy_tables/table2.json?_shape=array',
- gather_request=False
- )
- assert [{
- 'rowid': 1,
- 'c1': 100,
- 'c2': 120,
- 'c3': 130,
- }, {
- 'rowid': 2,
- 'c1': 200,
- 'c2': 220,
- 'c3': 230,
- }] == response.json
+ response = app_client.get('/dummy_tables/table2.json?_shape=array')
+ assert [
+ {
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130,
+ },
+ {
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230,
+ },
+ ] == response.json
def test_table_shape_invalid(app_client):
- response = app_client.get(
- '/dummy_tables/table2.json?_shape=invalid',
- gather_request=False
- )
+ response = app_client.get('/dummy_tables/table2.json?_shape=invalid')
assert {
'ok': False,
'error': 'Invalid _shape: invalid',
-from .fixtures import app_client
+from .fixtures import app_client, app_client_with_hash
def test_homepage(app_client):
- response = app_client.get('/', gather_request=False)
+ response = app_client.get('/')
assert response.status == 200
assert 'dummy_tables' in response.text
-def test_database_page(app_client):
- response = app_client.get('/dummy_tables', allow_redirects=False, gather_request=False)
+def test_database_page(app_client_with_hash):
+ response = app_client_with_hash.get('/dummy_tables', allow_redirects=False)
assert response.status == 302
- response = app_client.get('/dummy_tables', gather_request=False)
+ response = app_client_with_hash.get('/dummy_tables')
assert 'dummy_tables' in response.text
def test_table(app_client):
- response = app_client.get('/dummy_tables/table2', gather_request=False)
+ response = app_client.get('/dummy_tables/table2')
assert response.status == 200