Spark : v4.x - Fonctionnalités - Variant Data Type

Vous trouverez dans cet article, des informations sur le type de données Variant de Spark v4.x.

Introduction

Le type de données VARIANT permet de stocker des données semi-structurées (JSON, XML, objets imbriqués) sans schéma prédéfini. Il permet le schema-on-read pour les données hétérogènes. Il stocke les données dans un format binaire efficace avec des métadonnées pour l’information des types. Il prend en charge l’accès basé sur les chemins (variant_col:field.path) et la conversion de types.

Detail

Avant Spark 4.0, la gestion des données semi-structurées nécessitait de choisir entre trois approches problématiques :

  1. Stocker comme STRING et parser à l’exécution : Exécution du parsing à chaque requête. Mauvaises performances. Incompatible avec le pushdown de prédicats.
  2. Définir un schéma STRUCT rigide : Le schéma doit être connu à l’avance. L’évolution du schéma nécessite des modifications sur la table. Les champs null gaspillent l’espace de stockage. Les données hétérogènes ne s’adaptent pas.
  3. Utiliser MAP<STRING, STRING> : Tout est en chaînes de caractères. Pas d’accès imbriqué. Conversion de type manuelle requise.

Spark 4.0 introduit le type VARIANT. Il permet de stocker des données semi-structurées dans un format binaire en colonnes qui préserve les types de données :

 1CREATE TABLE events (
 2    event_id STRING,
 3    timestamp TIMESTAMP,
 4    payload VARIANT
 5);
 6
 7INSERT INTO events VALUES (
 8    'EVT001',
 9    TIMESTAMP'2024-01-15 10:00:00',
10    PARSE_JSON('{"user": {"id": 12345, "name": "Alice"}, "action": "purchase", "amount": 99.99}')
11);
12
13SELECT 
14    event_id,
15    payload:user.name::STRING AS user_name,
16    payload:amount::DOUBLE AS amount
17FROM events
18WHERE payload:action::STRING = 'purchase';

Le format de stockage se compose de trois composants :

  1. Section Valeur : Les données réelles stockées dans un format binaire compressé. Les nombres, chaînes, booléens stockés dans des types natifs. Les structures imbriquées préservées.
  2. Section Métadonnées : Informations de schéma pour chaque valeur. Les balises de type indiquent si la valeur est INT, STRING, OBJECT, ARRAY, etc. Permet l’extraction type-safe sans parsing.
  3. Index d’Offset : Pointeurs vers les champs imbriqués pour un accès rapide basé sur les chemins. Permet l’accès par payload:user.name en temps O(1) sans scanner tout l’objet.

Cette conception permet plusieurs opérations :

  1. Accès basé sur les chemins : Extrait les valeurs imbriquées sans parsing. La syntaxe deux-points (:) permet de naviguer dans la structure. Notation par points pour les objets imbriqués. Notation par crochets pour les tableaux :
1variant_column:field.nested.path
  1. Conversion de Type : La syntaxe double deux-points convertit le variant vers un type spécifique. Échoue si les types sont incompatibles. Utilisez TRY_CAST pour une conversion sûre : TRY_CAST(variant_column AS INT).
1variant_column::INT
2variant_column::STRING
  1. Introspection de Type : Retourne le nom du type. Permet la vérification de type à l’exécution.
1schema_of_variant(variant_column)

L’optimiseur Catalyst prend en charge les opérations VARIANT. Le pushdown de prédicats fonctionne pour les champs de premier niveau. Par exemple, WHERE payload:status::STRING = 'active' peut ignorer les partitions où le champ status ne correspond pas. Cependant, les prédicats imbriqués peuvent ne pas être utilisable efficacement pour un pushdown.

Liste des fonctions pour manipuler les données de type VARIANT : Documentation SQL

Comparaison

CritèreSTRING + JSONSTRUCTVARIANT
Schéma fixe requisNonOuiNon
Performance lectureFaible (re-parsé)ÉlevéeBonne (pré-indexé)
Performance écritureÉlevéeBonneBonne
Schémas hétérogènesOuiNonOui
Pushdown de filtresLimitéCompletPartiel

Shredding

Le shredding est une technique hybride. Au lieu de stocker le JSON comme une simple chaîne de caractères, Spark analyse les données et crée :

  • Des colonnes de métadonnées pour les champs qui reviennent souvent (ex: id, sensor).
  • Une colonne “reste” qui contient tout ce qui est trop rare ou trop complexe pour être décomposé.

