UDFs in Flow V3

Flow allows users to create their own custom functions, called User-Defined Functions (UDFs) to implement custom functionality. There are four main types of UDFs in Flow: Map UDFs, Reduce UDFs, Pre-Flow UDFs, and Post-Flow UDFs.

Map UDF

Use a Map UDF to process each input file or record in a flow in parallel. Map UDF should be used when your processing logic operates on a single document.

Input variables

Name Type Description Values
INPUT_RECORD dictionary Dictionary containing information about the input file that is being processed by the UDF. Contains the following keys:
input_filepath: Path to the input file
content: Contents of the input file, as a string
output_filename: Name to use for the output file generated by the UDF.
ROOT_OUTPUT_FOLDER string Root output folder for the flow.
STEP_FOLDER string Output folder for the current step in the flow.
CONFIG dictionary Dictionary containing the runtime configuration settings for the flow. Dictionary containing key-value pairs representing the configuration settings.
CLIENTS object UDF Client object containing different client objects that can be used by the UDF to interact with other systems. Object containing client objects such as the File System (IBFILE) or a conversion client.
TOKEN_FRAMEWORK_REGISTRY object Object that provides access to default and custom Token Matchers and Tokenizers. Object that provides access to Token Matchers and Tokenizers, which can be used to perform text tokenization and matching operations within the UDF.
JOB_ID string ID for the job reported by the initial execution request.
INPUT_FOLDER string Input folder for the flow job, mentioned during the execution request.

Output variables

Name Type Description Values
out_files array Array of output file dictionaries. Array of dictionaries, each containing the following keys:
filename: Name of the output file, as a string
content: Contents of the output file, as bytes.

Each output file dictionary in the out_files array represents one output file generated by the UDF. The UDF must return this dictionary as its return value for the output files to be passed to the next step in the flow.

Example

The following Map UDF renames the input file.

from instabase.udf_utils.clients.udf_helpers import get_output_ibmsg

def map_udf_func(input_record, step_folder, *args, **kwargs):
  input_filepath = input_record['input_filepath']
  file_content = input_record['content']
  output_filename = input_record['output_filename']

  # Copy output ibmsg from input
  output_ibmsg, err = get_output_ibmsg(input_filepath, step_folder,
                                       file_content)

  return {
    "out_files": [
      {
        "filename": f"map-udf-{output_filename}",
        "content": output_ibmsg
      }
    ]
  }

def register(name_to_fn):
  name_to_fn.update({
    'map_udf': {
      'fn': map_udf_func
    }
  })

Call this UDF in the map UDF step by using the formula map_udf(INPUT_RECORD, STEP_FOLDER).

Info

All input variables are also accessible via an object passed in as the _FN_CONTEXT_KEY keyword argument. See the code example below:

fn_context = kwargs.get('_FN_CONTEXT_KEY')
input_record, err = fn_context.get_by_col_name('INPUT_RECORD')
file_content = input_record['content']

Reduce UDF

Reduce UDFs are used to combine the results of multiple output files from previous steps in a flow, and optionally apply additional logic to modify their values.

Input variables

Name Type Description Values
INPUT_RECORDS generator Python generator that is used in a for loop to provide an input_record in each loop iteration. Generator that yields dictionaries containing information about the input files that are being processed by the UDF. Each dictionary contains the following keys:
input_filepath: Path to the input file
content: Contents of the input file, as a string
output_filename: Name to use for the output file generated by the UDF.
ROOT_OUTPUT_FOLDER string Root output folder for the flow.
STEP_FOLDER string Output folder for the current step in the flow.
CONFIG dictionary Dictionary containing the runtime configuration settings for the flow. Dictionary containing key-value pairs representing the configuration settings.
CLIENTS object UDF Client object containing different client objects that can be used by the UDF to interact with other systems. Object containing client objects such as the Flow File System (IBFILE) or a conversion client.
TOKEN_FRAMEWORK_REGISTRY object Object that provides access to default and custom Token Matchers and Tokenizers. Object that provides access to Token Matchers and Tokenizers, which can be used to perform text tokenization and matching operations within the UDF.
JOB_ID string ID for the job reported by the initial execution request.
INPUT_FOLDER string Input folder for the flow job, mentioned during the execution request.

Example

Generate a summary of the files processed and write it to a file named summary.json.

import json
import logging
import os
from typing import Any, Dict, Generator, List, Text

