How-to Jupyter Notebook for Custom Abacus.AI Model?

This notebook provides you with a hands-on environment to build and deploy custom python models in the Abacus.AI environment. 'Custom' here refers to both the data transformations required to build a model and the training process used to construct a model from data. Hosting the custom logic on Abacus.AI allows automatically refreshing the model as new data arrives and also to host the model for the generation of online or batch (offline) predictions. Additionally, it allows features like monitoring input drift, model performance, and other ML Ops requirements.


Installing Abacus.AI Python SDK and Setting-up API Key

  1. Install the Abacus.AI library:
  !pip install abacusai
  1. Add your Abacus.AI API Key generated using the API dashboard as follows:
  #@title Abacus.AI API Key
  api_key = 'cf45d2********fa79101f7b'  #@param {type: "string"}
  1. Import the Abacus.AI library and instantiate a client:
  from abacusai import ApiClient, ApiException
  client = ApiClient(api_key)


1. Create a Project

In this notebook, we're going to see how to use python to customize models on Abacus.AI. We will cover custom data transforms, model training, and prediction handling. Projects that will be hosting a custom model need to be created with the PYTHON_MODEL use case. Note that custom python data transforms can be used in any project under any use case and all feature groups can be shared across projects. However, custom training algorithms and prediction functions are enabled only under the Custom Python Model use case.

project = client.create_project(name='Demo Python Model', use_case='PYTHON_MODEL')


2. Creating Datasets

Abacus.AI can read datasets directly from file blob storage. We are going to use a single dataset for this project - Concrete Strength

Add datasets to Abacus.AI

Using the Create Dataset API, we can tell Abacus.AI where to find the datasets using the public S3 URI of the dataset:

# if the dataset already exists, skip creation
try:
  concrete_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('concrete_strength').dataset_id)
except ApiException: # dataset not found
  concrete_dataset = client.create_dataset_from_file_connector(
      name='Concrete Strength',
      table_name='concrete_strength',
      location='s3://abacusai-exampledatasets/predicting/concrete_measurements.csv')
  concrete_dataset.wait_for_inspection()

Load the dataset so we can build and test the transform

Most of the time it is easier to develop custom transformations on your local machine. It makes iteration, inspection, and debugging easier and often you can do it directly in a notebook environment. To enable simple local development you can use the Abacus.AI client to load your dataset as a Pandas DataFrame. This tends to work well if your dataset is under 100MB but for datasets that are much larger, you will likely want to construct a sampled feature group for development.

Here we are working with a fairly small dataset so we can easily load it into the memory. The first block fetches the feature group corresponding to the dataset (datasets are used to move data into Abacus.AI, feature groups are used to consume data for various operations). It initiates a materialization of the feature group to generate a snapshot, waits for it to be ready, and then loads it as a Pandas DataFrame.

concrete_feature_group = concrete_dataset.describe_feature_group()
if not concrete_feature_group.list_versions():
  concrete_feature_group.create_version()
concrete_feature_group.wait_for_materialization()

concrete_df = concrete_feature_group.load_as_pandas()
concrete_df[:10]
cement slag flyash water superplasticizer coarseaggregate fineaggregate age csMPa
0 540.000000 0.000000 0.0 162.0 2.5 1040.000000 676.0 28.0 79.989998
1 540.000000 0.000000 0.0 162.0 2.5 1055.000000 676.0 28.0 61.889999
2 332.500000 142.500000 0.0 228.0 0.0 932.000000 594.0 270.0 40.270000
3 332.500000 142.500000 0.0 228.0 0.0 932.000000 594.0 365.0 41.049999
4 198.600006 132.399994 0.0 192.0 0.0 978.400024 825.5 360.0 44.299999
5 266.000000 114.000000 0.0 228.0 0.0 932.000000 670.0 90.0 47.029999
6 380.000000 95.000000 0.0 228.0 0.0 932.000000 594.0 365.0 43.700001
7 380.000000 95.000000 0.0 228.0 0.0 932.000000 594.0 28.0 36.450001
8 266.000000 114.000000 0.0 228.0 0.0 932.000000 670.0 28.0 45.849998
9 475.000000 0.000000 0.0 228.0 0.0 932.000000 594.0 28.0 39.290001


Custom Data Transform

We are going to transform the dataset so that flyash is no longer a feature but instead all the other values are transformed depending on whether they have flyash > 0 or not.

