# Spark Instrumentation

## DataFlint Spark Instrumentation

DataFlint provides optional instrumentation that enhances Spark observability. It injects extra metrics and metadata into the Spark UI. All instrumentation is opt-in and disabled by default

{% hint style="warning" %}
This feature is currently experimental, DataFlint instrument your query physical plan so use with caution.
{% endhint %}

### What changed in 0.8.9

DataFlint now uses a single generic `TimedExec` wrapper.

It replaces the previous per-node wrapper classes.

`TimedExec` adds:

* `duration`
* `rddId`

The wrapper preserves the wrapped node's native metrics.

It exposes `child.children` as its own children.

This keeps the SQL graph as one node.

You do not get a duplicate node in Spark UI or DataFlint UI.

`InMemoryTableScanExec` and all `Exchange` nodes are never wrapped.

Version-specific nodes are matched by class-name string.

This avoids `NoClassDefFoundError` on older Spark versions.

Join codegen is cancelled where codegen instrumentation does not work.

`DataWritingCommandExec` gets duration support through `doPrepare` delegation.

### Config flags and wrapped nodes

| Config key                                                         | Nodes wrapped                                                                                                                                                                                                                                                                                                                          |
| ------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `spark.dataflint.instrument.spark.enabled`                         | All nodes below                                                                                                                                                                                                                                                                                                                        |
| `spark.dataflint.instrument.spark.sqlNodes.enabled`                | `FilterExec`, `ProjectExec`, `ExpandExec`, `GenerateExec`, `SortExec`, `SortAggregateExec`, `HashAggregateExec`, `SortMergeJoinExec`, `BroadcastHashJoinExec`, `BroadcastNestedLoopJoinExec`, `CartesianProductExec`, `WindowGroupLimitExec`, `DataWritingCommandExec`, `FileSourceScanExec`, `RowDataSourceScanExec`, `BatchScanExec` |
| `spark.dataflint.instrument.spark.window.enabled`                  | `WindowExec`, `WindowInPandasExec`                                                                                                                                                                                                                                                                                                     |
| `spark.dataflint.instrument.spark.batchEvalPython.enabled`         | `BatchEvalPythonExec`                                                                                                                                                                                                                                                                                                                  |
| `spark.dataflint.instrument.spark.arrowEvalPython.enabled`         | `ArrowEvalPythonExec`                                                                                                                                                                                                                                                                                                                  |
| `spark.dataflint.instrument.spark.mapInPandas.enabled`             | `MapInPandasExec`                                                                                                                                                                                                                                                                                                                      |
| `spark.dataflint.instrument.spark.mapInArrow.enabled`              | `PythonMapInArrowExec`                                                                                                                                                                                                                                                                                                                 |
| `spark.dataflint.instrument.spark.flatMapGroupsInPandas.enabled`   | `FlatMapGroupsInPandasExec`                                                                                                                                                                                                                                                                                                            |
| `spark.dataflint.instrument.spark.flatMapCoGroupsInPandas.enabled` | `FlatMapCoGroupsInPandasExec`                                                                                                                                                                                                                                                                                                          |

### Stage grouping and duration attribution

Stage grouping now comes from the SQL plan topology.

`Exchange` boundaries define stage edges.

The same logic works for live execution and history server.

#### Inclusive and exclusive mode

Stage view supports **Inclusive** and **Exclusive** duration modes.

* **Inclusive** shows native metric values as-is.
* **Exclusive** is the default.
* **Exclusive** runs the attribution algorithm.
* **Exclusive** normalizes per-stage durations to `executorRunTime`.

Attribution mode auto-enables when the plan has instrumented nodes.

#### Attribution order

1. Instrumented node
   * Use the `duration` metric.
   * Then subtract the max descendant timing.
   * Descent stops at `Exchange`.
   * Instrumented blocking nodes are crossed through.
   * Non-instrumented blocking nodes terminate the descent.
2. Native exclusive metric
   * `Sort` uses sort time.
   * `HashAggregate` and `ObjectHashAggregate` use agg time.
   * `ShuffledHashJoin` uses build time.
3. Any timing metric on the node
   * Non-instrumented blocking nodes use it as-is.
   * Other nodes subtract the max descendant timing.
4. Whole-stage codegen fallback
   * Use enclosing `WholeStageCodegen` `pipelineTime`.

Exchange write and read durations come from shuffle metrics.

Producer and consumer stages split those durations.

Any node with a duration metric is treated as non-blocking.

Metric handling is null-safe across the reducer.

### Instrumentation guides

* [PySpark UDF instrumentation](https://dataflint.gitbook.io/dataflint-for-spark/integrations/spark-instrumentation/pyspark-udf-instrumentation)
* [Window instrumentation](https://dataflint.gitbook.io/dataflint-for-spark/integrations/spark-instrumentation/window-instrumentation)
