Integrating flows

After you’ve created and compiled a flow, integrate it into your existing business processes to achieve maximum value.

Flow integration occurs in two stages:

  • Ingesting documents into a flow, referred to as upstream integration.

  • Sending results at the end of a flow, referred to as downstream integration.

For example, in a mortgage processing flow, users upload documents to a mortgage lending application like Blend. Blend sends the documents to Instabase for processing (upstream integration). Then, Instabase sends processed results to a loan processing application like Narrative (downstream integration).

Upstream integration

Instabase offers several methods for integrating a flow with an upstream system. The method you use depends on your integration requirements, the capabilities of the system you’re integrating, and other factors.

Flow APIs

If your upstream system is programmable, you can program it to copy files to a drive on Instabase and then trigger a flow run using Flow Binary APIs.

You can implement this task in any programming language. Here’s an example in Python:

api_endpoint_url = URL_BASE + '/api/v1/flow/run_binary_async'

args = {
  'input_dir': 'jaydoe/my_repo/fs/Instabase Drive/flow_proj/data/input',
  'binary_path': 'jaydoe/my_repo/fs/Instabase Drive/flow_proj/build/bin/process.ibflowbin'
}
json_data = json.dumps(args)

headers = {
  'Authorization': 'Bearer {0}'.format(token)
}

r = requests.post(api_endpoint_url, data=json_data, headers=headers)
resp_data = json.loads(r.content)
Tip

If you’re integrating the same system both upstream and downstream of a flow, you can use the job ID returned from the API call to query first the Job Status API for job completion, then use the Flow Results API to fetch results.

For small flow jobs without validation checkpoints, you can use the Flow Binary API in sync mode to synchronously run a flow and get the extracted information as the API response. Flows that use sync mode run entirely in memory, so they’re faster than running the flow asynchronously. However, sync mode doesn’t support checkpoint resume and it has a maximum processing time of 60 seconds, which limits processing to approximately 5 files or less with fewer than 20 pages each.

Webhooks

If your upstream system isn’t programmable, you can use a custom Instabase endpoint, paired with a handler function, to trigger a flow run.

When a new document is added to a document capture system, you can trigger a webhook to send a POST request with a doc_id and doc_url to a custom Instabase endpoint. A custom handler function processes the HTTP request and starts a flow job.

To configure this integration, save the function in a file on the Instabase system. When you create your endpoint, set path to myendpoint and handler_script_path to the full path to the handler script. These parameters define an endpoint at INSTABASE_URL/api/v1/http-endpoint/u/myendpoint. To complete the integration, configure your upstream system to send webhooks to this endpoint.

Here’s an example handler function that triggers a flow when a request is received:

import json, requests

API_TOKEN = 'ENTER API TOKEN'
URL_BASE = 'ENTER INSTABASE URL'
 
def handler(request):
 form_data = json.loads(request.data)
 if not 'packet_url' in form_data:
   return Response('Missing packet_url in request', status=400)
 
 if not 'packet_id' in form_data:
   return Response('Missing packet_id in request', status=400)
 
 packet_url = form_data.get('packet_url')
 packet_id = form_data.get('packet_id')

 # Launch Flow
 api_args = {
   'input_dir': 'jdoe/my-repo/fs/Instabase Drive/input',
   'binary_path': 'jdoe/my-repo/fs/Instabase Drive/process_mortgage.ibflowbin',
   'settings': {
     'runtime_config': {
       'packet_id': packet_id,
       'packet_url': packet_url
     }
   }
 }
 
 headers = {
     'Authorization': 'Bearer {0}'.format(API_TOKEN)
 }
 
 url = URL_BASE + '/api/v1/flow/run_binary_async'
 
 r = requests.post(url, data=json.dumps(api_args), headers=headers)
 resp_data = json.loads(r.content)
 return 'OK'
Tip

For maximum security, verify incoming webhooks before processing them. For example, use the webhook event ID to query the upstream system for confirmation that the event originated from the system. If you want advanced features like mTLS, you can use external services like AWS API Gateway to process the webhooks.

Scheduler automatic schedule

