Skip to content

1. Developing Transforms

1.1. Transform Types

There are two types of Transforms

1.2. Capturing Transforms with InfinStor Managed I/O

In InfinStor Managed I/O transform types, the infinStor client library or clientlib ( the infinstor pip package) manages input and output for the transform. The clientlib

  • reads data from the Cloud Object Store,
  • copies it into a local directory in the execution environment (VM or container),
  • and passes the data it into user code in one of the following ways:
    • infin_transform_all_objects callback
    • infin_fs transforms (infin_fs stands for infinstor filesystem)

1.2.1. infin_transform_all_objects transforms

This type of transform adopts a push model: clientlib pushes data from cloud storage into the transform by copying data from cloud storage to local storage:

  • when the transform is run, infinstor clientlib starts an mlflow run automatically
  • input_dir parameter
    • all the objects in the chosen cloud storage, InfinSnap or InfinSlice are downloaded into input_dir temporary directory
    • Note that if the objects were present in subdirectories under input_dir, that subdirectory hierarchy is preserved when the files are downloaded from the cloud object store into the local temporary directory
  • the callback infin_transform_all_objects() in the transform is invoked.
  • output_dir parameter
    • The transform code can write any output, created during its run, to output_dir.
    • when the transform finishes, infinstor clientlib logs the files in output_dir as artifacts of the mlflow run, using mlflow's log_artifact() API.
    • The directory structure under output_dir is preserved when the artifacts are logged to the mlflow run (using log_artifact() )
  • kwargs parameter
    • represents the key=value arguments specified when the transform is run. These are additional user specified key=value pairs, that can be passed to the transform at the time of running the transform.
  • when the transform finishes, the corresponding mlflow run is automatically marked as finished

The following is the default template all-objects-template.

# This transform is called with all objects downloaded to input_dir
#
# Note that dir hierarchy in source is preserved inside input_dir
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
    print('input_dir=' + input_dir + ', output_dir=' + output_dir)

1.2.1.1. infin_transform_all_objects example

Here is a practical example of the infin_transform_all_objects transform. In this example,

  • the transform simply copies its input directory to its output directory, preserving the input directory hierarchy.
  • This output can be used as an input to subsequent transforms
###########################
#  infin_transform_all_objects transform: copies input to output; 
#############################

import os
import sys
import logging
from pathlib import Path
import mlflow
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")

# This transform is called with all objects downloaded to input_dir
# Note that dir hierarchy in source is preserved inside input_dir
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
    logging.info("infin_transform_all_objects(input_dir=" + input_dir + ", output_dir=" + output_dir + ", kwargs=%s", kwargs)

    logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
    logging.info("sys.executable=%s", sys.executable)
    logging.info("sys.argv=%s", sys.argv)
    logging.info("os.environ=%s", os.environ)

    # copy files from input directory to output directory
    for path, subdirs, files in os.walk(input_dir):
      for name in files:
        # mirror each input file to the output directory:  for example, given 
        #     input_dir=/tmp/input_dir_root, 
        #     path = /tmp/input_dir_root/dir1
        #     name = file1.txt
        #     output_dir = /tmp/output_dir_root
        # the input file /tmp/input_dir_root/dir1/file1.txt is mirrored to /tmp/output_dir_root/dir1/file1.txt
        logging.info("file                    %s in directory %s. size = %d", os.path.join(path, name), path, Path(os.path.join(path, name)).stat().st_size)

        # if input_dir is /tmp/input_dir_root  and path is /tmp/input_dir_root/dir1, then input_file_dir_minus_input_dir is 'dir1'
        input_file_dir_minus_input_dir = path[len(input_dir)+1:]
        # create output directory that mirrors input directory
        output_artifact_dir = os.path.join(output_dir, input_file_dir_minus_input_dir)
        os.makedirs(output_artifact_dir, exist_ok=True)

        # make a copy of the input file to the output directory
        output_artifact_fullname = os.path.join(output_artifact_dir, name)
        logging.info("writing output artifact=%s", output_artifact_fullname)
        with open(output_artifact_fullname, "wb") as out_fh:
            with open(os.path.join(path, name), "rb") as in_fh:
                out_fh.write(in_fh.read())

1.2.1.2. Dockerfile

This is a dockerfile that can be used to specify the docker environment for the above transform

FROM pytorch/pytorch