The example is not entirely realistic and it is certainly feasible to achieve the same result using SQL. However, the point is to illustrate that you are free to transform the dataset using the full functionality of python and its data frameworks. Here we are using pandas but you can use a wide range of standard python libraries to manipulate the data. Additionally, you can bundle resources with your code, for example, small maps or tables, that can be accessed by your function to implement the transform.

Note that we test the function locally by running it against the DataFrame loaded from the feature group.

def separate_by_flyash(concrete_strength):
  import pandas as pd
  feature_df = concrete_strength.drop(['flyash'], axis=1)
  no_flyash = feature_df[concrete_strength.flyash == 0.0]
  flyash = feature_df[concrete_strength.flyash > 0.0]
  return pd.concat([no_flyash - no_flyash.assign(age=0).mean(), flyash - flyash.assign(age=0).mean()])

concrete_by_flyash_df = separate_by_flyash(concrete_df)
concrete_by_flyash_df[:10]
cement slag water superplasticizer coarseaggregate fineaggregate age csMPa
0 225.962191 -100.110247 -24.616784 -1.555654 66.642580 -88.853001 28.0 43.218213
1 225.962191 -100.110247 -24.616784 -1.555654 81.642580 -88.853001 28.0 25.118215
2 18.462191 42.389753 41.383216 -4.055654 -41.357420 -170.853001 270.0 3.498216
3 18.462191 42.389753 41.383216 -4.055654 -41.357420 -170.853001 365.0 4.278215
4 -115.437803 32.289747 5.383216 -4.055654 5.042604 60.646999 360.0 7.528215
5 -48.037809 13.889753 41.383216 -4.055654 -41.357420 -94.853001 90.0 10.258214
6 65.962191 -5.110247 41.383216 -4.055654 -41.357420 -170.853001 365.0 6.928216
7 65.962191 -5.110247 41.383216 -4.055654 -41.357420 -170.853001 28.0 -0.321784
8 -48.037809 13.889753 41.383216 -4.055654 -41.357420 -94.853001 28.0 9.078214
9 160.962191 -100.110247 41.383216 -4.055654 -41.357420 -170.853001 28.0 2.518216


Registering Python Functions

Now that we have a working transform the next step is to register it with Abacus.AI to allow it to run the function when required by workflows. For simple self-contained functions, we can just pass the function to the client and it will build a suitable resource to ship the python code to Abacus.AI. For more complicated functions and in cases where additional resources are required you can instead build an archive and add it to the registration function.

Registering the function involves supplying the source artifact, the name of the function implementing the transform, and a list of required input feature groups. These feature groups will be passed as DataFrame arguments to the functions. Optionally, you can also supply configuration options as keyword arguments that can alter the behavior of the function. For example, the same function may be used to construct two different feature groups differing only in the keyword arguments.

Note that Abacus.AI will ensure that the function is operating on the latest version of the respective input feature group.

concrete_flyash = client.create_feature_group_from_function(
    table_name='concrete_by_flyash',
    function_source_code=separate_by_flyash,
    function_name='separate_by_flyash',
    input_feature_groups=['concrete_strength'])
concrete_flyash.create_version()
concrete_flyash.wait_for_materialization()
concrete_by_flyash_df = concrete_flyash.load_as_pandas()


Custom Model

Now we will define a custom model trained on this flyash partitioned data. A custom training function is similar in many ways to a custom transform. The main difference is that instead of returning a new dataframe with the transformed data it returns an object containing the trained model. It is required that the object returned should be pickleable by the standard python pickle library. However, the model is free to serialize additional data to the local disk in the current working directory. The contents of the working directory will be made available at prediction time. There is support for supplying an initialization function along with a prediction function that will receive the unpickled model object and transform it based on data loaded from disk to use at prediction. This will be covered in more detail later.

To illustrate that the training can be customized arbitrarily we will train a composite model that uses a linear model (depending on the age of the concrete) on quantized features or a GBDT trained on raw input:

