Dynamic Job Configurations¶
In general, Zuar Runner job configurations are static – they are defined in JSON and their behavior never changes. In some situations, such as examples, experimentation, and debugging, it can be useful for a job’s behavior to be defined dynamically, via Python code. Dynamic job configuration was introduced to allow this.
At its simplest, dynamic job configuration allows the user to embed Python code in a job configuration to modify the behavior of:
the input data
steps the job executes
transforms applied to the data
Warning
Dynamic job configuration is an extremely advanced feature; anything beyond the simplest of uses will likely require an in-depth understanding of Zuar Runner internals. Please proceed with caution.
The following Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job via the following Python callables:
inputter -
ExampleInput
transform -
PythonTransform
step -
PythonStep
Todo
add links to relevant docs
Some aspects of the Python code to be injected is common; those common aspects are described in the remainder of this page.
Python Code¶
Python code is introduced to a job via the python_code
parameter. The
value of the parameter can be one of the following:
a relative path to a file containing Python code in located in
$MITTO_DATA
.a fully qualified path to a file containing Python code
a list of strings containing Python code
Regardless of it source, the Python ``exec` function <https://docs.python.org/3/library/functions.html#exec >`_ will be called
with python_code
as its first argument. In the case of
PythonTransform
and PythonStep
, the Python functions globals
and
locals
are provided as the called and their return value used as
the second and third args, respectively, to exec
.
Generally, the code should:
define a function to perform the desired action, and
assign that function to an attribute of the callable via
self
When the code is contained in a file, no special formatting is required.
Example code suitable for use with PythonTransform
via file:
def transform_func(self, record):
print("record1=%s" % record)
return record
self.transform_func = transform_func
When code is provided directly in the job config,
quirks in HJSON’s handling of indented strings require the use of
special formatting; a .
should be used to indicate the leftmost
column when it would otherwise contain a space.
The same code, modified for use with PythonTransform
via the config
file:
python_code: [
def transform_func(self, record):
. print("record1=%s" % record)
. return record
self.transform_func = transform_func
],
Callable-specific Details¶
ExampleInput
¶
The function’s call signature must match
(self)
because the function will be treated as a class method of anExampleInput
instance.The function must be assigned to
self.inputter_func
.The purpose of the function is to provide data.
The function will be treated as an iterator.
Example job configuration fragment:
{
input: {
use: mitto.iov2.input#ExampleInput
python_code: [
def inputter_func(self):
. cols = "abcdefghijk"
. for i in range(0, 10):
. yield {col: i for col in cols}
self.inputter_func = inputter_func
]
},
...
}
PythonStep
and PythonTransform
¶
The function’s call signature must match
(self, record)
because the function will be treated as a class method of aPythonStep
orPythonTransform
instance.The function must be assigned to
self.inputter_func
.The purpose of the function is to provide data.
The function will be treated as an iterator.
General¶
The value of python_code
can be any of the following:
A string containing the name of a file located in
/var/mitto/data
containing valid Python code.A string containing the fully-qualified path to a file containing valid Python code.
A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python
exec
function.
Depending upon where the python_code
is used, additional constraints
may be placed on the code.
Formatting the List of Strings¶
When python_code
is a list of strings, a non-standard formatting
convention is used due to inconsistent handling of indentation by
HJSON. This is best explained by example:
{
use: mitto.iov2.steps.builtin#PythonStep
python_code: [
# Executed in the context of an instance of the PythonStep class
# Because this uses the store as input, the job must be configured
# with a store.
def _dynamic_step(self):
. logging.info("start")
. from mitto.iov2.input import StoreInput
. from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
. from mitto.io.db.redshift import StreamIter
. streamer = StreamIter(
. to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
. for record in self.environ[STORE].list()
. )
. data = streamer.read()
. logging.info("stop")
# Function must be assigned to `step`
self.step = _dynamic_step
]
}
Things to note:
The first non-space character on the line is considered to be “column 1”.
If the first non-space character is a
.
, it is converted to a space.Python comments can be used
The variables available for use depend upon the context of execution
Execution Context and Other Requirements¶
PythonStep
¶
When using the PythonStep
step, python_code
must define a function
that will be valid as a method of the PythonStep
class. The
function must:
Accept a single argument:
self
Expect to be called once during the execution of the job
Not return a value
Be assigned to the
step
attribute of the class instance
PythonTransform
¶
When using the PythonTransform
transform, python_code
must define
a function that will be valid as a method of the PythonTransform
class. The function must:
Accept two arguments:
self
andrecord
Expect to be called once for each row of data
Return
record
or a modified version ofrecord
Be assigned to the
transform_
attributed of the class instance
Tips and Tricks¶
If you are running the job manually using the CLI via
job_io.py config.json
, you can invoke the python debugger via, e.g.:{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ import pdb; pdb.set_trace() ] }
Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via
mitto run
.You can easily add logging statements.
To log every row at a certain point in a set of transforms:
{ use: mitto.iov2.transform.builtin#PythonTransform python_code: [ def transform_(self, record): . logging.info("record=%s", record) . return record self.transform_ = transform_ ] }
To log the job execution environment at a certain point in the steps:
{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ logging.info("environ=%s", self.environ) ] }