Data Streaming

Data ingestion is the process in which data is extracted from one or multiple sources and then prepared for training AI/ML models. There are two commonly used approaches for ingesting data, incremental batch processing and streaming. Batch Processing requires a set of data (dataset) collected over time and is used when dealing with large volumes of data from a single source or from legacy systems where it's not efficient to deliver data in streams. Batch data, by definition, requires all the data needed for the batch to be loaded to some type of storage, a database, or a file system to then be processed.

Streaming, on the other hand, requires specific infrastructure to bring in the real-time streaming data from an endpoint, view/monitor the incoming data on a dashboard, and then create a snapshot of the data records when there is enough data to either add it to the existing batch data or use it as a new dataset. Streaming is often used in cases where the data source is distributed, or when the latency of the streamed data being fed to the AI/ML model is important to keep low.


Setting-up Streaming Dataset

Add New Streaming Dataset

The first step is to click on the "Create Dataset" button and provide a name, type, and table name for the streaming dataset. The type can be a use-case-specific type or a custom table depending upon your requirements. The table name is the dataset table name using which a corresponding feature group is created for the streaming dataset. All the configurations applied for streaming are applied on the feature group and not on the underlying data table.

Add Dataset Name Dataset

Next, click on "Continue", select the file source as "Streaming Dataset", and press the "Set-up Streaming Dataset" button to activate the streaming console:

Add Dataset

Streaming Console

On the streaming console, you can set the API Library as Console, Python, or Curl. Using the console option, you can directly enter the data in JSON format to directly stream it. This is a handy way to copy and paste your streaming data quickly and be able to create a schema for your new streaming dataset. The common way to stream would be to use the sample code and embed it to your endpoint for getting the streaming data on the interface.

Add Dataset

The "Stream Data" button might take a minute to get enabled. You can click on the button to stream the data. Your data will be displayed under the "Recently Received Data" section confirming the successful streaming of your console data to the Abacus.AI system.

Add Dataset

Capturing Schema

Next, click on the "Capture Streaming Schema" button on the streaming console and you will be directed to the Features tab under Feature Groups:

Add Dataset

Verify the Feature Type and Feature Mapping for each feature. In the running example, the userID feature has been changed to feature type CATEGORICAL, and USER_ID feature mapping has been added:

Add Dataset

Configuring Streaming and Deployment Settings

Streaming dataset configuration tells the system how to index the data so that it can be fetched effectively. The streaming datasets can be configured either via Lookup Keys or by using Record ID. The default configuration would be to use lookup keys where the key is used to fetch all the matching records. For example, an audit log where a user interacts with an item at a timestamp where (userID, itemID) can act as a lookup key, and the method appendData can be used to appends new records on the basis of the lookup key such that the old records are never overwritten or modified. At the time of fetching the records, the key would determine all the associated records to be fetched effectively. The second case is to set up a Primary Key, which means you need only ONE record per id. Imagine a user table that has a userId and for every user, there exists only one set of attributes. In such cases indexing the data with a primary key is suitable. The method upsertData can be used to create a new row if the ID doesn't exist already, otherwise, the existing record will be updated. While fetching the data, it becomes effective to retrieve the row on the basis of the key since the system is already configured such that each ID is associated with only one record. Let's illustrate how you could configure these settings using the Abacus.AI console:

Example 1 - Configuring Lookup Keys

Let's say you have streaming data coming into the system from your website where a user has rated a movie at a particular time. Let's suppose you have created a streaming feature group called user_item_interaction_streaming from the streaming data where the table has four features: user_id, movie_id, rating, and timestamp. Now, let's set up a configuration for this streaming dataset. You can set LOOKUP KEYS as (user_id, movie_id) and EVENT TIMESTAMP as timestamp. This way, every time a new record is streamed, the method appendData is used by the system to add the new record without changing the existing ones and to index the records as per the key and the provided event timestamp. If no explicit event timestamp is provided then the system indexes the records by creating a timestamp at which the data came in. Therefore, lookup keys are a great way of configuring streaming data storage and retrieval to effectively fetch the group of relevant records whenever needed.

Configuring Lookup keys

Example 2 - Configuring Record ID (Primary Key)

Let's take the same running example and let's say you have a profile for each user that rates movies on your website. Let's suppose you have created a streaming feature group called user_attribute_streaming where the table has the features: user_id, gender, age, occupation, and zip_code. Now, let's set up a configuration for this streaming dataset. You can set Record ID as user_id and use it as a primary key to index the data because only one record per ID is present. This way, every time a new record is streamed, the system uses the method upsertData to find the record with the user_id and update it if it exists otherwise to insert a new record. The system indexes the records as per the primary key and the provided event timestamp. If no explicit event timestamp is provided then the system indexes the records by creating a timestamp at which the data came in. Therefore, the primary key can be useful in these scenarios to effectively configure streaming data storage and retrieval.

Configuring Primary key

Besides using the User Interface, you can also use the following API methods to configure streaming settings:

Snapshot Streaming Data

The next step is to Snapshot the streamed data to create a new streaming dataset. Once you have enough streamed records click on the "Finish" button and then click on the "Snapshot Streaming Data" button on the Datasets Details interface:

Streaming Finish Streaming Finish Streaming Finish

After the snapshot is successful you can click to the "View" option under the "Explore" or the "Data" column of the snapshotted dataset version to see the dataset statistics and the raw data respectively. If enough streamed records are not found, the "No streaming data found while snapshotting" error is raised.


Concatenate Batch Feature Group to Streaming Feature Group

You can concatenate batch feature groups to your newly created streaming feature group provided that the feature groups have the same Feature Group Type and schema. If you are going to use the data for serving predictions or a feature store lookups, you can check mark the box to skip snapshotting the data, otherwise let the data be snapshotted if you need the data to be materialized, used for training, or exported.

Streaming Finish Streaming Finish

The streamed records from the streaming feature group will be concatenated into the batch feature group, you could verify this by materializing the batch feature group data.


Adding Streaming To Existing Batch Feature Group

You can add streaming to an existing feature group type that is already attached to the project. Select the feature group from the Feature Groups list. The feature group details interface will be displayed:

Streaming Finish

Click on the "Actions" button and select the "Add Streaming" option. Provide a table name for the streaming feature group to be created and click on the "Add Streaming" button. Your new streaming feature group will be created. The schema and the streaming configuration will be automatically populated to match the batch feature group. You can re-configure the streaming configuration if you would like.

The new streaming feature group is already concatenated to the batch feature group that was used to create it, therefore whenever you finish streaming data and snapshot it, the data will automatically be concatenated into the associated batch feature group. You can materialize the batch feature group and view the added streamed data if you'd like.


Streaming APIs (Optional)

All the operations performed in the above steps can also be accomplished using the APIs offered. Feel free to access the streaming APIs currently available under the Abacus.AI SDK.