Skip to main content

Pipeline API Tutorial

Overview​

This tutorial demonstrates how to programmatically create and execute pipelines using the Abacus.AI Python SDK. You'll learn how to transform Python functions into pipeline steps and orchestrate a complete ML workflow.

Prerequisites​

Before starting, ensure you have the Abacus.AI Python SDK installed and configured:

from abacusai import ApiClient, ApiException

client = ApiClient()

Creating a Pipeline​

First, create a pipeline and associate it with a project (optional):

pipeline_name = "Your Pipeline Name Here"
project_id = None # Leave empty for organization-level pipeline

try:
client.create_pipeline(pipeline_name=pipeline_name, project_id=project_id)
except ApiException as exception:
if exception.exception == "AlreadyExistsError":
print("Pipeline Already Exists!")
else:
raise exception

pipeline = client.describe_pipeline_by_name(pipeline_name)

Key Points:

  • If project_id is None, the pipeline exists at the organization level
  • The code handles the case where a pipeline with the same name already exists

Defining Pipeline Functions​

Pipeline functions contain the core logic for each step. Let's examine three common functions:

Step 1: Refresh Feature Group​

This function refreshes (materializes) a feature group by its table name:

def refresh_feature_group(table_name: str):
from abacusai import ApiClient, ApiException

client = ApiClient()
fg = client.describe_feature_group_by_table_name(table_name)
fg.materialize()

return {"refreshed_fg_id": fg.feature_group_id}

What it does:

  • Takes a table name as input
  • Retrieves the feature group
  • Materializes (refreshes) the data
  • Returns the feature group ID

Step 2: Retrain Model​

This function retrains a model and waits for completion:

def retrain_model(model_id: str):
from abacusai import ApiClient

client = ApiClient()
model = client.describe_model(model_id)

model.retrain()

# Wait up to 3 hours for training to complete
model.wait_for_training(timeout=10800)

model = model.describe()

# Verify deployments are updated
deployments = [
dep
for dep in client.list_deployments(project_id=model.project_id)
if dep.model_id == model.id
]

for deploy in deployments:
deploy.wait_for_pending_deployment_update()
deploy = deploy.describe()
assert deploy.model_version == model.latest_model_version.model_version

return {"model_id": model_id}

What it does:

  • Initiates model retraining
  • Waits for training to complete (up to 3 hours)
  • Ensures all deployments are updated with the new model version
  • Returns the model ID

Step 3: Create Batch Prediction​

This function starts a batch prediction job:

def create_new_bp(bp_id: str):
from abacusai import ApiClient

client = ApiClient()
output_fg_id = (
client.describe_batch_prediction(bp_id)
.describe_output_feature_group()
.feature_group_id
)
bp_version_id = client.start_batch_prediction(bp_id).batch_prediction_version

return {"bp_output_fg_id": output_fg_id}

What it does:

  • Retrieves the output feature group for the batch prediction
  • Starts a new batch prediction
  • Returns the output feature group ID

Converting Functions to Pipeline Steps​

Each Python function must be converted into a pipeline step with proper input/output mappings.

Creating Step 1: Refresh Feature Group​

try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_feature_group",
function=refresh_feature_group,
step_input_mappings=[{"name": "table_name", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "refreshed_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
)
except ApiException as e:
# If step exists, update it instead
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_feature_group"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=refresh_feature_group,
step_input_mappings=[{"name": "table_name", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "refreshed_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
)

Key Components:

  • step_name: Unique identifier for the step
  • function: The Python function to execute
  • step_input_mappings: Defines input parameters and their types
  • output_variable_mappings: Defines output variables and their types
  • The try-except block handles both creation and updates

Creating Step 2: Retrain Model​

try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_model",
function=retrain_model,
step_input_mappings=[{"name": "model_id", "variable_type": "STRING"}],
output_variable_mappings=[{"name": "model_id", "variable_type": "MODEL_ID"}],
step_dependencies=["refresh_feature_group"],
package_requirements=[], # e.g. ['numpy==1.2.3', 'pandas>=1.4.0']
)
except ApiException as e:
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_model"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=retrain_model,
step_input_mappings=[{"name": "model_id", "variable_type": "STRING"}],
output_variable_mappings=[{"name": "model_id", "variable_type": "MODEL_ID"}],
step_dependencies=["refresh_feature_group"],
package_requirements=[],
)

New Components:

  • step_dependencies: Specifies that this step runs after refresh_feature_group
  • package_requirements: Optional list of Python packages needed for execution

Creating Step 3: Batch Prediction​

try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_bp",
function=create_new_bp,
step_input_mappings=[{"name": "bp_id", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "bp_output_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
step_dependencies=["refresh_model"],
)
except ApiException as e:
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_bp"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=create_new_bp,
step_input_mappings=[{"name": "bp_id", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "bp_output_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
step_dependencies=["refresh_model"],
)

Dependency Chain: This step depends on refresh_model, creating the execution order:

  1. refresh_feature_group
  2. refresh_model (depends on step 1)
  3. refresh_bp (depends on step 2)

Executing the Pipeline​

Once all steps are defined, execute the pipeline with the required input variables:

table_name = ""  # Your feature group table name
model_id = "" # Your model ID
bp_id = "" # Your batch prediction ID

running_pipeline = client.run_pipeline(
pipeline.id,
pipeline_variable_mappings=[
{"name": "table_name", "variable_type": "STRING", "value": table_name},
{"name": "model_id", "variable_type": "STRING", "value": model_id},
{"name": "bp_id", "variable_type": "STRING", "value": bp_id},
],
)

Variable Mappings:

  • Each input parameter defined in the pipeline steps must be provided
  • Values are passed through pipeline_variable_mappings

Monitoring Pipeline Execution​

Wait for the pipeline to complete and retrieve its status:

# Wait for pipeline completion
client.describe_pipeline_version(
pipeline_version=running_pipeline.pipeline_version
).wait_for_pipeline()

# Get final pipeline status
pipeline_result = client.describe_pipeline_version(
pipeline_version=running_pipeline.pipeline_version
)