Table Of Contents

Source code for jdbc.iov2.input

"""JDBC Input"""
import datetime
import logging
import typing as T

from functools import lru_cache

import jaydebeapi
import pytz

from dateutil.parser import parse

from import SDL
from mitto.iov2.input import BaseInputV2

logger = logging.getLogger(__name__)

# TODO: remove install of java from debian/control
#       add pytest skip if no java on system

# pylint: disable=line-too-long
[docs]class Input(BaseInputV2): """Input from any database engine with a JDBC interface. Extended Summary ---------------- If this inputter is used with :class:`mitto.iov2.steps.builtin.CreateTable`, the types of the output columns will be determined either by the SDL included in the job configuration or by Mitto's internal sampling algorithm. If this inputter is used with :class:`jdbc.iov2.steps.builtin.CreateTable` and `reflect_sdl` is `true`, the types of the output columns will be determined by the types returned by the `query`. Note: JDBC always returns naive timestamps, even if the input column type is timzone aware. See additional discussion below in the :ref:`Timestamps <timestamps>` section. Parameters ---------- driver_class The fully-qualified path to the driver's Java class. connection_url The connection URL to be used by the driver when connecting to the database. The format of the string is driver-specific. query A string or a list of strings that define the SQL query to be used. If a list of strings is provided, they will be concatenated into a single string. The query **must** end with: `"FROM {table_name}`". `{table_name}` will be replaced with the value of the `table_name` parameter. table_name The table name from which to select the data. reflect_sdl Controls how the :class:`jdbc.iov2.steps.builtin.CreateTable` step will determine output column types. credentials A dict of properties to pass as the `driver_args` parameter when `jaydebeapi.connect() <>`_ is called to establish a connection with the database. These properties are driver-specific and may include properties beyond traditional "credentials" (e.g., the `postgres JDBC driver <>`_ allows configuration of, the schema name, read-only mode, timeouts, logging, ssl, etc.). The use of Mitto's `named credentials` is supported. jar_files String of fully-qualified paths to jar file(s) containing the driver to be used. If the string contains multiple paths, the paths must be separated by a colon: `":"`. When locating the driver, the files in `jar_files` are consulted first, then `CLASSPATH`, if it is present in the environment. libraries String containing one or more `DLL` or `.so` shared libraries used by the driver. If present, this string is passed as an option when Java is invoked (i.e.: `-Djava.library.path=<value>`). last_modified_column The name of the column containing the last modified time of the data. If this has a value is not `null`, an upsert is performed; if it is `null` a load is performed. last_modified_timezone The timezone to use for data saved in the `last_modified_column` of the output database for upserts. The timezone is applied to the value while leaving the value unchanged. The value is a string containing the name of an Olsen timezone database. Values can be taken from the `TZ database name` column of the databases `described here <>`_. For example, if the value is `"America/Chicago"` and the input data is `2021-01-14 00:00:00`, the data will be saved as: `2021-01-14 00:00:00-06:00`. last_modifed_delta An adjustment in minutes to apply to values when writing output data to `last_modified_column`. Yields ------- dict A row returned by `query`. Notes ----- `CreateTable` Generally, jobs should use :class:`mitto.iov2.steps.builtin.CreateTable` with this inputter. :class:`jdbc.iov2.steps.builtin.CreateTable` should be used with this inputter only when one wishes for the column types of the output table to be determined by the JDBC types returned by execution of the `query`. For further explanation, see the documentation for the `CreateTable` step in use. .. _timestamps: JDBC Timestamps JDBC always returns "naive" values for timestamp columns, even if the actual column type is timezone "aware" (e.g., Postgres's `TIMESTAMP WITH TIME ZONE`). The actual value is affected by both the timezone for which the database server is configured well as the timezone of the Mitto server running the job. Thus, in some situations, it may be necessary to apply a correction (`delta`) to values obtained from the `last_modified_column` input database. This correction is provided via the `last_modified_delta` parameter. See `this page <>`_ for additional discussion of `aware` and `naive` values. The `last_modified_timezone` determines the timezone used when saving data to the `last_modified_column` of the output database. The :class:`mitto.iov2.steps.builtin.MaxTimestamp` step provides a `delta` parameter that can be used to apply a similar correction to the maximum timestamp value obtained from the output database during an upsert. Java and JDBC drivers Use of this connector requires that: * Java must be installed. Java is installed by default on Mitto 2.9 or greater. * The desired JDBC driver must be installed. If one or more parameter values are incorrect, the job using this inputter will likely fail with a Java error/traceback. If the Java error message is not helpful, a good fist step is to carefully compare parameter values with those described by the driver's documentation. This connector uses the `JayDeBeApi <>`_ Python package for JDBC database access. Postgres JDBC driver The `Postgres JDBC driver <>`_ is installed with this connector which allows testing and experimentation with this inputter on a default Mitto install. As mentioned earlier, the JDBC driver in use determines the values of `driver_class`, `connection_url`, and `credentials` and the formats of those values. For example, when using other inputters, `"postgresql://"` is a valid `dbo`; however it is not a valid `connection_url` for the Postgres JDBC driver. To input data from that database using the JDBC connector, the following configuration fragment would be necessary: .. code-block:: "input": { ... "connection_url": "jdbc:postgresql://", "credentials": { "user": "zuar", "password": "zuarpass" }, ... } Examples -------- 1. Input section of job configuration using JDBC and Postgres. .. code-block:: "input": { "use": "jdbc.iov2.input#Input", "driver_class": "org.postgresql.Driver", "jar_files": "/usr/share/java/postgresql.jar", "connection_url": "jdbc:postgresql://localhost:5432/analytics", "credentials": { "user": "zuar", "password": "zuarpass" }, "query": "SELECT * FROM public.jdbc_test" }, 2. Input section of job configuration using JDBC and Db2 on an AS400. .. code-block:: "input": { "use": "jdbc.iov2.input#Input", "jar_files": "/opt/jtopen_10_4/lib/jt400.jar", "driver_class": "", "connection_url": "jdbc:as400:", "credentials": { "user": "ZUAR", "password": "ZUARPASS", "database_name": "AS400DB", "translate binary": "true", "naming": "system" }, "query": [ "SELECT * ", "FROM EXAMPLE_TABLE" ], "reflect_sdl": true }, 3. The same configuration using named credentials: .. code-block:: "input": { "use": "jdbc.iov2.input#Input", "jar_files": "/opt/jtopen_10_4/lib/jt400.jar", "driver_class": "", "connection_url": "jdbc:as400:", "credentials": "steve_postgres_named_creds", "query": [ "SELECT * ", "FROM EXAMPLE_TABLE" ], "reflect_sdl": true }, """ # noqa: E501 type_mapping = { jaydebeapi.STRING: "String", jaydebeapi.TEXT: "Text", jaydebeapi.BINARY: "LargeBinary", jaydebeapi.NUMBER: "Integer", jaydebeapi.FLOAT: "Float", jaydebeapi.DECIMAL: "Numeric", jaydebeapi.DATE: "Date", jaydebeapi.TIME: "Time", jaydebeapi.DATETIME: "DateTime", jaydebeapi.ROWID: "String", } """Map JDBC driver type to sqlalchemy type. Key is JayDeBeApi type. Value is java.sql.Types constant. JayDeBeApi driver type documentation: Java type documentation: """ # pylint: enable=line-too-long def __init__(self, driver_class: str, connection_url: str, query: T.Union[str, T.List[str]], table_name: str, reflect_sdl: bool = False, credentials: T.Union[ str, T.Optional[T.Dict[str, T.Any]]] = None, jar_files: T.Optional[str] = None, libraries: T.Optional[str] = None, last_modified_column: T.Optional[str] = None, last_modified_timezone: str = "UTC", last_modified_delta: int = 0, ): self.driver_class = driver_class self.connection_url = connection_url self.query = "".join(query) self.table_name = table_name self.reflect_sdl = reflect_sdl self.credentials = credentials self.jar_files = jar_files self.libraries = libraries self.last_modified_column = last_modified_column self.last_modified_timezone = last_modified_timezone self.last_modified_delta = last_modified_delta self._updated_at = None self._column_names = None self.embed_table_name_in_query() super().__init__()
[docs] def embed_table_name_in_query(self): """ perform template expansion """ query = self.query.format(table_name=self.table_name) if self.query == query: raise ValueError("'query' parameter must contain '{table_name}'") self.query = query
@property def data_tzinfo(self) -> T.Optional[datetime.tzinfo]: """Separate from _data_tzinfo to allow cached property""" return self._data_tzinfo() @lru_cache(1) def _data_tzinfo(self) -> T.Optional[datetime.tzinfo]: """ return tzinfo for last_modified_column """ return (pytz.timezone(self.last_modified_timezone) if self.last_modified_timezone else None)
[docs] def localize_value(self, value: datetime.datetime) -> datetime.datetime: """ Apply a timezone to naive datetime if timezone set. JDBC always returns a naive datetime. Add the specified TZ to the value, if one is configured. """ return self.data_tzinfo.localize(value) if self.data_tzinfo else value
[docs] def jdbc_connect(self) -> jaydebeapi.Connection: """Establish JDBC connection""" return jaydebeapi.connect( jclassname=self.driver_class, url=self.connection_url, driver_args=self.credentials, jars=self.jar_files, libs=self.libraries, )
[docs] def jdbc_columns_types(self) -> \ T.Iterator[T.Tuple[str, jaydebeapi.DBAPITypeObject]]: """Returns (name, type) for each column in query results""" connection = self.jdbc_connect() with connection.cursor() as cursor: cursor.execute(self.query) for column in cursor.description: column_name = column[0] jdbc_type = column[1] yield column_name, jdbc_type
[docs] def updated_at(self, timestamp: T.Union[int, float, str]) -> None: if isinstance(timestamp, (int, float)): self._updated_at = timestamp else: self._updated_at = parse(timestamp)"updated_at = %s", self._updated_at)
def __iter__(self): """ Hints about passing parameters to a Postgres JDBC query: Hints about `java.sql.Timestamp`: """ # Starts the JVM with self.jdbc_connect() as connection: # The following imports require that the JVM already be started. # The imports can't be done at the beginning of the file as # Java and connection info must be known. # pylint: disable=unused-import import jpype.imports # noqa=F401 # `sql` can only be imported after `jtype.imports` # pylint: disable=import-error from java import ( sql, # noqa: F401 ) if self._updated_at: # upsert job if not self.last_modified_column: raise ValueError( "'last_modified_column' parameter " "must be set for upsert job" ) # Use query from config as subquery to get new records query = (f"SELECT * FROM ({self.query}) AS SUBQ " f"WHERE SUBQ.{self.last_modified_column} > ?") java_updated_at = sql.Timestamp( int(self._updated_at.timestamp()) * 1000 ) query_params = (java_updated_at, ) else: # load job query = self.query query_params = tuple() with connection.cursor() as cursor: try: cursor.execute(query, query_params) except jaydebeapi.DatabaseError as exc: if "does not exist" in str(exc): logger.error("does column '%s' exist in the table?", self.last_modified_column) raise self._column_names = [col[0] for col in cursor.description] for row in cursor.fetchall(): yield self.record(row)
[docs] def apply_delta(self, obj, delta): """ Apply a timedelta to a datetime object """ # pylint: disable=no-self-use if delta == 0: return obj return obj + datetime.timedelta(minutes=delta)
[docs] def record(self, row): """Create Record from a given row.""" values = dict() for index, value in enumerate(row): if isinstance(value, datetime.datetime): if value.tzinfo is None: value = self.localize_value(value) value = self.apply_delta(value, self.last_modified_delta) values[self._column_names[index]] = value return self.Record(**values)
[docs] def add_sdl_to_environ(self, environ): """Update environ with SDL for JDBC query results""" columns = environ[SDL]["columns"] if not self.reflect_sdl: msg = "input.reflect_sdl is 'false'; using learned SDL" return msg = "input.reflect_sdl is 'true'; reflecting SDL from query" columns.clear() for column_name, column_type in self.jdbc_columns_types(): column = { "name": column_name, "type": self.type_mapping[column_type], } columns.append(column)
# pylint: enable=line-too-long