from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder, RefinedPhrase
from instabase.udf_utils.clients.udf_helpers import get_output_ibmsg

_SUMMARY_FILENAME = 'summary.json'

def get_by_col_name(kwargs, col) -> Any:
  val, err = kwargs['_FN_CONTEXT_KEY'].get_by_col_name(col)
  if err:
    raise Exception(f'{col} is not available in UDF context')
  return val

def generate_summary(*args: Any, **kwargs: Any) -> Generator[Dict, None, None]:
  logging.info('Started running Reduce UDF function')

  job_id = get_by_col_name(kwargs, 'JOB_ID')
  input_records = get_by_col_name(kwargs, 'INPUT_RECORDS')
  root_output_folder = get_by_col_name(kwargs, 'ROOT_OUTPUT_FOLDER')
  step_folder = get_by_col_name(kwargs, 'STEP_FOLDER')
  clients = get_by_col_name(kwargs, 'CLIENTS')
  summary_dict = {}
  
  for payload in input_records:
    input_filepath = payload['input_filepath']
    output_filename = payload['output_filename']
    content = payload['content']

    # Loading ibmsg so we can get records from it.
    builder, err = ParsedIBOCRBuilder.load_from_str(input_filepath, content)
    if err:
      raise Exception(err)

    # Pulls out ibmsg records only.
    for ibocr_record in builder.get_ibocr_records():
      raw_input_filepath = ibocr_record.get_document_path()
      if raw_input_filepath not in summary_dict:
        summary_dict[raw_input_filepath] = [] 
      result = {}
      if ibocr_record.has_class_label(): 
        result['class_label'] = ibocr_record.get_class_label()
        if ibocr_record.has_classify_page_range():
          result['page_range'] = ibocr_record.get_classify_page_range()
        else:
          result['page_range'] = {}
          result['page_range']['start_page'] = ibocr_record.get_page_numbers()[0]+1
          result['page_range']['end_page'] = ibocr_record.get_page_numbers()[-1]+1

      result['extracted_fields'] = {}
      refined_phrases, _ = ibocr_record.get_refined_phrases()
      for phrase in refined_phrases:
        name = phrase.get_column_name()
        value = phrase.get_column_value()
        result['extracted_fields'][name] = value
      summary_dict[raw_input_filepath].append(result)

    # Copy output ibmsg from input.
    output_ibmsg, err = get_output_ibmsg(input_filepath, step_folder, content)
    if err:
      raise Exception(err)

    output_ibmsg_dict = {
        'out_files': [{
            'filename': output_filename,
            'content': output_ibmsg
        }]
    }
    yield output_ibmsg_dict

  out_path = os.path.join(root_output_folder, _SUMMARY_FILENAME)
  clients.ibfile.write_file(out_path, json.dumps(summary_dict))

  return

def register(name_to_fn):
  name_to_fn.update({
    'generate_summary': {
      'fn': generate_summary
    }
  })

Call this UDF in the reduce UDF step by using the formula generate_summary(INPUT_RECORDS, ROOT_OUTPUT_FOLDER, STEP_FOLDER, CLIENTS).

Pre-flow UDF

A pre-flow UDF is a hook that runs at the start of the flow before any of the steps have started execution. Pre-flow UDFs can be used to perform any necessary setup such as copying files into the input folder from another directory. You can add a pre-flow UDF in the Flow editor by selecting Events > Pre-Flow UDF.

Input variables

Name Type Description Values
ROOT_OUTPUT_FOLDER string Root output folder for the flow.
CONFIG dictionary Dictionary containing the runtime configuration settings for the flow. Dictionary containing key-value pairs representing the configuration settings.
CLIENTS object UDF Client object containing different client objects that can be used by the UDF to interact with other systems. Object containing client objects such as the Flow File System (IBFILE) or a conversion client.
JOB_ID string ID for the job reported by the initial execution request.
INPUT_FOLDER string Input folder for the flow job, mentioned during the execution request.

Example

The following UDF writes a summary file containing both the job ID and flow start timestamp.

from typing import Any, Text
import json
import time

def flow_info(clients: Any, root_output_folder:Text, job_id: Text, **kwargs):
  flow_info = {
      'Job ID': job_id,
      'Start timestamp': time.time(),
  }
  out_path = root_output_folder + '/flowinfo.json' 
  clients.ibfile.write_file(out_path, json.dumps(flow_info))
  return

def register(name_to_fn):
  name_to_fn.update({
    'flow_info': {
      'fn': flow_info
    }
  })

