684689db7cd46a77f80e90f6f936beb39709513b
[datasette-pytables.git] / tests / test_api.py
1 from .fixtures import app_client
2 import pytest
3 from urllib.parse import urlencode
4
5 def test_homepage(app_client):
6     response = app_client.get('/.json')
7     assert response.status == 200
8     assert response.json.keys() == {'test_tables': 0}.keys()
9     d = response.json['test_tables']
10     assert d['name'] == 'test_tables'
11     assert d['tables_count'] == 5
12
13 def test_database_page(app_client):
14     response = app_client.get('/test_tables.json')
15     data = response.json
16     assert 'test_tables' == data['database']
17     assert [{
18         'name': '/array1',
19         'columns': ['value'],
20         'primary_keys': [],
21         'count': 2,
22         'hidden': False,
23         'fts_table': None,
24         'foreign_keys': {'incoming': [], 'outgoing': []},
25         'private': False,
26     }, {
27         'name': '/group1/array2',
28         'columns': ['value'],
29         'primary_keys': [],
30         'count': 10000,
31         'hidden': False,
32         'fts_table': None,
33         'foreign_keys': {'incoming': [], 'outgoing': []},
34         'private': False,
35     }, {
36         'name': '/group1/table1',
37         'columns': ['identity', 'idnumber', 'speed'],
38         'primary_keys': [],
39         'count': 10000,
40         'hidden': False,
41         'fts_table': None,
42         'foreign_keys': {'incoming': [], 'outgoing': []},
43         'private': False,
44     }, {
45         'name': '/group2/multi',
46         'columns': ['value'],
47         'primary_keys': [],
48         'count': 10,
49         'hidden': False,
50         'fts_table': None,
51         'foreign_keys': {'incoming': [], 'outgoing': []},
52         'private': False,
53     }, {
54         'name': '/group2/table2',
55         'columns': ['identity', 'idnumber', 'speed'],
56         'primary_keys': [],
57         'count': 10000,
58         'hidden': False,
59         'fts_table': None,
60         'foreign_keys': {'incoming': [], 'outgoing': []},
61         'private': False,
62     }] == data['tables']
63
64 def test_custom_sql(app_client):
65     response = app_client.get(
66         '/test_tables.json?' + urlencode({
67             'sql': 'select identity from [/group1/table1]',
68             '_shape': 'objects'
69         }),
70     )
71     data = response.json
72     assert {
73         'sql': 'select identity from [/group1/table1]',
74         'params': {}
75     } == data['query']
76     assert 1000 == len(data['rows'])
77     assert [
78         {'identity': 'This is particle:  0'},
79         {'identity': 'This is particle:  1'},
80         {'identity': 'This is particle:  2'},
81         {'identity': 'This is particle:  3'}
82     ] == data['rows'][:4]
83     assert ['identity'] == data['columns']
84     assert 'test_tables' == data['database']
85     assert data['truncated']
86
87 def test_custom_complex_sql(app_client):
88     response = app_client.get(
89         '/test_tables.json?' + urlencode({
90             'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
91             '_shape': 'objects'
92         }),
93         gather_request=False
94     )
95     data = response.json
96     assert {
97         'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
98         'params': {}
99     } == data['query']
100     assert 4 == len(data['rows'])
101     assert [
102         {'identity': 'This is particle: 51'},
103         {'identity': 'This is particle: 52'},
104         {'identity': 'This is particle: 53'},
105         {'identity': 'This is particle: 54'}
106     ] == data['rows']
107     assert ['identity'] == data['columns']
108     assert 'test_tables' == data['database']
109     assert not data['truncated']
110
111 def test_custom_pytables_sql(app_client):
112     response = app_client.get(
113         '/test_tables.json?' + urlencode({
114             'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
115             '_shape': 'objects'
116             }),
117         gather_request=False
118     )
119     data = response.json
120     assert {
121         'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
122         'params': {}
123     } == data['query']
124     assert 199 == len(data['rows'])
125     assert [
126         {'identity': 'This is particle: 51'},
127         {'identity': 'This is particle: 52'},
128         {'identity': 'This is particle: 53'}
129     ] == data['rows'][:3]
130     assert ['identity'] == data['columns']
131     assert 'test_tables' == data['database']
132     assert not data['truncated']
133
134 def test_invalid_custom_sql(app_client):
135     response = app_client.get(
136         '/test_tables.json?sql=.schema',
137         gather_request=False
138     )
139     assert response.status == 400
140     assert response.json['ok'] is False
141     assert 'Statement must be a SELECT' == response.json['error']
142
143 def test_table_json(app_client):
144     response = app_client.get(
145         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
146         gather_request=False
147     )
148     assert response.status == 200
149     data = response.json
150     assert data['query']['sql'] == 'select rowid, * from [/group2/table2] order by rowid limit 51'
151     assert data['rows'][3:6] == [{
152         'rowid': 3,
153         'identity': 'This is particle:  3',
154         'idnumber': 3,
155         'speed': 6.0
156     }, {
157         'rowid': 4,
158         'identity': 'This is particle:  4',
159         'idnumber': 4,
160         'speed': 8.0
161     }, {
162         'rowid': 5,
163         'identity': 'This is particle:  5',
164         'idnumber': 5,
165         'speed': 10.0
166     }]
167
168 def test_table_not_exists_json(app_client):
169     assert {
170         'ok': False,
171         'error': 'Table not found: blah',
172         'status': 404,
173         'title': None,
174     } == app_client.get(
175         '/test_tables/blah.json', gather_request=False
176     ).json
177
178 def test_table_shape_arrays(app_client):
179     response = app_client.get(
180         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=arrays',
181         gather_request=False
182     )
183     assert [
184         [6, 'This is particle:  6', 6, 12.0],
185         [7, 'This is particle:  7', 7, 14.0],
186     ] == response.json['rows'][6:8]
187
188 def test_table_shape_objects(app_client):
189     response = app_client.get(
190         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
191         gather_request=False
192     )
193     assert [{
194         'rowid': 6,
195         'identity': 'This is particle:  6',
196         'idnumber': 6,
197         'speed': 12.0,
198     }, {
199         'rowid': 7,
200         'identity': 'This is particle:  7',
201         'idnumber': 7,
202         'speed': 14.0,
203     }] == response.json['rows'][6:8]
204
205 def test_table_shape_array(app_client):
206     response = app_client.get(
207         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=array',
208         gather_request=False
209     )
210     assert [{
211         'rowid': 6,
212         'identity': 'This is particle:  6',
213         'idnumber': 6,
214         'speed': 12.0,
215     }, {
216         'rowid': 7,
217         'identity': 'This is particle:  7',
218         'idnumber': 7,
219         'speed': 14.0,
220     }] == response.json[6:8]
221
222 def test_table_shape_invalid(app_client):
223     response = app_client.get(
224         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=invalid',
225         gather_request=False
226     )
227     assert {
228         'ok': False,
229         'error': 'Invalid _shape: invalid',
230         'status': 400,
231         'title': None,
232     } == response.json
233
234 @pytest.mark.parametrize('path, expected_rows, expected_pages', [
235     ('/test_tables/%2Farray1.json', 2, 1),
236     ('/test_tables/%2Farray1.json?_size=1', 2, 2),
237     ('/test_tables/%2Fgroup1%2Farray2.json?_size=1000', 10000, 10),
238     ('/test_tables/%2Fgroup2%2Fmulti.json?_size=5', 10, 2),
239 ])
240 def test_paginate_tables_and_arrays(app_client, path, expected_rows, expected_pages):
241     fetched = []
242     count = 0
243     while path:
244         response = app_client.get(path, gather_request=False)
245         print("*****", response.json)
246         assert 200 == response.status
247         count += 1
248         fetched.extend(response.json['rows'])
249         path = response.json['next_url']
250         if path:
251             assert response.json['next']
252             assert '_next={}'.format(response.json['next']) in path
253
254     assert expected_rows == len(fetched)
255     assert expected_pages == count