UDF clients

All UDFs in Flow, such as the Map UDF and Reduce UDF, are provided a clients Python object as input. The clients object contains API clients that let you access Instabase APIs as the user running the flow, without having to pass in an access token. The clients object contains four clients, each used to access different APIs:

  • flow_client: Accesses flow APIs.

  • job_client: Accesses job APIs.

  • ibfile: Accesses the filesystem API.

  • conversion_client: Accesses the conversion API.

You can access the clients object in two ways:

  • As an input variable for the UDF. For example:
def udf_func(input_record: Dict, clients: object, *args, **kwargs):
    flow_client = clients.flow_client
    ...
  • Through the _FN_CONTEXT_KEY keyword argument. For example:
fn_context = kwargs.get('_FN_CONTEXT_KEY')
clients, _ = fn_context.get_by_col_name("CLIENTS")
flow_client = clients.flow_client

Flow client

The flow client is used to interact with the flow APIs. It can be used to run a new flow, query flow status, and perform other flow tasks.

Run flow binary

The run_flow_binary() method runs the flow binary with the given settings.

Input parameters

Input parameters are the same as request parameters for the Run flow binary API.

Output parameters

Returns a tuple of result and an error message. The result contains the following keys:

Name Type Description
job_id string ID for the job reported by the initial execution request.
output_folder string Path to the output folder in Instabase file system.

Example

import logging