Call this UDF in the pre-flow UDF hook by using the formula flow_info(CLIENTS, ROOT_OUTPUT_FOLDER, JOB_ID).

Post-flow UDF

A post-flow UDF is a hook that runs after the flow completes execution. Post-flow UDFs can be used to perform any post-processing tasks such as sending results to a downstream system or doing folder cleanup. You can add a post-flow UDF in the Flow editor by selecting Events > Post-Flow UDF.

Input variables

Name Type Description Values
ROOT_OUTPUT_FOLDER string Root output folder for the flow.
CONFIG dictionary Dictionary containing the runtime configuration settings for the flow. Dictionary containing key-value pairs representing the configuration settings.
CLIENTS object UDF Client object containing different client objects that can be used by the UDF to interact with other systems. Object containing client objects such as the Flow File System (IBFILE) or a conversion client.
JOB_ID string ID for the job reported by the initial execution request.
INPUT_FOLDER string Input folder for the flow job, mentioned during the execution request.

Example

In a post-flow UDF, you can consume results by reading the batch.ibflowresults file or using the API, and sending it to the downstream system. The following is an example you can use as a starting point to implement this integration.

import logging
import os
import json

from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder

def write_summary(**kwargs):
  summary_dict = {}

  fn_context = kwargs.get('_FN_CONTEXT_KEY')
  clients, _ = fn_context.get_by_col_name('CLIENTS')
  root_out_folder, _ = fn_context.get_by_col_name('ROOT_OUTPUT_FOLDER')

  res_path = os.path.join(root_out_folder, 'batch.ibflowresults')
  output, err = clients.ibfile.read_file(res_path)
  if err:
    return None, err
  
  results = json.loads(output)
  if results['can_resume']:
    # can_resume = True implies flow is stopped at checkpoint.
    # If true, skip writing summary.
    return None, None

  for result_id in results['results']:
    result = results['results'][result_id]
    for record in result['records']:
      if record['status'] == 'OK':
        ibocr_path = record['ibocr_full_path']
        ibocr, err = clients.ibfile.read_file(ibocr_path)
        if err:
          return None, f'Failed to fetch ibocr path={ibocr_path} err={err}'

        builder, err = ParsedIBOCRBuilder.load_from_str(ibocr_path, ibocr)
        if err:
          return None, f'Failed to parse ibocr path={ibocr_path} err={err}'

        # Iterate over the IBOCR records.
        for ibocr_record in builder.get_ibocr_records():
          raw_input_filepath = ibocr_record.get_document_path()
          if raw_input_filepath not in summary_dict:
            summary_dict[raw_input_filepath] = [] 
          result = {}
          if ibocr_record.has_class_label(): 
            result['class_label'] = ibocr_record.get_class_label()
            if ibocr_record.has_classify_page_range():
              result['page_range'] = ibocr_record.get_classify_page_range()
            else:
              result['page_range'] = {}
              result['page_range']['start_page'] = ibocr_record.get_page_numbers()[0]+1
              result['page_range']['end_page'] = ibocr_record.get_page_numbers()[-1]+1

          result['extracted_fields'] = {}
          refined_phrases, _ = ibocr_record.get_refined_phrases()
          for phrase in refined_phrases:
            name = phrase.get_column_name()
            value = phrase.get_column_value()
            result['extracted_fields'][name] = value
          summary_dict[raw_input_filepath].append(result)

  _, err = clients.ibfile.write_file(os.path.join(root_out_folder, 'summary.json'), json.dumps(summary_dict, indent=4)) 
  if err:
    return None, f'Failed to write summary file err={err}'  
  return summary_dict, None


def send_results(**kwargs):
  summary, err = write_summary(**kwargs)
  if err:
    logging.error(err)
    return

  if not summary:
    return

  # TODO: Send result to downstream system
  pass

def register(name_to_fn):
  name_to_fn.update({
    'send_results': {
      'fn': send_results
    }
  })

Call this UDF in the post-flow UDF hook by using the formula send_results().

The example implements two functions: send_results and write_summary. A post-flow UDF is called every time a flow stops. This means the post-flow UDF is called when a flow completes and also when a checkpoint fails and the flow is stopped. The write_summary function reads the flow results file and checks if the flow has completed by checking the can_resume flag. If the flow has been completed, it proceeds to read the IBOCR records, construct a summary containing the extracted results and returns it. You should complete the send_results function to send the returned summary to the downstream system.