Source code for jdbc.iov2.input
"""JDBC Input"""
import datetime
import logging
import typing as T
from functools import lru_cache
import jaydebeapi
import pytz
from dateutil.parser import parse
from mitto.io import SDL
from mitto.iov2.input import BaseInputV2
logger = logging.getLogger(__name__)
# TODO: remove install of java from debian/control
# add pytest skip if no java on system
# pylint: disable=line-too-long
[docs]class Input(BaseInputV2):
"""Input from any database engine with a JDBC interface.
Extended Summary
----------------
If this inputter is used with
:class:`mitto.iov2.steps.builtin.CreateTable`, the types of the
output columns will be determined either by the SDL included in the
job configuration or by Mitto's internal sampling algorithm.
If this inputter is used with :class:`jdbc.iov2.steps.builtin.CreateTable`
and `reflect_sdl` is `true`, the types of the output columns will be
determined by the types returned by the `query`.
Note: JDBC always returns naive timestamps, even if the input column type
is timzone aware. See additional discussion below in the
:ref:`Timestamps <timestamps>` section.
Parameters
----------
driver_class
The fully-qualified path to the driver's Java class.
connection_url
The connection URL to be used by the driver when connecting
to the database. The format of the string is driver-specific.
query
A string or a list of strings that define the SQL query
to be used. If a list of strings is provided, they will be
concatenated into a single string. The query **must**
end with: `"FROM {table_name}`". `{table_name}` will be
replaced with the value of the `table_name` parameter.
table_name
The table name from which to select the data.
reflect_sdl
Controls how the :class:`jdbc.iov2.steps.builtin.CreateTable`
step will determine output column types.
credentials
A dict of properties to pass as the
`driver_args` parameter when
`jaydebeapi.connect()
<https://github.com/baztian/jaydebeapi/blob/cd2fd4c6c047803916fc92a534f02f1213e252d6/jaydebeapi/__init__.py#L381>`_
is called to establish a connection with the database.
These properties are driver-specific and may include
properties beyond traditional "credentials" (e.g., the
`postgres JDBC driver
<https://jdbc.postgresql.org/documentation/head/connect.html#connection-parameters>`_
allows configuration of, the schema name,
read-only mode, timeouts, logging, ssl, etc.).
The use of Mitto's `named credentials` is supported.
jar_files
String of fully-qualified paths to jar file(s) containing
the driver to be used. If the string contains
multiple paths, the paths must be separated by a colon: `":"`. When
locating the driver, the files in `jar_files` are consulted first, then
`CLASSPATH`, if it is present in the environment.
libraries
String containing one or more `DLL` or `.so` shared libraries
used by the driver. If present, this string is
passed as an option when Java is invoked
(i.e.: `-Djava.library.path=<value>`).
last_modified_column
The name of the column containing the last modified time
of the data. If this has a value is not `null`, an upsert
is performed; if it is `null` a load is performed.
last_modified_timezone
The timezone to use for data saved in the `last_modified_column`
of the output database for upserts. The timezone is applied
to the value while leaving the value unchanged. The value is a string
containing the name of an Olsen timezone database. Values
can be taken from the `TZ database name` column of the databases
`described here
<https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>`_.
For example, if the value is `"America/Chicago"` and the input data
is `2021-01-14 00:00:00`, the data will be saved as:
`2021-01-14 00:00:00-06:00`.
last_modifed_delta
An adjustment in minutes to apply to values when writing
output data to `last_modified_column`.
Yields
-------
dict
A row returned by `query`.
Notes
-----
`CreateTable`
Generally, jobs should use
:class:`mitto.iov2.steps.builtin.CreateTable` with this
inputter.
:class:`jdbc.iov2.steps.builtin.CreateTable` should be
used with this inputter only when one wishes for the
column types of the output table to be determined by
the JDBC types returned by execution of the `query`.
For further explanation, see the documentation for
the `CreateTable` step in use.
.. _timestamps:
JDBC Timestamps
JDBC always returns "naive" values for timestamp columns, even
if the actual column type is timezone "aware" (e.g., Postgres's
`TIMESTAMP WITH TIME ZONE`). The actual value is affected by
both the timezone for which the database server is configured well
as the timezone of the Mitto server running the job. Thus, in some
situations, it may be necessary to apply a correction
(`delta`) to values obtained from the `last_modified_column`
input database. This correction is provided via the
`last_modified_delta` parameter.
See `this page
<https://docs.python.org/3/library/datetime.html#aware-and-naive-objects>`_
for additional discussion of `aware` and `naive` values.
The `last_modified_timezone` determines the timezone used when
saving data to the `last_modified_column` of the output database.
The :class:`mitto.iov2.steps.builtin.MaxTimestamp` step provides a
`delta` parameter that can be used to apply a similar correction
to the maximum timestamp value obtained from the output database
during an upsert.
Java and JDBC drivers
Use of this connector requires that:
* Java must be installed. Java is installed by default on
Mitto 2.9 or greater.
* The desired JDBC driver must be installed.
If one or more parameter values are incorrect, the job using this
inputter will likely fail with a Java error/traceback. If the Java
error message is not helpful, a good fist step is to carefully compare
parameter values with those described by the driver's documentation.
This connector uses the
`JayDeBeApi <https://github.com/baztian/jaydebeapi>`_
Python package for JDBC database access.
Postgres JDBC driver
The `Postgres JDBC driver
<https://jdbc.postgresql.org/documentation/80/connect.html>`_ is
installed with this connector which allows testing and
experimentation with this inputter on a default Mitto install.
As mentioned earlier, the JDBC driver in use determines the values of
`driver_class`, `connection_url`, and `credentials` and the
formats of those values. For example, when using other inputters,
`"postgresql://zuar:zuarpass@db.zuar.com/analytics"` is a valid `dbo`;
however it is not a valid `connection_url` for the Postgres JDBC driver.
To input data from that database using the JDBC connector, the following
configuration fragment would be necessary:
.. code-block::
"input": {
...
"connection_url": "jdbc:postgresql://db.zuar.com/analytics",
"credentials": {
"user": "zuar",
"password": "zuarpass"
},
...
}
Examples
--------
1. Input section of job configuration using JDBC and Postgres.
.. code-block::
"input": {
"use": "jdbc.iov2.input#Input",
"driver_class": "org.postgresql.Driver",
"jar_files": "/usr/share/java/postgresql.jar",
"connection_url": "jdbc:postgresql://localhost:5432/analytics",
"credentials": {
"user": "zuar",
"password": "zuarpass"
},
"query": "SELECT * FROM public.jdbc_test"
},
2. Input section of job configuration using JDBC and Db2 on an AS400.
.. code-block::
"input": {
"use": "jdbc.iov2.input#Input",
"jar_files": "/opt/jtopen_10_4/lib/jt400.jar",
"driver_class": "com.ibm.as400.access.AS400JDBCDriver",
"connection_url": "jdbc:as400:192.168.1.11",
"credentials": {
"user": "ZUAR",
"password": "ZUARPASS",
"database_name": "AS400DB",
"translate binary": "true",
"naming": "system"
},
"query": [
"SELECT * ",
"FROM EXAMPLE_TABLE"
],
"reflect_sdl": true
},
3. The same configuration using named credentials:
.. code-block::
"input": {
"use": "jdbc.iov2.input#Input",
"jar_files": "/opt/jtopen_10_4/lib/jt400.jar",
"driver_class": "com.ibm.as400.access.AS400JDBCDriver",
"connection_url": "jdbc:as400:192.168.1.11",
"credentials": "steve_postgres_named_creds",
"query": [
"SELECT * ",
"FROM EXAMPLE_TABLE"
],
"reflect_sdl": true
},
""" # noqa: E501
type_mapping = {
jaydebeapi.STRING: "String",
jaydebeapi.TEXT: "Text",
jaydebeapi.BINARY: "LargeBinary",
jaydebeapi.NUMBER: "Integer",
jaydebeapi.FLOAT: "Float",
jaydebeapi.DECIMAL: "Numeric",
jaydebeapi.DATE: "Date",
jaydebeapi.TIME: "Time",
jaydebeapi.DATETIME: "DateTime",
jaydebeapi.ROWID: "String",
}
"""Map JDBC driver type to sqlalchemy type.
Key is JayDeBeApi type. Value is java.sql.Types constant.
JayDeBeApi driver type documentation:
https://github.com/baztian/jaydebeapi/blob/cd2fd4c6c047803916fc92a534f02f1213e252d6/jaydebeapi/__init__.py#L301
Java type documentation:
http://download.oracle.com/javase/8/docs/api/java/sql/Types.html
"""
# pylint: enable=line-too-long
def __init__(self,
driver_class: str,
connection_url: str,
query: T.Union[str, T.List[str]],
table_name: str,
reflect_sdl: bool = False,
credentials: T.Union[
str, T.Optional[T.Dict[str, T.Any]]] = None,
jar_files: T.Optional[str] = None,
libraries: T.Optional[str] = None,
last_modified_column: T.Optional[str] = None,
last_modified_timezone: str = "UTC",
last_modified_delta: int = 0,
):
self.driver_class = driver_class
self.connection_url = connection_url
self.query = "".join(query)
self.table_name = table_name
self.reflect_sdl = reflect_sdl
self.credentials = credentials
self.jar_files = jar_files
self.libraries = libraries
self.last_modified_column = last_modified_column
self.last_modified_timezone = last_modified_timezone
self.last_modified_delta = last_modified_delta
self._updated_at = None
self._column_names = None
self.embed_table_name_in_query()
super().__init__()
[docs] def embed_table_name_in_query(self):
""" perform template expansion """
query = self.query.format(table_name=self.table_name)
if self.query == query:
raise ValueError("'query' parameter must contain '{table_name}'")
self.query = query
@property
def data_tzinfo(self) -> T.Optional[datetime.tzinfo]:
"""Separate from _data_tzinfo to allow cached property"""
return self._data_tzinfo()
@lru_cache(1)
def _data_tzinfo(self) -> T.Optional[datetime.tzinfo]:
""" return tzinfo for last_modified_column """
return (pytz.timezone(self.last_modified_timezone)
if self.last_modified_timezone else None)
[docs] def localize_value(self, value: datetime.datetime) -> datetime.datetime:
""" Apply a timezone to naive datetime if timezone set.
JDBC always returns a naive datetime. Add the specified TZ
to the value, if one is configured.
"""
return self.data_tzinfo.localize(value) if self.data_tzinfo else value
[docs] def jdbc_connect(self) -> jaydebeapi.Connection:
"""Establish JDBC connection"""
return jaydebeapi.connect(
jclassname=self.driver_class,
url=self.connection_url,
driver_args=self.credentials,
jars=self.jar_files,
libs=self.libraries,
)
[docs] def jdbc_columns_types(self) -> \
T.Iterator[T.Tuple[str, jaydebeapi.DBAPITypeObject]]:
"""Returns (name, type) for each column in query results"""
connection = self.jdbc_connect()
with connection.cursor() as cursor:
cursor.execute(self.query)
for column in cursor.description:
column_name = column[0]
jdbc_type = column[1]
yield column_name, jdbc_type
[docs] def updated_at(self, timestamp: T.Union[int, float, str]) -> None:
if isinstance(timestamp, (int, float)):
self._updated_at = timestamp
else:
self._updated_at = parse(timestamp)
logger.info("updated_at = %s", self._updated_at)
def __iter__(self):
"""
Hints about passing parameters to a Postgres JDBC query:
https://jdbc.postgresql.org/documentation/head/query.html#query-example
Hints about `java.sql.Timestamp`:
https://github.com/baztian/jaydebeapi/issues/70
https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
"""
# Starts the JVM
with self.jdbc_connect() as connection:
# The following imports require that the JVM already be started.
# The imports can't be done at the beginning of the file as
# Java and connection info must be known.
# pylint: disable=unused-import
import jpype.imports # noqa=F401
# `sql` can only be imported after `jtype.imports`
# pylint: disable=import-error
from java import (
sql, # noqa: F401
)
if self._updated_at:
# upsert job
if not self.last_modified_column:
raise ValueError(
"'last_modified_column' parameter "
"must be set for upsert job"
)
# Use query from config as subquery to get new records
query = (f"SELECT * FROM ({self.query}) AS SUBQ "
f"WHERE SUBQ.{self.last_modified_column} > ?")
java_updated_at = sql.Timestamp(
int(self._updated_at.timestamp()) * 1000
)
query_params = (java_updated_at, )
else:
# load job
query = self.query
query_params = tuple()
with connection.cursor() as cursor:
try:
cursor.execute(query, query_params)
except jaydebeapi.DatabaseError as exc:
if "does not exist" in str(exc):
logger.error("does column '%s' exist in the table?",
self.last_modified_column)
raise
self._column_names = [col[0] for col in cursor.description]
for row in cursor.fetchall():
yield self.record(row)
[docs] def apply_delta(self, obj, delta):
""" Apply a timedelta to a datetime object """
# pylint: disable=no-self-use
if delta == 0:
return obj
return obj + datetime.timedelta(minutes=delta)
[docs] def record(self, row):
"""Create Record from a given row."""
values = dict()
for index, value in enumerate(row):
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
value = self.localize_value(value)
value = self.apply_delta(value, self.last_modified_delta)
values[self._column_names[index]] = value
return self.Record(**values)
[docs] def add_sdl_to_environ(self, environ):
"""Update environ with SDL for JDBC query results"""
columns = environ[SDL]["columns"]
if not self.reflect_sdl:
msg = "input.reflect_sdl is 'false'; using learned SDL"
logger.info(msg)
return
msg = "input.reflect_sdl is 'true'; reflecting SDL from query"
logger.info(msg)
columns.clear()
for column_name, column_type in self.jdbc_columns_types():
column = {
"name": column_name,
"type": self.type_mapping[column_type],
}
columns.append(column)
# pylint: enable=line-too-long