This article will describe how to set up a Databricks cluster and provide code examples for running a c++ mojo created by the Driverless AI pipeline building recipe "nlp_model". The recipe only enables NLP BERT models based on Pytorch to process pure text. Much of the article applies to other model types as well.
Cluster Setup
Notes:
- A runtime that uses Spark 3 is required.
- The `daimojo` library compatible with the Python version in the selected runtime needs to be installed.
- It's recommended to use 'Standard_DS12_v2' worker type or similar as it's the best balance of memory and cpu for the nlp_model.
- The minimum driver type is 'Standard_DS13_v2' - depending on your work flow you may need more memory.
- `spark.tasks.cpus` must be set to the core count of the worker type. Failing to do this will result in copies of the model being loaded into memory for every core, resulting in OOM related errors.
- Autoscaling has not been tested.
- More workers = faster results.
Code
In the code below, replace `license_path`, `model_path`, and `input_path`. Or bypass `input_path` altogether and use a Spark DataFrame. The output location/frame also needs to be specified.
It's recommended to tune the partition count. A good starting point for the 'Standard_DS12_v2' worker type is choosing a count such that each partition has roughly 200 rows (this is unique to the nlp_model, which can get pretty memory hungry but doesn't benefit from more cpus). Use the metrics section of the cluster to make sure your partition count doesn't cause swap usage.
import daimojo.model import datatable as dt import os import pandas as pd from pathlib import Path from pyspark import SparkFiles from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import ArrayType, DoubleType license_path = "/dbfs/FileStore/shared_uploads/.../license.sig" model_path = "/dbfs/FileStore/shared_uploads/.../1921_jobs_mojo/mojo-pipeline/pipeline.mojo" input_path = '/FileStore/shared_uploads/.../jobs.csv' license_name = Path(license_path).name model_name = Path(model_path).name @pandas_udf(returnType=ArrayType(DoubleType())) def predict_pandas_udf(*cols: pd.Series) -> pd.Series: os.environ['DRIVERLESS_AI_LICENSE_FILE'] = SparkFiles.get(license_name) model_path_remote = SparkFiles.get(model_name) model = daimojo.model(model_path_remote) X = dt.Frame([c.values for c in cols], names=model.feature_names) preds = pd.Series(model.predict(X).to_numpy().tolist()) return preds spark = SparkSession.builder.appName('daimojo').getOrCreate() spark.sparkContext.addFile(license_path) spark.sparkContext.addFile(model_path) os.environ['DRIVERLESS_AI_LICENSE_FILE'] = license_path model = daimojo.model(model_path) input_df = spark.read.csv(input_path, header=True).repartition(500) # tune number of partitions df_pred_multi = ( input_df.select( # col('id'), # including a unique identifier is recommended predict_pandas_udf(*[col(c) for c in model.feature_names]).alias('predictions') ).select( # col('id'), # including a unique identifier is recommended *[col('predictions')[i].alias(f'prediction_{c}') for i, c in enumerate(model.output_names)] ) ) df_pred_multi.write.format('csv').mode('overwrite').option('header', 'true').save("test.csv")