query = parsed_sql['where']
# Limit number of rows
+ limit = None
if 'limit' in parsed_sql:
- max_rows = int(parsed_sql['limit'])
- if end - start > max_rows:
- end = start + max_rows
+ limit = int(parsed_sql['limit'])
# Truncate if needed
if page_size and max_returned_rows and truncate:
if max_returned_rows == page_size:
max_returned_rows += 1
- if end - start > max_returned_rows:
- end = start + max_returned_rows
- truncated = True
# Execute query
if query:
rows.append(Row({'count(*)': int(table.nrows)}))
else:
if type(table) is tables.table.Table:
+ count = 0
for table_row in table_rows:
+ count += 1
+ if limit and count > limit:
+ break
+ if truncate and max_returned_rows and count > max_returned_rows:
+ truncated = True
+ break
row = Row()
for field in fields:
field_name = field['value']
else:
# Any kind of array
rowid = start - 1
+ count = 0
for table_row in table_rows:
+ count += 1
+ if limit and count > limit:
+ break
+ if truncate and max_returned_rows and count > max_returned_rows:
+ truncated = True
+ break
row = Row()
rowid += 1
for field in fields:
}] == data['tables']
def test_custom_sql(app_client):
+ response = app_client.get(
+ '/test_tables.json?' + urlencode({
+ 'sql': 'select identity from [/group1/table1]',
+ '_shape': 'objects'
+ }),
+ gather_request=False
+ )
+ data = response.json
+ assert {
+ 'sql': 'select identity from [/group1/table1]',
+ 'params': {}
+ } == data['query']
+ assert 1000 == len(data['rows'])
+ assert [
+ {'identity': 'This is particle: 0'},
+ {'identity': 'This is particle: 1'},
+ {'identity': 'This is particle: 2'},
+ {'identity': 'This is particle: 3'}
+ ] == data['rows'][:4]
+ assert ['identity'] == data['columns']
+ assert 'test_tables' == data['database']
+ assert data['truncated']
+
+def test_custom_complex_sql(app_client):
response = app_client.get(
'/test_tables.json?' + urlencode({
'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
gather_request=False
)
data = response.json
- print("*************************", data)
assert {
'sql': 'select identity from [/group1/table1] where speed > 100 and idnumber < 55',
'params': {}
] == data['rows']
assert ['identity'] == data['columns']
assert 'test_tables' == data['database']
- assert False == data['truncated']
+ assert not data['truncated']
def test_custom_pytables_sql(app_client):
response = app_client.get(
] == data['rows'][:3]
assert ['identity'] == data['columns']
assert 'test_tables' == data['database']
- assert data['truncated']
+ assert not data['truncated']
def test_invalid_custom_sql(app_client):
response = app_client.get(