If ease of configuration is a priority, you can use the Instabase Scheduler in automatic mode. In an automatic schedule, a scheduler job detects changes to a directory and runs a flow on new files.

Typically, this method uses a shared drive, such as an S3 bucket, accessible to both an upstream system and Instabase. The upstream system puts new documents into the shared drive, and the Scheduler job automatically runs a specified flow on each new document.

To configure this integration, create a new job with the schedule set to Automatic. Specify the input folder that Scheduler should monitor and the flow you want to run on each document.

Trigger flow

If pull-based integration is preferable, use a flow to pull work from an upstream system and then trigger one or more flows to process the work. This configuration is called a trigger flow pattern, and the flow used to pull work and launch flows is called the trigger flow.

To configure a trigger flow, create a new flow with a single Reduce UDF step. Define a reduce function to download data from the upstream system and start one or more flows. To run the trigger flow at periodic intervals, use Scheduler.

Downstream integration

Downstream integration generally requires a post-flow UDF to notify the downstream system of flow results.

Post-flow UDF

A post-flow UDF is a hook that runs when a flow stops. Post-flow UDFs consume flow results by reading the batch.ibflowresults file or using the API, and then sending the results to the downstream system.

Here’s an example:

import json, logging, os
from instabase.ocr.client.libs.ibocr import ParsedIBOCRBuilder

def gen_results(**kwargs):
  summary_dict = {}

  fn_context = kwargs.get('_FN_CONTEXT_KEY')
  clients, _ = fn_context.get_by_col_name('CLIENTS')
  root_out_folder, _ = fn_context.get_by_col_name('ROOT_OUTPUT_FOLDER')

  res_path = os.path.join(root_out_folder, 'batch.ibflowresults')
  output, err = clients.ibfile.read_file(res_path)
  if err:
    return None, err
  
  results = json.loads(output)
  if results['can_resume']:
    # can_resume = True implies flow is stopped at checkpoint.
    # If true, skip writing summary
    return None, None

  for result_id in results['results']:
    result = results['results'][result_id]
    for record in result['records']:
      if record['status'] == 'OK':
        ibocr_path = record['ibocr_full_path']
        ibocr, err = clients.ibfile.read_file(ibocr_path)
        if err:
          return None, f'Failed to fetch ibocr path={ibocr_path} err={err}'

        builder, err = ParsedIBOCRBuilder.load_from_str(ibocr_path, ibocr)
        if err:
          return None, f'Failed to parse ibocr path={ibocr_path} err={err}'

        # Iterate over the IBOCR records
        for ibocr_record in builder.get_ibocr_records():
          raw_input_filepath = ibocr_record.get_document_path()
          if raw_input_filepath not in summary_dict:
            summary_dict[raw_input_filepath] = [] 
          result = {}
          if ibocr_record.has_class_label(): 
            result['class_label'] = ibocr_record.get_class_label()
            if ibocr_record.has_classify_page_range():
              result['page_range'] = ibocr_record.get_classify_page_range()
            else:
              result['page_range'] = {}
              result['page_range']['start_page'] = ibocr_record.get_page_numbers()[0]+1
              result['page_range']['end_page'] = ibocr_record.get_page_numbers()[-1]+1

          result['extracted_fields'] = {}
          refined_phrases, _ = ibocr_record.get_refined_phrases()
          for phrase in refined_phrases:
            name = phrase.get_column_name()
            value = phrase.get_column_value()
            result['extracted_fields'][name] = value
          summary_dict[raw_input_filepath].append(result)

  return summary_dict, None


def send_results(**kwargs):
  results, err = gen_results(**kwargs)
  if err:
    logging.error(err)
    return

  if not results:
    return

  # TODO: Send result to downstream system
  pass


def register(name_to_fn):
    # type: (Any) -> None
    more_fns = {
        'send_results': {
            'fn': send_results,
            'ex': '',
            'desc': ''
        }
    }
    name_to_fn.update(more_fns)

In this example, gen_result checks the can_resume flag to verify that the flow has completed. If the flow has completed, it reads the IBOCR records and returns a summary of the extracted results. You must customize the send_results function to send the returned summary to the downstream system.