Apply Checkpoint

The apply checkpoint step verifies details identified in a previous step according to validation formulas, and triggers a review for failed validations.

Rules are specified in validations and connected as modules to the apply checkpoint step in a flow.

When validation fails and a review is triggered, the flow job is paused at the checkpoint until a reviewer confirms or corrects data in Flow Review and resumes the flow. You can modify this behavior by enabling straight-through processing.

As a best practice, insert a checkpoint at these points in a flow:

  • After classification but before branching, to ensure that records are routed to the correct branch for data extraction.

  • After extraction, refinement, and/or redaction, to ensure that extracted data is accurate.

  • Anywhere that data accuracy is critical. For example, add a checkpoint before inserting values into a database.

Straight-through processing

By default, the apply checkpoint step pauses all documents in the batch when any one document fails validation. This behavior ensures that all steps after the checkpoint are executed with the complete batch of documents, but it can slow processing overall.

To change this behavior, in the apply checkpoint step, set Enable Straight-Through Processing to Yes.

With straight-through processing enabled, documents that pass validation continue executing regardless of validation failures in other documents in a batch. When the validation failures are corrected and the flow is retried, the flow runs again from the checkpoint step with all documents, including any documents that originally passed validation. This behavior ensures that any reduce operations are executed with the full batch of documents, but it means that some steps might be re-executed.

If you choose to enable straight-through processing, you can account for the potential re-execution of steps in these ways:

  • Make sure any custom UDFs are built to correctly handle repeat execution. See the example below.

  • In Flow Review, use the Finalized Records filter to review only documents that aren’t subject to re-execution of steps. Documents subject to re-execution display a warning indicating that the record can’t be corrected because it’s regenerated on the next flow resume.

Handling repeat execution in UDF

With straight-through processing enabled, documents can be processed multiple times until all documents in a batch pass the checkpoint. If you don’t want documents to be sent to subsequent steps multiple times, you can use a summary file to skip over documents that have already been processed.

For example, imagine you have an apply checkpoint step followed by a reduce UDF that sends results to a downstream system. The first time the flow runs, three out of four documents pass the checkpoint and are passed to the reduce UDF. After review, the reduce UDF is re-executed with all four documents. If your downstream system requires that you send only the new document, the reduce UDF needs to be modified to handle this requirement.

Below is an example reduce UDF that filters out documents that were already processed.

import json
import logging
import os
from typing import Any, Dict, Generator, List, Text

from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder

_SUMMARY_FILENAME = 'summary.json'

def send_results(input_payloads: List[Dict],
                 root_output_folder: Text,
                 step_folder: Text,
                 clients: Any, *args: Any,
                 **kwargs: Any):
  # Maintain a file at summary_path that contains JSON map of documents
  # already processed.
  summary_path = os.path.join(root_output_folder, _SUMMARY_FILENAME)
  summary = {}
  output_result = {}

  # Check if summary file exists, if yes load summary.
  if clients.ibfile.exists(summary_path):
    summary_txt = clients.ibfile.read_file(summary_path)
    summary = json.loads(summary_txt)

  for payload in input_payloads:
    input_filepath = payload['input_filepath']

    # Skip any files that have already been processed.
    if input_filepath in summary:
      continue

    summary[input_filepath] = True

    # Loading ibmsg so we can get records from it.
    content = payload['content']
    builder, err = ParsedIBOCRBuilder.load_from_str(input_filepath, content)
    if err:
      raise Exception(err)

    # Pulls out ibmsg records only.
    for ibocr_record in builder.get_ibocr_records():
      refined_phrases, _ = ibocr_record.get_refined_phrases()
      for phrase in refined_phrases:
        name = phrase.get_column_name()
        value = phrase.get_column_value()
        # TODO: populate output result

  # TODO: send output result to downstream system

  # Write the summary file
  clients.ibfile.write_file(summary_path, json.dumps(summary_dict))
  return

def register(name_to_fn):
  name_to_fn.update({
    'send_results': {
      'fn': send_results
    }
  })