Dynamic Job Configurations

In general, Zuar Runner job configurations are static – they are defined in JSON and their behavior never changes. In some situations, such as examples, experimentation, and debugging, it can be useful for a job’s behavior to be defined dynamically, via Python code. Dynamic job configuration was introduced to allow this.

At its simplest, dynamic job configuration allows the user to embed Python code in a job configuration to modify the behavior of:

  • the input data

  • steps the job executes

  • transforms applied to the data

Warning

Dynamic job configuration is an extremely advanced feature; anything beyond the simplest of uses will likely require an in-depth understanding of Zuar Runner internals. Please proceed with caution.

The following Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job via the following Python callables:

  • inputter - ExampleInput

  • transform - PythonTransform

  • step - PythonStep

Todo

add links to relevant docs

Some aspects of the Python code to be injected is common; those common aspects are described in the remainder of this page.

Python Code

Python code is introduced to a job via the python_code parameter. The value of the parameter can be one of the following:

  1. a relative path to a file containing Python code in located in $MITTO_DATA.

  2. a fully qualified path to a file containing Python code

  3. a list of strings containing Python code

Regardless of it source, the Python ``exec` function <https://docs.python.org/3/library/functions.html#exec >`_ will be called with python_code as its first argument. In the case of PythonTransform and PythonStep, the Python functions globals and locals are provided as the called and their return value used as the second and third args, respectively, to exec.

Generally, the code should:

  1. define a function to perform the desired action, and

  2. assign that function to an attribute of the callable via self

When the code is contained in a file, no special formatting is required.

Example code suitable for use with PythonTransform via file:

def transform_func(self, record):
    print("record1=%s" % record)
    return record
self.transform_func = transform_func

When code is provided directly in the job config, quirks in HJSON’s handling of indented strings require the use of special formatting; a . should be used to indicate the leftmost column when it would otherwise contain a space.

The same code, modified for use with PythonTransform via the config file:

python_code: [
    def transform_func(self, record):
    .   print("record1=%s" % record)
    .   return record
    self.transform_func = transform_func
],

Callable-specific Details

ExampleInput

  • The function’s call signature must match (self) because the function will be treated as a class method of an ExampleInput instance.

  • The function must be assigned to self.inputter_func.

  • The purpose of the function is to provide data.

  • The function will be treated as an iterator.

Example job configuration fragment:

{
   input: {
    use: mitto.iov2.input#ExampleInput
    python_code: [
      def inputter_func(self):
      .   cols = "abcdefghijk"
      .   for i in range(0, 10):
      .       yield {col: i for col in cols}
      self.inputter_func = inputter_func
    ]
  },
  ...
}

PythonStep and PythonTransform

  • The function’s call signature must match (self, record) because the function will be treated as a class method of a PythonStep or PythonTransform instance.

  • The function must be assigned to self.inputter_func.

  • The purpose of the function is to provide data.

  • The function will be treated as an iterator.

General

The value of python_code can be any of the following:

  1. A string containing the name of a file located in /var/mitto/data containing valid Python code.

  2. A string containing the fully-qualified path to a file containing valid Python code.

  3. A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python exec function.

Depending upon where the python_code is used, additional constraints may be placed on the code.

Formatting the List of Strings

When python_code is a list of strings, a non-standard formatting convention is used due to inconsistent handling of indentation by HJSON. This is best explained by example:

{
    use: mitto.iov2.steps.builtin#PythonStep
    python_code: [
        # Executed in the context of an instance of the PythonStep class
        # Because this uses the store as input, the job must be configured
        # with a store.
        def _dynamic_step(self):
        .    logging.info("start")
        .    from mitto.iov2.input import StoreInput
        .    from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
        .    from mitto.io.db.redshift import StreamIter
        .    streamer = StreamIter(
        .        to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
        .        for record in self.environ[STORE].list()
        .    )
        .    data = streamer.read()
        .    logging.info("stop")
        # Function must be assigned to `step`
        self.step = _dynamic_step
    ]
}

Things to note:

  • The first non-space character on the line is considered to be “column 1”.

  • If the first non-space character is a ., it is converted to a space.

  • Python comments can be used

  • The variables available for use depend upon the context of execution

Execution Context and Other Requirements

PythonStep

When using the PythonStep step, python_code must define a function that will be valid as a method of the PythonStep class. The function must:

  • Accept a single argument: self

  • Expect to be called once during the execution of the job

  • Not return a value

  • Be assigned to the step attribute of the class instance

PythonTransform

When using the PythonTransform transform, python_code must define a function that will be valid as a method of the PythonTransform class. The function must:

  • Accept two arguments: self and record

  • Expect to be called once for each row of data

  • Return record or a modified version of record

  • Be assigned to the transform_ attributed of the class instance

Tips and Tricks

  1. If you are running the job manually using the CLI via job_io.py config.json, you can invoke the python debugger via, e.g.:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            import pdb; pdb.set_trace()
        ]
    }
    

    Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via mitto run.

  2. You can easily add logging statements.

    To log every row at a certain point in a set of transforms:

    {
        use: mitto.iov2.transform.builtin#PythonTransform
        python_code: [
            def transform_(self, record):
            .   logging.info("record=%s", record)
            .   return record
            self.transform_ = transform_
       ]
    }
    

    To log the job execution environment at a certain point in the steps:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            logging.info("environ=%s", self.environ)
        ]
    }