Pipeline API Tutorial
Overview​
This tutorial demonstrates how to programmatically create and execute pipelines using the Abacus.AI Python SDK. You'll learn how to transform Python functions into pipeline steps and orchestrate a complete ML workflow.
Prerequisites​
Before starting, ensure you have the Abacus.AI Python SDK installed and configured:
from abacusai import ApiClient, ApiException
client = ApiClient()
Creating a Pipeline​
First, create a pipeline and associate it with a project (optional):
pipeline_name = "Your Pipeline Name Here"
project_id = None # Leave empty for organization-level pipeline
try:
client.create_pipeline(pipeline_name=pipeline_name, project_id=project_id)
except ApiException as exception:
if exception.exception == "AlreadyExistsError":
print("Pipeline Already Exists!")
else:
raise exception
pipeline = client.describe_pipeline_by_name(pipeline_name)
Key Points:
- If
project_idisNone, the pipeline exists at the organization level - The code handles the case where a pipeline with the same name already exists
Defining Pipeline Functions​
Pipeline functions contain the core logic for each step. Let's examine three common functions:
Step 1: Refresh Feature Group​
This function refreshes (materializes) a feature group by its table name:
def refresh_feature_group(table_name: str):
from abacusai import ApiClient, ApiException
client = ApiClient()
fg = client.describe_feature_group_by_table_name(table_name)
fg.materialize()
return {"refreshed_fg_id": fg.feature_group_id}
What it does:
- Takes a table name as input
- Retrieves the feature group
- Materializes (refreshes) the data
- Returns the feature group ID
Step 2: Retrain Model​
This function retrains a model and waits for completion:
def retrain_model(model_id: str):
from abacusai import ApiClient
client = ApiClient()
model = client.describe_model(model_id)
model.retrain()
# Wait up to 3 hours for training to complete
model.wait_for_training(timeout=10800)
model = model.describe()
# Verify deployments are updated
deployments = [
dep
for dep in client.list_deployments(project_id=model.project_id)
if dep.model_id == model.id
]
for deploy in deployments:
deploy.wait_for_pending_deployment_update()
deploy = deploy.describe()
assert deploy.model_version == model.latest_model_version.model_version
return {"model_id": model_id}
What it does:
- Initiates model retraining
- Waits for training to complete (up to 3 hours)
- Ensures all deployments are updated with the new model version
- Returns the model ID
Step 3: Create Batch Prediction​
This function starts a batch prediction job:
def create_new_bp(bp_id: str):
from abacusai import ApiClient
client = ApiClient()
output_fg_id = (
client.describe_batch_prediction(bp_id)
.describe_output_feature_group()
.feature_group_id
)
bp_version_id = client.start_batch_prediction(bp_id).batch_prediction_version
return {"bp_output_fg_id": output_fg_id}
What it does:
- Retrieves the output feature group for the batch prediction
- Starts a new batch prediction
- Returns the output feature group ID
Converting Functions to Pipeline Steps​
Each Python function must be converted into a pipeline step with proper input/output mappings.
Creating Step 1: Refresh Feature Group​
try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_feature_group",
function=refresh_feature_group,
step_input_mappings=[{"name": "table_name", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "refreshed_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
)
except ApiException as e:
# If step exists, update it instead
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_feature_group"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=refresh_feature_group,
step_input_mappings=[{"name": "table_name", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "refreshed_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
)
Key Components:
- step_name: Unique identifier for the step
- function: The Python function to execute
- step_input_mappings: Defines input parameters and their types
- output_variable_mappings: Defines output variables and their types
- The try-except block handles both creation and updates
Creating Step 2: Retrain Model​
try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_model",
function=retrain_model,
step_input_mappings=[{"name": "model_id", "variable_type": "STRING"}],
output_variable_mappings=[{"name": "model_id", "variable_type": "MODEL_ID"}],
step_dependencies=["refresh_feature_group"],
package_requirements=[], # e.g. ['numpy==1.2.3', 'pandas>=1.4.0']
)
except ApiException as e:
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_model"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=retrain_model,
step_input_mappings=[{"name": "model_id", "variable_type": "STRING"}],
output_variable_mappings=[{"name": "model_id", "variable_type": "MODEL_ID"}],
step_dependencies=["refresh_feature_group"],
package_requirements=[],
)
New Components:
- step_dependencies: Specifies that this step runs after
refresh_feature_group - package_requirements: Optional list of Python packages needed for execution
Creating Step 3: Batch Prediction​
try:
client.create_pipeline_step_from_function(
pipeline_id=pipeline.id,
step_name="refresh_bp",
function=create_new_bp,
step_input_mappings=[{"name": "bp_id", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "bp_output_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
step_dependencies=["refresh_model"],
)
except ApiException as e:
pipeline_step = client.describe_pipeline_step_by_name(
pipeline_id=pipeline.id, step_name="refresh_bp"
)
client.update_pipeline_step_from_function(
pipeline_step_id=pipeline_step.id,
function=create_new_bp,
step_input_mappings=[{"name": "bp_id", "variable_type": "STRING"}],
output_variable_mappings=[
{"name": "bp_output_fg_id", "variable_type": "FEATURE_GROUP_ID"}
],
step_dependencies=["refresh_model"],
)
Dependency Chain:
This step depends on refresh_model, creating the execution order:
refresh_feature_grouprefresh_model(depends on step 1)refresh_bp(depends on step 2)
Executing the Pipeline​
Once all steps are defined, execute the pipeline with the required input variables:
table_name = "" # Your feature group table name
model_id = "" # Your model ID
bp_id = "" # Your batch prediction ID
running_pipeline = client.run_pipeline(
pipeline.id,
pipeline_variable_mappings=[
{"name": "table_name", "variable_type": "STRING", "value": table_name},
{"name": "model_id", "variable_type": "STRING", "value": model_id},
{"name": "bp_id", "variable_type": "STRING", "value": bp_id},
],
)
Variable Mappings:
- Each input parameter defined in the pipeline steps must be provided
- Values are passed through
pipeline_variable_mappings
Monitoring Pipeline Execution​
Wait for the pipeline to complete and retrieve its status:
# Wait for pipeline completion
client.describe_pipeline_version(
pipeline_version=running_pipeline.pipeline_version
).wait_for_pipeline()
# Get final pipeline status
pipeline_result = client.describe_pipeline_version(
pipeline_version=running_pipeline.pipeline_version
)