This article will describe how to set up a Databricks cluster and provide code examples for running a c++ mojo created by the Driverless AI pipeline building recipe "nlp_model". The recipe only enables NLP BERT models based on Pytorch to process pure text. Much of the article applies to other model types as well.


Cluster Setup

Notes:

  • A runtime that uses Spark 3 is required.
  • The `daimojo` library compatible with the Python version in the selected runtime needs to be installed.
  • It's recommended to use 'Standard_DS12_v2' worker type or similar as it's the best balance of memory and cpu for the nlp_model.
  • The minimum driver type is 'Standard_DS13_v2' - depending on your work flow you may need more memory. 
  • `spark.tasks.cpus` must be set to the core count of the worker type. Failing to do this will result in copies of the model being loaded into memory for every core, resulting in OOM related errors.
  • Autoscaling has not been tested.
  • More workers = faster results.


Code


In the code below, replace `license_path`, `model_path`, and `input_path`. Or bypass `input_path` altogether and use a Spark DataFrame. The output location/frame also needs to be specified.


It's recommended to tune the partition count. A good starting point for the 'Standard_DS12_v2' worker type is choosing a count such that each partition has roughly 200 rows (this is unique to the nlp_model, which can get pretty memory hungry but doesn't benefit from more cpus). Use the metrics section of the cluster to make sure your partition count doesn't cause swap usage.



import daimojo.model
import datatable as dt
import os
import pandas as pd
from pathlib import Path
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import ArrayType, DoubleType


license_path = "/dbfs/FileStore/shared_uploads/.../license.sig"
model_path = "/dbfs/FileStore/shared_uploads/.../1921_jobs_mojo/mojo-pipeline/pipeline.mojo"
input_path = '/FileStore/shared_uploads/.../jobs.csv'

license_name = Path(license_path).name
model_name = Path(model_path).name

@pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols: pd.Series) -> pd.Series:
  os.environ['DRIVERLESS_AI_LICENSE_FILE'] = SparkFiles.get(license_name)
  model_path_remote = SparkFiles.get(model_name)
  model = daimojo.model(model_path_remote)
  X = dt.Frame([c.values for c in cols], names=model.feature_names)
  preds = pd.Series(model.predict(X).to_numpy().tolist())
  return preds

spark = SparkSession.builder.appName('daimojo').getOrCreate()
spark.sparkContext.addFile(license_path)
spark.sparkContext.addFile(model_path)

os.environ['DRIVERLESS_AI_LICENSE_FILE'] = license_path
model = daimojo.model(model_path)

input_df = spark.read.csv(input_path, header=True).repartition(500) # tune number of partitions

df_pred_multi = (
  input_df.select(
    # col('id'), # including a unique identifier is recommended
    predict_pandas_udf(*[col(c) for c in model.feature_names]).alias('predictions')
  ).select(
    # col('id'), # including a unique identifier is recommended
    *[col('predictions')[i].alias(f'prediction_{c}') for i, c in enumerate(model.output_names)]
  )
)
df_pred_multi.write.format('csv').mode('overwrite').option('header', 'true').save("test.csv")