RUN apt update
RUN pip install infinstor
RUN pip install infinstor-mlflow-plugin

1.2.1.3. Conda environment

The conda environment for the above example can be created using the steps below

$ conda create --name all_objects_transform python=3.7
$ conda activate all_objects_transform
(all_objects_transform) ~ $ pip install infinstor infinstor-mlflow-plugin

1.2.2. infin_fs transforms

This type of transform adopts a pull model:

  • This transform type is a script like execution model (the script is run from start to finish) and has no callback like execution model ( unlike infin_transform_all_objects() above. )
  • when the transform is run, infinstor clientlib starts an mlflow run automatically
  • infinmount.infin_declare_input()
    • The transform declares input directories (using infinmount.infin_declare_input() ), and clientlib reads data from cloud storage, when the transform code attempts to read input data, and makes it available in the declared input directories. clientlib does not copy data from cloud storage to local storage, before running the transform, as is the case with infin_transform_all_objects.
  • infinmount.infin_log_output()
    • Similarly, the transform writes its output files to a temporary output directory. And then calls infinmount.infin_log_output() ) to automatically log the files in the output directory to the active mlflow run as mlflow artifacts.
  • kwargs
    • represents the key=value arguments specified when the transform is run. These are additional user specified key=value pairs, that can be passed to the transform at the time of running. A key=value specified during transform run is translated to --key=value argument, which can be accessed by the transform code using sys.argv
  • when the transform finishes, the corresponding mlflow run is automatically marked as finished

infin_fs stands for infinstor filesystem.

1.2.2.1. example infin_fs transform

###########################
#  infin_fs transform: copies input to output; 
#############################

#!/usr/bin/env python
# coding: utf-8

import os.path
import os
from pathlib import Path
from infinstor.infinfs import infinmount
import logging
import sys

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")

def copy_from_input_dir_to_output_dir(input_dir:str, output_dir:str):
    logging.info(f"copying from input_dir={input_dir} to output_dir={output_dir}")

    # copy files from input directory to output directory
    for path, subdirs, files in os.walk(input_dir):
      logging.info(f"os.walk({input_dir}): path={path}, subdirs={subdirs}, files={files}")

      input_file_dir_minus_input_dir = path[len(input_dir)+1:]
      output_artifact_dir = os.path.join(output_dir, input_file_dir_minus_input_dir)
      os.makedirs(output_artifact_dir, exist_ok=True)

      # mirror input file to output directory
      # create output directory that mirrors input directory
      for name in files:
        # mirror each input file to the output directory:  for example, given 
        #     input_dir=/tmp/input_dir_root, 
        #     path = /tmp/input_dir_root/dir1
        #     name = file1.txt
        #     output_dir = /tmp/output_dir_root
        # the input file /tmp/input_dir_root/dir1/file1.txt is mirrored to /tmp/output_dir_root/dir1/file1.txt
        input_artifact_fullname:str = os.path.join(path, name)

        # make a copy of the input file to the output directory
        output_artifact_fullname = os.path.join(output_artifact_dir, name)
        logging.info("file %s in input directory %s: size = %d", input_artifact_fullname, path, Path(input_artifact_fullname).stat().st_size)
        logging.info("copying input file=%s to output file=%s", input_artifact_fullname, output_artifact_fullname)
        with open(output_artifact_fullname, "wb") as out_fh:
            with open(input_artifact_fullname, "rb") as in_fh:
                out_fh.write(in_fh.read())

logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
logging.info("sys.executable=%s", sys.executable)
logging.info("sys.argv=%s", sys.argv)
logging.info("os.environ=%s", os.environ)

inputdir:str = "/tmp/inputdir"
inputdir_in1:str = os.path.join(inputdir, "in1")
infinmount.infin_declare_input(inputdir_in1, name="in1")

inputdir_in2:str = os.path.join(inputdir, "in2")
infinmount.infin_declare_input(inputdir_in2, name="in2")

outputdir:str = "/tmp/outputdir"
copy_from_input_dir_to_output_dir(inputdir_in1, os.path.join(outputdir, "in1"))
copy_from_input_dir_to_output_dir(inputdir_in2, os.path.join(outputdir, "in2"))

# log all files in 'outputdir' as mlflow artifacts of the active mlflow run
infinmount.infin_log_output(outputdir)

print("infin_fs transform executed Successfully")

1.2.2.2. Dockerfile

FROM pytorch/pytorch

