license='Apache License, Version 2.0',
packages=['datasette_connectors'],
install_requires=['datasette==0.25'],
+ tests_require=['pytest', 'aiohttp'],
entry_points='''
[console_scripts]
datasette=datasette_connectors.cli:cli
--- /dev/null
+from datasette_connectors.row import Row
+
+
+_connector_type = 'dummy'
+
+def inspect(path):
+ tables = {}
+ views = []
+
+ for table in ['table1', 'table2']:
+ tables[table] = {
+ 'name': table,
+ 'columns': ['c1', 'c2', 'c3'],
+ 'primary_keys': [],
+ 'count': 2,
+ 'label_column': None,
+ 'hidden': False,
+ 'fts_table': None,
+ 'foreign_keys': {'incoming': [], 'outgoing': []},
+ }
+
+ return tables, views, _connector_type
+
+
+class Connection:
+ def __init__(self, path):
+ self.path = path
+
+ def execute(self, sql, params=None, truncate=False, page_size=None, max_returned_rows=None):
+ sql = sql.strip()
+
+ rows = []
+ truncated = False
+ description = []
+
+ if sql == 'select c1 from table1':
+ rows = [
+ Row({'c1': 10}),
+ Row({'c1': 20})
+ ]
+ description = (('c1',),)
+ elif sql == 'select rowid, * from table2 order by rowid limit 51':
+ rows = [
+ Row({'rowid': 1, 'c1': 100, 'c2': 120, 'c3': 130}),
+ Row({'rowid': 2, 'c1': 200, 'c2': 220, 'c3': 230})
+ ]
+ description = (('rowid',), ('c1',), ('c2',), ('c3',))
+ elif sql == 'select count(*) from table2':
+ rows = [Row({'count(*)': 2})]
+ description = (('count(*)',),)
+ elif sql == """select distinct rowid from table2
+ where rowid is not null
+ limit 31""":
+ rows = [
+ Row({'rowid': 1}),
+ Row({'rowid': 2})
+ ]
+ description = (('rowid',),)
+ elif sql == """select distinct c1 from table2
+ where c1 is not null
+ limit 31""":
+ rows = [
+ Row({'c1': 100}),
+ Row({'c1': 200})
+ ]
+ description = (('c1',),)
+ elif sql == """select distinct c2 from table2
+ where c2 is not null
+ limit 31""":
+ rows = [
+ Row({'c2': 120}),
+ Row({'c2': 220})
+ ]
+ description = (('c2',),)
+ elif sql == """select distinct c3 from table2
+ where c3 is not null
+ limit 31""":
+ rows = [
+ Row({'c3': 130}),
+ Row({'c3': 230})
+ ]
+ description = (('c3',),)
+ elif sql == 'select sql from sqlite_master where name = :n and type=:t':
+ if params['t'] != 'view':
+ rows = [Row({'sql': 'CREATE TABLE ' + params['n'] + ' (c1, c2, c3)'})]
+ description = (('sql',),)
+ else:
+ raise Exception("Unexpected query: %s" % sql)
+
+ return rows, truncated, description
--- /dev/null
+from datasette_connectors import monkey; monkey.patch_datasette()
+from datasette_connectors import connectors
+from . import dummy
+connectors.db_connectors['dummy'] = dummy
+
+from datasette.app import Datasette
+import os
+import pytest
+import tempfile
+
+@pytest.fixture(scope='session')
+def app_client(max_returned_rows=None):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ filepath = os.path.join(tmpdir, 'dummy_tables.db')
+ populate_file(filepath)
+ ds = Datasette(
+ [filepath],
+ config={
+ 'default_page_size': 50,
+ 'max_returned_rows': max_returned_rows or 1000,
+ }
+ )
+ client = ds.app().test_client
+ client.ds = ds
+ yield client
+
+
+def populate_file(filepath):
+ dummyfile = open(filepath, "w")
+ dummyfile.write("This is a dummy file. We need something to force a SQLite error")
+ dummyfile.close()
--- /dev/null
+from .fixtures import app_client
+from urllib.parse import urlencode
+
+def test_homepage(app_client):
+ _, response = app_client.get('/.json')
+ assert response.status == 200
+ assert response.json.keys() == {'dummy_tables': 0}.keys()
+ d = response.json['dummy_tables']
+ assert d['name'] == 'dummy_tables'
+ assert d['tables_count'] == 2
+
+def test_database_page(app_client):
+ response = app_client.get('/dummy_tables.json', gather_request=False)
+ data = response.json
+ assert 'dummy_tables' == data['database']
+ assert [{
+ 'name': 'table1',
+ 'columns': ['c1', 'c2', 'c3'],
+ 'primary_keys': [],
+ 'count': 2,
+ 'label_column': None,
+ 'hidden': False,
+ 'fts_table': None,
+ 'foreign_keys': {'incoming': [], 'outgoing': []}
+ }, {
+ 'name': 'table2',
+ 'columns': ['c1', 'c2', 'c3'],
+ 'primary_keys': [],
+ 'count': 2,
+ 'label_column': None,
+ 'hidden': False,
+ 'fts_table': None,
+ 'foreign_keys': {'incoming': [], 'outgoing': []}
+ }] == data['tables']
+
+def test_custom_sql(app_client):
+ response = app_client.get(
+ '/dummy_tables.json?' + urlencode({
+ 'sql': 'select c1 from table1',
+ '_shape': 'objects'
+ }),
+ gather_request=False
+ )
+ data = response.json
+ assert {
+ 'sql': 'select c1 from table1',
+ 'params': {}
+ } == data['query']
+ assert 2 == len(data['rows'])
+ assert [
+ {'c1': 10},
+ {'c1': 20}
+ ] == data['rows']
+ assert ['c1'] == data['columns']
+ assert 'dummy_tables' == data['database']
+ assert not data['truncated']
+
+def test_invalid_custom_sql(app_client):
+ response = app_client.get(
+ '/dummy_tables.json?sql=.schema',
+ gather_request=False
+ )
+ assert response.status == 400
+ assert response.json['ok'] is False
+ assert 'Statement must be a SELECT' == response.json['error']
+
+def test_table_json(app_client):
+ response = app_client.get(
+ '/dummy_tables/table2.json?_shape=objects',
+ gather_request=False
+ )
+ assert response.status == 200
+ data = response.json
+ assert data['query']['sql'] == 'select rowid, * from table2 order by rowid limit 51'
+ assert data['rows'] == [{
+ 'rowid': 1,
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130
+ }, {
+ 'rowid': 2,
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230
+ }]
+
+def test_table_not_exists_json(app_client):
+ assert {
+ 'ok': False,
+ 'error': 'Table not found: blah',
+ 'status': 404,
+ 'title': None,
+ } == app_client.get(
+ '/dummy_tables/blah.json', gather_request=False
+ ).json
+
+def test_table_shape_arrays(app_client):
+ response = app_client.get(
+ '/dummy_tables/table2.json?_shape=arrays',
+ gather_request=False
+ )
+ assert [
+ [1, 100, 120, 130],
+ [2, 200, 220, 230],
+ ] == response.json['rows']
+
+def test_table_shape_objects(app_client):
+ response = app_client.get(
+ '/dummy_tables/table2.json?_shape=objects',
+ gather_request=False
+ )
+ assert [{
+ 'rowid': 1,
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130,
+ }, {
+ 'rowid': 2,
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230,
+ }] == response.json['rows']
+
+def test_table_shape_array(app_client):
+ response = app_client.get(
+ '/dummy_tables/table2.json?_shape=array',
+ gather_request=False
+ )
+ assert [{
+ 'rowid': 1,
+ 'c1': 100,
+ 'c2': 120,
+ 'c3': 130,
+ }, {
+ 'rowid': 2,
+ 'c1': 200,
+ 'c2': 220,
+ 'c3': 230,
+ }] == response.json
+
+def test_table_shape_invalid(app_client):
+ response = app_client.get(
+ '/dummy_tables/table2.json?_shape=invalid',
+ gather_request=False
+ )
+ assert {
+ 'ok': False,
+ 'error': 'Invalid _shape: invalid',
+ 'status': 400,
+ 'title': None,
+ } == response.json
--- /dev/null
+from .fixtures import app_client
+
+def test_homepage(app_client):
+ response = app_client.get('/', gather_request=False)
+ assert response.status == 200
+ assert 'dummy_tables' in response.text
+
+def test_database_page(app_client):
+ response = app_client.get('/dummy_tables', allow_redirects=False, gather_request=False)
+ assert response.status == 302
+ response = app_client.get('/dummy_tables', gather_request=False)
+ assert 'dummy_tables' in response.text
+
+def test_table(app_client):
+ response = app_client.get('/dummy_tables/table2', gather_request=False)
+ assert response.status == 200