Spark : v4.x - Features - Variant Data Type

You’ll find in this article, some informations about the Variant Data Type from Spark v4.x.

Introduction

The VARIANT data type stores semi-structured data (JSON, XML, nested objects) without predefined schema. It enables schema-on-read for heterogeneous data. It stores data in efficient binary format with metadata for type information. It supports path-based access (variant_col:field.path) and type casting.

Detail

Before Spark 4.0, handling semi-structured data required choosing between three problematic approaches :

  1. Store as STRING and Parse at runtime : Parsing overhead on every query. No type safety. Poor performance. Incompatible with predicate pushdown.
  2. Define rigid STRUCT schema : Schema must be known upfront. Schema evolution requires table alterations. Null fields waste storage. Heterogeneous data doesn’t fit.
  3. Use MAP<STRING, STRING> : Everything is strings. No nested access. Manual type conversion required. Poor query optimizer integration.

Spark 4.0 introduces the VARIANT type. It allows storing semi-structured data in a binary columnar format that preserves type information :

 1CREATE TABLE events (
 2    event_id STRING,
 3    timestamp TIMESTAMP,
 4    payload VARIANT
 5);
 6
 7INSERT INTO events VALUES (
 8    'EVT001',
 9    TIMESTAMP'2024-01-15 10:00:00',
10    PARSE_JSON('{"user": {"id": 12345, "name": "Alice"}, "action": "purchase", "amount": 99.99}')
11);
12
13SELECT 
14    event_id,
15    payload:user.name::STRING AS user_name,
16    payload:amount::DOUBLE AS amount
17FROM events
18WHERE payload:action::STRING = 'purchase';

The storage format consists of three components :

  1. Value Section: Actual data stored in compressed binary format. Numbers, strings, booleans stored in native types. Nested structures preserved.
  2. Metadata Section: Schema information for each value. Type tags indicate whether value is INT, STRING, OBJECT, ARRAY, etc. Enables type-safe extraction without parsing.
  3. Offset Index: Pointers to nested fields for fast path-based access. Enables payload:user.name access in O(1) time without scanning entire object.

This design enables several operations :

  1. Path-Based Access : Extracts nested values without parsing. Syntax (:) navigates into structure. Dot notation for nested objects. Bracket notation for arrays: payload:items[0].price.
1variant_column:field.nested.path
  1. Type Casting : Double-colon syntax casts variant to specific type. Fails if types incompatible. Use TRY_CAST for safe conversion: TRY_CAST(variant_column AS INT).
1variant_column::INT
2variant_column::STRING
  1. Type Introspection : Returns type name. Enables runtime type checking.
1schema_of_variant(variant_column)

The Catalyst optimizer support VARIANT operations. Predicate pushdown works for top-level fields. For example, WHERE payload:status::STRING = 'active' can skip partitions where the status field doesn’t match. However, deeply nested predicates may not push down efficiently.

List of functions for manipulating VARIANT data : SQL Documentation

Comparison

CriterionSTRING + JSONSTRUCTVARIANT
Fixed schema requiredNoYesNo
Read performanceLow (re-parsed)HighGood (pre-indexed)
Write performanceHighGoodGood
Heterogeneous schemasYesNoYes
Filter pushdownLimitedCompletePartial

Shredding

Variant Shredding is a hybrid technique. Instead of storing JSON as a simple character string, Spark analyzes the data and creates:

  • Metadata columns for fields that appear frequently (e.g., id, sensor).
  • A “remainder” column that contains everything that is too rare or too complex to be broken down.

This is an important mechanism for :

  • Performance (I/O): If you run a query to access the $.sensor information, Spark physically reads only the shredded column corresponding to that field. It ignores all the rest of the JSON document.
  • Compression: Since the decomposed data is typed (e.g., all IDs are integers), compression algorithms like Snappy or Zstd are much more efficient than on raw text.

The limitation of shredding : It does not transform all elements of the Variant into columns. Spark uses a limit (generally on the number of fields or depth) to avoid creating thousands of columns in Parquet.

