from moz_sql_parser import parse
import re
+
import tables
+import datasette_connectors as dc
+
+
+class PyTablesConnection(dc.Connection):
+ def __init__(self, path, connector):
+ super().__init__(path, connector)
+ self.h5file = tables.open_file(path)
+
+
+class PyTablesConnector(dc.Connector):
+ connector_type = 'pytables'
+ connection_class = PyTablesConnection
+
+ def table_names(self):
+ return [
+ node._v_pathname
+ for node in self.conn.h5file
+ if not(isinstance(node, tables.group.Group))
+ ]
+
+ def table_count(self, table_name):
+ table = self.conn.h5file.get_node(table_name)
+ return int(table.nrows)
+
+ def table_info(self, table_name):
+ table = self.conn.h5file.get_node(table_name)
+ colnames = ['value']
+ if isinstance(table, tables.table.Table):
+ colnames = table.colnames
+
+ return [
+ {
+ 'idx': idx,
+ 'name': colname,
+ 'primary_key': False,
+ }
+ for idx, colname in enumerate(colnames)
+ ]
+
+ def hidden_table_names(self):
+ return []
+
+ def detect_spatialite(self):
+ return False
+
+ def view_names(self):
+ return []
+
+ def detect_fts(self, table_name):
+ return False
-_connector_type = 'pytables'
def inspect(path):
"Open file and return tables info"
def _parse_sql(sql, params):
# Table name
- sql = re.sub('(?i)from \[(.*)]', 'from "\g<1>"', sql)
+ sql = re.sub(r'(?i)from \[(.*)]', r'from "\g<1>"', sql)
# Params
for param in params:
sql = sql.replace(":" + param, param)
for token in ['group by', 'order by', 'limit', '']:
res = re.search('(?i)where (.*)' + token, sql)
if res:
- modified_sql = re.sub('(?i)where (.*)(' + token + ')', '\g<2>', sql)
+ modified_sql = re.sub('(?i)where (.*)(' + token + ')', r'\g<2>', sql)
parsed = parse(modified_sql)
parsed['where'] = res.group(1).strip()
break
],
},
install_requires=[
- 'datasette-connectors',
+ 'datasette-connectors>=2.0.0',
'tables',
'moz-sql-parser==1.3.18033',
'mo-future==1.6.18072'
from datasette_connectors import monkey; monkey.patch_datasette()
-from datasette_connectors import connectors; connectors.load()
+from datasette_connectors.connectors import ConnectorList
+from datasette_pytables import PyTablesConnector
+ConnectorList.add_connector('pytables', PyTablesConnector)
from datasette.app import Datasette
+from datasette.utils.testing import TestClient
import numpy as np
import os
import pytest
from tables import *
import tempfile
+import contextlib
-@pytest.fixture(scope='session')
-def app_client(max_returned_rows=None):
- with tempfile.TemporaryDirectory() as tmpdir:
- filepath = os.path.join(tmpdir, 'test_tables.h5')
- populate_file(filepath)
- ds = Datasette(
- [filepath],
- config={
- 'default_page_size': 50,
- 'max_returned_rows': max_returned_rows or 1000,
- }
- )
- client = ds.app().test_client
- client.ds = ds
- yield client
def populate_file(filepath):
class Particle(IsDescription):
table.flush()
h5file.close()
+
+
+@contextlib.contextmanager
+def make_app_client(
+ max_returned_rows=None,
+ config=None,
+ is_immutable=False,
+):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ filepath = os.path.join(tmpdir, 'test_tables.h5')
+ populate_file(filepath)
+ if is_immutable:
+ files = []
+ immutables = [filepath]
+ else:
+ files = [filepath]
+ immutables = []
+ config = config or {}
+ config.update({
+ 'default_page_size': 50,
+ 'max_returned_rows': max_returned_rows or 1000,
+ })
+ ds = Datasette(
+ files,
+ immutables=immutables,
+ config=config,
+ )
+ client = TestClient(ds.app())
+ client.ds = ds
+ yield client
+
+
+@pytest.fixture(scope='session')
+def app_client():
+ with make_app_client() as client:
+ yield client
+
+
+@pytest.fixture(scope='session')
+def app_client_with_hash():
+ with make_app_client(config={"hash_urls": True}, is_immutable=True) as client:
+ yield client
from urllib.parse import urlencode
def test_homepage(app_client):
- _, response = app_client.get('/.json')
+ response = app_client.get('/.json')
assert response.status == 200
assert response.json.keys() == {'test_tables': 0}.keys()
d = response.json['test_tables']