🦌PySpark UDF instrumentation

Usage examples

Enable all Python UDF instrumentation (PySpark)

Enable only mapInPandas instrumentation

Enable only mapInArrow instrumentation

Enable only Arrow scalar pandas_udf instrumentation

Enable only regular @udf batch instrumentation

Enable only grouped applyInPandas() instrumentation

Enable only co-grouped applyInPandas() instrumentation


Spark Python UDF Instrumentation

Overview

Spark uses several physical nodes to run PySpark UDFs. Spark does not report Python execution time for these nodes by default.

DataFlint replaces the built-in physical plan nodes

It uses instrumented versions that add a duration SQL metric. The metric is visible in the Spark UI SQL tab.

Configuration

All properties default to false.

  • spark.dataflint.instrument.spark.enabled

    • Global toggle.

    • Enables all Spark Python UDF instrumentation on this page.

  • spark.dataflint.instrument.spark.mapInPandas.enabled

    • Enables mapInPandas instrumentation only.

  • spark.dataflint.instrument.spark.mapInArrow.enabled

    • Enables mapInArrow instrumentation only.

  • spark.dataflint.instrument.spark.arrowEvalPython.enabled

    • Enables ArrowEvalPythonExec instrumentation only.

    • Use this for scalar or scalar-iterator pandas_udf.

  • spark.dataflint.instrument.spark.batchEvalPython.enabled

    • Enables BatchEvalPythonExec instrumentation only.

    • Use this for regular pickle-based @udf.

  • spark.dataflint.instrument.spark.flatMapGroupsInPandas.enabled

    • Enables FlatMapGroupsInPandasExec instrumentation only.

    • Use this for GroupedData.applyInPandas().

  • spark.dataflint.instrument.spark.flatMapCoGroupsInPandas.enabled

    • Enables FlatMapCoGroupsInPandasExec instrumentation only.

    • Use this for PandasCogroupedOps.applyInPandas().

Use the global flag when you want coverage for scalar UDFs, scalar pandas_udf, grouped pandas UDFs, and co-grouped pandas UDFs.

Instrumented execution paths

DataFlintMapInPandasExec

  • Spark node: MapInPandasExec

  • PySpark API: DataFrame.mapInPandas()

  • What it measures:

    • Time spent sending partition batches to Python as Pandas DataFrames

    • Time spent running the user function on each partition

    • Time spent collecting the returned rows back into Spark

  • Spark versions:

    • 3.3 to 4.1

DataFlintPythonMapInArrowExec

  • Spark node: PythonMapInArrowExec / MapInArrowExec

  • PySpark API: DataFrame.mapInArrow()

  • What it measures:

    • Time spent sending partition batches to Python through Arrow

    • Time spent running the Python function on Arrow data

    • Time spent collecting the returned Arrow batches back into Spark

  • Spark versions:

    • 3.3 to 4.1

DataFlintArrowEvalPythonExec

  • Spark node: ArrowEvalPythonExec

  • PySpark API: pandas_udf with scalar or scalar-iterator semantics

    • Typical shape: @pandas_udf returning a Series

  • What it measures:

    • Time spent calling the Python UDF

    • Time spent transferring columnar batches through Apache Arrow

    • Both Python execution and Arrow serialization overhead

  • Spark versions:

    • 3.2 to 4.1

DataFlintBatchEvalPythonExec

  • Spark node: BatchEvalPythonExec

  • PySpark API: regular @udf

    • Non-Arrow, pickle-based SQL_BATCHED_UDF

  • What it measures:

    • Time spent serializing rows to Python with pickle

    • Time spent executing the Python UDF

    • Time spent deserializing results back into Spark

  • Spark versions:

    • 3.0 to 4.1

DataFlintFlatMapGroupsInPandasExec

  • Spark node: FlatMapGroupsInPandasExec

  • PySpark API: GroupedData.applyInPandas()

  • What it measures:

    • Time spent sending each key group to Python as a Pandas DataFrame

    • Time spent running the user function for every group

    • Time spent collecting the produced rows back into Spark

    • Total Python processing time across all groups in the task

  • Spark versions:

    • 3.0 to 4.1

DataFlintFlatMapCoGroupsInPandasExec

  • Spark node: FlatMapCoGroupsInPandasExec

  • PySpark API: PandasCogroupedOps.applyInPandas()

  • What it measures:

    • Time spent sending matching key groups from both DataFrames to Python

    • Time spent running the user function on the pair of Pandas DataFrames

    • Time spent collecting the output rows back into Spark

    • Total Python processing time across all co-group calls in the task

  • Spark versions:

    • 3.0 to 4.1

  • Implementation note:

    • This node has two children, left and right

    • Its withNewChildrenInternal must preserve both SparkPlan children

How it works

  1. During SparkDataflintPlugin.init(), DataFlint checks instrumentation flags.

  2. If enabled, it registers DataFlintInstrumentationExtension into spark.sql.extensions.

  3. The extension walks the physical plan and looks for supported Python execution nodes.

  4. It replaces matching nodes with DataFlint equivalents.

    • MapInPandasExec β†’ DataFlintMapInPandasExec

    • PythonMapInArrowExec / MapInArrowExec β†’ DataFlintPythonMapInArrowExec

    • ArrowEvalPythonExec β†’ DataFlintArrowEvalPythonExec

    • BatchEvalPythonExec β†’ DataFlintBatchEvalPythonExec

    • FlatMapGroupsInPandasExec β†’ DataFlintFlatMapGroupsInPandasExec

    • FlatMapCoGroupsInPandasExec β†’ DataFlintFlatMapCoGroupsInPandasExec

  5. The instrumented nodes preserve the original execution semantics and child plans.

    • Co-group execution keeps both left and right children intact.

  6. The instrumented nodes wrap the original doExecute() logic.

    • They measure System.nanoTime() before and after each partition evaluation.

    • They accumulate elapsed time into a SQL metric (milliseconds).

  7. DataFlint detects the Spark runtime version.

    • It picks the correct implementation for that version’s internal APIs.

What you get

Once enabled, the Spark UI SQL plan shows a duration metric on the corresponding DataFlint Python nodes.

The metric is total wall-clock time in milliseconds.

What it includes depends on the execution path:

  • mapInPandas and mapInArrow

    • Python execution time across partitions

  • ArrowEvalPythonExec

    • Python execution plus Arrow transfer overhead

  • BatchEvalPythonExec

    • Python execution plus pickle serialization and deserialization overhead

  • FlatMapGroupsInPandasExec

    • Total Python processing time across all groups in the task

  • FlatMapCoGroupsInPandasExec

    • Total Python processing time across all co-group calls in the task

This helps you confirm whether the bottleneck sits in row serialization, Arrow transfer, scalar UDF evaluation, grouped pandas logic, or co-grouped pandas logic.

Last updated