1. Developing Transforms¶
1.1. Transform Types¶
There are two types of Transforms
1.2. Capturing Transforms with InfinStor Managed I/O¶
In InfinStor Managed I/O transform types, the infinStor client library or clientlib ( the infinstor pip package) manages input and output for the transform. The clientlib
- reads data from the Cloud Object Store,
- copies it into a local directory in the execution environment (VM or container),
- and passes the data it into user code in one of the following ways:
- infin_transform_all_objects callback
- infin_fs transforms (infin_fs stands for infinstor filesystem)
1.2.1. infin_transform_all_objects transforms¶
This type of transform adopts a push model: clientlib pushes data from cloud storage into the transform by copying data from cloud storage to local storage:
- when the transform is run, infinstor clientlib starts an mlflow run automatically
input_dir
parameter- all the objects in the chosen cloud storage, InfinSnap or InfinSlice are downloaded into
input_dir
temporary directory - Note that if the objects were present in subdirectories under
input_dir
, that subdirectory hierarchy is preserved when the files are downloaded from the cloud object store into the local temporary directory
- all the objects in the chosen cloud storage, InfinSnap or InfinSlice are downloaded into
- the callback
infin_transform_all_objects()
in the transform is invoked. output_dir
parameter- The transform code can write any output, created during its run, to
output_dir
. - when the transform finishes, infinstor clientlib logs the files in
output_dir
as artifacts of the mlflow run, using mlflow'slog_artifact()
API. - The directory structure under
output_dir
is preserved when the artifacts are logged to the mlflow run (usinglog_artifact()
)
- The transform code can write any output, created during its run, to
kwargs
parameter- represents the
key=value
arguments specified when the transform is run. These are additional user specifiedkey=value
pairs, that can be passed to the transform at the time of running the transform.
- represents the
- when the transform finishes, the corresponding mlflow run is automatically marked as finished
The following is the default template all-objects-template.
# This transform is called with all objects downloaded to input_dir
#
# Note that dir hierarchy in source is preserved inside input_dir
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
print('input_dir=' + input_dir + ', output_dir=' + output_dir)
1.2.1.1. infin_transform_all_objects example¶
Here is a practical example of the infin_transform_all_objects
transform. In this example,
- the transform simply copies its input directory to its output directory, preserving the input directory hierarchy.
- This output can be used as an input to subsequent transforms
###########################
# infin_transform_all_objects transform: copies input to output;
#############################
import os
import sys
import logging
from pathlib import Path
import mlflow
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")
# This transform is called with all objects downloaded to input_dir
# Note that dir hierarchy in source is preserved inside input_dir
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
logging.info("infin_transform_all_objects(input_dir=" + input_dir + ", output_dir=" + output_dir + ", kwargs=%s", kwargs)
logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
logging.info("sys.executable=%s", sys.executable)
logging.info("sys.argv=%s", sys.argv)
logging.info("os.environ=%s", os.environ)
# copy files from input directory to output directory
for path, subdirs, files in os.walk(input_dir):
for name in files:
# mirror each input file to the output directory: for example, given
# input_dir=/tmp/input_dir_root,
# path = /tmp/input_dir_root/dir1
# name = file1.txt
# output_dir = /tmp/output_dir_root
# the input file /tmp/input_dir_root/dir1/file1.txt is mirrored to /tmp/output_dir_root/dir1/file1.txt
logging.info("file %s in directory %s. size = %d", os.path.join(path, name), path, Path(os.path.join(path, name)).stat().st_size)
# if input_dir is /tmp/input_dir_root and path is /tmp/input_dir_root/dir1, then input_file_dir_minus_input_dir is 'dir1'
input_file_dir_minus_input_dir = path[len(input_dir)+1:]
# create output directory that mirrors input directory
output_artifact_dir = os.path.join(output_dir, input_file_dir_minus_input_dir)
os.makedirs(output_artifact_dir, exist_ok=True)
# make a copy of the input file to the output directory
output_artifact_fullname = os.path.join(output_artifact_dir, name)
logging.info("writing output artifact=%s", output_artifact_fullname)
with open(output_artifact_fullname, "wb") as out_fh:
with open(os.path.join(path, name), "rb") as in_fh:
out_fh.write(in_fh.read())
1.2.1.2. Dockerfile¶
This is a dockerfile
that can be used to specify the docker environment for the above transform
FROM pytorch/pytorch
RUN apt update
RUN pip install infinstor
RUN pip install infinstor-mlflow-plugin
1.2.1.3. Conda environment¶
The conda environment for the above example can be created using the steps below
$ conda create --name all_objects_transform python=3.7
$ conda activate all_objects_transform
(all_objects_transform) ~ $ pip install infinstor infinstor-mlflow-plugin
1.2.2. infin_fs transforms¶
This type of transform adopts a pull model:
- This transform type is a script like execution model (the script is run from start to finish) and has no callback like execution model ( unlike
infin_transform_all_objects()
above. ) - when the transform is run, infinstor clientlib starts an mlflow run automatically
infinmount.infin_declare_input()
- The transform declares input directories (using
infinmount.infin_declare_input()
), and clientlib reads data from cloud storage, when the transform code attempts to read input data, and makes it available in the declared input directories. clientlib does not copy data from cloud storage to local storage, before running the transform, as is the case withinfin_transform_all_objects
.
- The transform declares input directories (using
infinmount.infin_log_output()
- Similarly, the transform writes its output files to a temporary output directory. And then calls
infinmount.infin_log_output()
) to automatically log the files in the output directory to the active mlflow run as mlflow artifacts.
- Similarly, the transform writes its output files to a temporary output directory. And then calls
kwargs
- represents the
key=value
arguments specified when the transform is run. These are additional user specifiedkey=value
pairs, that can be passed to the transform at the time of running. Akey=value
specified during transform run is translated to--key=value
argument, which can be accessed by the transform code usingsys.argv
- represents the
- when the transform finishes, the corresponding mlflow run is automatically marked as finished
infin_fs stands for infinstor filesystem.
1.2.2.1. example infin_fs transform¶
###########################
# infin_fs transform: copies input to output;
#############################
#!/usr/bin/env python
# coding: utf-8
import os.path
import os
from pathlib import Path
from infinstor.infinfs import infinmount
import logging
import sys
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")
def copy_from_input_dir_to_output_dir(input_dir:str, output_dir:str):
logging.info(f"copying from input_dir={input_dir} to output_dir={output_dir}")
# copy files from input directory to output directory
for path, subdirs, files in os.walk(input_dir):
logging.info(f"os.walk({input_dir}): path={path}, subdirs={subdirs}, files={files}")
input_file_dir_minus_input_dir = path[len(input_dir)+1:]
output_artifact_dir = os.path.join(output_dir, input_file_dir_minus_input_dir)
os.makedirs(output_artifact_dir, exist_ok=True)
# mirror input file to output directory
# create output directory that mirrors input directory
for name in files:
# mirror each input file to the output directory: for example, given
# input_dir=/tmp/input_dir_root,
# path = /tmp/input_dir_root/dir1
# name = file1.txt
# output_dir = /tmp/output_dir_root
# the input file /tmp/input_dir_root/dir1/file1.txt is mirrored to /tmp/output_dir_root/dir1/file1.txt
input_artifact_fullname:str = os.path.join(path, name)
# make a copy of the input file to the output directory
output_artifact_fullname = os.path.join(output_artifact_dir, name)
logging.info("file %s in input directory %s: size = %d", input_artifact_fullname, path, Path(input_artifact_fullname).stat().st_size)
logging.info("copying input file=%s to output file=%s", input_artifact_fullname, output_artifact_fullname)
with open(output_artifact_fullname, "wb") as out_fh:
with open(input_artifact_fullname, "rb") as in_fh:
out_fh.write(in_fh.read())
logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
logging.info("sys.executable=%s", sys.executable)
logging.info("sys.argv=%s", sys.argv)
logging.info("os.environ=%s", os.environ)
inputdir:str = "/tmp/inputdir"
inputdir_in1:str = os.path.join(inputdir, "in1")
infinmount.infin_declare_input(inputdir_in1, name="in1")
inputdir_in2:str = os.path.join(inputdir, "in2")
infinmount.infin_declare_input(inputdir_in2, name="in2")
outputdir:str = "/tmp/outputdir"
copy_from_input_dir_to_output_dir(inputdir_in1, os.path.join(outputdir, "in1"))
copy_from_input_dir_to_output_dir(inputdir_in2, os.path.join(outputdir, "in2"))
# log all files in 'outputdir' as mlflow artifacts of the active mlflow run
infinmount.infin_log_output(outputdir)
print("infin_fs transform executed Successfully")
1.2.2.2. Dockerfile¶
FROM pytorch/pytorch
RUN apt update
RUN apt install -y libfuse-dev
RUN pip install boto3
RUN pip install mlflow
RUN pip install infinstor
RUN pip install infinstor infinstor-mlflow-plugin
1.2.2.3. conda environment¶
Note that infin_fs transforms are not supported in a Conda environment yet. They are only supported in a Docker environment
1.3. Capturing Transforms with non Infinstor Managed I/O¶
In non Infinstor Managed I/O transform
- the InfinStor client library does not read or write any files from storage - that is the responsibility of user code
- kwargs ( key=value arguments ) is supported and is used to pass run time parameters to the transform being run. See Run Transform for details on how to specify
kwargs
.- these
kwargs
are converted to command line arguments, i.e. sys.argv when the script is run - For example, while setting up the run, the user may specify two
kwargs
:model_class_name=AutoModelForQuestionAnswering
- key model_class_name with value AutoModelForQuestionAnswering
model_name=bert-large-uncased-whole-word-masking-finetuned-squad
- key model_name with value bert-large-uncased-whole-word-masking-finetuned-squad
- When this transform is run in ICE ( InfinStor Compute Engine ), the script may extract the parameters as follows:
for arg in sys.argv: if (arg.startswith('--model_class_name=')): model_class_name = arg[len('--model_class_name='):] elif (arg.startswith('--model_name=')): model_name = arg[len('--model_name='):] print('model_class_name=' + str(model_class_name) + ', model_name=' + str(model_name))
- these
1.3.1. infin non managed I/O transform example¶
###########################
# infin non managed I/O transform: copies input files to mlflow run as artifacts;
#############################
#!/usr/bin/env python
# coding: utf-8
import os.path
import os
from pathlib import Path
import logging
import sys
import mlflow
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")
def copy_from_input_dir_to_mlflow_run(input_dir:str):
logging.info(f"copying from input_dir={input_dir} to mlflow run as artifacts")
# copy files from input directory to mlflow artifacts
#
# Note that the names in the lists are just names, with no path components. To get a full path (which begins with top) to a file or directory in dirpath, do os.path.join(dirpath, name)
for path, subdirs, files in os.walk(input_dir):
logging.info(f"os.walk({input_dir}): path={path}, subdirs={subdirs}, files={files}")
# strip the prefix 'input_dir' from 'path': if input_dir is /tmp/input_dir_root and path is /tmp/input_dir_root/dir1, then input_file_dir_minus_input_dir is dir1
input_file_dir_minus_input_dir = path[len(input_dir)+1:]
# copy input file to corresponding mlflow run artifacts
for name in files:
# mirror each input file as mlflow artifact
# input_dir=/tmp/input_dir_root,
# path = /tmp/input_dir_root/dir1
# name = file1.txt
# the input file /tmp/input_dir_root/dir1/file1.txt is copied to mlflow artifact dir1/file1.txt
input_artifact_fullname:str = os.path.join(path, name)
# log the input file as an mlflow artifact
logging.info("file %s in input directory %s: size = %d", input_artifact_fullname, path, Path(input_artifact_fullname).stat().st_size)
logging.info(f"logging file={input_artifact_fullname} to mlflow artifact location={input_file_dir_minus_input_dir}")
mlflow.log_artifact(local_path=input_artifact_fullname, artifact_path=input_file_dir_minus_input_dir)
logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
logging.info("sys.executable=%s", sys.executable)
logging.info("sys.argv=%s", sys.argv)
logging.info("os.environ=%s", os.environ)
inputdir:str = "/etc/rc.d/init.d"
copy_from_input_dir_to_mlflow_run(inputdir)
print("infin_script transform executed Successfully")
1.3.2. Dockerfile¶
FROM pytorch/pytorch
RUN apt update
RUN apt install -y libfuse-dev
RUN pip install boto3
RUN pip install mlflow
RUN pip install infinstor
RUN pip install infinstor infinstor-mlflow-plugin
1.3.3. Conda environment¶
The conda environment for the above example can be created using the steps below
$ conda create --name infin_script python=3.7
$ conda activate infin_script
(infin_script) ~ $ pip install infinstor infinstor-mlflow-plugin