π¦PySpark UDF instrumentation
Usage examples


Enable all Python UDF instrumentation (PySpark)
Enable only mapInPandas instrumentation
Enable only mapInArrow instrumentation
Enable only Arrow scalar pandas_udf instrumentation
pandas_udf instrumentationEnable only regular @udf batch instrumentation
@udf batch instrumentationEnable only grouped applyInPandas() instrumentation
applyInPandas() instrumentationEnable only co-grouped applyInPandas() instrumentation
applyInPandas() instrumentationSpark Python UDF Instrumentation
Overview
Spark uses several physical nodes to run PySpark UDFs. Spark does not report Python execution time for these nodes by default.
DataFlint replaces the built-in physical plan nodes
It uses instrumented versions that add a duration SQL metric. The metric is visible in the Spark UI SQL tab.
Configuration
All properties default to false.
spark.dataflint.instrument.spark.enabledGlobal toggle.
Enables all Spark Python UDF instrumentation on this page.
spark.dataflint.instrument.spark.mapInPandas.enabledEnables
mapInPandasinstrumentation only.
spark.dataflint.instrument.spark.mapInArrow.enabledEnables
mapInArrowinstrumentation only.
spark.dataflint.instrument.spark.arrowEvalPython.enabledEnables
ArrowEvalPythonExecinstrumentation only.Use this for scalar or scalar-iterator
pandas_udf.
spark.dataflint.instrument.spark.batchEvalPython.enabledEnables
BatchEvalPythonExecinstrumentation only.Use this for regular pickle-based
@udf.
spark.dataflint.instrument.spark.flatMapGroupsInPandas.enabledEnables
FlatMapGroupsInPandasExecinstrumentation only.Use this for
GroupedData.applyInPandas().
spark.dataflint.instrument.spark.flatMapCoGroupsInPandas.enabledEnables
FlatMapCoGroupsInPandasExecinstrumentation only.Use this for
PandasCogroupedOps.applyInPandas().
Use the global flag when you want coverage for scalar UDFs, scalar pandas_udf, grouped pandas UDFs, and co-grouped pandas UDFs.
Instrumented execution paths
DataFlintMapInPandasExec
DataFlintMapInPandasExecSpark node:
MapInPandasExecPySpark API:
DataFrame.mapInPandas()What it measures:
Time spent sending partition batches to Python as Pandas DataFrames
Time spent running the user function on each partition
Time spent collecting the returned rows back into Spark
Spark versions:
3.3to4.1
DataFlintPythonMapInArrowExec
DataFlintPythonMapInArrowExecSpark node:
PythonMapInArrowExec/MapInArrowExecPySpark API:
DataFrame.mapInArrow()What it measures:
Time spent sending partition batches to Python through Arrow
Time spent running the Python function on Arrow data
Time spent collecting the returned Arrow batches back into Spark
Spark versions:
3.3to4.1
DataFlintArrowEvalPythonExec
DataFlintArrowEvalPythonExecSpark node:
ArrowEvalPythonExecPySpark API:
pandas_udfwith scalar or scalar-iterator semanticsTypical shape:
@pandas_udfreturning aSeries
What it measures:
Time spent calling the Python UDF
Time spent transferring columnar batches through Apache Arrow
Both Python execution and Arrow serialization overhead
Spark versions:
3.2to4.1
DataFlintBatchEvalPythonExec
DataFlintBatchEvalPythonExecSpark node:
BatchEvalPythonExecPySpark API: regular
@udfNon-Arrow, pickle-based
SQL_BATCHED_UDF
What it measures:
Time spent serializing rows to Python with pickle
Time spent executing the Python UDF
Time spent deserializing results back into Spark
Spark versions:
3.0to4.1
DataFlintFlatMapGroupsInPandasExec
DataFlintFlatMapGroupsInPandasExecSpark node:
FlatMapGroupsInPandasExecPySpark API:
GroupedData.applyInPandas()What it measures:
Time spent sending each key group to Python as a Pandas DataFrame
Time spent running the user function for every group
Time spent collecting the produced rows back into Spark
Total Python processing time across all groups in the task
Spark versions:
3.0to4.1
DataFlintFlatMapCoGroupsInPandasExec
DataFlintFlatMapCoGroupsInPandasExecSpark node:
FlatMapCoGroupsInPandasExecPySpark API:
PandasCogroupedOps.applyInPandas()What it measures:
Time spent sending matching key groups from both DataFrames to Python
Time spent running the user function on the pair of Pandas DataFrames
Time spent collecting the output rows back into Spark
Total Python processing time across all co-group calls in the task
Spark versions:
3.0to4.1
Implementation note:
This node has two children,
leftandrightIts
withNewChildrenInternalmust preserve both SparkPlan children
How it works
During
SparkDataflintPlugin.init(), DataFlint checks instrumentation flags.If enabled, it registers
DataFlintInstrumentationExtensionintospark.sql.extensions.The extension walks the physical plan and looks for supported Python execution nodes.
It replaces matching nodes with DataFlint equivalents.
MapInPandasExecβDataFlintMapInPandasExecPythonMapInArrowExec/MapInArrowExecβDataFlintPythonMapInArrowExecArrowEvalPythonExecβDataFlintArrowEvalPythonExecBatchEvalPythonExecβDataFlintBatchEvalPythonExecFlatMapGroupsInPandasExecβDataFlintFlatMapGroupsInPandasExecFlatMapCoGroupsInPandasExecβDataFlintFlatMapCoGroupsInPandasExec
The instrumented nodes preserve the original execution semantics and child plans.
Co-group execution keeps both
leftandrightchildren intact.
The instrumented nodes wrap the original
doExecute()logic.They measure
System.nanoTime()before and after each partition evaluation.They accumulate elapsed time into a SQL metric (milliseconds).
DataFlint detects the Spark runtime version.
It picks the correct implementation for that versionβs internal APIs.
What you get
Once enabled, the Spark UI SQL plan shows a duration metric on the corresponding DataFlint Python nodes.
The metric is total wall-clock time in milliseconds.
What it includes depends on the execution path:
mapInPandasandmapInArrowPython execution time across partitions
ArrowEvalPythonExecPython execution plus Arrow transfer overhead
BatchEvalPythonExecPython execution plus pickle serialization and deserialization overhead
FlatMapGroupsInPandasExecTotal Python processing time across all groups in the task
FlatMapCoGroupsInPandasExecTotal Python processing time across all co-group calls in the task
This helps you confirm whether the bottleneck sits in row serialization, Arrow transfer, scalar UDF evaluation, grouped pandas logic, or co-grouped pandas logic.
Last updated