Skip to content

Transforms with InfinStor Managed I/O

Types

This is a list of transform types wherein the InfinStor client library, i.e. the infinstor pip package, manages input for the transform. The clientlib reads data from the Cloud Object Store, copies it into a local directory in the execution environment (VM or container), and passes it into user code in one of the following ways:

  • infin_transform_all_objects
  • infin_transform_one_object
  • infin_transform_dir_by_dir
  • infin_transform_csv_to_pd
  • infin_transform_raw_to_pd

infin_transform_all_objects

In this type of transform, all the objects in the chosen storage InfinSnap/InfinSlice are downloaded into a temporary directory and the transform named infin_transform_all_objects is called as in the following example

The following is the default template all-objects-template. As you can see, the input_dir param is a directory where all the objects in that InfinSlice/InfinSnap have been downloaded to. Note that if the objects were present in subdirectories in the InfinSnap/InfinSlice, that subdirectory structure is preserved when the files are downloaded from the cloud object store into the local temporary directory where the transform is executed

# This transform is called with all objects downloaded to input_dir
# Note that dir hierarchy in source is preserved inside input_dir
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
    print('input_dir=' + input_dir + ', output_dir=' + output_dir)

Here is a practical example of the 'all objects' transform. In this example, all the objects are read into a single pandas dataframe, and then picked and saved as output. This output can be used as an input to subsequent transforms

import os
import pandas as pd

def list_dir_recursively(root, prefix, array_of_files):
    for file in os.listdir(root):
        if (os.path.isfile(os.path.join(root, file))):
            print(prefix + file)
            if (not file.startswith('USA_all')):
                array_of_files.append(os.path.join(root, file))
        else:
            print(prefix + file + '/')
            list_dir_recursively(os.path.join(root, file), prefix + '    ', array_of_files)

# This transform is called for each directory in the chosen data
def infin_transform_all_objects(input_dir, output_dir, **kwargs):
    print('input_dir=' + input_dir + ', output_dir=' + output_dir)
    array_of_files = []
    list_dir_recursively(input_dir, '', array_of_files)
    df = pd.concat(map(pd.read_csv, array_of_files))
    df.to_pickle(os.path.join(output_dir, 'all_articles.pkl'))

infin_transform_one_object

  • In this method of transformation, this transform function is invoked once for each object from the InfinSlice or InfinSnap.
  • This function is called as many times as there are objects in the InfinSlice or InfinSnap. A directory is read, the statics and globals for the transform are run, then the function infin_transform_one_object is called for each object in that directory. Then, the next directory is processed
  • The following is an example transform that gets called for each object with the bucketname, parentdir, filename and object_bytes as parameters. In this type of transform, the globals and static code is executed once for each directory. The function infin_transform_one_object is called once for each object in that directory. In the following example, we generate a unique id for each directory, and use that per directory id as a prefix for each file in that directory.
import os
from uuid import uuid4
import shutil

unique_id_for_dir = str(uuid4())


# This transform is called for each file in the chosen data
def infin_transform_one_object(filename, temp_output_dir, parentdir, **kwargs):
    print('infin_transform_one_object: Entered. filename=' + filename\
        + ', temp_output_dir=' + temp_output_dir + ', parentdir=' + parentdir)
    new_filename = unique_id_for_dir + '-' + os.path.basename(filename)
    shutil.copyfile(filename, os.path.join(temp_output_dir, new_filename))    

Here is another example of a transform that gets called for each object in the InfinSnap or InfinSlice

import io
from bitcoin.core import *
import pandas as pd
from pandas import DataFrame

def calculate_transaction_value(rawbytes):
    f = io.BytesIO(rawbytes)
    tx = CTransaction.stream_deserialize(f)
    vout = tx.vout
    totvalue = 0.0
    for onevout in vout:
        if (onevout.nValue > 0):
            totvalue += onevout.nValue;
    totvalue /= 100000000.0
    return int(totvalue)

# kwargs can be passed in from the invocation of this transform
# inline, in a singlevm or EMR
def infin_transform_one_object(filename, temp_output_dir, parentdir, **kwargs):
    print('infin_transform_one_object: Entered. filename=' + filename\
        + ', temp_output_dir=' + temp_output_dir)
    xval = calculate_transaction_value(open(filename, 'rb').read())
    print(str(xval))

infin_transform_dir_by_dir

In this type of transform, all the files in a directory in the InfinSnap/InfinSlice are downloaded into a temporary directory and the transform named infin_transform_dir_by_dir is called as in the following example

In this example, the transform simply uses shutil.copy to copy the files from the input directory to the output directory. When this transform returns, the InfinStor SDK will log all of the files in the output_dir as mlflow artifacts

import os
import shutil

# This transform is called for each directory in the chosen data
def infin_transform_dir_by_dir(input_dir, output_dir, parentdir, **kwargs):
    print('input_dir=' + input_dir + ', output_dir=' + output_dir + ', parentdir=' + parentdir)
    for onefile in os.listdir(input_dir):
        if (os.path.isfile(os.path.join(input_dir, onefile))):
            shutil.copy(os.path.join(input_dir, onefile), os.path.join(output_dir, onefile))

infin_transform_csv_to_pd

In this type of transform, all files are read as csv files into a pandas dataframe