97e07dc560cb4202f927c63b33b8990753c2ca0e
[datasette-pytables.git] / tests / test_api.py
1 from .fixtures import app_client
2 import pytest
3 from urllib.parse import urlencode
4
5 pytest.fixture(scope='module')(app_client)
6
7 def test_homepage(app_client):
8     _, response = app_client.get('/.json')
9     assert response.status == 200
10     assert response.json.keys() == {'test_tables': 0}.keys()
11     d = response.json['test_tables']
12     assert d['name'] == 'test_tables'
13     assert d['tables_count'] == 4
14
15 def test_database_page(app_client):
16     response = app_client.get('/test_tables.json', gather_request=False)
17     data = response.json
18     assert 'test_tables' == data['database']
19     assert [{
20         'name': '/array1',
21         'columns': [],
22         'primary_keys': [],
23         'count': 2,
24         'label_column': None,
25         'hidden': False,
26         'fts_table': None,
27         'foreign_keys': {'incoming': [], 'outgoing': []}
28     }, {
29         'name': '/group1/array2',
30         'columns': [],
31         'primary_keys': [],
32         'count': 10000,
33         'label_column': None,
34         'hidden': False,
35         'fts_table': None,
36         'foreign_keys': {'incoming': [], 'outgoing': []}
37     }, {
38         'name': '/group1/table1',
39         'columns': ['identity', 'idnumber', 'speed'],
40         'primary_keys': [],
41         'count': 10000,
42         'label_column': None,
43         'hidden': False,
44         'fts_table': None,
45         'foreign_keys': {'incoming': [], 'outgoing': []}
46     }, {
47         'name': '/group2/table2',
48         'columns': ['identity', 'idnumber', 'speed'],
49         'primary_keys': [],
50         'count': 10000,
51         'label_column': None,
52         'hidden': False,
53         'fts_table': None,
54         'foreign_keys': {'incoming': [], 'outgoing': []}
55     }] == data['tables']
56
57 def test_custom_sql(app_client):
58     response = app_client.get(
59         '/test_tables.json?' + urlencode({
60             'sql': 'select identity from [/group1/table1]',
61             '_shape': 'objects'
62         }),
63         gather_request=False
64     )
65     data = response.json
66     assert {
67         'sql': 'select identity from [/group1/table1]',
68         'params': {}
69     } == data['query']
70     assert 1000 == len(data['rows'])
71     assert [
72         {'identity': 'This is particle:  0'},
73         {'identity': 'This is particle:  1'},
74         {'identity': 'This is particle:  2'},
75         {'identity': 'This is particle:  3'}
76     ] == data['rows'][:4]
77     assert ['identity'] == data['columns']
78     assert 'test_tables' == data['database']
79     assert data['truncated']
80
81 def test_custom_complex_sql(app_client):
82     response = app_client.get(
83         '/test_tables.json?' + urlencode({
84             'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
85             '_shape': 'objects'
86         }),
87         gather_request=False
88     )
89     data = response.json
90     assert {
91         'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
92         'params': {}
93     } == data['query']
94     assert 4 == len(data['rows'])
95     assert [
96         {'identity': 'This is particle: 51'},
97         {'identity': 'This is particle: 52'},
98         {'identity': 'This is particle: 53'},
99         {'identity': 'This is particle: 54'}
100     ] == data['rows']
101     assert ['identity'] == data['columns']
102     assert 'test_tables' == data['database']
103     assert not data['truncated']
104
105 def test_custom_pytables_sql(app_client):
106     response = app_client.get(
107         '/test_tables.json?' + urlencode({
108             'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
109             '_shape': 'objects'
110             }),
111         gather_request=False
112     )
113     data = response.json
114     assert {
115         'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
116         'params': {}
117     } == data['query']
118     assert 199 == len(data['rows'])
119     assert [
120         {'identity': 'This is particle: 51'},
121         {'identity': 'This is particle: 52'},
122         {'identity': 'This is particle: 53'}
123     ] == data['rows'][:3]
124     assert ['identity'] == data['columns']
125     assert 'test_tables' == data['database']
126     assert not data['truncated']
127
128 def test_invalid_custom_sql(app_client):
129     response = app_client.get(
130         '/test_tables.json?sql=.schema',
131         gather_request=False
132     )
133     assert response.status == 400
134     assert response.json['ok'] is False
135     assert 'Statement must be a SELECT' == response.json['error']
136
137 def test_table_json(app_client):
138     response = app_client.get(
139         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
140         gather_request=False
141     )
142     assert response.status == 200
143     data = response.json
144     assert data['query']['sql'] == 'select rowid, * from [/group2/table2] order by rowid limit 51'
145     assert data['rows'][3:6] == [{
146         'rowid': 3,
147         'identity': 'This is particle:  3',
148         'idnumber': 3,
149         'speed': 6.0
150     }, {
151         'rowid': 4,
152         'identity': 'This is particle:  4',
153         'idnumber': 4,
154         'speed': 8.0
155     }, {
156         'rowid': 5,
157         'identity': 'This is particle:  5',
158         'idnumber': 5,
159         'speed': 10.0
160     }]
161
162 def test_table_not_exists_json(app_client):
163     assert {
164         'ok': False,
165         'error': 'Table not found: blah',
166         'status': 404,
167         'title': None,
168     } == app_client.get(
169         '/test_tables/blah.json', gather_request=False
170     ).json
171
172 def test_table_shape_arrays(app_client):
173     response = app_client.get(
174         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=arrays',
175         gather_request=False
176     )
177     assert [
178         [6, 'This is particle:  6', 6, 12.0],
179         [7, 'This is particle:  7', 7, 14.0],
180     ] == response.json['rows'][6:8]
181
182 def test_table_shape_objects(app_client):
183     response = app_client.get(
184         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
185         gather_request=False
186     )
187     assert [{
188         'rowid': 6,
189         'identity': 'This is particle:  6',
190         'idnumber': 6,
191         'speed': 12.0,
192     }, {
193         'rowid': 7,
194         'identity': 'This is particle:  7',
195         'idnumber': 7,
196         'speed': 14.0,
197     }] == response.json['rows'][6:8]
198
199 def test_table_shape_array(app_client):
200     response = app_client.get(
201         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=array',
202         gather_request=False
203     )
204     assert [{
205         'rowid': 6,
206         'identity': 'This is particle:  6',
207         'idnumber': 6,
208         'speed': 12.0,
209     }, {
210         'rowid': 7,
211         'identity': 'This is particle:  7',
212         'idnumber': 7,
213         'speed': 14.0,
214     }] == response.json[6:8]
215
216 def test_table_shape_invalid(app_client):
217     response = app_client.get(
218         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=invalid',
219         gather_request=False
220     )
221     assert {
222         'ok': False,
223         'error': 'Invalid _shape: invalid',
224         'status': 400,
225         'title': None,
226     } == response.json
227
228 @pytest.mark.parametrize('path, expected_rows, expected_pages', [
229     ('/test_tables/%2Farray1.json', 2, 1),
230     ('/test_tables/%2Farray1.json?_size=1', 2, 2),
231     ('/test_tables/%2Fgroup1%2Farray2.json?_size=1000', 10000, 10),
232 ])
233 def test_paginate_tables_and_arrays(app_client, path, expected_rows, expected_pages):
234     fetched = []
235     count = 0
236     while path:
237         response = app_client.get(path, gather_request=False)
238         print("*****", response.json)
239         assert 200 == response.status
240         count += 1
241         fetched.extend(response.json['rows'])
242         path = response.json['next_url']
243         if path:
244             assert response.json['next']
245             assert '_next={}'.format(response.json['next']) in path
246
247     assert expected_rows == len(fetched)
248     assert expected_pages == count