This notebook provides you with a hands-on environment to build and deploy custom python models in the Abacus.AI environment. 'Custom' here refers to both the data transformations required to build a model and the training process used to construct a model from data. Hosting the custom logic on Abacus.AI allows automatically refreshing the model as new data arrives and also to host the model for the generation of online or batch (offline) predictions. Additionally, it allows features like monitoring input drift, model performance, and other ML Ops requirements.
!pip install abacusai
#@title Abacus.AI API Key
api_key = 'cf45d2********fa79101f7b' #@param {type: "string"}
from abacusai import ApiClient, ApiException
client = ApiClient(api_key)
In this notebook, we're going to see how to use python to customize models on Abacus.AI. We will cover custom data transforms, model training, and prediction handling. Projects that will be hosting a custom model need to be created with the PYTHON_MODEL
use case. Note that custom python data transforms can be used in any project under any use case and all feature groups can be shared across projects. However, custom training algorithms and prediction functions are enabled only under the Custom Python Model use case.
project = client.create_project(name='Demo Python Model', use_case='PYTHON_MODEL')
Abacus.AI can read datasets directly from file blob storage. We are going to use a single dataset for this project - Concrete Strength
Using the Create Dataset API, we can tell Abacus.AI where to find the datasets using the public S3 URI of the dataset:
# if the dataset already exists, skip creation
try:
concrete_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('concrete_strength').dataset_id)
except ApiException: # dataset not found
concrete_dataset = client.create_dataset_from_file_connector(
name='Concrete Strength',
table_name='concrete_strength',
location='s3://abacusai-exampledatasets/predicting/concrete_measurements.csv')
concrete_dataset.wait_for_inspection()
Most of the time it is easier to develop custom transformations on your local machine. It makes iteration, inspection, and debugging easier and often you can do it directly in a notebook environment. To enable simple local development you can use the Abacus.AI client to load your dataset as a Pandas DataFrame. This tends to work well if your dataset is under 100MB
but for datasets that are much larger, you will likely want to construct a sampled feature group for development.
Here we are working with a fairly small dataset so we can easily load it into the memory. The first block fetches the feature group corresponding to the dataset (datasets are used to move data into Abacus.AI, feature groups are used to consume data for various operations). It initiates a materialization of the feature group to generate a snapshot, waits for it to be ready, and then loads it as a Pandas DataFrame.
concrete_feature_group = concrete_dataset.describe_feature_group()
if not concrete_feature_group.list_versions():
concrete_feature_group.create_version()
concrete_feature_group.wait_for_materialization()
concrete_df = concrete_feature_group.load_as_pandas()
concrete_df[:10]
cement | slag | flyash | water | superplasticizer | coarseaggregate | fineaggregate | age | csMPa | |
---|---|---|---|---|---|---|---|---|---|
0 | 540.000000 | 0.000000 | 0.0 | 162.0 | 2.5 | 1040.000000 | 676.0 | 28.0 | 79.989998 |
1 | 540.000000 | 0.000000 | 0.0 | 162.0 | 2.5 | 1055.000000 | 676.0 | 28.0 | 61.889999 |
2 | 332.500000 | 142.500000 | 0.0 | 228.0 | 0.0 | 932.000000 | 594.0 | 270.0 | 40.270000 |
3 | 332.500000 | 142.500000 | 0.0 | 228.0 | 0.0 | 932.000000 | 594.0 | 365.0 | 41.049999 |
4 | 198.600006 | 132.399994 | 0.0 | 192.0 | 0.0 | 978.400024 | 825.5 | 360.0 | 44.299999 |
5 | 266.000000 | 114.000000 | 0.0 | 228.0 | 0.0 | 932.000000 | 670.0 | 90.0 | 47.029999 |
6 | 380.000000 | 95.000000 | 0.0 | 228.0 | 0.0 | 932.000000 | 594.0 | 365.0 | 43.700001 |
7 | 380.000000 | 95.000000 | 0.0 | 228.0 | 0.0 | 932.000000 | 594.0 | 28.0 | 36.450001 |
8 | 266.000000 | 114.000000 | 0.0 | 228.0 | 0.0 | 932.000000 | 670.0 | 28.0 | 45.849998 |
9 | 475.000000 | 0.000000 | 0.0 | 228.0 | 0.0 | 932.000000 | 594.0 | 28.0 | 39.290001 |
We are going to transform the dataset so that flyash is no longer a feature but instead all the other values are transformed depending on whether they have flyash > 0
or not.
The example is not entirely realistic and it is certainly feasible to achieve the same result using SQL. However, the point is to illustrate that you are free to transform the dataset using the full functionality of python and its data frameworks. Here we are using pandas but you can use a wide range of standard python libraries to manipulate the data. Additionally, you can bundle resources with your code, for example, small maps or tables, that can be accessed by your function to implement the transform.
Note that we test the function locally by running it against the DataFrame loaded from the feature group.
def separate_by_flyash(concrete_strength):
import pandas as pd
feature_df = concrete_strength.drop(['flyash'], axis=1)
no_flyash = feature_df[concrete_strength.flyash == 0.0]
flyash = feature_df[concrete_strength.flyash > 0.0]
return pd.concat([no_flyash - no_flyash.assign(age=0).mean(), flyash - flyash.assign(age=0).mean()])
concrete_by_flyash_df = separate_by_flyash(concrete_df)
concrete_by_flyash_df[:10]
cement | slag | water | superplasticizer | coarseaggregate | fineaggregate | age | csMPa | |
---|---|---|---|---|---|---|---|---|
0 | 225.962191 | -100.110247 | -24.616784 | -1.555654 | 66.642580 | -88.853001 | 28.0 | 43.218213 |
1 | 225.962191 | -100.110247 | -24.616784 | -1.555654 | 81.642580 | -88.853001 | 28.0 | 25.118215 |
2 | 18.462191 | 42.389753 | 41.383216 | -4.055654 | -41.357420 | -170.853001 | 270.0 | 3.498216 |
3 | 18.462191 | 42.389753 | 41.383216 | -4.055654 | -41.357420 | -170.853001 | 365.0 | 4.278215 |
4 | -115.437803 | 32.289747 | 5.383216 | -4.055654 | 5.042604 | 60.646999 | 360.0 | 7.528215 |
5 | -48.037809 | 13.889753 | 41.383216 | -4.055654 | -41.357420 | -94.853001 | 90.0 | 10.258214 |
6 | 65.962191 | -5.110247 | 41.383216 | -4.055654 | -41.357420 | -170.853001 | 365.0 | 6.928216 |
7 | 65.962191 | -5.110247 | 41.383216 | -4.055654 | -41.357420 | -170.853001 | 28.0 | -0.321784 |
8 | -48.037809 | 13.889753 | 41.383216 | -4.055654 | -41.357420 | -94.853001 | 28.0 | 9.078214 |
9 | 160.962191 | -100.110247 | 41.383216 | -4.055654 | -41.357420 | -170.853001 | 28.0 | 2.518216 |
Now that we have a working transform the next step is to register it with Abacus.AI to allow it to run the function when required by workflows. For simple self-contained functions, we can just pass the function to the client and it will build a suitable resource to ship the python code to Abacus.AI. For more complicated functions and in cases where additional resources are required you can instead build an archive and add it to the registration function.
Registering the function involves supplying the source artifact, the name of the function implementing the transform, and a list of required input feature groups. These feature groups will be passed as DataFrame arguments to the functions. Optionally, you can also supply configuration options as keyword arguments that can alter the behavior of the function. For example, the same function may be used to construct two different feature groups differing only in the keyword arguments.
Note that Abacus.AI will ensure that the function is operating on the latest version of the respective input feature group.
concrete_flyash = client.create_feature_group_from_function(
table_name='concrete_by_flyash',
function_source_code=separate_by_flyash,
function_name='separate_by_flyash',
input_feature_groups=['concrete_strength'])
concrete_flyash.create_version()
concrete_flyash.wait_for_materialization()
concrete_by_flyash_df = concrete_flyash.load_as_pandas()
Now we will define a custom model trained on this flyash partitioned data. A custom training function is similar in many ways to a custom transform. The main difference is that instead of returning a new dataframe with the transformed data it returns an object containing the trained model. It is required that the object returned should be pickleable by the standard python pickle
library. However, the model is free to serialize additional data to the local disk in the current working directory. The contents of the working directory will be made available at prediction time. There is support for supplying an initialization function along with a prediction function that will receive the unpickled model object and transform it based on data loaded from disk to use at prediction. This will be covered in more detail later.
To illustrate that the training can be customized arbitrarily we will train a composite model that uses a linear model (depending on the age of the concrete) on quantized features or a GBDT trained on raw input:
!pip install catboost
Collecting catboost
Downloading catboost-1.0.0-cp37-none-manylinux1_x86_64.whl (76.4 MB)
|===========================| 76.4 MB 36 kB/s
Requirement already satisfied: pandas>=0.24.0 in /usr/local/lib/python3.7/dist-packages (from catboost) (1.1.5)
Requirement already satisfied: scipy in /usr/local/lib/python3.7/dist-packages (from catboost) (1.4.1)
Requirement already satisfied: graphviz in /usr/local/lib/python3.7/dist-packages (from catboost) (0.10.1)
Requirement already satisfied: numpy>=1.16.0 in /usr/local/lib/python3.7/dist-packages (from catboost) (1.19.5)
Requirement already satisfied: matplotlib in /usr/local/lib/python3.7/dist-packages (from catboost) (3.2.2)
Requirement already satisfied: plotly in /usr/local/lib/python3.7/dist-packages (from catboost) (4.4.1)
Requirement already satisfied: six in /usr/local/lib/python3.7/dist-packages (from catboost) (1.15.0)
Requirement already satisfied: python-dateutil>=2.7.3 in /usr/local/lib/python3.7/dist-packages (from pandas>=0.24.0->catboost) (2.8.2)
Requirement already satisfied: pytz>=2017.2 in /usr/local/lib/python3.7/dist-packages (from pandas>=0.24.0->catboost) (2018.9)
Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (2.4.7)
Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (0.10.0)
Requirement already satisfied: kiwisolver>=1.0.1 in /usr/local/lib/python3.7/dist-packages (from matplotlib->catboost) (1.3.2)
Requirement already satisfied: retrying>=1.3.3 in /usr/local/lib/python3.7/dist-packages (from plotly->catboost) (1.3.3)
Installing collected packages: catboost
Successfully installed catboost-1.0.0
Just like with data transforms we can test our function locally to ensure that it works on the data frame as expected and that it is building a reasonable model. Notice that the model object we return is a tuple comprising:
Since this tuple can be pickled we do not need to write anything to the local disk. Also, we will be able to use the default identity initialization function which will return the tuple unmodified at the prediction time.
def train(concrete_by_flyash):
# set the seed for reproducible results
import numpy as np
np.random.seed(5)
X = concrete_by_flyash.drop(['csMPa'], axis=1)
y = concrete_by_flyash.csMPa
recent = concrete_by_flyash.age < 10
from sklearn.preprocessing import QuantileTransformer
from sklearn.linear_model import LinearRegression
qt = QuantileTransformer(n_quantiles=20)
recent_model = LinearRegression()
_ = recent_model.fit(qt.fit_transform(X[recent]), y[recent])
print(f'Linear model R^2 = {recent_model.score(X[recent], y[recent])}')
from catboost import Pool, CatBoostRegressor
train_pool = Pool(X[~recent], y[~recent])
older_model = CatBoostRegressor(iterations=5, depth=2, loss_function='RMSE')
_ = older_model.fit(train_pool)
metrics = older_model.eval_metrics(train_pool, ['RMSE'])
old_r2 = 1 - metrics['RMSE'][-1]**2 / y[~recent].var()
print(f'Catboost model R^2 = {old_r2}')
return (X.columns, qt, recent_model, older_model)
local_model = train(concrete_by_flyash_df)
Linear model R^2 = -59474.80409065778
Learning rate set to 0.5
0: learn: 12.7627412 total: 46.9ms remaining: 188ms
1: learn: 11.5585084 total: 47.7ms remaining: 71.6ms
2: learn: 10.3223491 total: 48.4ms remaining: 32.3ms
3: learn: 9.3247540 total: 49ms remaining: 12.3ms
4: learn: 8.5430952 total: 49.6ms remaining: 0us
Catboost model R^2 = 0.6814947748102853
Note: The train function is designed to return only one object and that object will be the first argument to predict function.
To use this model for predictions we need to tell the service how to evaluate a new input against the returned model object. This function could be as simple as calling predict()
on a scikit-learn compliant model. However, it will usually be the case that there will be some translation of the request data into model inputs prior to the final invocation. This is to match any feature engineering/transformation done inside the training function. Keep in mind that any feature transformation done in feature group transformations will be handled automatically by the service responsible for batch predictions.
Our running example is relatively more complex because the model is a composite model built on two partitions of the data so the prediction function needs to dispatch the input to the right model based on one of the input features.
We can follow the same pattern of testing locally to ensure that the prediction function works as expected. If the model requires an initialization function that loads data from disk it would also be good to test that locally:
def predict(model, query):
# abacusai.get_client().get_feature_group().lookup(...)
columns, qt, recent_model, older_model = model
import pandas as pd
X = pd.DataFrame({c: [query[c]] for c in columns})
if X.age[0] < 10:
y = recent_model.predict(qt.transform(X))[0]
else:
y = older_model.predict(X.values.reshape(-1))
return {'csMPa': y}
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
print(predict(local_model, r.to_dict()), r['csMPa'])
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age > 10][:5].iterrows():
print(predict(local_model, r.to_dict()), r['csMPa'])
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826
{'csMPa': 19.340295162698652} 43.21821554770317
{'csMPa': 19.340295162698652} 25.118215547703173
{'csMPa': 10.376285866501192} 3.4982155477031753
{'csMPa': 10.376285866501192} 4.278215547703169
{'csMPa': -2.273645085750303} 7.528215547703169
Note : It is possible that the object returned from train function is not pickleable. In that case, we can save the object locally using custom functions and can load it back again in predict function. For example :
def train(concrete_by_flyash):
X = concrete_by_flyash.drop(['csMPa'], axis=1)
y = concrete_by_flyash.csMPa
from catboost import Pool, CatBoostRegressor
train_pool = Pool(X, y)
catboost_model = CatBoostRegressor(iterations=5, depth=2, loss_function='RMSE')
_ = catboost_model.fit(train_pool)
catboost_model.save_model('trained_model.cb') # use catboost's save_model to save model locally
return X.columns
def predict(x_columns, query):
from catboost import CatBoostRegressor
catboost_model = CatBoostRegressor()
catboost_model.load_model('trained_model.cb') # use catboost's load_model to load model from local file
import pandas as pd
X = pd.DataFrame({c: [query[c]] for c in x_columns})
y = catboost_model.predict(X.values.reshape(-1))
return {'csMPa': y}
We are now ready to put together the feature group, the training function and the prediction function as a new Abacus.AI model. Like with custom feature groups the model has to specify the feature groups required for training which will be passed as arguments to the train function:
model = client.create_model_from_functions(project_id=project,
train_function=train,
predict_function=predict,
training_input_tables=['concrete_by_flyash'])
Wait for the model to finish training and then deploy the model to use for prediction:
model.wait_for_training()
deployment_token = client.create_deployment_token(project.project_id).deployment_token
deployment = client.create_deployment(model_id=model.model_id)
deployment.wait_for_deployment()
Linear model R^2 = -59474.80409065778
Learning rate set to 0.5
0: learn: 12.7627412 total: 573us remaining: 2.29ms
1: learn: 11.5585084 total: 1.03ms remaining: 1.54ms
2: learn: 10.3223491 total: 1.73ms remaining: 1.15ms
3: learn: 9.3247540 total: 2.41ms remaining: 601us
4: learn: 8.5430952 total: 3ms remaining: 0us
Catboost model R^2 = 0.6814947748102853
Now we can run predictions on Abacus.AI and compare them against predictions from the local model:
# locally trained
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
print(predict(local_model, r.to_dict()), r['csMPa'])
print(' Is equal to ')
# remotely trained
for _, r in concrete_by_flyash_df[concrete_by_flyash_df.age < 10][:5].iterrows():
print(client.predict(deployment_token, deployment.deployment_id, r.to_dict()), r['csMPa'])
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826
Is equal to
{'csMPa': -31.75412474980192} -28.711784452296826
{'csMPa': -5.324797742032455} 1.8282155477031736
{'csMPa': -4.377726654712578} -1.6917844522968295
{'csMPa': -23.147157848108026} -21.721784452296827
{'csMPa': -16.712019233341156} -10.511784452296826
We can set up a new dataset to feed a batch prediction job. Abacus will run the prediction dataset through the feature transformation function and then apply the custom model to generate predictions for the uploaded data. Keep in mind the input to the model will be what is generated by transform. The inputs to the model are included in the batch prediction download along with the model outputs:
try:
prediction_dataset = client.describe_dataset(client.describe_feature_group_by_table_name('concrete_strength_prediction_input').dataset_id)
except ApiException: # dataset not found
prediction_dataset = client.create_dataset_from_file_connector(
name='Concrete Strength Prediction Input',
table_name='concrete_strength_prediction',
location='s3://abacusai-exampledatasets/predicting/concrete_measurements.csv')
prediction_dataset.wait_for_inspection()
batch_prediction = client.create_batch_prediction(deployment.deployment_id)
batch_prediction.set_batch_prediction_dataset_remap({
concrete_dataset.dataset_id: prediction_dataset.dataset_id
})
batch_prediction_run = batch_prediction.start()
batch_prediction_run.wait_for_predictions()
with open('batch_predictions_results.csv', 'wb') as bpr_file:
batch_prediction_run.download_result_to_file(bpr_file)
!head batch_predictions_results.csv
As a final step we can attach refresh schedules to various objects to ensure that they are updated regularly without any manual intervention. This allows the custom model to run with the same level of automation as models generated internally by the service:
concrete_dataset.set_refresh_schedule('0 4 * * 1')
model.set_refresh_schedule('0 6 * * 1')
deployment.set_auto_deployment(True)
batch_prediction.set_refresh_schedule('0 8 * * 1')