]> git.jsancho.org Git - datasette-connectors.git/commitdiff
Some tests
authorJavier Sancho <jsf@jsancho.org>
Mon, 8 Oct 2018 14:55:04 +0000 (16:55 +0200)
committerJavier Sancho <jsf@jsancho.org>
Mon, 8 Oct 2018 14:55:04 +0000 (16:55 +0200)
setup.py
tests/__init__.py [new file with mode: 0644]
tests/dummy.py [new file with mode: 0644]
tests/fixtures.py [new file with mode: 0644]
tests/test_api.py [new file with mode: 0644]
tests/test_html.py [new file with mode: 0644]

index 4b7d0f4c8e114bcfebeac45a2783b9b85ae6d9eb..0427641afd02a1e64f446658f7acb5cbc3b7ad10 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -23,6 +23,7 @@ setup(
     license='Apache License, Version 2.0',
     packages=['datasette_connectors'],
     install_requires=['datasette==0.25'],
     license='Apache License, Version 2.0',
     packages=['datasette_connectors'],
     install_requires=['datasette==0.25'],
+    tests_require=['pytest', 'aiohttp'],
     entry_points='''
         [console_scripts]
         datasette=datasette_connectors.cli:cli
     entry_points='''
         [console_scripts]
         datasette=datasette_connectors.cli:cli
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tests/dummy.py b/tests/dummy.py
new file mode 100644 (file)
index 0000000..b4ae1c0
--- /dev/null
@@ -0,0 +1,90 @@
+from datasette_connectors.row import Row
+
+
+_connector_type = 'dummy'
+
+def inspect(path):
+    tables = {}
+    views = []
+
+    for table in ['table1', 'table2']:
+        tables[table] = {
+            'name': table,
+            'columns': ['c1', 'c2', 'c3'],
+            'primary_keys': [],
+            'count': 2,
+            'label_column': None,
+            'hidden': False,
+            'fts_table': None,
+            'foreign_keys': {'incoming': [], 'outgoing': []},
+        }
+
+    return tables, views, _connector_type
+
+
+class Connection:
+    def __init__(self, path):
+        self.path = path
+
+    def execute(self, sql, params=None, truncate=False, page_size=None, max_returned_rows=None):
+        sql = sql.strip()
+
+        rows = []
+        truncated = False
+        description = []
+
+        if sql == 'select c1 from table1':
+            rows = [
+                Row({'c1': 10}),
+                Row({'c1': 20})
+            ]
+            description = (('c1',),)
+        elif sql == 'select rowid, * from table2 order by rowid limit 51':
+            rows = [
+                Row({'rowid': 1, 'c1': 100, 'c2': 120, 'c3': 130}),
+                Row({'rowid': 2, 'c1': 200, 'c2': 220, 'c3': 230})
+            ]
+            description = (('rowid',), ('c1',), ('c2',), ('c3',))
+        elif sql == 'select count(*) from table2':
+            rows = [Row({'count(*)': 2})]
+            description = (('count(*)',),)
+        elif sql == """select distinct rowid from table2 
+                        where rowid is not null
+                        limit 31""":
+            rows = [
+                Row({'rowid': 1}),
+                Row({'rowid': 2})
+            ]
+            description = (('rowid',),)
+        elif sql == """select distinct c1 from table2 
+                        where c1 is not null
+                        limit 31""":
+            rows = [
+                Row({'c1': 100}),
+                Row({'c1': 200})
+            ]
+            description = (('c1',),)
+        elif sql == """select distinct c2 from table2 
+                        where c2 is not null
+                        limit 31""":
+            rows = [
+                Row({'c2': 120}),
+                Row({'c2': 220})
+            ]
+            description = (('c2',),)
+        elif sql == """select distinct c3 from table2 
+                        where c3 is not null
+                        limit 31""":
+            rows = [
+                Row({'c3': 130}),
+                Row({'c3': 230})
+            ]
+            description = (('c3',),)
+        elif sql == 'select sql from sqlite_master where name = :n and type=:t':
+            if params['t'] != 'view':
+                rows = [Row({'sql': 'CREATE TABLE ' + params['n'] + ' (c1, c2, c3)'})]
+                description = (('sql',),)
+        else:
+            raise Exception("Unexpected query: %s" % sql)
+
+        return rows, truncated, description
diff --git a/tests/fixtures.py b/tests/fixtures.py
new file mode 100644 (file)
index 0000000..6b772c6
--- /dev/null
@@ -0,0 +1,31 @@
+from datasette_connectors import monkey; monkey.patch_datasette()
+from datasette_connectors import connectors
+from . import dummy
+connectors.db_connectors['dummy'] = dummy
+
+from datasette.app import Datasette
+import os
+import pytest
+import tempfile
+
+@pytest.fixture(scope='session')
+def app_client(max_returned_rows=None):
+    with tempfile.TemporaryDirectory() as tmpdir:
+        filepath = os.path.join(tmpdir, 'dummy_tables.db')
+        populate_file(filepath)
+        ds = Datasette(
+            [filepath],
+            config={
+                'default_page_size': 50,
+                'max_returned_rows': max_returned_rows or 1000,
+            }
+        )
+        client = ds.app().test_client
+        client.ds = ds
+        yield client
+
+
+def populate_file(filepath):
+    dummyfile = open(filepath, "w")
+    dummyfile.write("This is a dummy file. We need something to force a SQLite error")
+    dummyfile.close()
diff --git a/tests/test_api.py b/tests/test_api.py
new file mode 100644 (file)
index 0000000..63555cd
--- /dev/null
@@ -0,0 +1,151 @@
+from .fixtures import app_client
+from urllib.parse import urlencode
+
+def test_homepage(app_client):
+    _, response = app_client.get('/.json')
+    assert response.status == 200
+    assert response.json.keys() == {'dummy_tables': 0}.keys()
+    d = response.json['dummy_tables']
+    assert d['name'] == 'dummy_tables'
+    assert d['tables_count'] == 2
+
+def test_database_page(app_client):
+    response = app_client.get('/dummy_tables.json', gather_request=False)
+    data = response.json
+    assert 'dummy_tables' == data['database']
+    assert [{
+        'name': 'table1',
+        'columns': ['c1', 'c2', 'c3'],
+        'primary_keys': [],
+        'count': 2,
+        'label_column': None,
+        'hidden': False,
+        'fts_table': None,
+        'foreign_keys': {'incoming': [], 'outgoing': []}
+    }, {
+        'name': 'table2',
+        'columns': ['c1', 'c2', 'c3'],
+        'primary_keys': [],
+        'count': 2,
+        'label_column': None,
+        'hidden': False,
+        'fts_table': None,
+        'foreign_keys': {'incoming': [], 'outgoing': []}
+    }] == data['tables']
+
+def test_custom_sql(app_client):
+    response = app_client.get(
+        '/dummy_tables.json?' + urlencode({
+            'sql': 'select c1 from table1',
+            '_shape': 'objects'
+        }),
+        gather_request=False
+    )
+    data = response.json
+    assert {
+        'sql': 'select c1 from table1',
+        'params': {}
+    } == data['query']
+    assert 2 == len(data['rows'])
+    assert [
+        {'c1': 10},
+        {'c1': 20}
+    ] == data['rows']
+    assert ['c1'] == data['columns']
+    assert 'dummy_tables' == data['database']
+    assert not data['truncated']
+
+def test_invalid_custom_sql(app_client):
+    response = app_client.get(
+        '/dummy_tables.json?sql=.schema',
+        gather_request=False
+    )
+    assert response.status == 400
+    assert response.json['ok'] is False
+    assert 'Statement must be a SELECT' == response.json['error']
+
+def test_table_json(app_client):
+    response = app_client.get(
+        '/dummy_tables/table2.json?_shape=objects',
+        gather_request=False
+    )
+    assert response.status == 200
+    data = response.json
+    assert data['query']['sql'] == 'select rowid, * from table2 order by rowid limit 51'
+    assert data['rows'] == [{
+        'rowid': 1,
+        'c1': 100,
+        'c2': 120,
+        'c3': 130
+    }, {
+        'rowid': 2,
+        'c1': 200,
+        'c2': 220,
+        'c3': 230
+    }]
+
+def test_table_not_exists_json(app_client):
+    assert {
+        'ok': False,
+        'error': 'Table not found: blah',
+        'status': 404,
+        'title': None,
+    } == app_client.get(
+        '/dummy_tables/blah.json', gather_request=False
+    ).json
+
+def test_table_shape_arrays(app_client):
+    response = app_client.get(
+        '/dummy_tables/table2.json?_shape=arrays',
+        gather_request=False
+    )
+    assert [
+        [1, 100, 120, 130],
+        [2, 200, 220, 230],
+    ] == response.json['rows']
+
+def test_table_shape_objects(app_client):
+    response = app_client.get(
+        '/dummy_tables/table2.json?_shape=objects',
+        gather_request=False
+    )
+    assert [{
+        'rowid': 1,
+        'c1': 100,
+        'c2': 120,
+        'c3': 130,
+    }, {
+        'rowid': 2,
+        'c1': 200,
+        'c2': 220,
+        'c3': 230,
+    }] == response.json['rows']
+
+def test_table_shape_array(app_client):
+    response = app_client.get(
+        '/dummy_tables/table2.json?_shape=array',
+        gather_request=False
+    )
+    assert [{
+        'rowid': 1,
+        'c1': 100,
+        'c2': 120,
+        'c3': 130,
+    }, {
+        'rowid': 2,
+        'c1': 200,
+        'c2': 220,
+        'c3': 230,
+    }] == response.json
+
+def test_table_shape_invalid(app_client):
+    response = app_client.get(
+        '/dummy_tables/table2.json?_shape=invalid',
+        gather_request=False
+    )
+    assert {
+        'ok': False,
+        'error': 'Invalid _shape: invalid',
+        'status': 400,
+        'title': None,
+    } == response.json
diff --git a/tests/test_html.py b/tests/test_html.py
new file mode 100644 (file)
index 0000000..e604694
--- /dev/null
@@ -0,0 +1,16 @@
+from .fixtures import app_client
+
+def test_homepage(app_client):
+    response = app_client.get('/', gather_request=False)
+    assert response.status == 200
+    assert 'dummy_tables' in response.text
+
+def test_database_page(app_client):
+    response = app_client.get('/dummy_tables', allow_redirects=False, gather_request=False)
+    assert response.status == 302
+    response = app_client.get('/dummy_tables', gather_request=False)
+    assert 'dummy_tables' in response.text
+
+def test_table(app_client):
+    response = app_client.get('/dummy_tables/table2', gather_request=False)
+    assert response.status == 200