Module markov.api.models.artifacts.inference_pipeline
Classes
class InferencePipeline (name: str, schema: List[MarkovModelArtifactColumnSchema], samples: List)
-
The
InferencePipeline
class is designed to define a structured and adaptable workflow for performing inferences with Markov models. It provides a way to orchestrate the processing of input data through various stages of transformation, prediction, and post-processing. This is particularly useful when deploying machine learning models for real-time predictions or batch processing.Args
name
:str
- A user-defined name for the inference pipeline, which helps identify and manage different pipelines.
schema
:List[MarkovModelArtifactColumnSchema]
- A schema that specifies the column names and data types of the input data.
- The schema is necessary to create the
InferencePipeline
. Supported input data type is currently - limited to
pandas DataFrame
, and the schema represents the name and type of each column. - Note that MultiIndex DataFrames are not supported.
samples
:list
- A list of sample inputs to the
InferencePipeline
. These samples can be used to provide
Examples
To create an
InferencePipeline
, you can follow these steps: 1. Initialize anInferencePipeline
object with a descriptive name and a schema:python sample_input = pd.DataFrame([{ "feature1": 0.3, # float "feature2": 4 # int "feature3: "text" # str }] my_schema = infer_schema_from_dataframe(df=sample_input) my_inference_pipeline = InferencePipeline(name="ImageProcessingPipeline", schema=my_schema)
2. Add pipeline stages to define the specific processing steps. These stages can include components like data transformation, model prediction, and post-processing. For example, adding a transformation and a predictor:python my_inference_pipeline.add_pipeline_stage(MarkovTransformer(name="image-preprocessing", transformer=my_image_preprocessor)) my_inference_pipeline.add_pipeline_stage(MarkovPredictor(name="image-predictor", predictor=my_image_model))
3. Add dependent code paths, if needed:python my_inference_pipeline.add_dependent_code(["/path/to/dependency1", "/path/to/dependency2"])
4. Add Pip requirements, if needed:python my_inference_pipeline.add_pip_requirements(["package1==1.0.0", "package2>=2.1.0"])
5. Save and register the pipeline to make it available for deployment:python my_inference_pipeline.register(model_id="my_model")
The registeredInferencePipeline
can then be used to make predictions and process input data as part of a Markov model's deployment. The pipeline's flexibility allows for customization and chaining of various processing stages to suit the specific requirements of your machine learning application.Ancestors
- MarkovBaseModelArtifact
- abc.ABC
Static methods
def load_inference_pipeline(model_id: str, force_download=False)
-
Loads an inference pipeline for the specified model ID. Here model ID refers to the unique identifier associated with the model reigstered with MarkovML. The loaded inference pipeline for making predictions using the
predict()
methodArgs
model_id
:str
- The unique identifier for the model registered with MarkovML.
force_download
:bool
, optional- If True, forces downloading the model pipeline even if it exists locally. Defaults to False.
Returns
InferencePipeline
- An instance of InferencePipeline initialized with the loaded model pipeline, schema, and samples.
Raises
ValueError
- If the model pipeline or associated schema and samples cannot be fetched or loaded.
Example
>>> pipeline = InferencePipeline.load_inference_pipeline(model_id='my_model') >>> model_input = pd.DataFrame([{"column": "value"}]) >>> pipeline.predict(model_input)
Methods
def add_pipeline_stage(self, stage: Union[MarkovPyfunc, MarkovTransformer, MarkovPredictor])
-
Add a pipeline stage to the inference pipeline. A pipeline stage represents a distinct step in the inference pipeline, defining a specific data transformation or processing task. These stages are used to sequence and organize the workflow of input data, and it's important to ensure that the output of each stage matches the input expected by the next stage.
Args
stage
:INFERENCE_PIPELINE_STAGE
- A pipeline stage to be added to the inference pipeline. This stage can include
tasks such as data preprocessing, machine learning model prediction, or post-processing of prediction results.
Raises
ValueError
- If the provided stage is not one of MarkovPyfunc, MarkovTransformer, or MarkovPredictor.
Examples: To add stages to an InferencePipeline, you can do the following: 1. Define and create the stages (e.g., MarkovTransformer or MarkovPredictor instances). 2. Add the stages to the pipeline to construct the desired workflow. Example:
# Define and create stages image_preprocessor = MarkovTransformer(name="image-preprocessing", transformer=my_image_preprocessor) image_predictor = MarkovPredictor(name="image-predictor", predictor=my_image_model) # Add the stages to the pipeline my_inference_pipeline.add_pipeline_stage(image_preprocessor) my_inference_pipeline.add_pipeline_stage(image_predictor)
The added pipeline stages are executed in sequence, allowing you to customize and control the flow of data throughout the inference process, making it suitable for various machine learning applications.
def predict(self, model_input)
-
Perform prediction on
model_input
using the registered InferencePipeline. A registered InferencePipeline has all stages defined required to convertmodel_input
toprediction
. You can get a registeredInferencePipeline
either by creating a newInferencePipeline
, adding stages and callingregister
or by loading an existingInferencePipeline
.Args
model_input
- Input understood by the InferencePipeline. Make sure it conforms to the schema of the InferencePipeline.
Returns
Prediction Dict[str, np.array]
def predict_samples(self)
-
Predicts outputs for the samples in the inference pipeline.
Returns
list
- A list of predictions corresponding to each sample.
Example
>>> pipeline = InferencePipeline(...) >>> pipeline.add_samples(samples=[...]) >>> pipeline.add_schema(schema={...}) >>> predictions = pipeline.predict_samples()
>>> # Another example with a loaded inference pipeline >>> loaded_pipeline = InferencePipeline.load_inference_pipeline(model_id='my_model') >>> predictions = loaded_pipeline.predict_samples()
def register(self, model_id: str, validate: bool = True)
-
Register the inference model pipeline with the Markov backend. Once registered, this inference model becomes available for model-app generation. You can interact with this model using Markov UI once the model app is generated.
Args
model_id
:str
- The identifier of the model container for which this artifact is being registered.
validate
:bool
- A flag to allow users to opt out of validating the Inference Pipeline.
This method performs the following steps: 1. Saves the inference pipeline using the
save
method. 2. Uploads the saved pipeline as a ZIP archive to the Markov backend. 3. Sets the model's schema and sample data for the registered model, if provided. 4. Records the uploaded path of the model artifact. Example:>>> my_inference_pipeline.register(model_id="my_model") Note: - Before using this method, ensure that you have configured the Markov backend correctly. - The model's schema and sample data can be set using the <code>\_schema</code> and <code>\_samples</code> attributes of the InferencePipeline instance. For more details on the registration process, consult the Markov documentation.
def save(self)
-
Save the inference pipeline, prepare it for registration, and return the path where the saved pipeline is stored. This method performs the following actions: 1. Adds Markov-specific pip requirements for the model. 2. Adds output parsing logic to ensure the model's output is in a standard format for parsing when serving as an app. 3. Converts the inference pipeline to an MLflow-compatible pipeline. 4. Registers the MLflow inference pipeline.
Returns
str
- The file path where the saved inference pipeline is stored.
Example:
>>> saved_pipeline_path = my_inference_pipeline.save()
def validate_local(self, model_pipeline_path: str, model_id: str)
-
Validates the MlflowInferencePipeline locally.
This method validates the machine learning pipeline by creating a virtual environment, installing the necessary requirements, and running a test prediction. It ensures the pipeline's functionality and compatibility with the given requirements.
Args
model_pipeline_path
:str
- The path to the model pipeline directory.
model_id
:str
- The unique identifier for the model.
Returns
bool
- True if the pipeline validation is successful, False otherwise.
Cleanup
Removes the created virtual environment after validation.
Inherited members