Amazon SageMaker Pipelines includes features that allow you to streamline and automate your machine learning (ML) workflows. This frees scientists and model developers to focus on model development and rapid experimentation rather than infrastructure management.
Pipelines provides the ability to orchestrate complex ML workflows using a simple Python SDK and the ability to visualize those workflows through SageMaker Studio. This helps automate data preparation and feature engineering tasks, as well as model training and deployment. Pipelines also integrates with Amazon SageMaker automatic model tuning, allowing you to automatically find hyperparameter values that result in the best performing model as determined by the metrics you choose.
Ensemble models are gaining popularity within the ML community. Generate more accurate predictions by combining predictions from multiple models. Pipelines allow you to quickly create end-to-end ML pipelines for ensemble models. This allows developers to build highly accurate models while maintaining efficiency and reproducibility.
This post provides an example of an ensemble model trained and deployed using a pipeline.
Use case overview
Create opportunities for sales reps to generate new leads and track them within Salesforce. The following application is an ML approach that uses unsupervised learning to automatically identify use cases for each opportunity based on various textual information such as name, description, details, and product service group.
Preliminary analysis showed that use cases vary by industry and that different use cases have significantly different distributions of annual revenue, which can be useful for segmentation. Use cases are therefore an important predictive feature that allows you to optimize your analysis and improve your sales recommendation models.
We can treat use case identification as a topic identification problem and explore various topic identification models such as latent semantic analysis (LSA), latent Dirichlet assignment (LDA), and BERTopic. In both LSA and LDA, each document is treated only as a collection of words, and the order or grammatical role of the words is not important, so information can be lost when determining topics. Additionally, a predetermined number of topics is required, which was difficult to determine in the dataset. BERTopic overcame the above issues and was therefore used to identify the use case.
This approach uses three consecutive BERTopic models to generate the final clustering in a hierarchical manner.
Each BERTopic model consists of four parts:
- embedded – BERTopic allows various embedding methods. In this scenario, input data comes from different areas and is typically entered manually. As a result, we use statement embedding to ensure scalability and fast processing.
- Dimension reduction – Use Uniform Manifold Approximation and Projection (UMAP), an unsupervised nonlinear dimensionality reduction technique, to reduce high-dimensional text vectors.
- clustering – Form different use case clusters using the Balanced Iterative Reducing and Clustering using Hierarchies (BIRCH) technique.
- Identifying keywords – Use class-based TF-IDF to extract the most representative words from each cluster.
sequential ensemble model
There is no predetermined number of topics, so set the number of clusters input to between 15 and 25 topics. When observed, some topics are widespread and common. Therefore, another layer of the BERTopic model is applied separately. After combining all newly identified topics in the second layer model with the original topics from the first layer results, manually perform post-processing to complete topic identification. Finally, use a third layer to create subtopics for some clusters.
For second- and third-tier models to work effectively, they require mapping files to map the results of previous models to specific words or phrases. This helps ensure that the clustering is accurate and relevant.
Bayesian optimization is used for hyperparameter tuning and cross-validation to reduce overfitting. The dataset includes characteristics such as opportunity name, opportunity details, needs, associated product names, product details, and product groups. The model is evaluated using a customized loss function and the best embedding model is selected.
Challenges and considerations
Here are some of the challenges and considerations for this solution:
- A pipeline’s data preprocessing capabilities are important for improving model performance. The ability to preprocess incoming data before training ensures that the model is fed high quality data. Preprocessing and data cleaning steps include converting all text columns to lowercase, removing template elements, abbreviations, URLs, emails, etc., removing unrelated NER labels, and converting combined text to headings. This includes verbalization. The result is more accurate and reliable predictions.
- You need a scalable computing environment so you can easily process and train millions of rows of data. This makes it easy to perform large-scale data processing and modeling tasks, reducing development time and costs.
- Each step in an ML workflow requires different resource requirements, so flexible and adaptable pipelines are essential for efficient resource allocation. By optimizing the resource usage of each step, you can reduce overall processing time, resulting in faster model development and deployment.
- To run custom scripts for data processing and model training, the required frameworks and dependencies must be available.
- Coordinating the training of multiple models can be difficult, especially when each subsequent model depends on the output of the previous model. The process of coordinating workflows between these models can be complex and time-consuming.
- Following each training layer, we need to modify the mapping to reflect the topics generated by the model and use it as input for subsequent model layers.
Solution overview
In this solution, the entry point is Amazon SageMaker Studio. It is a web-based integrated development environment (IDE) provided by AWS that enables data scientists and ML developers to collaboratively and efficiently build, train, and deploy ML models at scale. manner.
The following diagram shows the high-level architecture of the solution.
We use the following SageMaker pipeline steps as part of our architecture:
- SageMaker processing – This step allows you to preprocess and transform your data before training. One advantage of this step is that you can use built-in algorithms for common data transformations and resource autoscaling. You can also perform complex data preprocessing using custom code, which allows you to use custom container images.
- SageMaker training – In this step, you can train your ML model using SageMaker built-in algorithms or custom code. Distributed training allows you to train your model faster.
- SageMaker callback – This step allows you to run custom code during the ML workflow, such as sending notifications or triggering additional processing steps. You can run an external process and restart the pipeline workflow upon completion of this step.
- SageMaker model – In this step, you can create or register your model with Amazon SageMaker.
Implementation walkthrough
First, set up your Sagemaker pipeline.
import boto3
import sagemaker
# create a Session with custom region (e.g. us-east-1), will be None if not specified
region = "<your-region-name>"
# allocate default S3 bucket for SageMaker session, will be None if not specified
default_bucket = "<your-s3-bucket>"
boto_session = boto3.Session(region_name=region
sagemaker_client = boto_session.client("sagemaker")
Initialize a SageMaker session
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket= default_bucket,)
Set the Sagemaker execution role for a session
role = sagemaker.session.get_execution_role(sagemaker_session)
Manage interactions in pipeline context
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket=default_bucket,)
Define a base image for running the script
account_id = role.split(":")(4)
# create a base image that take care of dependencies
ecr_repository_name = "<your-base-image-to-run-script>".
tag = "latest"
container_image_uri = "{0}.dkr.ecr.{1}.amazonaws.com/{2}:{3}".format(account_id, region, ecr_repository_name, tag)
The workflow steps are detailed below.
- Data preprocessing – This involves cleaning and preparing data for feature engineering and splitting the data into training, testing, and validation sets.
import os
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import (
ProcessingInput,
ProcessingOutput,
ScriptProcessor,
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
# choose an instance type suitable for the job
default_value="ml.m5.4xlarge"
)
script_processor = ScriptProcessor(
image_uri=container_image_uri,
command=("python"),
instance_type=processing_instance_type,
instance_count=1,
role=role,
)
# define the data preprocess job
step_preprocess = ProcessingStep(
name="DataPreprocessing",
processor=script_processor,
inputs=(
ProcessingInput(source=BASE_DIR, destination="/opt/ml/processing/input/code/")
),
outputs=(
ProcessingOutput(output_name="data_train", source="/opt/ml/processing/data_train"), # output data and dictionaries etc for later steps
)
code=os.path.join(BASE_DIR, "preprocess.py"),
)
- Train a Layer 1 BERTopic model – The SageMaker training step is used to train the first layer of the BERTopic model using an Amazon Elastic Container Registry (Amazon ECR) image and a custom training script.
base_job_prefix="OppUseCase"
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.4xlarge"
)
# create an estimator for training job
estimator_first_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path= f"s3://{default_bucket}/{base_job_prefix}/train_first_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_first_layer.py"
)
# create training job for the estimator based on inputs from data-preprocess step
step_train_first_layer = TrainingStep(
name="TrainFirstLayerModel",
estimator = estimator_first_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs( "data_train" ).S3Output.S3Uri,
),
},
)
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum
first_sqs_queue_to_use = ParameterString(
name="FirstSQSQueue",
default_value= <first_queue_url>, # add queue url
)
first_callback_output = CallbackOutput(output_name="s3_mapping_first_update", output_type=CallbackOutputTypeEnum.String)
step_first_mapping_update = CallbackStep(
name="FirstMappingUpdate",
sqs_queue_url= first_sqs_queue_to_use,
# Input arguments that will be provided in the SQS message
inputs={
"input_location": f"s3://{default_bucket}/{base_job_prefix}/mapping",
"output_location": f"s3://{default_bucket}/{base_job_prefix}/ mapping_first_update "
},
outputs=(
first_callback_output,
),
)
step_first_mapping_update.add_depends_on((step_train_first_layer)) # call back is run after the step_train_first_layer
- Train a Layer 2 BERTopic model – Another SageMaker TrainingStep is used to train the second layer of the BERTopic model using ECR images and a custom training script.
estimator_second_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # same type as of first train layer
instance_count=1,
output_path=f"s3://{bucket}/{base_job_prefix}/train_second_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_second_layer.py"
)
# create training job for the estimator based on inputs from preprocessing, output of previous call back step and first train layer step
step_train_second_layer = TrainingStep(
name="TrainSecondLayerModel",
estimator = estimator_second_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs( "data_train").S3Output.S3Uri,
),
TrainingInput(
# Output of the previous call back step
s3_data= step_first_mapping_update.properties.Outputs("s3_mapping_first_update"),
),
TrainingInput(
s3_data=f"s3://{bucket}/{base_job_prefix}/train_first_layer"
),
}
)
- Use callback steps – Similar to step 3, this involves sending a message to the SQS queue that triggers the Lambda function. The Lambda function updates the mapping file in Amazon S3 and sends a success token back to the pipeline to resume execution.
second_sqs_queue_to_use = ParameterString(
name="SecondSQSQueue",
default_value= <second_queue_url>, # add queue url
)
second_callback_output = CallbackOutput(output_name="s3_mapping_second_update", output_type=CallbackOutputTypeEnum.String)
step_second_mapping_update = CallbackStep(
name="SecondMappingUpdate",
sqs_queue_url= second_sqs_queue_to_use,
# Input arguments that will be provided in the SQS message
inputs={
"input_location": f"s3://{default_bucket}/{base_job_prefix}/mapping_first_update ",
"output_location": f"s3://{default_bucket}/{base_job_prefix}/mapping_second_update "
},
outputs=(
second_callback_output,
),
)
step_second_mapping_update.add_depends_on((step_train_second_layer)) # call back is run after the step_train_second_layer
- Train a Layer 3 BERTopic model – This involves retrieving the mapping file from Amazon S3 and training the third layer of the BERTopic model using the ECR image and a custom training script.
estimator_third_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # same type as of prvious two train layers
instance_count=1,
output_path=f"s3://{default_bucket}/{base_job_prefix}/train_third_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_third_layer.py"
)
# create training job for the estimator based on inputs from preprocess step, second callback step and outputs of previous two train layers
step_train_third_layer = TrainingStep(
name="TrainThirdLayerModel",
estimator = estimator_third_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs("data_train").S3Output.S3Uri,
),
TrainingInput(
# s3_data = Output of the previous call back step
s3_data= step_second_mapping_update.properties.Outputs(' s3_mapping_second_update’),
),
TrainingInput(
s3_data=f"s3://{default_bucket}/{base_job_prefix}/train_first_layer"
),
TrainingInput(
s3_data=f"s3://{default_bucket}/{base_job_prefix}/train_second_layer"
),
}
)
- Register the model – The SageMaker model step is used to register the model with the SageMaker model registry. Once a model is registered, it can be used through the SageMaker inference pipeline.
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
model = Model(
image_uri=container_image_uri,
model_data=step_train_third_layer.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
register_args = model.register(
content_types=("text/csv"),
response_types=("text/csv"),
inference_instances=("ml.c5.9xlarge", "ml.m5.xlarge"),
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
)
step_register = ModelStep(name="OppUseCaseRegisterModel", step_args=register_args)
To effectively train the BERTopic model, BIRCH, and UMAP methods, you need a custom training image that can provide the additional dependencies and framework needed to run the algorithms. For a working example of a custom Docker image, see Create a custom Docker container image for SageMaker.
conclusion
In this post, we showed you how to use custom images to train an ensemble model using a wide range of steps provided by SageMaker Pipelines. For more information about starting a pipeline using an existing ML operations (MLOps) template, see Build, Automate, Manage, and Scale ML Workflows with Amazon SageMaker Pipelines.
About the author
Bikramjeet Singh He is an applied scientist on the AWS Sales Insights, Analytics, and Data Science (SIADS) team, responsible for building the GenAI platform and AI/ML infrastructure solutions for ML scientists within SIADS. Prior to working as an AS, Bikram worked as a software development engineer at SIADS and Alexa AI.
rahul sharma He is a Senior Specialist Solutions Architect at AWS, helping AWS customers build ML and generative AI solutions. Prior to joining AWS, Rahul worked for several years in the finance and insurance industry, helping customers build data and analytics platforms.
Sachin Mishra is a seasoned professional with 16 years of industry experience in technology consulting and software leadership roles. Sachin leads Sales Strategy Science and Engineering at AWS. In this role, he was responsible for expanding cognitive analytics for sales strategy, leveraging advanced AI/ML technology to derive insights and optimize business outcomes.
Nada Abdallah I’m a researcher at AWS. Her work and expertise spans multiple scientific areas of statistics and ML, including text analysis, recommendation systems, Bayesian modeling, and prediction. She previously worked in academia and received her master’s and doctorate degrees in biostatistics from UCLA. Through her work in academia and industry, she has published multiple papers in reputed statistical journals and applied ML conferences. In my free time, I enjoy running and spending time with my family.