Skip to content

Commit

Permalink
Merge pull request #128 from BritishGeologicalSurvey/column-types
Browse files Browse the repository at this point in the history
Make information on column types accessible
  • Loading branch information
ximenesuk authored May 23, 2022
2 parents 94b2f28 + 2e93885 commit e8ae62c
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 35 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ For a high level introduction to `etlhelper`, see the FOSS4GUK 2019 presentation
+ [Installation](#installation)
+ [Connect to databases](#connect-to-databases)
+ [Transfer data](#transfer-data)
+ [Utilities](#utilities)
+ [Recipes](#recipes)
+ [Development](#development)
+ [References](#references)
Expand Down Expand Up @@ -559,6 +560,39 @@ a list. Data transformation can then be performed via [memory-efficient
iterator-chains](https://dbader.org/blog/python-iterator-chains).


## Utilities

The following utility functions provide useful database metadata.


### Table info


The `table_info` function provides basic metadata for a table. An optional schema
can be used. Note that for `sqlite` the schema value is currently ignored.

```python
from etlhelper.utils import table_info

with ORACLEDB.connect("ORA_PASSWORD") as conn:
columns = table_info('my_table', conn, schema='my_schema')
```

The returned value is a list of named tuples of four values. Each tuple represents
one column in the table, giving its name, type, if it has a NOT NULL constraint
and if is has a DEFAULT value constraint. For example,

```python
[
Column(name='ID', type='NUMBER', not_null=1, has_default=0),
Column(name='VALUE', type='VARCHAR2', not_null=0, has_default=1),
]
```

the ID column is of type NUMBER and has a NOT NULL constraint but not a DEFAULT value,
while the VALUE column is of type VARCHAR2, can be NULL but does have a DEFAULT value.


## Recipes

The following recipes demonstrate how `etlhelper` can be used.
Expand Down
3 changes: 3 additions & 0 deletions etlhelper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
get_connection_string,
get_sqlalchemy_connection_string,
)
from etlhelper.utils import (
table_info,
)

from . import _version
__version__ = _version.get_versions()['version']
Expand Down
3 changes: 2 additions & 1 deletion etlhelper/db_helpers/db_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class DbHelper(metaclass=ABCMeta):
"""
sql_exceptions = None
connect_exceptions = None
table_info_query = None
# The following are used to help create parameterized queries. Although
# paramstyle is require by DBAPI2, most drivers support both a named and
# paramstyle is required by DBAPI2, most drivers support both a named and
# positional style.
paramstyle = None
named_paramstyle = None
Expand Down
12 changes: 12 additions & 0 deletions etlhelper/db_helpers/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@
Database helper for mssql
"""
import warnings
from textwrap import dedent
from etlhelper.db_helpers.db_helper import DbHelper


class MSSQLDbHelper(DbHelper):
"""
MS Sql server helper class
"""
table_info_query = dedent("""
SELECT
column_name as name,
data_type as type,
(case when is_nullable = 'NO' then 1 else 0 end) as not_null,
(case when column_default is not null then 1 else 0 end) as has_default
FROM INFORMATION_SCHEMA.COLUMNS
WHERE LOWER(table_name) = LOWER(?)
AND LOWER(table_schema) LIKE COALESCE(LOWER(?), '%%')
""").strip()

def __init__(self):
super().__init__()
self.required_params = {'host', 'port', 'dbname', 'user', 'odbc_driver'}
Expand Down
12 changes: 12 additions & 0 deletions etlhelper/db_helpers/oracle.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Database helper for Oracle
"""
from textwrap import dedent
import warnings
from etlhelper.db_helpers.db_helper import DbHelper

Expand All @@ -9,6 +10,17 @@ class OracleDbHelper(DbHelper):
"""
Oracle DB helper class
"""
table_info_query = dedent("""
SELECT
column_name as name,
data_type as type,
(case when nullable = 'N' then 1 else 0 end) as not_null,
(case when data_default is not null then 1 else 0 end) as has_default
FROM all_tab_columns
WHERE LOWER(table_name) = LOWER(:1)
AND REGEXP_LIKE(LOWER(owner), '^' || COALESCE(LOWER(:2), '.*') || '$')
""").strip()

def __init__(self):
super().__init__()
self.required_params = {'host', 'port', 'dbname', 'user'}
Expand Down
22 changes: 22 additions & 0 deletions etlhelper/db_helpers/postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Database helper for PostgreSQL
"""
from textwrap import dedent
import warnings
from etlhelper.db_helpers.db_helper import DbHelper

Expand All @@ -9,6 +10,27 @@ class PostgresDbHelper(DbHelper):
"""
Postgres db helper class
"""
table_info_query = dedent("""
SELECT
pg_attribute.attname AS name,
pg_catalog.format_type(pg_attribute.atttypid, pg_attribute.atttypmod) AS type,
(case when pg_attribute.attnotnull then 1 else 0 end) as not_null,
(case when pg_attribute.atthasdef then 1 else 0 end) as has_default
FROM
pg_catalog.pg_attribute
INNER JOIN
pg_catalog.pg_class ON pg_class.oid = pg_attribute.attrelid
INNER JOIN
pg_catalog.pg_namespace ON pg_namespace.oid = pg_class.relnamespace
WHERE
pg_attribute.attnum > 0
AND NOT pg_attribute.attisdropped
AND pg_class.relname = %s
AND pg_namespace.nspname ~ COALESCE(%s, '.*')
ORDER BY
attnum ASC;
""").strip()

def __init__(self):
super().__init__()
self.required_params = {'host', 'port', 'dbname', 'user'}
Expand Down
16 changes: 16 additions & 0 deletions etlhelper/db_helpers/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Database helper for SQLite
"""
from contextlib import contextmanager
from textwrap import dedent
import warnings
from etlhelper.db_helpers.db_helper import DbHelper

Expand All @@ -10,6 +11,21 @@ class SQLiteDbHelper(DbHelper):
"""
SQLite DB helper class
"""
# schema_name is not used for SQLite but is required as parameter to be
# consistent with other databases. The WHERE clause is always true,
# whether schema_name is NULL or not.
table_info_query = dedent("""
SELECT
name,
type,
"notnull" as not_null,
(case when dflt_value is not null then 1 else 0 end) as has_default
FROM pragma_table_info(:table_name)
-- this effectively ignores the unused schema_name
-- parameter since schemas are not used in sqlite
WHERE COALESCE(TRUE, :schema_name)
;""").strip()

def __init__(self):
super().__init__()
self.required_params = {'filename'}
Expand Down
39 changes: 39 additions & 0 deletions etlhelper/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
Utility functions to help with tasks such as programatically generating SQL queries.
"""
from collections import namedtuple

from etlhelper import fetchall
from etlhelper.exceptions import ETLHelperQueryError
from etlhelper.db_helper_factory import DB_HELPER_FACTORY

Column = namedtuple('Column', ['name', 'type', 'not_null', 'has_default'])


def table_info(table, conn, schema=None):
"""
Return basic metadata for each of the columns of 'table' on 'conn'.
:param table: str, the table to describe
:param conn: dbapi connection
:param schema: str, optional name of schema for table
:returns columns: list, tuples of (name, type, not_null, has_default)
"""
helper = DB_HELPER_FACTORY.from_conn(conn)

params = (table, schema)
result = fetchall(helper.table_info_query, conn, parameters=params)
columns = [Column(*row) for row in result]

if not columns:
schema_table = f"{schema}.{table}" if schema else table
msg = f"Table name '{schema_table}' not found."
raise ETLHelperQueryError(msg)

# If same table exists in another schema, duplicate columns may be returned
if len(columns) > len(set(col.name for col in columns)):
msg = (f"Table name {table} is not unique in database. "
"Please specify the schema.")
raise ETLHelperQueryError(msg)

return columns
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def pgtestdb_test_tables(test_table_data, pgtestdb_conn, pgtestdb_insert_sql):
CREATE TABLE src
(
id integer primary key,
value double precision,
simple_text text,
value double precision not null,
simple_text text default 'default',
utf8_text text,
day date,
date_time timestamp without time zone
Expand Down
58 changes: 54 additions & 4 deletions test/integration/db/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
generate_insert_sql,
load,
)
from etlhelper.utils import table_info, Column
from etlhelper.exceptions import (
ETLHelperConnectionError,
ETLHelperInsertError,
Expand Down Expand Up @@ -144,9 +145,10 @@ def test_copy_table_rows_happy_path_fast_true(
def test_copy_table_rows_on_error(test_tables, testdb_conn, test_table_data):
# Arrange
duplicate_id_row_sql = """
INSERT INTO dest (id)
INSERT INTO dest (id, value)
VALUES (
1
1,
1.234
)""".strip()
execute(duplicate_id_row_sql, testdb_conn)

Expand Down Expand Up @@ -240,6 +242,54 @@ def test_generate_insert_sql_dictionary(testdb_conn):
generate_insert_sql('my_table', data, testdb_conn)


def test_table_info_no_schema_no_duplicates(testdb_conn, test_tables):
# Arrange
expected = [
Column(name='id', type='int', not_null=0, has_default=0),
Column(name='value', type='float', not_null=1, has_default=0),
Column(name='simple_text', type='nvarchar', not_null=0, has_default=1),
Column(name='utf8_text', type='nvarchar', not_null=0, has_default=0),
Column(name='day', type='date', not_null=0, has_default=0),
Column(name='date_time', type='datetime2', not_null=0, has_default=0)
]

# Act
columns = table_info('src', testdb_conn)

# Assert
assert columns == expected


def test_table_info_with_schema_no_duplicates(testdb_conn, test_tables):
# Arrange
expected = [
Column(name='id', type='int', not_null=0, has_default=0),
Column(name='value', type='float', not_null=1, has_default=0),
Column(name='simple_text', type='nvarchar', not_null=0, has_default=1),
Column(name='utf8_text', type='nvarchar', not_null=0, has_default=0),
Column(name='day', type='date', not_null=0, has_default=0),
Column(name='date_time', type='datetime2', not_null=0, has_default=0)
]

# Act
columns = table_info('src', testdb_conn, schema='etlhelper')

# Assert
assert columns == expected


def test_table_info_bad_table_name_no_schema(testdb_conn, test_tables):
# Arrange, act and assert
with pytest.raises(ETLHelperQueryError, match=r"Table name 'bad_table' not found."):
table_info('bad_table', testdb_conn)


def test_table_info_bad_table_name_with_schema(testdb_conn, test_tables):
# Arrange, act and assert
with pytest.raises(ETLHelperQueryError, match=r"Table name 'etlhelper.bad_table' not found."):
table_info('bad_table', testdb_conn, schema='etlhelper')


# -- Fixtures here --

INSERT_SQL = dedent("""
Expand Down Expand Up @@ -297,8 +347,8 @@ def test_tables(test_table_data, testdb_conn):
CREATE TABLE src
(
id integer unique,
value double precision,
simple_text nvarchar(max),
value double precision not null,
simple_text nvarchar(max) default 'default',
utf8_text nvarchar(max),
day date,
date_time datetime2(6)
Expand Down
Loading

0 comments on commit e8ae62c

Please sign in to comment.