What is Upsert?

Many Mitto IO jobs can be configured for upsert.

Upsert updates existing rows and inserts new rows when outputting to a database.

Requirements for Upsert

When configuring a Mitto IO job to use upsert, there are two requirements that need to be met by the source (input):

  1. A way to uniquely identify a single row. Typically this is a primary key. It can be a single column or a combination of columns.
  2. A true last modified column. Typically this is a datetime column that is updated anytime a value in a row is changed. NOTE: Just because a table has a date or datetime column does not mean it has a true last modified column.

Supported Inputs and Outputs

Supported Inputs

IO job database inputs that support upsert:

IO job API inputs that support upsert:

Supported Outputs

All of the database outputs support upsert.

What happens when an IO job WITHOUT upsert is run?

  • Mitto sends a SQL query to the source database (input)
  • Mitto truncates the destination database table (output) and loads all the data

What happens when an IO job WITH upsert is run?

  • Mitto queries the destination database table (output) to find the maximum value of the "last modified column"
  • Mitto sends a SQL query to the source database (input) with an added WHERE clause including the value of the maximum "last modified column".
  • Mitto creates a temporary table in the destination database
  • Mitto merges the data in the temporary table with the destination table using the primary key(s)

Both example above assume the output table already exists. If it doesn't Mitto will create it.

Configuring a Query job for Upsert

The JSON config of a Query job configured for upsert has a few additional values. If the Query job is created via the Add Job wizard, these values are auto created and populated. However, the resulting job can be edited, so it is helpful to understand the resulting job configuration.

Input

The input section has an additional key/value pair defining the last_modified_column.

"input": {
        "query": [
            "SELECT * from test.upsert"
        ],
        "dbo": "postgresql://localhost/analytics",
        "use": "query.io#QueryInput",
        "last_modified_column": "last_updated"
    },

Steps:

The steps section has a few additional steps:

  • MaxTimestamp - This is the step where Mitto queries the "last modified column" (column) in the output table.
  • SetUpdatedAt - This is the step where Mitto adds the WHERE clause to the input query that is sent to the source database.
  • CreateTempTable - This is the step where Mitto creates the temporary table in the destination database (output).
  • SyncTempTable - This is the step where the upsert actually occurs. The temporary table is merged with the output table by using the key(s). In the example below, the primary key is id. key is a list, and therefore the primary key can be a composite key of multiple columns.
"steps": [
        {
            "column": "last_updated",
            "use": "mitto.iov2.steps#MaxTimestamp"
        },
        {
            "use": "mitto.iov2.steps.upsert#SetUpdatedAt"
        },
        {
            "transforms": [
                {
                    "use": "mitto.iov2.transform#ExtraColumnsTransform"
                },
                {
                    "use": "mitto.iov2.transform#ColumnsTransform"
                }
            ],
            "use": "mitto.iov2.steps#Input"
        },
        {
            "use": "mitto.iov2.steps#CreateTable"
        },
        {
            "use": "mitto.iov2.steps.upsert#CreateTempTable"
        },
        {
            "transforms": [
                {
                    "use": "mitto.iov2.transform#FlattenTransform"
                }
            ],
            "use": "mitto.iov2.steps#Output"
        },
        {
            "key": [
                "id"
            ],
            "use": "mitto.iov2.steps.upsert#SyncTempTable"
        },
        {
            "use": "mitto.iov2.steps#CountTable"
        }
    ]

Results of a Query Job with Upsert

After a Query job is run, in the Run info section you can see how many rows are in the resulting table (count) and how many rows were updated (updates).