C’est un mécanisme important pour :

  • La performance (I/O) : Si vous faites une requête pour accéder à l’information $.sensor, Spark ne lit physiquement que la colonne shredded correspondant à ce champ. Il ignore tout le reste du document JSON.
  • La compression : Comme les données décomposées sont typées (ex: tous les IDs sont des entiers), les algorithmes de compression comme Snappy ou Zstd sont beaucoup plus efficaces que sur du texte brut.

La limite du shredding : Cela ne transforme pas l’ensemble des éléments du Variant en colonnes. Spark utilise une limite (généralement sur le nombre de champs ou la profondeur) pour éviter de créer des milliers de colonnes dans Parquet.

Attention : Le shredding ne se produit que si Spark détecte que les données sont suffisamment répétitives pour valoir le coup d’être extraites en colonnes. Sur un petit échantillon, il est possible que Spark décide de ne pas faire de shredding.

Démonstration du format stockage avec et sans shredding (avec l’utilisation de duckdb pour voir la structure du stockage) :

 1from pyspark.sql import SparkSession
 2from pyspark.sql.types import StructType, StructField, VariantType, IntegerType, StringType
 3from pyspark.sql.functions import col, parse_json
 4
 5REP_DATA_FILES = "file:///opt/spark/data/files"
 6URL_MASTER = "spark://spark-master:7077"
 7FILE_NAME_NS = "data_variant_ns"
 8FILE_NAME_SC = "data_variant_sc"
 9
10
11schema = StructType([
12    StructField("id", IntegerType(), nullable=False),
13    StructField("data_raw", StringType(), nullable=False),
14    StructField("data", VariantType(), nullable=True)
15])
16
17raw_data_small = [
18    (1,'{"sensor": "temp", "value": 22.5, "unit": "C"}',None),
19    (2,'{"sensor": "pressure", "value": 1013}',None)
20]
21
22raw_data_more = [
23    (3,'{"sensor": "temp", "value": 23, "unit": "C"}',None),
24    (4,'{"sensor": "pressure", "value": 900}',None),
25    (5,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
26    (6,'{"sensor": "pressure", "value": 1000}',None),
27    (7,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
28    (8,'{"sensor": "pressure", "value": 1000}',None),
29    (9,'{"sensor": "temp", "value": 22, "unit": "C"}',None),
30    (10,'{"sensor": "pressure", "value": 856}',None)
31]
32
33# Spark Instance for Shredding Demo
34spark = SparkSession.builder \
35    .appName("SPARK_VARIANTModeShredding") \
36    .master(URL_MASTER) \
37    .getOrCreate()
38
39
40# Load Dataframe with small dataset
41df_variant_ns = spark.createDataFrame(raw_data_small, schema)
42# Cast STRING to VARIANT (col : data)
43df_variant_ns = df_variant_ns.withColumn("data", parse_json(col("data_raw")))
44# Write dataframe into parquet file (no shredding)
45df_variant_ns.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_NS}.parquet")
46
47
48# Load Dataframe with complete dataset
49df_variant_sc = spark.createDataFrame((raw_data_small + raw_data_more), schema)
50# Cast STRING to VARIANT (col : data)
51df_variant_sc = df_variant_sc.withColumn("data", parse_json(col("data_raw")))
52# Write dataframe into parquet file (shredding)
53df_variant_sc.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_SC}.parquet")
54
55spark.stop()

Résultat de la démonstration :

