Spark : v4.x - Features - Variant Data Type
You’ll find in this article, some informations about the Variant Data Type from Spark v4.x.
Introduction
The VARIANT data type stores semi-structured data (JSON, XML, nested objects) without predefined schema.
It enables schema-on-read for heterogeneous data. It stores data in efficient binary format with metadata for type information. It supports path-based access (variant_col:field.path) and type casting.
Detail
Before Spark 4.0, handling semi-structured data required choosing between three problematic approaches :
- Store as
STRINGand Parse at runtime : Parsing overhead on every query. No type safety. Poor performance. Incompatible with predicatepushdown. - Define rigid
STRUCTschema : Schema must be known upfront. Schema evolution requires table alterations.Nullfields waste storage. Heterogeneous data doesn’t fit. - Use
MAP<STRING, STRING>: Everything is strings. No nested access. Manual type conversion required. Poor query optimizer integration.
Spark 4.0 introduces the VARIANT type. It allows storing semi-structured data in a binary columnar format that preserves type information :
1CREATE TABLE events (
2 event_id STRING,
3 timestamp TIMESTAMP,
4 payload VARIANT
5);
6
7INSERT INTO events VALUES (
8 'EVT001',
9 TIMESTAMP'2024-01-15 10:00:00',
10 PARSE_JSON('{"user": {"id": 12345, "name": "Alice"}, "action": "purchase", "amount": 99.99}')
11);
12
13SELECT
14 event_id,
15 payload:user.name::STRING AS user_name,
16 payload:amount::DOUBLE AS amount
17FROM events
18WHERE payload:action::STRING = 'purchase';
The storage format consists of three components :
- Value Section: Actual data stored in compressed binary format. Numbers, strings, booleans stored in native types. Nested structures preserved.
- Metadata Section: Schema information for each value. Type tags indicate whether value is INT, STRING, OBJECT, ARRAY, etc. Enables type-safe extraction without parsing.
- Offset Index: Pointers to nested fields for fast path-based access. Enables
payload:user.nameaccess inO(1)time without scanning entire object.
This design enables several operations :
- Path-Based Access : Extracts nested values without parsing. Syntax (
:) navigates into structure. Dot notation for nested objects. Bracket notation for arrays:payload:items[0].price.
1variant_column:field.nested.path
- Type Casting : Double-colon syntax casts variant to specific type. Fails if types incompatible. Use TRY_CAST for safe conversion:
TRY_CAST(variant_column AS INT).
1variant_column::INT
2variant_column::STRING
- Type Introspection : Returns type name. Enables runtime type checking.
1schema_of_variant(variant_column)
The Catalyst optimizer support VARIANT operations. Predicate pushdown works for top-level fields. For example, WHERE payload:status::STRING = 'active' can skip partitions where the status field doesn’t match. However, deeply nested predicates may not push down efficiently.
List of functions for manipulating VARIANT data : SQL Documentation
Comparison
| Criterion | STRING + JSON | STRUCT | VARIANT |
|---|---|---|---|
| Fixed schema required | No | Yes | No |
| Read performance | Low (re-parsed) | High | Good (pre-indexed) |
| Write performance | High | Good | Good |
| Heterogeneous schemas | Yes | No | Yes |
| Filter pushdown | Limited | Complete | Partial |
Shredding
Variant Shredding is a hybrid technique. Instead of storing JSON as a simple character string, Spark analyzes the data and creates:
- Metadata columns for fields that appear frequently (e.g., id, sensor).
- A “remainder” column that contains everything that is too rare or too complex to be broken down.
This is an important mechanism for :
- Performance (I/O): If you run a query to access the
$.sensorinformation, Spark physically reads only theshreddedcolumn corresponding to that field. It ignores all the rest of the JSON document. - Compression: Since the decomposed data is typed (e.g., all IDs are integers), compression algorithms like Snappy or Zstd are much more efficient than on raw text.
The limitation of shredding : It does not transform all elements of the Variant into columns. Spark uses a limit (generally on the number of fields or depth) to avoid creating thousands of columns in Parquet.
Warning :
Shreddingonly occurs if Spark detects that the data is repetitive enough to be worth extracting into columns. On a small sample, it is possible that Spark decides not to performshredding.
Demonstration of the storage format with and without shredding (using duckdb to check the storage structure) :
1from pyspark.sql import SparkSession
2from pyspark.sql.types import StructType, StructField, VariantType, IntegerType, StringType
3from pyspark.sql.functions import col, parse_json
4
5REP_DATA_FILES = "file:///opt/spark/data/files"
6URL_MASTER = "spark://spark-master:7077"
7FILE_NAME_NS = "data_variant_ns"
8FILE_NAME_SC = "data_variant_sc"
9
10
11schema = StructType([
12 StructField("id", IntegerType(), nullable=False),
13 StructField("data_raw", StringType(), nullable=False),
14 StructField("data", VariantType(), nullable=True)
15])
16
17raw_data_small = [
18 (1,'{"sensor": "temp", "value": 22.5, "unit": "C"}',None),
19 (2,'{"sensor": "pressure", "value": 1013}',None)
20]
21
22raw_data_more = [
23 (3,'{"sensor": "temp", "value": 23, "unit": "C"}',None),
24 (4,'{"sensor": "pressure", "value": 900}',None),
25 (5,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
26 (6,'{"sensor": "pressure", "value": 1000}',None),
27 (7,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
28 (8,'{"sensor": "pressure", "value": 1000}',None),
29 (9,'{"sensor": "temp", "value": 22, "unit": "C"}',None),
30 (10,'{"sensor": "pressure", "value": 856}',None)
31]
32
33# Spark Instance for Shredding Demo
34spark = SparkSession.builder \
35 .appName("SPARK_VARIANTModeShredding") \
36 .master(URL_MASTER) \
37 .getOrCreate()
38
39
40# Load Dataframe with small dataset
41df_variant_ns = spark.createDataFrame(raw_data_small, schema)
42# Cast STRING to VARIANT (col : data)
43df_variant_ns = df_variant_ns.withColumn("data", parse_json(col("data_raw")))
44# Write dataframe into parquet file (no shredding)
45df_variant_ns.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_NS}.parquet")
46
47
48# Load Dataframe with complete dataset
49df_variant_sc = spark.createDataFrame((raw_data_small + raw_data_more), schema)
50# Cast STRING to VARIANT (col : data)
51df_variant_sc = df_variant_sc.withColumn("data", parse_json(col("data_raw")))
52# Write dataframe into parquet file (shredding)
53df_variant_sc.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_SC}.parquet")
54
55spark.stop()
Demonstration results :
1/ Regarding the data_variant_ns.parquet file
- This file contains only two rows of data so Spark does not apply
shreddingby default. - This is confirmed by using the command
duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_ns.parquet/*.parquet';"which displays the description of the data column type that stores the JSON as VARIANT (which is a STRUCT composed ofmetadataandvalue)
┌─────────────┬─────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id │ INTEGER │ YES │ NULL │ NULL │ NULL │
│ data_raw │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ data │ STRUCT(metadata BLOB, "value" BLOB) │ YES │ NULL │ NULL │ NULL │
└─────────────┴─────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘
2/ Regarding the data_variant_sc.parquet file
- This file contains 10 rows of data and Spark applies
shreddingby default. - This is confirmed by using the command
duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_sc.parquet/*.parquet';"which displays the description of the data column type that stores the JSON as VARIANT (which is a STRUCT composed ofmetadataandvaluebut this time with the additionaltyped_valueinformation which is the result ofshredding)
┌─────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id │ INTEGER │ YES │ NULL │ NULL │ NULL │
│ data_raw │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ data │ STRUCT(metadata BLOB, "value" BLOB, typed_value STRUCT(sensor STRUCT("value" BLOB, typed_value VARCHAR), unit STRUCT("value" BLOB, typed_value VARCHAR), "value" STRUCT("value" BLOB, typed_value DECIMAL(18,1)))) │ YES │ NULL │ NULL │ NULL │
└─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘
Advices
from_jsonrequires you to know the schema upfront, whereasparse_jsonpreserves the flexibility of the raw data while optimizing it.- Use
VARIANTfor the landing/raw layer (and exploration) andSTRUCTfor the Gold/Serving layer.
Advantages
- Schema Flexibility : New fields appear without
ALTER TABLE. Event payloads evolve over time without breaking queries. Different event types stored in same table. Critical for event sourcing, API integration, and rapid iteration environments. - Better Performance than JSON Strings : Binary encoding avoids repeated parsing overhead. Shredding and metadata enables direct field access. Eliminates expensive regex or JSON parsing during read operations. Compressed storage reduces I/O.
- Partial
Schema-on-read: Query only the fields you need. Unused fields incur no extraction cost. Enables exploratory analysis on unknown schemas. Gradually migrate to typed columns as schema stabilizes. - Interoperability with JSON Systems : Direct JSON parsing and serialization. Compatible with REST APIs, message queues, NoSQL exports.
Limitations
- Performance Slower Than Native Types : Aggregations on VARIANT fields are slower than native INT/DECIMAL columns. Type extraction and validation happen at runtime. Path-based access has overhead compared to direct column access. Not suitable for performance-critical analytics on stable schemas.
- No Schema Validation : Any JSON can be inserted. No constraints on field presence or types. Typos in field names silently return
NULL. Data quality issues harder to detect. Requires application-level validation. - Limited Optimizer Support : Complex nested predicates may not push down to storage. Statistics collection is limited. Join optimization less effective than native types. Query planning cannot leverage schema information for optimization.
- Storage Overhead for Simple Data : It is slightly heavier than a perfectly optimized
STRUCTbecause it must store metadata for each record - Immature Ecosystem : Not all Spark functions support VARIANT. BI tools and connectors that read Parquet files directly do not all yet support the native VARIANT format.
When not to use Variant :
- Highly structured data
- Performance-critical operations
- Partitionned columns
Real-World Use Cases
- Use Case 1 : Multi-Source Event Aggregation
- A data platform ingests events from 20+ microservices. Each service has different event schemas that evolve independently. A table with a VARIANT column stores all events without schema coordination. Queries extract service-specific fields as needed. Schema evolution happens without pipeline changes.
- Use Case 2 : API Response Caching
- A system caches API responses in Spark tables for analysis. API payloads are deeply nested with optional fields. VARIANT stores responses without flattening. Analysis queries extract specific paths. New API versions add fields without breaking existing queries.
Codes
Example 1 : Working with Variant Types
Note: Create an SQL script containing the code and use Spark SQL to execute it :
docker exec -it spark-master spark-sql --master "local[*]" --conf "spark.hadoop.hive.cli.print.header=true" -f <script.sql> > script.log
SQL
1-- SQL Example - Work with variant Type
2
3-- ============================================================================
4-- INIT: Create Events Table with VARIANT Column
5-- ============================================================================
6
7SELECT '=== Create events table with event dataset ===' as section;
8
9DROP TABLE IF EXISTS events;
10CREATE TABLE events (
11 event_id STRING,
12 timestamp TIMESTAMP,
13 event_type STRING,
14 payload VARIANT
15);
16
17-- Dataset
18-- Each event type has different payload structure
19INSERT INTO events VALUES
20 ('EVT001', TIMESTAMP'2024-01-15 10:00:00', 'user_login',
21 PARSE_JSON('{"user_id": 12345, "username": "alice", "ip_address": "192.168.1.100", "device": "mobile"}')),
22 ('EVT002', TIMESTAMP'2024-01-15 10:05:00', 'purchase',
23 PARSE_JSON('{"user_id": 12345, "order_id": "ORD123", "amount": 99.99, "currency": "USD", "items": [{"sku": "PROD001", "quantity": 2, "price": 49.99}]}')),
24 ('EVT003', TIMESTAMP'2024-01-15 10:10:00', 'user_login',
25 PARSE_JSON('{"user_id": 67890, "username": "bob", "ip_address": "10.0.0.50", "device": "desktop", "session_id": "sess_xyz"}')),
26 ('EVT004', TIMESTAMP'2024-01-15 10:15:00', 'page_view',
27 PARSE_JSON('{"user_id": 12345, "page_url": "/products/electronics", "referrer": "https://google.com", "duration_seconds": 45}')),
28 ('EVT005', TIMESTAMP'2024-01-15 10:20:00', 'purchase',
29 PARSE_JSON('{"user_id": 67890, "order_id": "ORD124", "amount": 149.50, "currency": "USD", "items": [{"sku": "PROD002", "quantity": 1, "price": 149.50}], "discount_code": "SAVE10"}')),
30 ('EVT006', TIMESTAMP'2024-01-15 10:25:00', 'error',
31 PARSE_JSON('{"error_code": "E500", "error_message": "Database connection timeout", "service": "payment-api", "stack_trace": "com.example.PaymentService.processPayment"}')),
32 ('EVT007', TIMESTAMP'2024-01-15 10:30:00', 'page_view',
33 PARSE_JSON('{"user_id": 67890, "page_url": "/checkout", "referrer": "/cart", "duration_seconds": 120, "cart_items": 1}')),
34 ('EVT008', TIMESTAMP'2024-01-15 10:35:00', 'user_logout',
35 PARSE_JSON('{"user_id": 12345, "username": "alice", "session_duration_minutes": 35}')),
36 ('EVT009', TIMESTAMP'2024-01-15 10:40:00', 'error',
37 PARSE_JSON('{"error_code": "E404", "error_message": "Product not found", "service": "catalog-api", "requested_sku": "PROD999"}')),
38 ('EVT010', TIMESTAMP'2024-01-15 10:45:00', 'purchase',
39 PARSE_JSON('{"user_id": 12345, "order_id": "ORD125", "amount": 299.99, "currency": "USD", "items": [{"sku": "PROD003", "quantity": 3, "price": 99.99}], "shipping_address": {"country": "US", "zip": "94105"}}'));
40
41-- ============================================================================
42-- QUERY 1: Path-Based Field Extraction
43-- ============================================================================
44
45SELECT '=== Query 1: Path-Based Field Extraction ===' as section;
46
47SELECT
48 event_id,
49 event_type,
50 payload:user_id AS user_id,
51 payload:username AS username,
52 payload:order_id AS order_id
53FROM events
54WHERE payload:user_id IS NOT NULL
55ORDER BY timestamp;
56
57-- ============================================================================
58-- QUERY 2: Type Casting with Double-Colon Syntax
59-- ============================================================================
60
61SELECT '=== Query 2: Type Casting with Double-Colon Syntax ===' as section;
62
63SELECT
64 event_id,
65 timestamp,
66 payload:user_id::STRING AS user_id,
67 payload:order_id::STRING AS order_id,
68 payload:amount::DECIMAL(10,2) AS amount,
69 payload:currency::STRING AS currency,
70 payload:discount_code::STRING AS discount_code
71FROM events
72WHERE event_type = 'purchase'
73ORDER BY payload:amount::DECIMAL DESC;
74
75-- ============================================================================
76-- QUERY 3: Nested Field Access
77-- ============================================================================
78
79SELECT '=== Query 3: Nested Field Accesss ===' as section;
80
81-- Access nested shipping address
82SELECT
83 event_id,
84 payload:order_id AS order_id,
85 payload:shipping_address.country::STRING AS country,
86 payload:shipping_address.zip::STRING AS zip_code,
87 payload:items[0].sku::STRING AS first_item_sku,
88 payload:items[0].quantity::INT AS first_item_qty
89FROM events
90WHERE payload:shipping_address IS NOT NULL;
91
92-- ============================================================================
93-- QUERY 4: Type Introspection
94-- ============================================================================
95
96SELECT '=== Query 4: Type Introspection ===' as section;
97
98SELECT
99 event_type,
100 schema_of_variant(payload:user_id) AS user_id_type,
101 schema_of_variant(payload:amount) AS amount_type,
102 schema_of_variant(payload:items) AS items_type,
103 COUNT(*) AS event_count
104FROM events
105GROUP BY event_type, schema_of_variant(payload:user_id), schema_of_variant(payload:amount),schema_of_variant(payload:items)
106ORDER BY event_type;
107
108-- ============================================================================
109-- QUERY 5: Aggregations with VARIANT Fields
110-- ============================================================================
111
112SELECT '=== Query 5: Aggregations with VARIANT Fields ===' as section;
113
114SELECT
115 payload:user_id::STRING AS user_id,
116 COUNT(*) AS purchase_count,
117 SUM(payload:amount::DECIMAL(10,2)) AS total_revenue,
118 AVG(payload:amount::DECIMAL(10,2)) AS avg_order_value,
119 MIN(payload:amount::DECIMAL(10,2)) AS min_purchase,
120 MAX(payload:amount::DECIMAL(10,2)) AS max_purchase
121FROM events
122WHERE event_type = 'purchase'
123 AND payload:amount IS NOT NULL
124GROUP BY payload:user_id::STRING
125ORDER BY total_revenue DESC;
126
127-- ============================================================================
128-- QUERY 6: Conditional Logic Based on Variant Content
129-- ============================================================================
130
131SELECT '=== Query 6: Conditional Logic Based on Variant Content ===' as section;
132
133SELECT
134 event_id,
135 event_type,
136 payload:user_id::STRING AS user_id,
137 CASE
138 WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 200 THEN 'High Value'
139 WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 100 THEN 'Medium Value'
140 WHEN event_type = 'purchase' THEN 'Low Value'
141 WHEN event_type = 'error' AND payload:error_code::STRING LIKE 'E5%' THEN 'Critical Error'
142 WHEN event_type = 'error' THEN 'Minor Error'
143 ELSE 'Standard Event'
144 END AS event_category,
145 CASE
146 WHEN event_type = 'purchase' THEN payload:amount::DECIMAL(10,2)
147 ELSE 0
148 END AS monetary_value
149FROM events
150ORDER BY timestamp;
Result
1section
2=== Create events table with event dataset ===
3Time taken: 2.28 seconds, Fetched 1 row(s)
4Response code
5Time taken: 1.41 seconds
6Response code
7Time taken: 0.602 seconds
8Response code
9Time taken: 1.314 seconds
10
11
12section
13=== Query 1: Path-Based Field Extraction ===
14Time taken: 0.059 seconds, Fetched 1 row(s)
15event_id event_type user_id username order_id
16EVT001 user_login 12345 "alice" NULL
17EVT002 purchase 12345 NULL "ORD123"
18EVT003 user_login 67890 "bob" NULL
19EVT004 page_view 12345 NULL NULL
20EVT005 purchase 67890 NULL "ORD124"
21EVT007 page_view 67890 NULL NULL
22EVT008 user_logout 12345 "alice" NULL
23EVT010 purchase 12345 NULL "ORD125"
24Time taken: 1.815 seconds, Fetched 8 row(s)
25
26
27section
28=== Query 2: Type Casting with Double-Colon Syntax ===
29Time taken: 0.112 seconds, Fetched 1 row(s)
30event_id timestamp user_id order_id amount currency discount_code
31EVT010 2024-01-15 10:45:00 12345 ORD125 299.99 USD NULL
32EVT005 2024-01-15 10:20:00 67890 ORD124 149.50 USD SAVE10
33EVT002 2024-01-15 10:05:00 12345 ORD123 99.99 USD NULL
34Time taken: 0.972 seconds, Fetched 3 row(s)
35
36
37section
38=== Query 3: Nested Field Accesss ===
39Time taken: 0.166 seconds, Fetched 1 row(s)
40event_id order_id country zip_code first_item_sku first_item_qty
41EVT010 "ORD125" US 94105 PROD003 3
42Time taken: 0.427 seconds, Fetched 1 row(s)
43
44
45section
46=== Query 4: Type Introspection ===
47Time taken: 0.099 seconds, Fetched 1 row(s)
48event_type user_id_type amount_type items_type event_count
49error NULL NULL NULL 2
50page_view BIGINT NULL NULL 2
51purchase BIGINT DECIMAL(5,2) ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>> 1
52purchase BIGINT DECIMAL(4,2) ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>> 1
53purchase BIGINT DECIMAL(4,1) ARRAY<OBJECT<price: DECIMAL(4,1), quantity: BIGINT, sku: STRING>> 1
54user_login BIGINT NULL NULL 2
55user_logout BIGINT NULL NULL 1
56Time taken: 1.193 seconds, Fetched 7 row(s)
57
58
59section
60=== Query 5: Aggregations with VARIANT Fields ===
61Time taken: 0.09 seconds, Fetched 1 row(s)
62user_id purchase_count total_revenue avg_order_value min_purchase max_purchase
6312345 2 399.98 199.990000 99.99 299.99
6467890 1 149.50 149.500000 149.50 149.50
65Time taken: 1.202 seconds, Fetched 2 row(s)
66
67
68section
69=== Query 6: Conditional Logic Based on Variant Content ===
70Time taken: 0.096 seconds, Fetched 1 row(s)
71event_id event_type user_id event_category monetary_value
72EVT001 user_login 12345 Standard Event 0.00
73EVT002 purchase 12345 Low Value 99.99
74EVT003 user_login 67890 Standard Event 0.00
75EVT004 page_view 12345 Standard Event 0.00
76EVT005 purchase 67890 Medium Value 149.50
77EVT006 error NULL Critical Error 0.00
78EVT007 page_view 67890 Standard Event 0.00
79EVT008 user_logout 12345 Standard Event 0.00
80EVT009 error NULL Minor Error 0.00
81EVT010 purchase 12345 High Value 299.99
82Time taken: 0.705 seconds, Fetched 10 row(s)
Example 2 - Comparison of Execution Plans: : VARIANT vs STRING vs STRUCT
Spark
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col, get_json_object, from_json, expr
3from pyspark.sql.types import DoubleType, StringType, StructType, StructField
4
5
6REP_DATA_FILES = "file:///opt/spark/data/files"
7URL_MASTER = "spark://spark-master:7077"
8
9spark = SparkSession.builder \
10 .appName("SPARK_VARIANTModePERF") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14# Reference dataset
15transactions = [
16 ("TXN-001", '{"amount": 1500.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
17 ("TXN-002", '{"amount": 230.50, "currency": "USD", "merchant": "UBER", "category": "transport", "surge": 1.2}'),
18 ("TXN-003", '{"amount": 89750.0, "currency": "EUR", "merchant": "AIRBNB", "category": "travel", "nights": 5}'),
19 ("TXN-004", '{"amount": 45.00, "currency": "GBP", "merchant": "NETFLIX", "category": "entertainment"}'),
20 ("TXN-005", '{"amount": 3200.0, "currency": "EUR", "merchant": "APPLE", "category": "electronics", "items": 2}'),
21 ("TXN-006", '{"amount": 800.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
22 ("TXN-007", '{"amount": 1100.0, "currency": "USD", "merchant": "GOOGLE", "category": "software"}'),
23 ("TXN-008", '{"amount": 6500.0, "currency": "EUR", "merchant": "BOOKING", "category": "travel", "nights": 3}'),
24 ("TXN-009", '{"amount": 320.75, "currency": "GBP", "merchant": "SPOTIFY", "category": "entertainment"}'),
25 ("TXN-010", '{"amount": 2200.0, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
26]
27
28df = spark.createDataFrame(transactions, ["txn_id", "payload_str"])
29
30# Approach n°1 : STRING + get_json_object (Spark 3)
31df_str = df.withColumn("amount_str", get_json_object(col("payload_str"), "$.amount")) \
32 .withColumn("currency_str", get_json_object(col("payload_str"), "$.currency"))
33df_str.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_str")
34df_temp0 = spark.read.parquet(f"{REP_DATA_FILES}/df_str")
35df_temp0.createOrReplaceTempView("df_str")
36
37# Approach 2: Typed STRUCT (best performance for stable and known schemas)
38txn_schema = StructType([
39 StructField("amount", DoubleType()),
40 StructField("currency", StringType()),
41 StructField("merchant", StringType()),
42 StructField("category", StringType()),
43])
44df_struct = df.withColumn("payload_struct", from_json(col("payload_str"), txn_schema))
45df_struct.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_struct")
46df_temp1 = spark.read.parquet(f"{REP_DATA_FILES}/df_struct")
47df_temp1.createOrReplaceTempView("df_struct")
48
49# Approach 3: VARIANT (Spark 4.0 — best for heterogeneous or evolving schemas)
50df_variant = df.withColumn("payload_variant", expr("PARSE_JSON(payload_str)"))
51df_variant.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_variant")
52df_temp2 = spark.read.parquet(f"{REP_DATA_FILES}/df_variant")
53df_temp2.createOrReplaceTempView("df_variant")
54
55# Reference query : aggregation by currency
56print("=== STRING ===")
57spark.sql("""
58 SELECT currency_str
59 ,SUM(CAST(amount_str AS DOUBLE)) AS total
60 FROM df_str
61 GROUP BY currency_str
62""").explain(mode="formatted")
63
64print("=== STRUCT ===")
65spark.sql("""
66 SELECT payload_struct.currency
67 ,SUM(payload_struct.amount) AS total
68 FROM df_struct
69 GROUP BY payload_struct.currency
70""").explain(mode="formatted")
71
72print("=== VARIANT ===")
73spark.sql("""
74 SELECT payload_variant:currency::STRING AS currency
75 ,SUM(payload_variant:amount::DOUBLE) AS total
76 FROM df_variant
77 GROUP BY payload_variant:currency::STRING
78""").explain(mode="formatted")
79
80spark.stop()
Result
1=== STRING ===
2== Physical Plan ==
3AdaptiveSparkPlan (5)
4+- HashAggregate (4)
5 +- Exchange (3)
6 +- HashAggregate (2)
7 +- Scan parquet (1)
8
9(1) Scan parquet
10Output [2]: [amount_str#6, currency_str#7]
11Batched: true
12Location: InMemoryFileIndex [file:/opt/spark/data/files/df_str]
13ReadSchema: struct<amount_str:string,currency_str:string>
14
15(2) HashAggregate
16Input [2]: [amount_str#6, currency_str#7]
17Keys [1]: [currency_str#7]
18Functions [1]: [partial_sum(cast(amount_str#6 as double))]
19Aggregate Attributes [1]: [sum#18]
20Results [2]: [currency_str#7, sum#19]
21
22(3) Exchange
23Input [2]: [currency_str#7, sum#19]
24Arguments: hashpartitioning(currency_str#7, 200), ENSURE_REQUIREMENTS, [plan_id=78]
25
26(4) HashAggregate
27Input [2]: [currency_str#7, sum#19]
28Keys [1]: [currency_str#7]
29Functions [1]: [sum(cast(amount_str#6 as double))]
30Aggregate Attributes [1]: [sum(cast(amount_str#6 as double))#17]
31Results [2]: [currency_str#7, sum(cast(amount_str#6 as double))#17 AS total#16]
32
33(5) AdaptiveSparkPlan
34Output [2]: [currency_str#7, total#16]
35Arguments: isFinalPlan=false
36
37
38
39=== STRUCT ===
40== Physical Plan ==
41AdaptiveSparkPlan (6)
42+- HashAggregate (5)
43 +- Exchange (4)
44 +- HashAggregate (3)
45 +- Project (2)
46 +- Scan parquet (1)
47
48(1) Scan parquet
49Output [1]: [payload_struct#11]
50Batched: true
51Location: InMemoryFileIndex [file:/opt/spark/data/files/df_struct]
52ReadSchema: struct<payload_struct:struct<amount:double,currency:string>>
53
54(2) Project
55Output [2]: [payload_struct#11.amount AS _extract_amount#26, payload_struct#11.currency AS _groupingexpression#25]
56Input [1]: [payload_struct#11]
57
58(3) HashAggregate
59Input [2]: [_extract_amount#26, _groupingexpression#25]
60Keys [1]: [_groupingexpression#25]
61Functions [1]: [partial_sum(_extract_amount#26)]
62Aggregate Attributes [1]: [sum#30]
63Results [2]: [_groupingexpression#25, sum#31]
64
65(4) Exchange
66Input [2]: [_groupingexpression#25, sum#31]
67Arguments: hashpartitioning(_groupingexpression#25, 200), ENSURE_REQUIREMENTS, [plan_id=93]
68
69(5) HashAggregate
70Input [2]: [_groupingexpression#25, sum#31]
71Keys [1]: [_groupingexpression#25]
72Functions [1]: [sum(_extract_amount#26)]
73Aggregate Attributes [1]: [sum(_extract_amount#26)#24]
74Results [2]: [_groupingexpression#25 AS currency#22, sum(_extract_amount#26)#24 AS total#20]
75
76(6) AdaptiveSparkPlan
77Output [2]: [currency#22, total#20]
78Arguments: isFinalPlan=false
79
80
81
82=== VARIANT ===
83== Physical Plan ==
84AdaptiveSparkPlan (6)
85+- HashAggregate (5)
86 +- Exchange (4)
87 +- HashAggregate (3)
88 +- Project (2)
89 +- Scan parquet (1)
90
91
92(1) Scan parquet
93Output [1]: [payload_variant#39]
94Batched: true
95Location: InMemoryFileIndex [file:/opt/spark/data/files/df_variant]
96ReadSchema: struct<payload_variant:struct<0:variant,1:variant>>
97
98(2) Project
99Output [2]: [payload_variant#39.0 AS payload_variant#15, cast(payload_variant#39.1 as string) AS _groupingexpression#38]
100Input [1]: [payload_variant#39]
101
102(3) HashAggregate
103Input [2]: [payload_variant#15, _groupingexpression#38]
104Keys [1]: [_groupingexpression#38]
105Functions [1]: [partial_sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
106Aggregate Attributes [1]: [sum#40]
107Results [2]: [_groupingexpression#38, sum#41]
108
109(4) Exchange
110Input [2]: [_groupingexpression#38, sum#41]
111Arguments: hashpartitioning(_groupingexpression#38, 200), ENSURE_REQUIREMENTS, [plan_id=108]
112
113(5) HashAggregate
114Input [2]: [_groupingexpression#38, sum#41]
115Keys [1]: [_groupingexpression#38]
116Functions [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
117Aggregate Attributes [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37]
118Results [2]: [_groupingexpression#38 AS currency#33, sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37 AS total#35]
119
120(6) AdaptiveSparkPlan
121Output [2]: [currency#33, total#35]
122Arguments: isFinalPlan=false