def run_flow(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  flow_client = clients.flow_client

  run_flow_result, err = flow_client.run_flow_binary(
    input_dir='jaydoe/my-repo/fs/Instabase Drive/sampleflow/datasets/input',
    binary_path='jaydoe/my-repo/fs/Instabase Drive/sampleflow/childflow.ibflowbin',
    settings={
      'delete_out_dir': False,
      'output_has_run_id': True
    })
  if err:
    logging.error(f'Unable to run flow binary. Error={err}')
  logging.info('Flow scheduled, job_id={}, output_folder={}'.format(run_flow_result['job_id'], run_flow_result['output_folder'])

Get flow status

The get_status() method gets the current status of the flow.

Input parameters

Name Type Description
job_id string ID for the job reported by the initial execution request.

Output parameters

Returns a tuple of result and an error message. The result contains the following keys:

Name Type Description Values
state string State of the flow. PENDING or DONE
status string Status of the flow. OK or ERROR
job_id string ID for the job reported by the initial execution request.
output_folder string Path to the output folder.
reviewer string Instabase username of the user assigned to review the job. Empty if none.
review_state string Current state of job in the review process. NONE, IN REVIEW, COMPLETED, NOT_COMPLETED
num_files_processed integer Number of records processed.
num_files_failed integer Number of records that failed.
num_files_total integer Total number of records.

Example

import logging

def get_flow_status(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  flow_client = clients.flow_client

  flow_status, err = flow_client.get_status(job_id='123456789')
  if err:
    logging.error(f'Unable to get flow status. Error={err}')
  logging.info('Flow status={}'.format(str(flow_status)))

List pipeline

The pipelines() method lists flow pipelines a user has access to.

Input parameters

Name Type Description Values
perms list Pipeline permissions assigned to the user. read, execute, delete, write, manage_pipelines
include_all boolean Optional. Specifies whether to include all pipelines. true, false

Output parameters

Returns a tuple of result and an error message. The result has the same response schema as the List flow pipelines API.

Example

import logging

def list_pipelines(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  flow_client = clients.flow_client

  pipelines, err = flow_client.pipelines(perms=['read'])
  if err:
    logging.error(f'Unable to list pipelines. Error={err}')
  logging.info('Pipelines={}'.format(str(pipelines)))

Job client

Use the job client to access the job APIs. The client has methods to pause, resume, retry, cancel, list, get status, and update pipelines of a job.

List job

The list() method is used to list jobs based on the filter parameters.

Input parameters

Name Type Description Values
req_user string Optional. Instabase username of the user who submitted the job.
req_reviewer string Optional. Instabase username of the user assigned to review the job.
limit integer Optional. Maximum number of flow jobs to return. Jobs limit per response (default 20).
offset integer Optional. Initial flow job index to start returning jobs from. Used for pagination with limit. Starting index (default 0).
from_timestamp integer Optional. 10-digit Unix timestamp. Returns all jobs started after this timestamp. Starting timestamp (default is one week before current timestamp).
to_timestamp integer Optional. 10-digit Unix timestamp. Returns all jobs started before this timestamp. Ending timestamp (default is the current timestamp).
job_id string Optional. ID associated with each job, or the partial ID. Returns the singular flow job associated with that job id. A valid job ID.
job_ids string Optional. Returns all flow jobs associated with any of the job IDs passed in the list. A comma-separated list of job IDs.
requested_states list Optional. Returns flow jobs in any of the input states. When not passed, all flow jobs are returned. A list of strings. Valid string values are: PENDING, COMPLETE, FAILED, CANCELLED, RUNNING, PAUSED, and CHECKPOINT_FAILED.
tags list Optional. Returns all flow jobs that were started with any of the input tags. See how to attach tags to flow jobs in the Run a Flow binary API. A list of string tags.
pipeline_ids list Optional. The list of pipeline IDs of the jobs. A list of strings representing the pipeline IDs of the jobs.
orgs string Optional. The organization of the user who submitted the job. A string value representing the organization of the user who submitted the job.
review_states list Optional. Returns all flow jobs that are in any of the given review states. A list of strings. Valid string values are: NONE, IN REVIEW, COMPLETED, and NOT_COMPLETED.

Output parameters

Returns a tuple of a dictionary containing the result and an error message. The result contains the same response schema as the List jobs API.

Example

import logging

def list_jobs(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  list_result, err = job_client.list(limit=10, offset=0)
  if err:
    logging.error(f'Unable to list jobs. Error={err}')
  logging.info('Jobs listed, jobs={}'.format(str(list_result['jobs'])))

Job status

The status() method is used to get the status of a job.

Input parameters

Name Type Description Values
job_id string ID for the job reported by the initial execution request.
result_type string Type of job. flow, refiner, job, async, group

Output parameters

Returns a tuple of a dictionary containing the result and an error message. The result is the same as the Job status API response schema.

Example

import logging

def get_job_status(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  job_id = 'job_id'
  status_result, err = job_client.status(job_id=job_id)
  if err:
    logging.error(f'Unable to get job status. Error={err}')
  logging.info('Job status, status={}'.format(str(status_result)))

Pause job

The pause() method is used to pause a given job.

Input parameters

Name Type Description
job_id string ID for the job reported by the initial execution request.

Output parameters

Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:

Name Type Description Values
status string Status of the request. OK or ERROR

Example

import logging

def pause_job(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  pause_result, err = job_client.pause(job_id='123456789')
  if err:
    logging.error(f'Unable to pause job. Error={err}')
  logging.info('Job paused, status={}'.format(pause_result['status']))

Resume job

The resume() method is used to resume a given job.

Input parameters

Name Type Description
job_id string ID for the job reported by the initial execution request.

Output parameters

Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:

Name Type Description Values
status string Status of the request. OK or ERROR

Example

import logging

def resume_job(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  resume_result, err = job_client.resume(job_id='123456789')
  if err:
    logging.error(f'Unable to resume job. Error={err}')
  logging.info('Job resumed, status={}'.format(resume_result['status']))

Retry job

The retry() method is used to retry a given job.

Input parameters

Name Type Description Values
job_id string ID for the job reported by the initial execution request.
retry_type string Optional. Specifying a type retries files within the flow with only a specific type of failure. If type is omitted, all failed files are retried. all, checkpoint_failure, step_failure

Output parameters

Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:

Name Type Description Values
status string Status of the request. OK or ERROR

Example

import logging

def retry_job(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  retry_result, err = job_client.retry(job_id='123456789')
  if err:
    logging.error(f'Unable to retry job. Error={err}')
  logging.info('Job retried, status={}'.format(retry_result['status']))

Cancel job

The cancel() method is used to cancel a given job-id.

Input parameters

Name Type Description
job_id string ID for the job reported by the initial execution request.

Output parameters

Returns a tuple of a dictionary containing the request status and an error message. The result is a dictionary containing the following:

Name Type Description Values
status string Status of the request. OK or ERROR

Example

import logging

def cancel_job(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  cancel_result, err = job_client.cancel(job_id='123456789')
  if err:
    logging.error(f'Unable to cancel job. Error={err}')
  logging.info('Job cancelled, status={}'.format(cancel_result['status']))

Update pipeline

The update_pipelines() method is used to update the pipeline of a job. Given a job ID and list of pipeline ID, it updates the job’s pipeline_ids.

Input parameters

Name Type Description
job_id string ID for the job reported by the initial execution request.
pipeline_ids list List of strings representing the pipeline IDs of the jobs.

Output parameters

Returns a tuple of a dictionary containing the result and an error message. The result contains the following:

Name Type Description Values
status string Status of the request. ERROR, OK
updated_pipelines list List containing flow pipeline info. A list of dictionaries containing keys flow_pipeline_name, flow_pipeline_id.

Example

import logging

def update_pipelines(**kwargs):
  fn_context = kwargs.get("_FN_CONTEXT_KEY")
  clients, _ = fn_context.get_by_col_name("CLIENTS")
  job_client = clients.job_client

  update_result, err = job_client.update_pipelines(job_id='123456789', pipeline_ids=['123456789', '987654321'])
  if err:
    logging.error(f'Unable to update pipeline. Error={err}')
  logging.info('Pipeline updated, status={}'.format(update_result['status']))

Ibfile

The ibfile client is an Instabase FileHandle reference that provides pre-authenticated access to file operations. All operations done with the ibfile object have the same permissions as the user that invoked the operation.

is_file

The is_file() method checks if the provided path is a file.

Input parameters

Name Type Description
complete_path string The absolute path of the file.

Output parameters

The function returns a boolean value indicating whether the provided path is a file or not.

Example

import logging

def test_is_file(clients, complete_path):
  is_file_resp = clients.ibfile.is_file(complete_path)
  logging.info('Is file at path {}: {}'.format(complete_path, is_file_resp))

is_dir

The is_dir() method checks if the provided path is a directory.

Input parameters

Name Type Description
complete_path string The absolute path of the file.

Output parameters

The function returns a boolean value indicating whether the provided path is a directory.

Example

import logging

def test_is_dir(clients, complete_path):
  is_dir = clients.ibfile.is_dir(complete_path)
  logging.info('Is dir at path {}: {}'.format(complete_path, is_dir))

exists

The exists() method checks if a file or a directory exists at the provided path.

Input parameters

Name Type Description
complete_path string The absolute path of the file.

Output parameters

The function returns a boolean value indicating whether the file in the provided path exists.

Example

import logging

def test_exists(clients, complete_path):
  exists_resp = clients.ibfile.exists(complete_path)
  logging.info('Exists at path {}: {}'.format(complete_path, exists_resp))

mkdir

The mkdir() method creates a new directory at the specified path.

Input parameters

Name Type Description
complete_path string The absolute path of the directory.

Output parameters

The function returns a tuple containing an instance of the MkdirResp class and a string. The MkdirResp instance has a status attribute of type StatusCode enum which indicates the status of the operation.

Example

import logging

def test_mkdir(clients, complete_path):
  mkdir_resp, err = clients.ibfile.mkdir(complete_path)
  if err:
    logging.error(f'Unable to create directory. Error={err}')
  logging.info('Mkdir at path {}: {}'.format(complete_path, mkdir_resp.status))

copy

The copy() method copies a file or a directory along with its contents from the complete_path to new_complete_path.

Input parameters

Name Type Description
complete_path string The absolute path of the original file or directory.
new_complete_path string The absolute path of the target file or directory.

Output parameters

The function returns a tuple containing an instance of the CopyResp class and a string. The CopyResp instance has a status attribute of type StatusCode enum which indicates the status of the operation.

Example

import logging

def test_copy(clients, complete_path, new_complete_path):
  copy_resp, err = clients.ibfile.copy(complete_path, new_complete_path)
  if err:
    logging.error('Unable to copy at path {}: {}'.format(complete_path, err))
    return
  logging.info('Copy at path {}: {}'.format(complete_path, copy_resp.status))

rm

The rm() method removes a file or directory at the specified path.

Input parameters

Name Type Description Values
complete_path string The absolute path of the file or directory to be removed.
recursive boolean Optional. Indicates whether to remove a directory and all its contents recursively. When set to True, the directory and all its contents are removed. True or False (default is True).
force boolean Optional. When set to True, non-existent resource does not result in an error. When set to False non-existent resource results in an error. True or False (default isTrue).

Output parameters

The function returns a tuple containing an instance of the RmResp class and a string. The RmResp instance has a status attribute of type StatusCode enum which indicates the status of the operation.

Example

import logging

def test_rm(clients, complete_path):
  rm_resp, err = clients.ibfile.rm(complete_path)
  if err:
    logging.error('Rm at path {} failed: {}'.format(complete_path, err))
  logging.info('Rm at path {}: {}'.format(complete_path, rm_resp.status))

def test_rm_optional_params(clients, complete_path):
  rm_resp, err = clients.ibfile.rm(complete_path, recursive=True, force=False)
  if err:
    logging.error('Rm at path {} failed: {}'.format(complete_path, err))
    return
  logging.info('Rm at path {}: {}'.format(complete_path, rm_resp.status))

open

The open() method opens a file and returns a file object that can be used to access the contents of the file.

Input parameters

Name Type Description
path string The absolute path of the file to be opened.
mode string Optional parameter indicating the mode in which to open the file. The mode specifies how to use the file, such as for reading, writing, or appending. The valid modes are defined in VALID_MODES. The default mode is r (read-only).

Output parameters

The function returns an instance of the IBFileBase class, which represents the opened file. The IBFileBase instance has the following attributes:

  • path: A string containing the relative path to the file.

  • _mode: A string containing the mode in which the file was opened. See VALID_MODES

Valid modes

VALID_MODES is a frozenset containing the modes that can be used when opening a file. The following modes are allowed:

Read-only modes
  • ‘r’: Read-only mode (default).

  • ‘rU’: Universal read-only mode.

  • ‘rb’: Read-only mode in binary format.

  • ‘rbU’: Universal read-only mode in binary format.

Writeable modes
  • ‘r+’: Read and write mode.

  • ‘rb+’: Read and write mode in binary format.

  • ‘w’: Write-only mode.

  • ‘w+’: Read and write mode. Truncates the file to zero length or creates a new file if it doesn’t exist.

  • ‘wb’: Write-only mode in binary format. Truncates the file to zero length or creates a new file if it doesn’t exist.

  • ‘wb+’: Read and write mode in binary format. Truncates the file to zero length or creates a new file if it doesn’t exist.

Append modes
  • ‘a’: Append-only mode. Creates a new file if it doesn’t exist.

  • ‘a+’: Read and append mode. Creates a new file if it doesn’t exist.

  • ‘ab’: Append-only mode in binary format. Creates a new file if it doesn’t exist.

  • ‘ab+’: Read and append mode in binary format. Creates a new file if it doesn’t exist.

For more information on modes, see the Python documentation.

Example

import logging

def test_open(clients, complete_path):
  file_obj = clients.ibfile.open('/path/to/file.txt', mode='w')
  file_obj.write('This is some data to write to the file.')
  file_obj.close()

read_file

The read_file() method returns the contents of a file as a string.

Input parameters

Name Type Description
file_path string The absolute path of the file to be read.

Output parameters

The function returns a tuple containing the contents of the file, and error message explaining the result of the operation. If the operation was successful, the error message string is None.

Example

import logging

def test_read_file(clients, complete_path):
  read_file_resp, err = clients.ibfile.read_file(complete_path)
  if err:
    logging.error('Read file at path {} failed: {}'.format(complete_path, err))
    return
  logging.info('Read file at path {}: {}'.format(complete_path, len(read_file_resp)))

write_file

The write_file() method writes a string to a file.

Input parameters

Name Type Description
file_path string The absolute path of the file to be written to.
content string The string to be written to the file.

Output parameters

The function returns a tuple containing a boolean value and a string. The boolean value is True if the operation was successful, and False if it was not. The string is an error message explaining the result of the operation. If the operation was successful, the error message is None.

Example

import logging

def test_write_file(clients, complete_path, data):
  write_file_resp, err = clients.ibfile.write_file(complete_path, data)
  if err:
    logging.error('Write file at path {} failed: {}'.format(complete_path, err))
  logging.info('Write file at path {}: {}'.format(complete_path, write_file_resp))

list_dir

The list_dir() method returns a list of files and folders in a given directory.

Input parameters

Name Type Description
path string The absolute path of the directory to list.
start_page_token string An optional page token to use for pagination. If provided, the function returns the contents of the directory starting from this page.

Output parameters

The function returns a tuple containing an instance of the ListDirInfo class and a string.

The ListDirInfo instance has the following attributes:

  • nodes: A list of NodeInfo objects, representing the file and folder resources in the directory.

  • start_page_token: A string representing the start page token for the current list of resources.

  • next_page_token: A string representing the start page token for the next list of resources.

  • has_more: A boolean value indicating whether there are more resources in the directory that have not been listed yet.

The string is a error message explaining the result of the operation. If the operation was successful, the error message is None.

The NodeInfo class has the following attributes:

  • name: A string containing the name of the file or folder resource.

  • path: A string containing the path of the resource relative to the mounted repo.

  • full_path: A string containing the full path of the resource, including the location of the mounted repo.

  • _type: A string indicating the type of the node, either file or folder.

Example

import logging

def test_list_dir(clients, path, start_page_token):
  list_dir_info, err = clients.ibfile.list_dir(path, start_page_token)
  if err:
    logging.info('ERROR list_dir at path {}: {}'.format(path, err))
  logging.info('List dir at path {}: {}'.format(path, list_dir_info))
  for node in list_dir_info.nodes:
    logging.info('Node {}'.format(node.as_dict()))
  return list_dir_info.nodes

StatusCode enum

The StatusCode enum is used to indicate the status of the operation and contains the following values:

  • OK: The operation was successful.

  • MISSING_PARAM: A required parameter was missing.

  • READ_ERROR: There was an error reading a file.

  • WRITE_ERROR: There was an error writing to a file.

  • NONEXISTENT: A required file or directory does not exist.

  • FAILURE: A general exception occurred.

  • NO_MOUNT_DETAILS: Mount details are missing.

  • ACCESS_DENIED: Access to a file or directory was denied.

Conversion client

The conversion client is used to convert Microsoft Office documents and HTML pages to PDFs.

The PDF conversion client is available in the following steps:

  • apply UDF

  • map UDF

  • reduce UDF

  • pre-flow

  • post-flow

The conversion service can convert .docx, .xlsx, .html, .csv and .pptx document types to PDF. The service maintains the original document layout.

The input to the service is the raw byte content of original document conversion types. The returned value is the converted PDF as bytes.

Converting documents to PDFs in a flow

The conversion client be used within Apply UDF, Map UDF, Reduce UDF, Pre-flow UDF, Post-flow UDF for custom conversion of Microsoft office documents and further processing of the generated PDFs.

Use the conversion client to send requests to the conversion service. Use the method clients.conversion_client.get_pdf() to send conversion requests to the conversion service:

def get_pdf(document_bytes, conversion_type): 
  ...

Input parameters

  • document_bytes: The raw byte content of original document.

  • conversion_type: html_to_pdf, docx_to_pdf, xlsx_to_pdf, csv_to_pdf or pptx_to_pdf

The return value is the converted PDF as bytes.

Example UDF script for PDF conversion in the apply UDF step

  1. Set the Output File Extension to pdf in the Apply UDF step of a flow.

  2. Create the UDF script required by the conversion service.

    • The UDF script must filter the input files instead of creating new files for the converted documents to be read by next steps in the flow.

    • The UDF must return the converted bytes instead of creating a PDF in the output folder (usually out/s1_apply_udf).

def convert_docx_to_pdf(file_path, clients, *args, **kwargs):
    file_content, err = clients.ibfile.read_file(file_path)
    if err:
        raise IOError('Could not read file {}'.format(file_path))
    converted_file, err = clients.conversion_client.get_pdf(file_content,'docx_to_pdf')
    if err:
        raise IOError('Could not convert the file {}'.format(file_path))
    return converted_file

def register(name_to_fn):
    more_fns = {
        'filter_files': {
            'fn': convert_docx_to_pdf,
            'ex': '',
            'desc': ''
        }
    }
    name_to_fn.update(more_fns)

Example UDF script for PDF conversion in the map UDF step

import os

def convert_docx_to_pdf_map_func(input_record, step_folder, clients, *args, **kwargs):
  
  input_filepath = input_record['input_filepath']
  file_content = input_record['content']
  output_filename = input_record['output_filename']
  output_file_without_ext = os.path.splitext(output_filename)[0]

  # Filter files here only for docx files to be converted.
  file_content, err = clients.ibfile.read_file(input_filepath)
  if err:
      raise IOError('Could not read file {}'.format(input_filepath))

  converted_file, err = clients.conversion_client.get_pdf(file_content,'docx_to_pdf')
  if err:
      raise IOError('Could not convert the file {}'.format(input_filepath))

  return {
    "out_files": [
      {
        "filename": f"{output_file_without_ext}.pdf",
        "content": converted_file
      }
    ]
  }

def register(name_to_fn):
    more_fns = {
        'convert_to_pdf_map_func': {
            'fn': convert_to_pdf_map_func,
            'ex': '',
            'desc': ''
        }
    }
    name_to_fn.update(more_fns)