Skip to content

InfinStor Transforms

Transform Types

This is a list of transform types supported by InfinStor

infin_transform_one_object

  • In this method of transformation, this transform function is invoked once for each object from the InfinSlice or InfinSnap.
  • This function is called as many times as there are objects in the InfinSlice or InfinSnap. A directory is read, the statics and globals for the transform are run, then the function infin_transform_one_object is called for each object in that directory. Then, the next directory is processed
  • The following is an example transform that gets called for each object with the bucketname, parentdir, filename and object_bytes as parameters. In this type of transform, the globals and static code is executed once for each directory. The function infin_transform_one_object is called once for each object in that directory. In the following example, we generate a unique id for each directory, and use that per directory id as a prefix for each file in that directory.
import os
from uuid import uuid4
import shutil

unique_id_for_dir = str(uuid4())


# This transform is called for each file in the chosen data
def infin_transform_one_object(filename, temp_output_dir, **kwargs):
    print('infin_transform_one_object: Entered. filename=' + filename\
        + ', temp_output_dir=' + temp_output_dir)
    new_filename = unique_id_for_dir + '-' + os.path.basename(filename)
    shutil.copyfile(filename, os.path.join(temp_output_dir, new_filename))    

Here is another example of a transform that gets called for each object in the InfinSnap or InfinSlice

import io
from bitcoin.core import *
import pandas as pd
from pandas import DataFrame

def calculate_transaction_value(rawbytes):
    f = io.BytesIO(rawbytes)
    tx = CTransaction.stream_deserialize(f)
    vout = tx.vout
    totvalue = 0.0
    for onevout in vout:
        if (onevout.nValue > 0):
            totvalue += onevout.nValue;
    totvalue /= 100000000.0
    return int(totvalue)

# kwargs can be passed in from the invocation of this transform
# inline, in a singlevm or EMR
def infin_transform_one_object(filename, temp_output_dir, **kwargs):
    print('infin_transform_one_object: Entered. filename=' + filename\
        + ', temp_output_dir=' + temp_output_dir)
    xval = calculate_transaction_value(open(filename, 'rb').read())
    print(str(xval))

infin_transform_dir_by_dir

In this type of transform, all the files in a directory in the InfinSnap/InfinSlice are downloaded into a temporary directory and the transform named infin_transform_dir_by_dir is called as in the following example

In this example, the transform simply uses shutil.copy to copy the files from the input directory to the output directory. When this transform returns, the InfinStor SDK will log all of the files in the output_dir as mlflow artifacts

import os
import shutil

# This transform is called for each directory in the chosen data
def infin_transform_dir_by_dir(input_dir, output_dir, **kwargs):
    print('input_dir=' + input_dir + ', output_dir=' + output_dir)
    for onefile in os.listdir(input_dir):
        if (os.path.isfile(os.path.join(input_dir, onefile))):
            shutil.copy(os.path.join(input_dir, onefile), os.path.join(output_dir, onefile))

infin_transform_raw_to_pd

  • The input to this function is a Pandas DataFrame with key being the filename and a single column containing the raw bytes from the file. This function must transform the input Pandas DataFrame in whatever way (perhaps adding feature columns) and return the transformed DataFrame

    • YYYY-MM-DD HH:MM:SS bucketname/path/in/bucket
      • Example: 2020-06-23 07:00:18 btc-rawtransactions/rawtransactions/2020-06-23/07/007c70aa475d773b92450afc61fb07f53d5b3665c3067a16c714336b95c1f3a5
    • Single Column in DataFrame: Raw bytes
  • Note that this function is called once after all the objects specified in the InfinSlice or InfinSnap have been read

  • The following is an example transform that takes in a Pandas DataFrame with index being the filename and one column containing file bytes. It then transforms the input DataFrame by adding a new column (the feature column) called TransactionValue
  • You can add kwargs to the invocation by editing the jupyterlab code cell that is generated when you want to use this transform. Note that the kwargs must pass in string values only
import io
from bitcoin.core import *
import pandas as pd
from pandas import DataFrame

def calculate_transaction_value(rawbytes):
    f = io.BytesIO(rawbytes)
    tx = CTransaction.stream_deserialize(f)
    vout = tx.vout
    totvalue = 0.0
    for onevout in vout:
        if (onevout.nValue > 0):
            totvalue += onevout.nValue;
    totvalue /= 100000000.0
    return int(totvalue)

def infin_transform_raw_to_pd(objects, **kwargs):
    for key, value in kwargs.items():
        print("infin_transform: kwargs: " + key + " = " + value)
    objects["TransactionValue"] = objects.apply(lambda row: calculate_transaction_value(row["RawBytes"]), axis=1)

infin_transform_raw_to_ds

  • The input to this function is a Pandas DataFrame with key being the filename and a single column containing the raw bytes from the file. This function must transform the input Pandas DataFrame in whatever way (perhaps adding feature columns), convert the pandas DataFrame to a tf.data.DataSet and return the tf.data.DataSet
  • Note that this function is called once after all the objects specified in the InfinSlice or InfinSnap have been read

infin_transform_csv_to_pd

  • The input to this function is a Pandas DataFrame with key being the filename and the columns contain the contents of the csv file. This function must transform the input Pandas DataFrame in whatever way (perhaps adding feature columns) and return the transformed DataFrame
  • Note that this function is called once after all the objects specified in the InfinSlice or InfinSnap have been read

infin_transform_csv_to_ds

  • The input to this function is a Pandas DataFrame with key being the filename and the columns contain the contents of the csv file. This function must transform the input Pandas DataFrame in whatever way (perhaps adding feature columns), convert the pandas DataFrame to a tf.data.DataSet and return the tf.data.DataSet
  • Note that this function is called once after all the objects specified in the InfinSlice or InfinSnap have been read