RUN apt update 
RUN apt install -y libfuse-dev

RUN pip install boto3
RUN pip install mlflow
RUN pip install infinstor
RUN pip install infinstor infinstor-mlflow-plugin

1.2.2.3. conda environment

Note that infin_fs transforms are not supported in a Conda environment yet. They are only supported in a Docker environment

1.3. Capturing Transforms with non Infinstor Managed I/O

In non Infinstor Managed I/O transform

  • the InfinStor client library does not read or write any files from storage - that is the responsibility of user code
  • kwargs ( key=value arguments ) is supported and is used to pass run time parameters to the transform being run. See Run Transform for details on how to specify kwargs.
    • these kwargs are converted to command line arguments, i.e. sys.argv when the script is run
    • For example, while setting up the run, the user may specify two kwargs:
      • model_class_name=AutoModelForQuestionAnswering
        • key model_class_name with value AutoModelForQuestionAnswering
      • model_name=bert-large-uncased-whole-word-masking-finetuned-squad
        • key model_name with value bert-large-uncased-whole-word-masking-finetuned-squad
    • When this transform is run in ICE ( InfinStor Compute Engine ), the script may extract the parameters as follows:
      for arg in sys.argv:
          if (arg.startswith('--model_class_name=')):
              model_class_name = arg[len('--model_class_name='):]
          elif (arg.startswith('--model_name=')):
              model_name = arg[len('--model_name='):]
      
      print('model_class_name=' + str(model_class_name) + ', model_name=' + str(model_name))
      

1.3.1. infin non managed I/O transform example

###########################
#  infin non managed I/O transform: copies input files to mlflow run as artifacts; 
#############################

#!/usr/bin/env python
# coding: utf-8

import os.path
import os
from pathlib import Path
import logging
import sys
import mlflow

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s")

def copy_from_input_dir_to_mlflow_run(input_dir:str):
    logging.info(f"copying from input_dir={input_dir} to mlflow run as artifacts")

    # copy files from input directory to mlflow artifacts
    #
    # Note that the names in the lists are just names, with no path components. To get a full path (which begins with top) to a file or directory in dirpath, do os.path.join(dirpath, name)
    for path, subdirs, files in os.walk(input_dir):
      logging.info(f"os.walk({input_dir}): path={path}, subdirs={subdirs}, files={files}")

      # strip the prefix 'input_dir' from 'path': if input_dir is /tmp/input_dir_root  and path is /tmp/input_dir_root/dir1, then input_file_dir_minus_input_dir is dir1
      input_file_dir_minus_input_dir = path[len(input_dir)+1:]

      # copy input file to corresponding mlflow run artifacts
      for name in files:
        # mirror each input file as mlflow artifact
        #     input_dir=/tmp/input_dir_root, 
        #     path = /tmp/input_dir_root/dir1
        #     name = file1.txt
        # the input file /tmp/input_dir_root/dir1/file1.txt is copied to mlflow artifact dir1/file1.txt
        input_artifact_fullname:str = os.path.join(path, name)

        # log the input file as an mlflow artifact
        logging.info("file %s in input directory %s: size = %d", input_artifact_fullname, path, Path(input_artifact_fullname).stat().st_size)
        logging.info(f"logging file={input_artifact_fullname} to mlflow artifact location={input_file_dir_minus_input_dir}")
        mlflow.log_artifact(local_path=input_artifact_fullname, artifact_path=input_file_dir_minus_input_dir)

logging.info("MLFLOW_TRACKING_URI=%s", os.environ.get("MLFLOW_TRACKING_URI"))
logging.info("sys.executable=%s", sys.executable)
logging.info("sys.argv=%s", sys.argv)
logging.info("os.environ=%s", os.environ)

inputdir:str = "/etc/rc.d/init.d"
copy_from_input_dir_to_mlflow_run(inputdir)

print("infin_script transform executed Successfully")

1.3.2. Dockerfile

FROM pytorch/pytorch

RUN apt update 
RUN apt install -y libfuse-dev

RUN pip install boto3
RUN pip install mlflow
RUN pip install infinstor
RUN pip install infinstor infinstor-mlflow-plugin

1.3.3. Conda environment

The conda environment for the above example can be created using the steps below

$ conda create --name infin_script python=3.7
$ conda activate infin_script
(infin_script) ~ $ pip install infinstor infinstor-mlflow-plugin