!pip install catboost
Collecting catboost
Downloading catboost-1.0.0-cp37-none-manylinux1_x86_64.whl (76.4 MB)
|===========================| 76.4 MB 36 kB/s
Requirement already satisfied: pandas>=0.24.0 in /usr/local/lib/python3.7/dist-packages (from catboost) (1.1.5)
Requirement already satisfied: scipy in /usr/local/lib/python3.7/dist-packages (from catboost) (1.4.1)
Requirement already satisfied: graphviz in /usr/local/lib/python3.7/dist-packages (from catboost) (0.10.1)
Requirement already satisfied: numpy>=1.16.0 in /usr/local/lib/python3.7/dist-packages (from catboost) (1.19.5)
Requirement already satisfied: matplotlib in /usr/local/lib/python3.7/dist-packages (from catboost) (3.2.2)
Requirement already satisfied: plotly in /usr/local/lib/python3.7/dist-packages (from catboost) (4.4.1)
Requirement already satisfied: six in /usr/local/lib/python3.7/dist-packages (from catboost) (1.15.0)
Requirement already satisfied: python-dateutil>=2.7.3 in /usr/local/lib/python3.7/dist-packages (from pandas>=0.24.0->catboost) (2.8.2)
Requirement already satisfied: pytz>=2017.2 in /usr/local/lib/python3.7/dist-packages (from pandas>=0.24.0->catboost) (2018.9)
Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (2.4.7)
Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (0.10.0)
Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (1.3.2)
Requirement already satisfied: retrying>=1.3.3 in /usr/local/lib/python3.7/dist-packages (from plotly->catboost) (1.3.3)
Installing collected packages: catboost
Successfully installed catboost-1.0.0

Just like with data transforms we can test our function locally to ensure that it works on the data frame as expected and that it is building a reasonable model. Notice that the model object we return is a tuple comprising:

Since this tuple can be pickled we do not need to write anything to the local disk. Also, we will be able to use the default identity initialization function which will return the tuple unmodified at the prediction time.

def train(concrete_by_flyash):
  # set the seed for reproducible results
  import numpy as np
  np.random.seed(5)

  X = concrete_by_flyash.drop(['csMPa'], axis=1)
  y = concrete_by_flyash.csMPa
  recent = concrete_by_flyash.age < 10
  from sklearn.preprocessing import QuantileTransformer
  from sklearn.linear_model import LinearRegression
  qt = QuantileTransformer(n_quantiles=20)
  recent_model = LinearRegression()
  _ = recent_model.fit(qt.fit_transform(X[recent]), y[recent])
  print(f'Linear model R^2 = {recent_model.score(X[recent], y[recent])}')

  from catboost import Pool, CatBoostRegressor
  train_pool = Pool(X[~recent], y[~recent])
  older_model = CatBoostRegressor(iterations=5, depth=2, loss_function='RMSE')
  _ = older_model.fit(train_pool)
  metrics = older_model.eval_metrics(train_pool, ['RMSE'])
  old_r2 = 1 - metrics['RMSE'][-1]**2 / y[~recent].var()
  print(f'Catboost model R^2 = {old_r2}')

  return (X.columns, qt, recent_model, older_model)

local_model = train(concrete_by_flyash_df)
Linear model R^2 = -59474.80409065778
Learning rate set to 0.5
0:  learn: 12.7627412   total: 46.9ms   remaining: 188ms
1:  learn: 11.5585084   total: 47.7ms   remaining: 71.6ms
2:  learn: 10.3223491   total: 48.4ms   remaining: 32.3ms
3:  learn: 9.3247540    total: 49ms remaining: 12.3ms
4:  learn: 8.5430952    total: 49.6ms   remaining: 0us
Catboost model R^2 = 0.6814947748102853

Note: The train function is designed to return only one object and that object will be the first argument to predict function.


Predict Function

To use this model for predictions we need to tell the service how to evaluate a new input against the returned model object. This function could be as simple as calling predict() on a scikit-learn compliant model. However, it will usually be the case that there will be some translation of the request data into model inputs prior to the final invocation. This is to match any feature engineering/transformation done inside the training function. Keep in mind that any feature transformation done in feature group transformations will be handled automatically by the service responsible for batch predictions.

Our running example is relatively more complex because the model is a composite model built on two partitions of the data so the prediction function needs to dispatch the input to the right model based on one of the input features.

We can follow the same pattern of testing locally to ensure that the prediction function works as expected. If the model requires an initialization function that loads data from disk it would also be good to test that locally:

def predict(model, query):
  # abacusai.get_client().get_feature_group().lookup(...)
  columns, qt, recent_model, older_model = model
  import pandas as pd
  X = pd.DataFrame({c: [query[c]] for c in columns})
  if X.age[0] < 10:
    y = recent_model.predict(qt.transform(X))[0]
  else:
    y = older_model.predict(X.values.reshape(-1))
  return {'csMPa': y}

for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
  print(predict(local_model, r.to_dict()), r['csMPa'])

