🦣window instrumentation

Usage examples

Enable global instrumentation (PySpark)

Enable only window instrumentation


Spark Python UDF Instrumentation

Overview

Spark window function execute native/java/python aggregated function per partition. Spark does not report execution time per partition by default.

DataFlint replaces the built-in physical plan nodes:

  • DataFlintWindow / DataFlintWindowInPandas

It uses instrumented versions that add a duration SQL metric. The metric is visible in the Spark UI SQL tab and on the DataFlint UI

Usage examples

Enable all Python UDF instrumentation (PySpark)

Enable only Window instrumentation

Configuration

All properties default to false.

  • spark.dataflint.instrument.spark.enabled

    • Global toggle.

    • Enables all DataFlint Spark instrumentation.

  • spark.dataflint.instrument.spark.window.enabled

    • Enables window instrumentation only.

Supported Spark versions

DataFlint ships version-specific implementations. They match each Spark version’s internal MapInBatchExec API.

Spark version
Window implementation
WindowInPanda

3.x.x

DataFlintWindowExec

DataFlintWindowInPandasExec

4.0.x

DataFlintWindowExec

DataFlintWindowInPandasExec_4_0

4.1.x

DataFlintWindowExec

DataFlintArrowWindowPythonExec_4_1

How it works

  1. During SparkDataflintPlugin.init(), DataFlint checks instrumentation flags.

  2. If enabled, it registers DataFlintInstrumentationExtension into spark.sql.extensions.

  3. The extension injects a Strategy.

  4. The rule runs during injectPlannerStrategy.

    • It runs as part of logical to physical plan transformation.

  5. The rule uses match to replace PhysicalWindow to DataFlintWindow.

  6. It replaces matching nodes:

    • WindowExec β†’ DataFlintWindowExec

    • WindowInPandasExec / ArrowWindowPythonExec β†’ DataFlintWindowInPandas

  7. The instrumented nodes wrap the original doExecute() logic.

    • They measure System.nanoTime() before and after each partition evaluation.

    • They accumulate elapsed time into a SQL metric (milliseconds).

  8. DataFlint detects the Spark runtime version.

    • It picks the correct implementation for that version’s internal APIs.

What you get

Once enabled, the Spark UI SQL plan shows a duration metric. It appears on DataFlintWindow / DataFlintWindowInPandas nodes.

The metric is total wall-clock time (milliseconds). It covers window execution across all partitions.

The timer wraps the actual Window execution within partition:

  1. Pulls rows from its child (e.g., a SortExec)

  2. Accumulates a full partition group

  3. Computes window function values

  4. Emits output rows

Steps 1–3 all happen during the first iter.hasNext call. So the timer captures child-fetch time + window computation time combined.

This helps you confirm if the Window function is the bottleneck.

Last updated