You can use Mitto's Query plugin to create a relational database table from any SQL query against any relational database.

The source database and destination database can be Mitto's built in PostgreSQL database or any other remote database (e.g. Snowflake, Amazon Redshift, Microsoft SQL Server, etc).

This is the preferred way to mirror data from other databases.

Caveats

Mitto needs network access to the source and destination databases. This typically means whitelisting IP addresses on potentially both sides (remote database server and Mitto).

Create a Query Job

Start creating the job:

Click "+ Add Job".

Select "Query".

Write the database query:

You will see a screen like this:

Database URL

Learn more about Database URLs here.

Query

Type the query you want to create a table from in the SQL area.

It is a good practice to fully qualify your tables by including the schema of the table.

So for example if you want to mirror an entire table:

SELECT *
FROM <schema>.<table>;

Setup the resulting table

Hit Next and Mitto will determine the structure of the resulting table from the query:

You can enable Upsert by checking the Upsert box.

Select the Primary key(s) and Last modified column for the table from the dropdown.

Learn more about upsert at the bottom of this document.

Specify the output

Hit Next and specify the output destination for the query results.

Hit Save.

Go run the job!

Upsert

Any Mitto Query job can be configured for upsert.

Upsert updates existing rows and inserts new rows.

Requirements for Upsert

When configuring a Mitto Query job to use upsert, there are two requirements that need to be met by the source (input) database's table.

  1. A way to uniquely identify a single row. Typically this is a primary key. It can be a single column or a combination of columns.
  2. A true last modified column. Typically this is a datetime column that is updated anytime a value in a row is changed. NOTE: Just because a table has a date or datetime column does not mean it has a true last modified column.

What happens when a Query job WITHOUT upsert is run?

  • Mitto sends a SQL query to the source database (input)
  • Mitto truncates the destination database table (output) and loads all the data

What happens when a Query job WITH upsert is run?

  • Mitto queries the destination database table (output) to find the maximum value of the "last modified column"
  • Mitto sends a SQL query to the source database (input) with an added WHERE clause including the value of the maximum "last modified column".
  • Mitto creates a temporary table in the destination database
  • Mitto merges the data in the temporary table with the destination table using the primary key(s)

Both example above assume the output table already exists. If it doesn't Mitto will create it.

Configuring a Query job for Upsert

The JSON config of a Query job configured for upsert has a few additional values. If the Query job is created via the Add Job wizard, these values are auto created and populated. However, the resulting job can be edited, so it is helpful to understand the resulting job configuration.

Input

The input section has an additional key/value pair defining the last_modified_column.

"input": {
        "query": [
            "SELECT * from test.upsert"
        ],
        "dbo": "postgresql://localhost/analytics",
        "use": "query.io#QueryInput",
        "last_modified_column": "last_updated"
    },

Steps:

The steps section has a few additional steps:

  • MaxTimestamp - This is the step where Mitto queries the "last modified column" (column) in the output table.
  • SetUpdatedAt - This is the step where Mitto adds the WHERE clause to the input query that is sent to the source database.
  • CreateTempTable - This is the step where Mitto creates the temporary table in the destination database (output).
  • SyncTempTable - This is the step where the upsert actually occurs. The temporary table is merged with the output table by using the key(s). In the example below, the primary key is id. key is a list, and therefore the primary key can be a composite key of multiple columns.
"steps": [
        {
            "column": "last_updated",
            "use": "mitto.iov2.steps#MaxTimestamp"
        },
        {
            "use": "mitto.iov2.steps.upsert#SetUpdatedAt"
        },
        {
            "transforms": [
                {
                    "use": "mitto.iov2.transform#ExtraColumnsTransform"
                },
                {
                    "use": "mitto.iov2.transform#ColumnsTransform"
                }
            ],
            "use": "mitto.iov2.steps#Input"
        },
        {
            "use": "mitto.iov2.steps#CreateTable"
        },
        {
            "use": "mitto.iov2.steps.upsert#CreateTempTable"
        },
        {
            "transforms": [
                {
                    "use": "mitto.iov2.transform#FlattenTransform"
                }
            ],
            "use": "mitto.iov2.steps#Output"
        },
        {
            "key": [
                "id"
            ],
            "use": "mitto.iov2.steps.upsert#SyncTempTable"
        },
        {
            "use": "mitto.iov2.steps#CountTable"
        }
    ]

Results of a Query Job with Upsert

After a Query job is run, in the Run info section you can see how many rows are in the resulting table (count) and how many rows were updated (updates).

Reflection

The default behavior in Mitto is to learn the structure of data as it is piped. This is useful for data coming from APIs and flat files where there may not be a schema or data types.

This process is handled by the ExtraColumnsTransform and ColumnsTransform transforms of the #Input step:

"steps": [
	{
    	"transforms": [
        	{
        	    "use": "mitto.iov2.transform#ExtraColumnsTransform"
        	},
        	{
        	    "use": "mitto.iov2.transform#ColumnsTransform"
        	}
    	],
    	"use": "mitto.iov2.steps#Input"
	},
    ...
]

However, when using databases as a source, as in Query jobs, Mitto can leverage the source database table's schema. This is referred to as reflection. The result is that the output table can exactly mirror the input table with column names and data types.

NOTE: Reflection typically only makes sense for a SELECT * query.

To use reflect, in the steps of the job config, update the transforms array of the #Input step to this:

 "steps": [
     {
        "transforms": [
            {
                "use": "mitto.iov2.transform#ReflectTransform",
                "dbo": "{driver}://{username}:{password}@{server}:{port}/{database}",
                "schema": "{schema}",
                "tablename": "{tablename}"
            }
        ],
        "use": "mitto.iov2.steps#Input"
    },
    ...
]
  • dbo - This is the database url of the table you are reflecting. This should match the exact dbo from the job's Input.
  • schema - This is the schema of the input table you are reflecting.
  • tablename - This is the table name of the input table you are reflecting.