Show multidimensional arrays as string arrays
[datasette-pytables.git] / tests / test_api.py
1 from .fixtures import app_client
2 import pytest
3 from urllib.parse import urlencode
4
5 pytest.fixture(scope='module')(app_client)
6
7 def test_homepage(app_client):
8     _, response = app_client.get('/.json')
9     assert response.status == 200
10     assert response.json.keys() == {'test_tables': 0}.keys()
11     d = response.json['test_tables']
12     assert d['name'] == 'test_tables'
13     assert d['tables_count'] == 5
14
15 def test_database_page(app_client):
16     response = app_client.get('/test_tables.json', gather_request=False)
17     data = response.json
18     assert 'test_tables' == data['database']
19     assert [{
20         'name': '/array1',
21         'columns': [],
22         'primary_keys': [],
23         'count': 2,
24         'label_column': None,
25         'hidden': False,
26         'fts_table': None,
27         'foreign_keys': {'incoming': [], 'outgoing': []}
28     }, {
29         'name': '/group1/array2',
30         'columns': [],
31         'primary_keys': [],
32         'count': 10000,
33         'label_column': None,
34         'hidden': False,
35         'fts_table': None,
36         'foreign_keys': {'incoming': [], 'outgoing': []}
37     }, {
38         'name': '/group1/table1',
39         'columns': ['identity', 'idnumber', 'speed'],
40         'primary_keys': [],
41         'count': 10000,
42         'label_column': None,
43         'hidden': False,
44         'fts_table': None,
45         'foreign_keys': {'incoming': [], 'outgoing': []}
46     }, {
47         'name': '/group2/multi',
48         'columns': [],
49         'primary_keys': [],
50         'count': 10,
51         'label_column': None,
52         'hidden': False,
53         'fts_table': None,
54         'foreign_keys': {'incoming': [], 'outgoing': []}
55     }, {
56         'name': '/group2/table2',
57         'columns': ['identity', 'idnumber', 'speed'],
58         'primary_keys': [],
59         'count': 10000,
60         'label_column': None,
61         'hidden': False,
62         'fts_table': None,
63         'foreign_keys': {'incoming': [], 'outgoing': []}
64     }] == data['tables']
65
66 def test_custom_sql(app_client):
67     response = app_client.get(
68         '/test_tables.json?' + urlencode({
69             'sql': 'select identity from [/group1/table1]',
70             '_shape': 'objects'
71         }),
72         gather_request=False
73     )
74     data = response.json
75     assert {
76         'sql': 'select identity from [/group1/table1]',
77         'params': {}
78     } == data['query']
79     assert 1000 == len(data['rows'])
80     assert [
81         {'identity': 'This is particle:  0'},
82         {'identity': 'This is particle:  1'},
83         {'identity': 'This is particle:  2'},
84         {'identity': 'This is particle:  3'}
85     ] == data['rows'][:4]
86     assert ['identity'] == data['columns']
87     assert 'test_tables' == data['database']
88     assert data['truncated']
89
90 def test_custom_complex_sql(app_client):
91     response = app_client.get(
92         '/test_tables.json?' + urlencode({
93             'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
94             '_shape': 'objects'
95         }),
96         gather_request=False
97     )
98     data = response.json
99     assert {
100         'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
101         'params': {}
102     } == data['query']
103     assert 4 == len(data['rows'])
104     assert [
105         {'identity': 'This is particle: 51'},
106         {'identity': 'This is particle: 52'},
107         {'identity': 'This is particle: 53'},
108         {'identity': 'This is particle: 54'}
109     ] == data['rows']
110     assert ['identity'] == data['columns']
111     assert 'test_tables' == data['database']
112     assert not data['truncated']
113
114 def test_custom_pytables_sql(app_client):
115     response = app_client.get(
116         '/test_tables.json?' + urlencode({
117             'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
118             '_shape': 'objects'
119             }),
120         gather_request=False
121     )
122     data = response.json
123     assert {
124         'sql': 'select identity from [/group1/table1] where (speed > 100) & (speed < 500)',
125         'params': {}
126     } == data['query']
127     assert 199 == len(data['rows'])
128     assert [
129         {'identity': 'This is particle: 51'},
130         {'identity': 'This is particle: 52'},
131         {'identity': 'This is particle: 53'}
132     ] == data['rows'][:3]
133     assert ['identity'] == data['columns']
134     assert 'test_tables' == data['database']
135     assert not data['truncated']
136
137 def test_invalid_custom_sql(app_client):
138     response = app_client.get(
139         '/test_tables.json?sql=.schema',
140         gather_request=False
141     )
142     assert response.status == 400
143     assert response.json['ok'] is False
144     assert 'Statement must be a SELECT' == response.json['error']
145
146 def test_table_json(app_client):
147     response = app_client.get(
148         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
149         gather_request=False
150     )
151     assert response.status == 200
152     data = response.json
153     assert data['query']['sql'] == 'select rowid, * from [/group2/table2] order by rowid limit 51'
154     assert data['rows'][3:6] == [{
155         'rowid': 3,
156         'identity': 'This is particle:  3',
157         'idnumber': 3,
158         'speed': 6.0
159     }, {
160         'rowid': 4,
161         'identity': 'This is particle:  4',
162         'idnumber': 4,
163         'speed': 8.0
164     }, {
165         'rowid': 5,
166         'identity': 'This is particle:  5',
167         'idnumber': 5,
168         'speed': 10.0
169     }]
170
171 def test_table_not_exists_json(app_client):
172     assert {
173         'ok': False,
174         'error': 'Table not found: blah',
175         'status': 404,
176         'title': None,
177     } == app_client.get(
178         '/test_tables/blah.json', gather_request=False
179     ).json
180
181 def test_table_shape_arrays(app_client):
182     response = app_client.get(
183         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=arrays',
184         gather_request=False
185     )
186     assert [
187         [6, 'This is particle:  6', 6, 12.0],
188         [7, 'This is particle:  7', 7, 14.0],
189     ] == response.json['rows'][6:8]
190
191 def test_table_shape_objects(app_client):
192     response = app_client.get(
193         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=objects',
194         gather_request=False
195     )
196     assert [{
197         'rowid': 6,
198         'identity': 'This is particle:  6',
199         'idnumber': 6,
200         'speed': 12.0,
201     }, {
202         'rowid': 7,
203         'identity': 'This is particle:  7',
204         'idnumber': 7,
205         'speed': 14.0,
206     }] == response.json['rows'][6:8]
207
208 def test_table_shape_array(app_client):
209     response = app_client.get(
210         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=array',
211         gather_request=False
212     )
213     assert [{
214         'rowid': 6,
215         'identity': 'This is particle:  6',
216         'idnumber': 6,
217         'speed': 12.0,
218     }, {
219         'rowid': 7,
220         'identity': 'This is particle:  7',
221         'idnumber': 7,
222         'speed': 14.0,
223     }] == response.json[6:8]
224
225 def test_table_shape_invalid(app_client):
226     response = app_client.get(
227         '/test_tables/%2Fgroup2%2Ftable2.json?_shape=invalid',
228         gather_request=False
229     )
230     assert {
231         'ok': False,
232         'error': 'Invalid _shape: invalid',
233         'status': 400,
234         'title': None,
235     } == response.json
236
237 @pytest.mark.parametrize('path, expected_rows, expected_pages', [
238     ('/test_tables/%2Farray1.json', 2, 1),
239     ('/test_tables/%2Farray1.json?_size=1', 2, 2),
240     ('/test_tables/%2Fgroup1%2Farray2.json?_size=1000', 10000, 10),
241     ('/test_tables/%2Fgroup2%2Fmulti.json?_size=5', 10, 2),
242 ])
243 def test_paginate_tables_and_arrays(app_client, path, expected_rows, expected_pages):
244     fetched = []
245     count = 0
246     while path:
247         response = app_client.get(path, gather_request=False)
248         print("*****", response.json)
249         assert 200 == response.status
250         count += 1
251         fetched.extend(response.json['rows'])
252         path = response.json['next_url']
253         if path:
254             assert response.json['next']
255             assert '_next={}'.format(response.json['next']) in path
256
257     assert expected_rows == len(fetched)
258     assert expected_pages == count