1/ Concernant le fichier data_variant_ns.parquet

  • Ce fichier ne contient que deux lignes de données et par conséquent, Spark n’applique pas de shredding par défaut.
  • Cela est confirmé en utilisant la commande duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_ns.parquet/*.parquet';" qui permet d’afficher la description du type de la colonne data qui stocke le JSON en VARIANT (qui est un STRUCT composé de metadata et value)
┌─────────────┬─────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │             column_type             │  null   │   key   │ default │  extra  │
│   varchar   │               varchar               │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id          │ INTEGER                             │ YES     │ NULL    │ NULL    │ NULL    │
│ data_raw    │ VARCHAR                             │ YES     │ NULL    │ NULL    │ NULL    │
│ data        │ STRUCT(metadata BLOB, "value" BLOB) │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴─────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘

2/ Concernant le fichier data_variant_sc.parquet

  • Ce fichier contient 10 lignes de données et Spark applique un Shredding par défaut.
  • Cela se confirme en utilisant la commande duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_sc.parquet/*.parquet';" qui permet d’afficher la description du type de la colonne data qui stocke le JSON en VARIANT (qui est un STRUCT composé de metadata et value mais avec cette fois en plus l’information typed_value qui est le résultat du shredding)
┌─────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                                                                                                    column_type                                                                                                     │  null   │   key   │ default │  extra  │
│   varchar   │                                                                                                      varchar                                                                                                       │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id          │ INTEGER                                                                                                                                                                                                            │ YES     │ NULL    │ NULL    │ NULL    │
│ data_raw    │ VARCHAR                                                                                                                                                                                                            │ YES     │ NULL    │ NULL    │ NULL    │
│ data        │ STRUCT(metadata BLOB, "value" BLOB, typed_value STRUCT(sensor STRUCT("value" BLOB, typed_value VARCHAR), unit STRUCT("value" BLOB, typed_value VARCHAR), "value" STRUCT("value" BLOB, typed_value DECIMAL(18,1)))) │ YES     │ NULL    │ NULL    │ NULL    │
└─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘

Conseils

  • from_json nécessite de connaître le schéma à l’avance, alors que parse_json préserve la flexibilité des données brutes tout en les optimisant.
  • Use VARIANT for the landing/raw layer (and exploration) and STRUCT for the Gold/Serving layer.
  • Utilisez VARIANT pour la couche Landing/Bronze (et l’exploration) et STRUCT pour la couche Optimisé/Gold

Advantages

  1. Flexibilité du Schéma : De nouveaux champs apparaissent sans ALTER TABLE. Les payloads d’événements évoluent au fil du temps sans casser les requêtes. Différents types d’événements stockés dans la même table. Critique pour l’event sourcing, l’intégration d’API et les environnements d’itération rapide.
  2. Meilleures performances que les données JSON : L’encodage binaire évite la surcharge de parsing à l’exécution. Le shredding et les métadonnées permettent l’accès direct aux informations. Cela élimine les opérations coûteuses de regex ou parsing JSON lors de la lecture. Le stockage compressé réduit les I/O.
  3. Schema-on-read partiel : Interrogez uniquement les champs dont vous avez besoin. Les champs inutilisés n’entraînent aucun coût d’extraction. Permet l’analyse exploratoire sur des schémas inconnus. Migrez progressivement vers des colonnes typées à mesure que le schéma se stabilise.
  4. Interopérabilité avec les systèmes JSON : Parsing et sérialisation JSON directs. Compatible avec les API REST, files de messages, exports NoSQL.

Limitations

  1. Performances plus lentes que les types natifs : Les agrégations sur les champs VARIANT sont plus lentes que les colonnes natives INT/DECIMAL. L’extraction et la validation de type se produisent à l’exécution. L’accès basé sur les chemins a une surcharge par rapport à l’accès direct aux colonnes. Non adapté pour l’analytique critique en performances sur des schémas stables.
  2. Pas de validation de schéma : N’importe quel JSON peut être inséré. Aucune contrainte sur la présence ou les types de champs. Les fautes de frappe dans les noms de champs retournent silencieusement NULL. Les problèmes de qualité des données sont plus difficiles à détecter. Nécessite une validation au niveau applicatif.
  3. Support limité de l’optimiseur : Les prédicats imbriqués complexes peuvent ne pas être poussés vers le stockage. La collecte de statistiques est limitée. L’optimisation des jointures est moins efficace que les types natifs. La planification de requête ne peut pas exploiter les informations de schéma pour l’optimisation.
  4. Surcharge de stockage pour les données simples : Il est légèrement plus lourd qu’un STRUCT optimisé car il doit stocker des métadonnées supplémentaires pour chaque enregistrement
  5. Écosystème Immature : Toutes les fonctions Spark ne prennent pas en charge le type VARIANT. Les outils BI et les connecteurs qui lisent directement les fichiers Parquet ne supportent pas encore tous le type VARIANT nativement.

Quand ne pas utiliser le type Variant :

  • Données hautement structurées
  • Opérations critiques en terme de performances
  • Colonnes partitionnées

Real-World Use Cases

  • Cas d’usage 1 : Agrégation d’événements multi-sources
    • Une plateforme de données ingère des événements provenant de plus de 20 microservices. Chaque service a des schémas d’événements différents qui évoluent indépendamment. Une table avec une colonne VARIANT stocke tous les événements sans coordination de schéma. Les requêtes extraient les champs spécifiques au service selon les besoins. L’évolution du schéma se produit sans modification du pipeline.
  • Cas d’usage 2 : Mise en cache de réponses d’API
    • Un système met en cache les réponses API dans des tables Spark pour analyse. Les payloads API sont profondément imbriqués avec des champs optionnels. la colonne de type VARIANT stocke les réponses sans aplatissement. Les requêtes d’analyse extraient des chemins spécifiques. Les nouvelles versions d’API ajoutent des champs sans casser les requêtes existantes.

Codes

Exemple 1 : Manipulation du type Variant

Note : Création d’un script SQL avec le contenu du code et utilisation de Spark SQL pour l’exécuter : docker exec -it spark-master spark-sql --master "local[*]" --conf "spark.hadoop.hive.cli.print.header=true" -f <script.sql> > script.log

SQL

  1-- SQL Example - Work with variant Type
  2
  3-- ============================================================================
  4-- INIT: Create Events Table with VARIANT Column
  5-- ============================================================================
  6
  7SELECT '=== Create events table with event dataset ===' as section;
  8
  9DROP TABLE IF EXISTS events;
 10CREATE TABLE events (
 11    event_id STRING,
 12    timestamp TIMESTAMP,
 13    event_type STRING,
 14    payload VARIANT
 15);
 16
 17-- Dataset
 18-- Each event type has different payload structure
 19INSERT INTO events VALUES
 20    ('EVT001', TIMESTAMP'2024-01-15 10:00:00', 'user_login',
 21     PARSE_JSON('{"user_id": 12345, "username": "alice", "ip_address": "192.168.1.100", "device": "mobile"}')),
 22    ('EVT002', TIMESTAMP'2024-01-15 10:05:00', 'purchase',
 23     PARSE_JSON('{"user_id": 12345, "order_id": "ORD123", "amount": 99.99, "currency": "USD", "items": [{"sku": "PROD001", "quantity": 2, "price": 49.99}]}')),
 24    ('EVT003', TIMESTAMP'2024-01-15 10:10:00', 'user_login',
 25     PARSE_JSON('{"user_id": 67890, "username": "bob", "ip_address": "10.0.0.50", "device": "desktop", "session_id": "sess_xyz"}')),
 26    ('EVT004', TIMESTAMP'2024-01-15 10:15:00', 'page_view',
 27     PARSE_JSON('{"user_id": 12345, "page_url": "/products/electronics", "referrer": "https://google.com", "duration_seconds": 45}')),
 28    ('EVT005', TIMESTAMP'2024-01-15 10:20:00', 'purchase',
 29     PARSE_JSON('{"user_id": 67890, "order_id": "ORD124", "amount": 149.50, "currency": "USD", "items": [{"sku": "PROD002", "quantity": 1, "price": 149.50}], "discount_code": "SAVE10"}')),
 30    ('EVT006', TIMESTAMP'2024-01-15 10:25:00', 'error',
 31     PARSE_JSON('{"error_code": "E500", "error_message": "Database connection timeout", "service": "payment-api", "stack_trace": "com.example.PaymentService.processPayment"}')),
 32    ('EVT007', TIMESTAMP'2024-01-15 10:30:00', 'page_view',
 33     PARSE_JSON('{"user_id": 67890, "page_url": "/checkout", "referrer": "/cart", "duration_seconds": 120, "cart_items": 1}')),
 34    ('EVT008', TIMESTAMP'2024-01-15 10:35:00', 'user_logout',
 35     PARSE_JSON('{"user_id": 12345, "username": "alice", "session_duration_minutes": 35}')),
 36    ('EVT009', TIMESTAMP'2024-01-15 10:40:00', 'error',
 37     PARSE_JSON('{"error_code": "E404", "error_message": "Product not found", "service": "catalog-api", "requested_sku": "PROD999"}')),
 38    ('EVT010', TIMESTAMP'2024-01-15 10:45:00', 'purchase',
 39     PARSE_JSON('{"user_id": 12345, "order_id": "ORD125", "amount": 299.99, "currency": "USD", "items": [{"sku": "PROD003", "quantity": 3, "price": 99.99}], "shipping_address": {"country": "US", "zip": "94105"}}'));
 40
 41-- ============================================================================
 42-- QUERY 1: Path-Based Field Extraction
 43-- ============================================================================
 44
 45SELECT '=== Query 1: Path-Based Field Extraction ===' as section;
 46
 47SELECT 
 48    event_id,
 49    event_type,
 50    payload:user_id AS user_id,
 51    payload:username AS username,
 52    payload:order_id AS order_id
 53FROM events
 54WHERE payload:user_id IS NOT NULL
 55ORDER BY timestamp;
 56
 57-- ============================================================================
 58-- QUERY 2: Type Casting with Double-Colon Syntax
 59-- ============================================================================
 60
 61SELECT '=== Query 2: Type Casting with Double-Colon Syntax ===' as section;
 62
 63SELECT 
 64    event_id,
 65    timestamp,
 66    payload:user_id::STRING AS user_id,
 67    payload:order_id::STRING AS order_id,
 68    payload:amount::DECIMAL(10,2) AS amount,
 69    payload:currency::STRING AS currency,
 70    payload:discount_code::STRING AS discount_code
 71FROM events
 72WHERE event_type = 'purchase'
 73ORDER BY payload:amount::DECIMAL DESC;
 74
 75-- ============================================================================
 76-- QUERY 3: Nested Field Access
 77-- ============================================================================
 78
 79SELECT '=== Query 3: Nested Field Accesss ===' as section;
 80
 81-- Access nested shipping address
 82SELECT 
 83    event_id,
 84    payload:order_id AS order_id,
 85    payload:shipping_address.country::STRING AS country,
 86    payload:shipping_address.zip::STRING AS zip_code,
 87    payload:items[0].sku::STRING AS first_item_sku,
 88    payload:items[0].quantity::INT AS first_item_qty
 89FROM events
 90WHERE payload:shipping_address IS NOT NULL;
 91
 92-- ============================================================================
 93-- QUERY 4: Type Introspection
 94-- ============================================================================
 95
 96SELECT '=== Query 4: Type Introspection ===' as section;
 97
 98SELECT 
 99    event_type,
100    schema_of_variant(payload:user_id) AS user_id_type,
101    schema_of_variant(payload:amount) AS amount_type,
102    schema_of_variant(payload:items) AS items_type,
103    COUNT(*) AS event_count
104FROM events
105GROUP BY event_type, schema_of_variant(payload:user_id), schema_of_variant(payload:amount),schema_of_variant(payload:items)
106ORDER BY event_type;
107
108-- ============================================================================
109-- QUERY 5: Aggregations with VARIANT Fields
110-- ============================================================================
111
112SELECT '=== Query 5: Aggregations with VARIANT Fields ===' as section;
113
114SELECT 
115    payload:user_id::STRING AS user_id,
116    COUNT(*) AS purchase_count,
117    SUM(payload:amount::DECIMAL(10,2)) AS total_revenue,
118    AVG(payload:amount::DECIMAL(10,2)) AS avg_order_value,
119    MIN(payload:amount::DECIMAL(10,2)) AS min_purchase,
120    MAX(payload:amount::DECIMAL(10,2)) AS max_purchase
121FROM events
122WHERE event_type = 'purchase'
123  AND payload:amount IS NOT NULL
124GROUP BY payload:user_id::STRING
125ORDER BY total_revenue DESC;
126
127-- ============================================================================
128-- QUERY 6: Conditional Logic Based on Variant Content
129-- ============================================================================
130
131SELECT '=== Query 6: Conditional Logic Based on Variant Content ===' as section;
132
133SELECT 
134    event_id,
135    event_type,
136    payload:user_id::STRING AS user_id,
137    CASE 
138        WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 200 THEN 'High Value'
139        WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 100 THEN 'Medium Value'
140        WHEN event_type = 'purchase' THEN 'Low Value'
141        WHEN event_type = 'error' AND payload:error_code::STRING LIKE 'E5%' THEN 'Critical Error'
142        WHEN event_type = 'error' THEN 'Minor Error'
143        ELSE 'Standard Event'
144    END AS event_category,
145    CASE 
146        WHEN event_type = 'purchase' THEN payload:amount::DECIMAL(10,2)
147        ELSE 0
148    END AS monetary_value
149FROM events
150ORDER BY timestamp;

Résultat

 1section
 2=== Create events table with event dataset ===
 3Time taken: 2.28 seconds, Fetched 1 row(s)
 4Response code
 5Time taken: 1.41 seconds
 6Response code
 7Time taken: 0.602 seconds
 8Response code
 9Time taken: 1.314 seconds
10
11
12section
13=== Query 1: Path-Based Field Extraction ===
14Time taken: 0.059 seconds, Fetched 1 row(s)
15event_id	event_type	user_id	username	order_id
16EVT001	user_login	12345	"alice"	NULL
17EVT002	purchase	12345	NULL	"ORD123"
18EVT003	user_login	67890	"bob"	NULL
19EVT004	page_view	12345	NULL	NULL
20EVT005	purchase	67890	NULL	"ORD124"
21EVT007	page_view	67890	NULL	NULL
22EVT008	user_logout	12345	"alice"	NULL
23EVT010	purchase	12345	NULL	"ORD125"
24Time taken: 1.815 seconds, Fetched 8 row(s)
25
26
27section
28=== Query 2: Type Casting with Double-Colon Syntax ===
29Time taken: 0.112 seconds, Fetched 1 row(s)
30event_id	timestamp	user_id	order_id	amount	currency	discount_code
31EVT010	2024-01-15 10:45:00	12345	ORD125	299.99	USD	NULL
32EVT005	2024-01-15 10:20:00	67890	ORD124	149.50	USD	SAVE10
33EVT002	2024-01-15 10:05:00	12345	ORD123	99.99	USD	NULL
34Time taken: 0.972 seconds, Fetched 3 row(s)
35
36
37section
38=== Query 3: Nested Field Accesss ===
39Time taken: 0.166 seconds, Fetched 1 row(s)
40event_id	order_id	country	zip_code	first_item_sku	first_item_qty
41EVT010	"ORD125"	US	94105	PROD003	3
42Time taken: 0.427 seconds, Fetched 1 row(s)
43
44
45section
46=== Query 4: Type Introspection ===
47Time taken: 0.099 seconds, Fetched 1 row(s)
48event_type	user_id_type	amount_type	items_type	event_count
49error	NULL	NULL	NULL	2
50page_view	BIGINT	NULL	NULL	2
51purchase	BIGINT	DECIMAL(5,2)	ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>>	1
52purchase	BIGINT	DECIMAL(4,2)	ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>>	1
53purchase	BIGINT	DECIMAL(4,1)	ARRAY<OBJECT<price: DECIMAL(4,1), quantity: BIGINT, sku: STRING>>	1
54user_login	BIGINT	NULL	NULL	2
55user_logout	BIGINT	NULL	NULL	1
56Time taken: 1.193 seconds, Fetched 7 row(s)
57
58
59section
60=== Query 5: Aggregations with VARIANT Fields ===
61Time taken: 0.09 seconds, Fetched 1 row(s)
62user_id	purchase_count	total_revenue	avg_order_value	min_purchase	max_purchase
6312345	2	399.98	199.990000	99.99	299.99
6467890	1	149.50	149.500000	149.50	149.50
65Time taken: 1.202 seconds, Fetched 2 row(s)
66
67
68section
69=== Query 6: Conditional Logic Based on Variant Content ===
70Time taken: 0.096 seconds, Fetched 1 row(s)
71event_id	event_type	user_id	event_category	monetary_value
72EVT001	user_login	12345	Standard Event	0.00
73EVT002	purchase	12345	Low Value	99.99
74EVT003	user_login	67890	Standard Event	0.00
75EVT004	page_view	12345	Standard Event	0.00
76EVT005	purchase	67890	Medium Value	149.50
77EVT006	error	NULL	Critical Error	0.00
78EVT007	page_view	67890	Standard Event	0.00
79EVT008	user_logout	12345	Standard Event	0.00
80EVT009	error	NULL	Minor Error	0.00
81EVT010	purchase	12345	High Value	299.99
82Time taken: 0.705 seconds, Fetched 10 row(s)

Exemple 2 - Comparaison du plan d’exécution : VARIANT vs STRING vs STRUCT

Spark

 1from pyspark.sql import SparkSession
 2from pyspark.sql.functions import col, get_json_object, from_json, expr
 3from pyspark.sql.types import DoubleType, StringType, StructType, StructField
 4
 5
 6REP_DATA_FILES = "file:///opt/spark/data/files"
 7URL_MASTER = "spark://spark-master:7077"
 8
 9spark = SparkSession.builder \
10    .appName("SPARK_VARIANTModePERF") \
11    .master(URL_MASTER) \
12    .getOrCreate()
13
14# Reference dataset 
15transactions = [
16    ("TXN-001", '{"amount": 1500.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
17    ("TXN-002", '{"amount": 230.50,  "currency": "USD", "merchant": "UBER", "category": "transport", "surge": 1.2}'),
18    ("TXN-003", '{"amount": 89750.0, "currency": "EUR", "merchant": "AIRBNB", "category": "travel", "nights": 5}'),
19    ("TXN-004", '{"amount": 45.00,   "currency": "GBP", "merchant": "NETFLIX", "category": "entertainment"}'),
20    ("TXN-005", '{"amount": 3200.0,  "currency": "EUR", "merchant": "APPLE", "category": "electronics", "items": 2}'),
21    ("TXN-006", '{"amount": 800.00,  "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
22    ("TXN-007", '{"amount": 1100.0,  "currency": "USD", "merchant": "GOOGLE", "category": "software"}'),
23    ("TXN-008", '{"amount": 6500.0,  "currency": "EUR", "merchant": "BOOKING", "category": "travel", "nights": 3}'),
24    ("TXN-009", '{"amount": 320.75,  "currency": "GBP", "merchant": "SPOTIFY", "category": "entertainment"}'),
25    ("TXN-010", '{"amount": 2200.0,  "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
26]
27
28df = spark.createDataFrame(transactions, ["txn_id", "payload_str"])
29
30# Approach n°1 : STRING + get_json_object (Spark 3)
31df_str = df.withColumn("amount_str",   get_json_object(col("payload_str"), "$.amount")) \
32           .withColumn("currency_str", get_json_object(col("payload_str"), "$.currency"))
33df_str.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_str")
34df_temp0 = spark.read.parquet(f"{REP_DATA_FILES}/df_str")
35df_temp0.createOrReplaceTempView("df_str")
36
37# Approach 2: Typed STRUCT (best performance for stable and known schemas)
38txn_schema = StructType([
39    StructField("amount",   DoubleType()),
40    StructField("currency", StringType()),
41    StructField("merchant", StringType()),
42    StructField("category", StringType()),
43])
44df_struct = df.withColumn("payload_struct", from_json(col("payload_str"), txn_schema))
45df_struct.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_struct")
46df_temp1 = spark.read.parquet(f"{REP_DATA_FILES}/df_struct")
47df_temp1.createOrReplaceTempView("df_struct")
48
49# Approach 3: VARIANT (Spark 4.0 — best for heterogeneous or evolving schemas)
50df_variant = df.withColumn("payload_variant", expr("PARSE_JSON(payload_str)"))
51df_variant.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_variant")
52df_temp2 = spark.read.parquet(f"{REP_DATA_FILES}/df_variant")
53df_temp2.createOrReplaceTempView("df_variant")
54
55# Reference query : aggregation by currency
56print("=== STRING ===")
57spark.sql("""
58    SELECT currency_str
59          ,SUM(CAST(amount_str AS DOUBLE)) AS total
60    FROM df_str 
61    GROUP BY currency_str
62""").explain(mode="formatted")
63
64print("=== STRUCT ===")
65spark.sql("""
66    SELECT payload_struct.currency
67          ,SUM(payload_struct.amount) AS total
68    FROM df_struct 
69    GROUP BY payload_struct.currency
70""").explain(mode="formatted")
71
72print("=== VARIANT ===")
73spark.sql("""
74    SELECT payload_variant:currency::STRING AS currency
75        ,SUM(payload_variant:amount::DOUBLE) AS total
76    FROM df_variant 
77    GROUP BY payload_variant:currency::STRING
78""").explain(mode="formatted")
79
80spark.stop()

Résultat

  1=== STRING ===
  2== Physical Plan ==
  3AdaptiveSparkPlan (5)
  4+- HashAggregate (4)
  5   +- Exchange (3)
  6      +- HashAggregate (2)
  7         +- Scan parquet  (1)
  8
  9(1) Scan parquet 
 10Output [2]: [amount_str#6, currency_str#7]
 11Batched: true
 12Location: InMemoryFileIndex [file:/opt/spark/data/files/df_str]
 13ReadSchema: struct<amount_str:string,currency_str:string>
 14
 15(2) HashAggregate
 16Input [2]: [amount_str#6, currency_str#7]
 17Keys [1]: [currency_str#7]
 18Functions [1]: [partial_sum(cast(amount_str#6 as double))]
 19Aggregate Attributes [1]: [sum#18]
 20Results [2]: [currency_str#7, sum#19]
 21
 22(3) Exchange
 23Input [2]: [currency_str#7, sum#19]
 24Arguments: hashpartitioning(currency_str#7, 200), ENSURE_REQUIREMENTS, [plan_id=78]
 25
 26(4) HashAggregate
 27Input [2]: [currency_str#7, sum#19]
 28Keys [1]: [currency_str#7]
 29Functions [1]: [sum(cast(amount_str#6 as double))]
 30Aggregate Attributes [1]: [sum(cast(amount_str#6 as double))#17]
 31Results [2]: [currency_str#7, sum(cast(amount_str#6 as double))#17 AS total#16]
 32
 33(5) AdaptiveSparkPlan
 34Output [2]: [currency_str#7, total#16]
 35Arguments: isFinalPlan=false
 36
 37
 38
 39=== STRUCT ===
 40== Physical Plan ==
 41AdaptiveSparkPlan (6)
 42+- HashAggregate (5)
 43   +- Exchange (4)
 44      +- HashAggregate (3)
 45         +- Project (2)
 46            +- Scan parquet  (1)
 47
 48(1) Scan parquet 
 49Output [1]: [payload_struct#11]
 50Batched: true
 51Location: InMemoryFileIndex [file:/opt/spark/data/files/df_struct]
 52ReadSchema: struct<payload_struct:struct<amount:double,currency:string>>
 53
 54(2) Project
 55Output [2]: [payload_struct#11.amount AS _extract_amount#26, payload_struct#11.currency AS _groupingexpression#25]
 56Input [1]: [payload_struct#11]
 57
 58(3) HashAggregate
 59Input [2]: [_extract_amount#26, _groupingexpression#25]
 60Keys [1]: [_groupingexpression#25]
 61Functions [1]: [partial_sum(_extract_amount#26)]
 62Aggregate Attributes [1]: [sum#30]
 63Results [2]: [_groupingexpression#25, sum#31]
 64
 65(4) Exchange
 66Input [2]: [_groupingexpression#25, sum#31]
 67Arguments: hashpartitioning(_groupingexpression#25, 200), ENSURE_REQUIREMENTS, [plan_id=93]
 68
 69(5) HashAggregate
 70Input [2]: [_groupingexpression#25, sum#31]
 71Keys [1]: [_groupingexpression#25]
 72Functions [1]: [sum(_extract_amount#26)]
 73Aggregate Attributes [1]: [sum(_extract_amount#26)#24]
 74Results [2]: [_groupingexpression#25 AS currency#22, sum(_extract_amount#26)#24 AS total#20]
 75
 76(6) AdaptiveSparkPlan
 77Output [2]: [currency#22, total#20]
 78Arguments: isFinalPlan=false
 79
 80
 81
 82=== VARIANT ===
 83== Physical Plan ==
 84AdaptiveSparkPlan (6)
 85+- HashAggregate (5)
 86   +- Exchange (4)
 87      +- HashAggregate (3)
 88         +- Project (2)
 89            +- Scan parquet  (1)
 90
 91
 92(1) Scan parquet 
 93Output [1]: [payload_variant#39]
 94Batched: true
 95Location: InMemoryFileIndex [file:/opt/spark/data/files/df_variant]
 96ReadSchema: struct<payload_variant:struct<0:variant,1:variant>>
 97
 98(2) Project
 99Output [2]: [payload_variant#39.0 AS payload_variant#15, cast(payload_variant#39.1 as string) AS _groupingexpression#38]
100Input [1]: [payload_variant#39]
101
102(3) HashAggregate
103Input [2]: [payload_variant#15, _groupingexpression#38]
104Keys [1]: [_groupingexpression#38]
105Functions [1]: [partial_sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
106Aggregate Attributes [1]: [sum#40]
107Results [2]: [_groupingexpression#38, sum#41]
108
109(4) Exchange
110Input [2]: [_groupingexpression#38, sum#41]
111Arguments: hashpartitioning(_groupingexpression#38, 200), ENSURE_REQUIREMENTS, [plan_id=108]
112
113(5) HashAggregate
114Input [2]: [_groupingexpression#38, sum#41]
115Keys [1]: [_groupingexpression#38]
116Functions [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
117Aggregate Attributes [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37]
118Results [2]: [_groupingexpression#38 AS currency#33, sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37 AS total#35]
119
120(6) AdaptiveSparkPlan
121Output [2]: [currency#33, total#35]
122Arguments: isFinalPlan=false