Module markov.api.models.artifacts.inference_pipeline

Classes

class InferencePipeline (name: str, schema: List[MarkovModelArtifactColumnSchema], samples: List)

The InferencePipeline class is designed to define a structured and adaptable workflow for performing inferences with Markov models. It provides a way to orchestrate the processing of input data through various stages of transformation, prediction, and post-processing. This is particularly useful when deploying machine learning models for real-time predictions or batch processing.

Args

name : str
A user-defined name for the inference pipeline, which helps identify and manage different pipelines.
schema : List[MarkovModelArtifactColumnSchema]
A schema that specifies the column names and data types of the input data.
The schema is necessary to create the InferencePipeline. Supported input data type is currently
limited to pandas DataFrame, and the schema represents the name and type of each column.
Note that MultiIndex DataFrames are not supported.
samples : list
A list of sample inputs to the InferencePipeline. These samples can be used to provide

Examples

To create an InferencePipeline, you can follow these steps: 1. Initialize an InferencePipeline object with a descriptive name and a schema: python sample_input = pd.DataFrame([{ "feature1": 0.3, # float "feature2": 4 # int "feature3: "text" # str }] my_schema = infer_schema_from_dataframe(df=sample_input) my_inference_pipeline = InferencePipeline(name="ImageProcessingPipeline", schema=my_schema) 2. Add pipeline stages to define the specific processing steps. These stages can include components like data transformation, model prediction, and post-processing. For example, adding a transformation and a predictor: python my_inference_pipeline.add_pipeline_stage(MarkovTransformer(name="image-preprocessing", transformer=my_image_preprocessor)) my_inference_pipeline.add_pipeline_stage(MarkovPredictor(name="image-predictor", predictor=my_image_model)) 3. Add dependent code paths, if needed: python my_inference_pipeline.add_dependent_code(["/path/to/dependency1", "/path/to/dependency2"]) 4. Add Pip requirements, if needed: python my_inference_pipeline.add_pip_requirements(["package1==1.0.0", "package2>=2.1.0"]) 5. Save and register the pipeline to make it available for deployment: python my_inference_pipeline.register(model_id="my_model") The registered InferencePipeline can then be used to make predictions and process input data as part of a Markov model's deployment. The pipeline's flexibility allows for customization and chaining of various processing stages to suit the specific requirements of your machine learning application.

Ancestors

Static methods

def load_inference_pipeline(model_id: str, force_download=False)

Loads an inference pipeline for the specified model ID. Here model ID refers to the unique identifier associated with the model reigstered with MarkovML. The loaded inference pipeline for making predictions using the predict() method

Args

model_id : str
The unique identifier for the model registered with MarkovML.
force_download : bool, optional
If True, forces downloading the model pipeline even if it exists locally. Defaults to False.

Returns

InferencePipeline
An instance of InferencePipeline initialized with the loaded model pipeline, schema, and samples.

Raises

ValueError
If the model pipeline or associated schema and samples cannot be fetched or loaded.

Example

>>> pipeline = InferencePipeline.load_inference_pipeline(model_id='my_model')
>>> model_input = pd.DataFrame([{"column": "value"}])
>>> pipeline.predict(model_input)

Methods

def add_pipeline_stage(self, stage: Union[MarkovPyfuncMarkovTransformerMarkovPredictor])

Add a pipeline stage to the inference pipeline. A pipeline stage represents a distinct step in the inference pipeline, defining a specific data transformation or processing task. These stages are used to sequence and organize the workflow of input data, and it's important to ensure that the output of each stage matches the input expected by the next stage.

Args

stage : INFERENCE_PIPELINE_STAGE
A pipeline stage to be added to the inference pipeline. This stage can include

tasks such as data preprocessing, machine learning model prediction, or post-processing of prediction results.

Raises

ValueError
If the provided stage is not one of MarkovPyfunc, MarkovTransformer, or MarkovPredictor.

Examples: To add stages to an InferencePipeline, you can do the following: 1. Define and create the stages (e.g., MarkovTransformer or MarkovPredictor instances). 2. Add the stages to the pipeline to construct the desired workflow. Example:

# Define and create stages
image_preprocessor = MarkovTransformer(name="image-preprocessing", transformer=my_image_preprocessor)
image_predictor = MarkovPredictor(name="image-predictor", predictor=my_image_model)
# Add the stages to the pipeline
my_inference_pipeline.add_pipeline_stage(image_preprocessor)
my_inference_pipeline.add_pipeline_stage(image_predictor)

The added pipeline stages are executed in sequence, allowing you to customize and control the flow of data throughout the inference process, making it suitable for various machine learning applications.

def predict(self, model_input)

Perform prediction on model_input using the registered InferencePipeline. A registered InferencePipeline has all stages defined required to convert model_input to prediction. You can get a registered InferencePipeline either by creating a new InferencePipeline, adding stages and calling register or by loading an existing InferencePipeline.

Args

model_input
Input understood by the InferencePipeline. Make sure it conforms to the schema of the InferencePipeline.

Returns

Prediction Dict[str, np.array]

def predict_samples(self)

Predicts outputs for the samples in the inference pipeline.

Returns

list
A list of predictions corresponding to each sample.

Example

>>> pipeline = InferencePipeline(...)
>>> pipeline.add_samples(samples=[...])
>>> pipeline.add_schema(schema={...})
>>> predictions = pipeline.predict_samples()
>>> # Another example with a loaded inference pipeline
>>> loaded_pipeline = InferencePipeline.load_inference_pipeline(model_id='my_model')
>>> predictions = loaded_pipeline.predict_samples()
def register(self, model_id: str, validate: bool = True)

Register the inference model pipeline with the Markov backend. Once registered, this inference model becomes available for model-app generation. You can interact with this model using Markov UI once the model app is generated.

Args

model_id : str
The identifier of the model container for which this artifact is being registered.
validate : bool
A flag to allow users to opt out of validating the Inference Pipeline.

This method performs the following steps: 1. Saves the inference pipeline using the save method. 2. Uploads the saved pipeline as a ZIP archive to the Markov backend. 3. Sets the model's schema and sample data for the registered model, if provided. 4. Records the uploaded path of the model artifact. Example:

>>> my_inference_pipeline.register(model_id="my_model")
Note:
- Before using this method, ensure that you have configured the Markov backend correctly.
- The model's schema and sample data can be set using the <code>\_schema</code> and <code>\_samples</code> attributes of the InferencePipeline instance.
For more details on the registration process, consult the Markov documentation.
def save(self)

Save the inference pipeline, prepare it for registration, and return the path where the saved pipeline is stored. This method performs the following actions: 1. Adds Markov-specific pip requirements for the model. 2. Adds output parsing logic to ensure the model's output is in a standard format for parsing when serving as an app. 3. Converts the inference pipeline to an MLflow-compatible pipeline. 4. Registers the MLflow inference pipeline.

Returns

str
The file path where the saved inference pipeline is stored.

Example:

>>> saved_pipeline_path = my_inference_pipeline.save()
def validate_local(self, model_pipeline_path: str, model_id: str)

Validates the MlflowInferencePipeline locally.

This method validates the machine learning pipeline by creating a virtual environment, installing the necessary requirements, and running a test prediction. It ensures the pipeline's functionality and compatibility with the given requirements.

Args

model_pipeline_path : str
The path to the model pipeline directory.
model_id : str
The unique identifier for the model.

Returns

bool
True if the pipeline validation is successful, False otherwise.

Cleanup

Removes the created virtual environment after validation.

Inherited members