Warning : Shredding only occurs if Spark detects that the data is repetitive enough to be worth extracting into columns. On a small sample, it is possible that Spark decides not to perform shredding.

Demonstration of the storage format with and without shredding (using duckdb to check the storage structure) :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.types import StructType, StructField, VariantType, IntegerType, StringType
 3from pyspark.sql.functions import col, parse_json
 4
 5REP_DATA_FILES = "file:///opt/spark/data/files"
 6URL_MASTER = "spark://spark-master:7077"
 7FILE_NAME_NS = "data_variant_ns"
 8FILE_NAME_SC = "data_variant_sc"
 9
10
11schema = StructType([
12    StructField("id", IntegerType(), nullable=False),
13    StructField("data_raw", StringType(), nullable=False),
14    StructField("data", VariantType(), nullable=True)
15])
16
17raw_data_small = [
18    (1,'{"sensor": "temp", "value": 22.5, "unit": "C"}',None),
19    (2,'{"sensor": "pressure", "value": 1013}',None)
20]
21
22raw_data_more = [
23    (3,'{"sensor": "temp", "value": 23, "unit": "C"}',None),
24    (4,'{"sensor": "pressure", "value": 900}',None),
25    (5,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
26    (6,'{"sensor": "pressure", "value": 1000}',None),
27    (7,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
28    (8,'{"sensor": "pressure", "value": 1000}',None),
29    (9,'{"sensor": "temp", "value": 22, "unit": "C"}',None),
30    (10,'{"sensor": "pressure", "value": 856}',None)
31]
32
33# Spark Instance for Shredding Demo
34spark = SparkSession.builder \
35    .appName("SPARK_VARIANTModeShredding") \
36    .master(URL_MASTER) \
37    .getOrCreate()
38
39
40# Load Dataframe with small dataset
41df_variant_ns = spark.createDataFrame(raw_data_small, schema)
42# Cast STRING to VARIANT (col : data)
43df_variant_ns = df_variant_ns.withColumn("data", parse_json(col("data_raw")))
44# Write dataframe into parquet file (no shredding)
45df_variant_ns.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_NS}.parquet")
46
47
48# Load Dataframe with complete dataset
49df_variant_sc = spark.createDataFrame((raw_data_small + raw_data_more), schema)
50# Cast STRING to VARIANT (col : data)
51df_variant_sc = df_variant_sc.withColumn("data", parse_json(col("data_raw")))
52# Write dataframe into parquet file (shredding)
53df_variant_sc.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_SC}.parquet")
54
55spark.stop()

Demonstration results :