for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age > 10][:5].iterrows():
  print(predict(local_model, r.to_dict()), r['csMPa'])
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826
{'csMPa': 19.340295162698652} 43.21821554770317
{'csMPa': 19.340295162698652} 25.118215547703173
{'csMPa': 10.376285866501192} 3.4982155477031753
{'csMPa': 10.376285866501192} 4.278215547703169
{'csMPa': -2.273645085750303} 7.528215547703169

Note : It is possible that the object returned from train function is not pickleable. In that case, we can save the object locally using custom functions and can load it back again in predict function. For example :

def train(concrete_by_flyash):
    X = concrete_by_flyash.drop(['csMPa'], axis=1)
    y = concrete_by_flyash.csMPa
    from catboost import Pool, CatBoostRegressor
    train_pool = Pool(X, y)
    catboost_model = CatBoostRegressor(iterations=5, depth=2, loss_function='RMSE')
    _ = catboost_model.fit(train_pool)
    catboost_model.save_model('trained_model.cb')  # use catboost's save_model to save model locally
    return X.columns

def predict(x_columns, query):
    from catboost import CatBoostRegressor
    catboost_model = CatBoostRegressor()
    catboost_model.load_model('trained_model.cb')  # use catboost's load_model to load model from local file
    import pandas as pd
    X = pd.DataFrame({c: [query[c]] for c in x_columns})
    y = catboost_model.predict(X.values.reshape(-1))
    return {'csMPa': y}


Register the Model

We are now ready to put together the feature group, the training function and the prediction function as a new Abacus.AI model. Like with custom feature groups the model has to specify the feature groups required for training which will be passed as arguments to the train function:

model = client.create_model_from_functions(project_id=project,
                                   train_function=train,
                                   predict_function=predict,
                                   training_input_tables=['concrete_by_flyash'])

Wait for the model to finish training and then deploy the model to use for prediction:

model.wait_for_training()
deployment_token = client.create_deployment_token(project.project_id).deployment_token
deployment = client.create_deployment(model_id=model.model_id)
deployment.wait_for_deployment()
Linear model R^2 = -59474.80409065778
Learning rate set to 0.5
0:  learn: 12.7627412   total: 573us    remaining: 2.29ms
1:  learn: 11.5585084   total: 1.03ms   remaining: 1.54ms
2:  learn: 10.3223491   total: 1.73ms   remaining: 1.15ms
3:  learn: 9.3247540    total: 2.41ms   remaining: 601us
4:  learn: 8.5430952    total: 3ms  remaining: 0us
Catboost model R^2 = 0.6814947748102853

Now we can run predictions on Abacus.AI and compare them against predictions from the local model:

# locally trained
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
  print(predict(local_model, r.to_dict()), r['csMPa'])

print(' Is equal to ')

# remotely trained
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
  print(client.predict(deployment_token, deployment.deployment_id, r.to_dict()), r['csMPa'])
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826
  Is equal to
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826


Set up Batch Predictions

We can set up a new dataset to feed a batch prediction job. Abacus will run the prediction dataset through the feature transformation function and then apply the custom model to generate predictions for the uploaded data. Keep in mind the input to the model will be what is generated by transform. The inputs to the model are included in the batch prediction download along with the model outputs:

try:
  prediction_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('concrete_strength_prediction_input').dataset_id)
except ApiException: # dataset not found
  prediction_dataset = client.create_dataset_from_file_connector(
      name='Concrete Strength Prediction Input',
      table_name='concrete_strength_prediction',
      location='s3://abacusai-exampledatasets/predicting/concrete_measurements.csv')
  prediction_dataset.wait_for_inspection()

batch_prediction = client.create_batch_prediction(deployment.deployment_id)
batch_prediction.set_batch_prediction_dataset_remap({
    concrete_dataset.dataset_id: prediction_dataset.dataset_id
})
batch_prediction_run = batch_prediction.start()
batch_prediction_run.wait_for_predictions()
with open('batch_predictions_results.csv', 'wb') as bpr_file:
  batch_prediction_run.download_result_to_file(bpr_file)
!head batch_predictions_results.csv


Attach Refresh Schedules

As a final step we can attach refresh schedules to various objects to ensure that they are updated regularly without any manual intervention. This allows the custom model to run with the same level of automation as models generated internally by the service:

concrete_dataset.set_refresh_schedule('0 4 * * 1')
model.set_refresh_schedule('0 6 * * 1')
deployment.set_auto_deployment(True)
batch_prediction.set_refresh_schedule('0 8 * * 1')