Executing Jupyter Notebooks as Tasks
When you want to productionize data science experiments without rewriting Jupyter notebooks as Python scripts, the NotebookTask in flyte-sdk allows you to execute .ipynb files directly as Flyte tasks. This integration uses Papermill to inject parameters and capture outputs, while automatically rendering the executed notebook as a Flyte Report.
Prerequisites
To use notebooks in flyte-sdk, you must install the papermill plugin:
pip install flyteplugins-papermill
Your environment must also have papermill, nbformat, and nbconvert installed.
Step 1: Prepare the Jupyter Notebook
For a notebook to function as a Flyte task, you must designate specific cells for inputs and outputs using Jupyter cell tags.
- Parameters Cell: Create a cell at the top of your notebook and add the
parameterstag to it. This is where Papermill will inject the values passed from the Flyte workflow. - Outputs Cell: Create a cell at the end of your notebook and add the
outputstag. This cell must callrecord_outputs()as its final expression.
Example notebook logic (analysis.ipynb):
# Cell tagged 'parameters'
input_path = "" # Default value, will be overwritten by Flyte
threshold = 0.5
# Cell logic
import pandas as pd
from flyteplugins.papermill import load_file, record_outputs
# Reconstruct Flyte File object from the injected path string
f = load_file(input_path)
with f.open_sync() as fh:
df = pd.read_csv(fh)
result_count = len(df[df['value'] > threshold])
# Cell tagged 'outputs'
# This must be the last expression in the cell
record_outputs(count=result_count)
Step 2: Define the NotebookTask
In your Python code, use the NotebookTask class from flyteplugins.papermill to wrap the notebook. You must specify the notebook_path, inputs, and outputs.
import flyte
from flyteplugins.papermill import NotebookTask
from flyte.io import File
# Define the environment where the notebook will run
env = flyte.TaskEnvironment(name="notebook_env")
# Define the task
analyze_data = NotebookTask(
name="analyze_data",
notebook_path="notebooks/analysis.ipynb",
task_environment=env,
inputs={
"input_path": File,
"threshold": float
},
outputs={
"count": int
},
output_notebooks=True # Captures the executed .ipynb as a task output
)
Step 3: Handle Complex Types inside the Notebook
Papermill only supports JSON-serializable primitives (int, float, str, bool, list, dict). When you pass Flyte IO types like File, Dir, or DataFrame to a NotebookTask, flyte-sdk serializes them to their remote URI strings.
Inside the notebook, use the following helpers from flyteplugins.papermill to reconstruct the objects:
load_file(path): Returns aflyte.io.File.load_dir(path): Returns aflyte.io.Dir.load_dataframe(uri): Returns aflyte.io.DataFrame.
from flyteplugins.papermill import load_file, load_dir, load_dataframe
# Reconstructing a File
my_file = load_file(input_path)
# Reconstructing a DataFrame
my_df = load_dataframe(df_uri)
pandas_df = my_df.all() # Materialize as pandas
Step 4: Record and Return Outputs
To return values to the Flyte workflow, call record_outputs(**kwargs) in the cell tagged outputs. The keys in kwargs must match the names defined in the outputs dictionary of your NotebookTask.
from flyteplugins.papermill import record_outputs
# The values can be primitives or complex Flyte types
record_outputs(
count=100,
processed_data=my_file
)
Step 5: Execute the Workflow
You can now use the NotebookTask just like any other Flyte task within a workflow.
@env.task
def notebook_workflow(data_file: File) -> int:
return analyze_data(input_path=data_file, threshold=0.8)
Advanced Configuration
Capturing Notebook Artifacts
By setting output_notebooks=True in the NotebookTask constructor, flyte-sdk automatically adds two additional outputs to your task:
output_notebook: The original source.ipynbfile.output_notebook_executed: The executed.ipynbfile containing all cell outputs.
These are returned as flyte.io.File objects and can be passed to downstream tasks.
Flyte Reports
NotebookTask automatically enables Flyte Reports. After execution, the notebook is converted to HTML (using nbconvert) and uploaded to the Flyte platform. You can view the rendered notebook, including plots and tables, directly in the Flyte Console under the "Reports" tab for that task execution.
Spark Integration
If your notebook needs to run Spark jobs, you can pass a Spark configuration to the plugin_config parameter.
from flyteplugins.spark import Spark
spark_nb = NotebookTask(
name="spark_analysis",
notebook_path="notebooks/spark.ipynb",
task_environment=env,
plugin_config=Spark(
spark_conf={"spark.executor.memory": "2g"},
),
inputs={"data_path": str},
outputs={"result": int}
)
Inside the notebook, you should initialize the Spark session using SparkSession.builder.getOrCreate(). Note that dynamic code distribution via SparkContext.addPyFile() is not supported for notebook tasks; all dependencies must be pre-installed in the Docker image.