1/ Regarding the data_variant_ns.parquet file

  • This file contains only two rows of data so Spark does not apply shredding by default.
  • This is confirmed by using the command duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_ns.parquet/*.parquet';" which displays the description of the data column type that stores the JSON as VARIANT (which is a STRUCT composed of metadata and value)
┌─────────────┬─────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │             column_type             │  null   │   key   │ default │  extra  │
│   varchar   │               varchar               │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id          │ INTEGER                             │ YES     │ NULL    │ NULL    │ NULL    │
│ data_raw    │ VARCHAR                             │ YES     │ NULL    │ NULL    │ NULL    │
│ data        │ STRUCT(metadata BLOB, "value" BLOB) │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴─────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘

2/ Regarding the data_variant_sc.parquet file

  • This file contains 10 rows of data and Spark applies shredding by default.
  • This is confirmed by using the command duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_sc.parquet/*.parquet';" which displays the description of the data column type that stores the JSON as VARIANT (which is a STRUCT composed of metadata and value but this time with the additional typed_value information which is the result of shredding)
┌─────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                                                                                                    column_type                                                                                                     │  null   │   key   │ default │  extra  │
│   varchar   │                                                                                                      varchar                                                                                                       │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id          │ INTEGER                                                                                                                                                                                                            │ YES     │ NULL    │ NULL    │ NULL    │
│ data_raw    │ VARCHAR                                                                                                                                                                                                            │ YES     │ NULL    │ NULL    │ NULL    │
│ data        │ STRUCT(metadata BLOB, "value" BLOB, typed_value STRUCT(sensor STRUCT("value" BLOB, typed_value VARCHAR), unit STRUCT("value" BLOB, typed_value VARCHAR), "value" STRUCT("value" BLOB, typed_value DECIMAL(18,1)))) │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘

Advices

  • from_json requires you to know the schema upfront, whereas parse_json preserves the flexibility of the raw data while optimizing it.
  • Use VARIANT for the landing/raw layer (and exploration) and STRUCT for the Gold/Serving layer.

Advantages

  1. Schema Flexibility : New fields appear without ALTER TABLE. Event payloads evolve over time without breaking queries. Different event types stored in same table. Critical for event sourcing, API integration, and rapid iteration environments.
  2. Better Performance than JSON Strings : Binary encoding avoids repeated parsing overhead. Shredding and metadata enables direct field access. Eliminates expensive regex or JSON parsing during read operations. Compressed storage reduces I/O.
  3. Partial Schema-on-read : Query only the fields you need. Unused fields incur no extraction cost. Enables exploratory analysis on unknown schemas. Gradually migrate to typed columns as schema stabilizes.
  4. Interoperability with JSON Systems : Direct JSON parsing and serialization. Compatible with REST APIs, message queues, NoSQL exports.

Limitations

  1. Performance Slower Than Native Types : Aggregations on VARIANT fields are slower than native INT/DECIMAL columns. Type extraction and validation happen at runtime. Path-based access has overhead compared to direct column access. Not suitable for performance-critical analytics on stable schemas.
  2. No Schema Validation : Any JSON can be inserted. No constraints on field presence or types. Typos in field names silently return NULL. Data quality issues harder to detect. Requires application-level validation.
  3. Limited Optimizer Support : Complex nested predicates may not push down to storage. Statistics collection is limited. Join optimization less effective than native types. Query planning cannot leverage schema information for optimization.
  4. Storage Overhead for Simple Data : It is slightly heavier than a perfectly optimized STRUCT because it must store metadata for each record
  5. Immature Ecosystem : Not all Spark functions support VARIANT. BI tools and connectors that read Parquet files directly do not all yet support the native VARIANT format.

When not to use Variant :

  • Highly structured data
  • Performance-critical operations
  • Partitionned columns

Real-World Use Cases

  • Use Case 1 : Multi-Source Event Aggregation
    • A data platform ingests events from 20+ microservices. Each service has different event schemas that evolve independently. A table with a VARIANT column stores all events without schema coordination. Queries extract service-specific fields as needed. Schema evolution happens without pipeline changes.
  • Use Case 2 : API Response Caching
    • A system caches API responses in Spark tables for analysis. API payloads are deeply nested with optional fields. VARIANT stores responses without flattening. Analysis queries extract specific paths. New API versions add fields without breaking existing queries.

Codes

Example 1 : Working with Variant Types

Note: Create an SQL script containing the code and use Spark SQL to execute it : docker exec -it spark-master spark-sql --master "local[*]" --conf "spark.hadoop.hive.cli.print.header=true" -f <script.sql> > script.log

SQL

  1-- SQL Example - Work with variant Type
  2
  3-- ============================================================================
  4-- INIT: Create Events Table with VARIANT Column
  5-- ============================================================================
  6
  7SELECT '=== Create events table with event dataset ===' as section;
  8
  9DROP TABLE IF EXISTS events;
 10CREATE TABLE events (
 11    event_id STRING,
 12    timestamp TIMESTAMP,
 13    event_type STRING,
 14    payload VARIANT
 15);
 16
 17-- Dataset
 18-- Each event type has different payload structure
 19INSERT INTO events VALUES
 20    ('EVT001', TIMESTAMP'2024-01-15 10:00:00', 'user_login',
 21     PARSE_JSON('{"user_id": 12345, "username": "alice", "ip_address": "192.168.1.100", "device": "mobile"}')),
 22    ('EVT002', TIMESTAMP'2024-01-15 10:05:00', 'purchase',
 23     PARSE_JSON('{"user_id": 12345, "order_id": "ORD123", "amount": 99.99, "currency": "USD", "items": [{"sku": "PROD001", "quantity": 2, "price": 49.99}]}')),
 24    ('EVT003', TIMESTAMP'2024-01-15 10:10:00', 'user_login',
 25     PARSE_JSON('{"user_id": 67890, "username": "bob", "ip_address": "10.0.0.50", "device": "desktop", "session_id": "sess_xyz"}')),
 26    ('EVT004', TIMESTAMP'2024-01-15 10:15:00', 'page_view',
 27     PARSE_JSON('{"user_id": 12345, "page_url": "/products/electronics", "referrer": "https://google.com", "duration_seconds": 45}')),
 28    ('EVT005', TIMESTAMP'2024-01-15 10:20:00', 'purchase',
 29     PARSE_JSON('{"user_id": 67890, "order_id": "ORD124", "amount": 149.50, "currency": "USD", "items": [{"sku": "PROD002", "quantity": 1, "price": 149.50}], "discount_code": "SAVE10"}')),
 30    ('EVT006', TIMESTAMP'2024-01-15 10:25:00', 'error',
 31     PARSE_JSON('{"error_code": "E500", "error_message": "Database connection timeout", "service": "payment-api", "stack_trace": "com.example.PaymentService.processPayment"}')),
 32    ('EVT007', TIMESTAMP'2024-01-15 10:30:00', 'page_view',
 33     PARSE_JSON('{"user_id": 67890, "page_url": "/checkout", "referrer": "/cart", "duration_seconds": 120, "cart_items": 1}')),
 34    ('EVT008', TIMESTAMP'2024-01-15 10:35:00', 'user_logout',
 35     PARSE_JSON('{"user_id": 12345, "username": "alice", "session_duration_minutes": 35}')),
 36    ('EVT009', TIMESTAMP'2024-01-15 10:40:00', 'error',
 37     PARSE_JSON('{"error_code": "E404", "error_message": "Product not found", "service": "catalog-api", "requested_sku": "PROD999"}')),
 38    ('EVT010', TIMESTAMP'2024-01-15 10:45:00', 'purchase',
 39     PARSE_JSON('{"user_id": 12345, "order_id": "ORD125", "amount": 299.99, "currency": "USD", "items": [{"sku": "PROD003", "quantity": 3, "price": 99.99}], "shipping_address": {"country": "US", "zip": "94105"}}'));
 40
 41-- ============================================================================
 42-- QUERY 1: Path-Based Field Extraction
 43-- ============================================================================
 44
 45SELECT '=== Query 1: Path-Based Field Extraction ===' as section;
 46
 47SELECT 
 48    event_id,
 49    event_type,
 50    payload:user_id AS user_id,
 51    payload:username AS username,
 52    payload:order_id AS order_id
 53FROM events
 54WHERE payload:user_id IS NOT NULL
 55ORDER BY timestamp;
 56
 57-- ============================================================================
 58-- QUERY 2: Type Casting with Double-Colon Syntax
 59-- ============================================================================
 60
 61SELECT '=== Query 2: Type Casting with Double-Colon Syntax ===' as section;
 62
 63SELECT 
 64    event_id,
 65    timestamp,
 66    payload:user_id::STRING AS user_id,
 67    payload:order_id::STRING AS order_id,
 68    payload:amount::DECIMAL(10,2) AS amount,
 69    payload:currency::STRING AS currency,
 70    payload:discount_code::STRING AS discount_code
 71FROM events
 72WHERE event_type = 'purchase'
 73ORDER BY payload:amount::DECIMAL DESC;
 74
 75-- ============================================================================
 76-- QUERY 3: Nested Field Access
 77-- ============================================================================
 78
 79SELECT '=== Query 3: Nested Field Accesss ===' as section;
 80
 81-- Access nested shipping address
 82SELECT 
 83    event_id,
 84    payload:order_id AS order_id,
 85    payload:shipping_address.country::STRING AS country,
 86    payload:shipping_address.zip::STRING AS zip_code,
 87    payload:items[0].sku::STRING AS first_item_sku,
 88    payload:items[0].quantity::INT AS first_item_qty
 89FROM events
 90WHERE payload:shipping_address IS NOT NULL;
 91
 92-- ============================================================================
 93-- QUERY 4: Type Introspection
 94-- ============================================================================
 95
 96SELECT '=== Query 4: Type Introspection ===' as section;
 97
 98SELECT 
 99    event_type,
100    schema_of_variant(payload:user_id) AS user_id_type,
101    schema_of_variant(payload:amount) AS amount_type,
102    schema_of_variant(payload:items) AS items_type,
103    COUNT(*) AS event_count
104FROM events
105GROUP BY event_type, schema_of_variant(payload:user_id), schema_of_variant(payload:amount),schema_of_variant(payload:items)
106ORDER BY event_type;
107
108-- ============================================================================
109-- QUERY 5: Aggregations with VARIANT Fields
110-- ============================================================================
111
112SELECT '=== Query 5: Aggregations with VARIANT Fields ===' as section;
113
114SELECT 
115    payload:user_id::STRING AS user_id,
116    COUNT(*) AS purchase_count,
117    SUM(payload:amount::DECIMAL(10,2)) AS total_revenue,
118    AVG(payload:amount::DECIMAL(10,2)) AS avg_order_value,
119    MIN(payload:amount::DECIMAL(10,2)) AS min_purchase,
120    MAX(payload:amount::DECIMAL(10,2)) AS max_purchase
121FROM events
122WHERE event_type = 'purchase'
123  AND payload:amount IS NOT NULL
124GROUP BY payload:user_id::STRING
125ORDER BY total_revenue DESC;
126
127-- ============================================================================
128-- QUERY 6: Conditional Logic Based on Variant Content
129-- ============================================================================
130
131SELECT '=== Query 6: Conditional Logic Based on Variant Content ===' as section;
132
133SELECT 
134    event_id,
135    event_type,
136    payload:user_id::STRING AS user_id,
137    CASE 
138        WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 200 THEN 'High Value'
139        WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 100 THEN 'Medium Value'
140        WHEN event_type = 'purchase' THEN 'Low Value'
141        WHEN event_type = 'error' AND payload:error_code::STRING LIKE 'E5%' THEN 'Critical Error'
142        WHEN event_type = 'error' THEN 'Minor Error'
143        ELSE 'Standard Event'
144    END AS event_category,
145    CASE 
146        WHEN event_type = 'purchase' THEN payload:amount::DECIMAL(10,2)
147        ELSE 0
148    END AS monetary_value
149FROM events
150ORDER BY timestamp;

Result

 1section
 2=== Create events table with event dataset ===
 3Time taken: 2.28 seconds, Fetched 1 row(s)
 4Response code
 5Time taken: 1.41 seconds
 6Response code
 7Time taken: 0.602 seconds
 8Response code
 9Time taken: 1.314 seconds
10
11
12section
13=== Query 1: Path-Based Field Extraction ===
14Time taken: 0.059 seconds, Fetched 1 row(s)
15event_id	event_type	user_id	username	order_id
16EVT001	user_login	12345	"alice"	NULL
17EVT002	purchase	12345	NULL	"ORD123"
18EVT003	user_login	67890	"bob"	NULL
19EVT004	page_view	12345	NULL	NULL
20EVT005	purchase	67890	NULL	"ORD124"
21EVT007	page_view	67890	NULL	NULL
22EVT008	user_logout	12345	"alice"	NULL
23EVT010	purchase	12345	NULL	"ORD125"
24Time taken: 1.815 seconds, Fetched 8 row(s)
25
26
27section
28=== Query 2: Type Casting with Double-Colon Syntax ===
29Time taken: 0.112 seconds, Fetched 1 row(s)
30event_id	timestamp	user_id	order_id	amount	currency	discount_code
31EVT010	2024-01-15 10:45:00	12345	ORD125	299.99	USD	NULL
32EVT005	2024-01-15 10:20:00	67890	ORD124	149.50	USD	SAVE10
33EVT002	2024-01-15 10:05:00	12345	ORD123	99.99	USD	NULL
34Time taken: 0.972 seconds, Fetched 3 row(s)
35
36
37section
38=== Query 3: Nested Field Accesss ===
39Time taken: 0.166 seconds, Fetched 1 row(s)
40event_id	order_id	country	zip_code	first_item_sku	first_item_qty
41EVT010	"ORD125"	US	94105	PROD003	3
42Time taken: 0.427 seconds, Fetched 1 row(s)
43
44
45section
46=== Query 4: Type Introspection ===
47Time taken: 0.099 seconds, Fetched 1 row(s)
48event_type	user_id_type	amount_type	items_type	event_count
49error	NULL	NULL	NULL	2
50page_view	BIGINT	NULL	NULL	2
51purchase	BIGINT	DECIMAL(5,2)	ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>>	1
52purchase	BIGINT	DECIMAL(4,2)	ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>>	1
53purchase	BIGINT	DECIMAL(4,1)	ARRAY<OBJECT<price: DECIMAL(4,1), quantity: BIGINT, sku: STRING>>	1
54user_login	BIGINT	NULL	NULL	2
55user_logout	BIGINT	NULL	NULL	1
56Time taken: 1.193 seconds, Fetched 7 row(s)
57
58
59section
60=== Query 5: Aggregations with VARIANT Fields ===
61Time taken: 0.09 seconds, Fetched 1 row(s)
62user_id	purchase_count	total_revenue	avg_order_value	min_purchase	max_purchase
6312345	2	399.98	199.990000	99.99	299.99
6467890	1	149.50	149.500000	149.50	149.50
65Time taken: 1.202 seconds, Fetched 2 row(s)
66
67
68section
69=== Query 6: Conditional Logic Based on Variant Content ===
70Time taken: 0.096 seconds, Fetched 1 row(s)
71event_id	event_type	user_id	event_category	monetary_value
72EVT001	user_login	12345	Standard Event	0.00
73EVT002	purchase	12345	Low Value	99.99
74EVT003	user_login	67890	Standard Event	0.00
75EVT004	page_view	12345	Standard Event	0.00
76EVT005	purchase	67890	Medium Value	149.50
77EVT006	error	NULL	Critical Error	0.00
78EVT007	page_view	67890	Standard Event	0.00
79EVT008	user_logout	12345	Standard Event	0.00
80EVT009	error	NULL	Minor Error	0.00
81EVT010	purchase	12345	High Value	299.99
82Time taken: 0.705 seconds, Fetched 10 row(s)

Example 2 - Comparison of Execution Plans: : VARIANT vs STRING vs STRUCT

Spark

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col, get_json_object, from_json, expr
 3from pyspark.sql.types import DoubleType, StringType, StructType, StructField
 4
 5
 6REP_DATA_FILES = "file:///opt/spark/data/files"
 7URL_MASTER = "spark://spark-master:7077"
 8
 9spark = SparkSession.builder \
10    .appName("SPARK_VARIANTModePERF") \
11    .master(URL_MASTER) \
12    .getOrCreate()
13
14# Reference dataset 
15transactions = [
16    ("TXN-001", '{"amount": 1500.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
17    ("TXN-002", '{"amount": 230.50,  "currency": "USD", "merchant": "UBER", "category": "transport", "surge": 1.2}'),
18    ("TXN-003", '{"amount": 89750.0, "currency": "EUR", "merchant": "AIRBNB", "category": "travel", "nights": 5}'),
19    ("TXN-004", '{"amount": 45.00,   "currency": "GBP", "merchant": "NETFLIX", "category": "entertainment"}'),
20    ("TXN-005", '{"amount": 3200.0,  "currency": "EUR", "merchant": "APPLE", "category": "electronics", "items": 2}'),
21    ("TXN-006", '{"amount": 800.00,  "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
22    ("TXN-007", '{"amount": 1100.0,  "currency": "USD", "merchant": "GOOGLE", "category": "software"}'),
23    ("TXN-008", '{"amount": 6500.0,  "currency": "EUR", "merchant": "BOOKING", "category": "travel", "nights": 3}'),
24    ("TXN-009", '{"amount": 320.75,  "currency": "GBP", "merchant": "SPOTIFY", "category": "entertainment"}'),
25    ("TXN-010", '{"amount": 2200.0,  "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
26]
27
28df = spark.createDataFrame(transactions, ["txn_id", "payload_str"])
29
30# Approach n°1 : STRING + get_json_object (Spark 3)
31df_str = df.withColumn("amount_str",   get_json_object(col("payload_str"), "$.amount")) \
32           .withColumn("currency_str", get_json_object(col("payload_str"), "$.currency"))
33df_str.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_str")
34df_temp0 = spark.read.parquet(f"{REP_DATA_FILES}/df_str")
35df_temp0.createOrReplaceTempView("df_str")
36
37# Approach 2: Typed STRUCT (best performance for stable and known schemas)
38txn_schema = StructType([
39    StructField("amount",   DoubleType()),
40    StructField("currency", StringType()),
41    StructField("merchant", StringType()),
42    StructField("category", StringType()),
43])
44df_struct = df.withColumn("payload_struct", from_json(col("payload_str"), txn_schema))
45df_struct.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_struct")
46df_temp1 = spark.read.parquet(f"{REP_DATA_FILES}/df_struct")
47df_temp1.createOrReplaceTempView("df_struct")
48
49# Approach 3: VARIANT (Spark 4.0 — best for heterogeneous or evolving schemas)
50df_variant = df.withColumn("payload_variant", expr("PARSE_JSON(payload_str)"))
51df_variant.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_variant")
52df_temp2 = spark.read.parquet(f"{REP_DATA_FILES}/df_variant")
53df_temp2.createOrReplaceTempView("df_variant")
54
55# Reference query : aggregation by currency
56print("=== STRING ===")
57spark.sql("""
58    SELECT currency_str
59          ,SUM(CAST(amount_str AS DOUBLE)) AS total
60    FROM df_str 
61    GROUP BY currency_str
62""").explain(mode="formatted")
63
64print("=== STRUCT ===")
65spark.sql("""
66    SELECT payload_struct.currency
67          ,SUM(payload_struct.amount) AS total
68    FROM df_struct 
69    GROUP BY payload_struct.currency
70""").explain(mode="formatted")
71
72print("=== VARIANT ===")
73spark.sql("""
74    SELECT payload_variant:currency::STRING AS currency
75        ,SUM(payload_variant:amount::DOUBLE) AS total
76    FROM df_variant 
77    GROUP BY payload_variant:currency::STRING
78""").explain(mode="formatted")
79
80spark.stop()

Result

  1=== STRING ===
  2== Physical Plan ==
  3AdaptiveSparkPlan (5)
  4+- HashAggregate (4)
  5   +- Exchange (3)
  6      +- HashAggregate (2)
  7         +- Scan parquet  (1)
  8
  9(1) Scan parquet 
 10Output [2]: [amount_str#6, currency_str#7]
 11Batched: true
 12Location: InMemoryFileIndex [file:/opt/spark/data/files/df_str]
 13ReadSchema: struct<amount_str:string,currency_str:string>
 14
 15(2) HashAggregate
 16Input [2]: [amount_str#6, currency_str#7]
 17Keys [1]: [currency_str#7]
 18Functions [1]: [partial_sum(cast(amount_str#6 as double))]
 19Aggregate Attributes [1]: [sum#18]
 20Results [2]: [currency_str#7, sum#19]
 21
 22(3) Exchange
 23Input [2]: [currency_str#7, sum#19]
 24Arguments: hashpartitioning(currency_str#7, 200), ENSURE_REQUIREMENTS, [plan_id=78]
 25
 26(4) HashAggregate
 27Input [2]: [currency_str#7, sum#19]
 28Keys [1]: [currency_str#7]
 29Functions [1]: [sum(cast(amount_str#6 as double))]
 30Aggregate Attributes [1]: [sum(cast(amount_str#6 as double))#17]
 31Results [2]: [currency_str#7, sum(cast(amount_str#6 as double))#17 AS total#16]
 32
 33(5) AdaptiveSparkPlan
 34Output [2]: [currency_str#7, total#16]
 35Arguments: isFinalPlan=false
 36
 37
 38
 39=== STRUCT ===
 40== Physical Plan ==
 41AdaptiveSparkPlan (6)
 42+- HashAggregate (5)
 43   +- Exchange (4)
 44      +- HashAggregate (3)
 45         +- Project (2)
 46            +- Scan parquet  (1)
 47
 48(1) Scan parquet 
 49Output [1]: [payload_struct#11]
 50Batched: true
 51Location: InMemoryFileIndex [file:/opt/spark/data/files/df_struct]
 52ReadSchema: struct<payload_struct:struct<amount:double,currency:string>>
 53
 54(2) Project
 55Output [2]: [payload_struct#11.amount AS _extract_amount#26, payload_struct#11.currency AS _groupingexpression#25]
 56Input [1]: [payload_struct#11]
 57
 58(3) HashAggregate
 59Input [2]: [_extract_amount#26, _groupingexpression#25]
 60Keys [1]: [_groupingexpression#25]
 61Functions [1]: [partial_sum(_extract_amount#26)]
 62Aggregate Attributes [1]: [sum#30]
 63Results [2]: [_groupingexpression#25, sum#31]
 64
 65(4) Exchange
 66Input [2]: [_groupingexpression#25, sum#31]
 67Arguments: hashpartitioning(_groupingexpression#25, 200), ENSURE_REQUIREMENTS, [plan_id=93]
 68
 69(5) HashAggregate
 70Input [2]: [_groupingexpression#25, sum#31]
 71Keys [1]: [_groupingexpression#25]
 72Functions [1]: [sum(_extract_amount#26)]
 73Aggregate Attributes [1]: [sum(_extract_amount#26)#24]
 74Results [2]: [_groupingexpression#25 AS currency#22, sum(_extract_amount#26)#24 AS total#20]
 75
 76(6) AdaptiveSparkPlan
 77Output [2]: [currency#22, total#20]
 78Arguments: isFinalPlan=false
 79
 80
 81
 82=== VARIANT ===
 83== Physical Plan ==
 84AdaptiveSparkPlan (6)
 85+- HashAggregate (5)
 86   +- Exchange (4)
 87      +- HashAggregate (3)
 88         +- Project (2)
 89            +- Scan parquet  (1)
 90
 91
 92(1) Scan parquet 
 93Output [1]: [payload_variant#39]
 94Batched: true
 95Location: InMemoryFileIndex [file:/opt/spark/data/files/df_variant]
 96ReadSchema: struct<payload_variant:struct<0:variant,1:variant>>
 97
 98(2) Project
 99Output [2]: [payload_variant#39.0 AS payload_variant#15, cast(payload_variant#39.1 as string) AS _groupingexpression#38]
100Input [1]: [payload_variant#39]
101
102(3) HashAggregate
103Input [2]: [payload_variant#15, _groupingexpression#38]
104Keys [1]: [_groupingexpression#38]
105Functions [1]: [partial_sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
106Aggregate Attributes [1]: [sum#40]
107Results [2]: [_groupingexpression#38, sum#41]
108
109(4) Exchange
110Input [2]: [_groupingexpression#38, sum#41]
111Arguments: hashpartitioning(_groupingexpression#38, 200), ENSURE_REQUIREMENTS, [plan_id=108]
112
113(5) HashAggregate
114Input [2]: [_groupingexpression#38, sum#41]
115Keys [1]: [_groupingexpression#38]
116Functions [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
117Aggregate Attributes [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37]
118Results [2]: [_groupingexpression#38 AS currency#33, sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37 AS total#35]
119
120(6) AdaptiveSparkPlan
121Output [2]: [currency#33, total#35]
122Arguments: isFinalPlan=false