Spark : v4.x - Fonctionnalités - Variant Data Type
Vous trouverez dans cet article, des informations sur le type de données Variant de Spark v4.x.
Introduction
Le type de données VARIANT permet de stocker des données semi-structurées (JSON, XML, objets imbriqués) sans schéma prédéfini.
Il permet le schema-on-read pour les données hétérogènes. Il stocke les données dans un format binaire efficace avec des métadonnées pour l’information des types. Il prend en charge l’accès basé sur les chemins (variant_col:field.path) et la conversion de types.
Detail
Avant Spark 4.0, la gestion des données semi-structurées nécessitait de choisir entre trois approches problématiques :
- Stocker comme
STRINGet parser à l’exécution : Exécution du parsing à chaque requête. Mauvaises performances. Incompatible avec lepushdownde prédicats. - Définir un schéma
STRUCTrigide : Le schéma doit être connu à l’avance. L’évolution du schéma nécessite des modifications sur la table. Les champsnullgaspillent l’espace de stockage. Les données hétérogènes ne s’adaptent pas. - Utiliser
MAP<STRING, STRING>: Tout est en chaînes de caractères. Pas d’accès imbriqué. Conversion de type manuelle requise.
Spark 4.0 introduit le type VARIANT. Il permet de stocker des données semi-structurées dans un format binaire en colonnes qui préserve les types de données :
1CREATE TABLE events (
2 event_id STRING,
3 timestamp TIMESTAMP,
4 payload VARIANT
5);
6
7INSERT INTO events VALUES (
8 'EVT001',
9 TIMESTAMP'2024-01-15 10:00:00',
10 PARSE_JSON('{"user": {"id": 12345, "name": "Alice"}, "action": "purchase", "amount": 99.99}')
11);
12
13SELECT
14 event_id,
15 payload:user.name::STRING AS user_name,
16 payload:amount::DOUBLE AS amount
17FROM events
18WHERE payload:action::STRING = 'purchase';
Le format de stockage se compose de trois composants :
- Section Valeur : Les données réelles stockées dans un format binaire compressé. Les nombres, chaînes, booléens stockés dans des types natifs. Les structures imbriquées préservées.
- Section Métadonnées : Informations de schéma pour chaque valeur. Les balises de type indiquent si la valeur est INT, STRING, OBJECT, ARRAY, etc. Permet l’extraction
type-safesans parsing. - Index d’Offset : Pointeurs vers les champs imbriqués pour un accès rapide basé sur les chemins. Permet l’accès par
payload:user.nameen tempsO(1)sans scanner tout l’objet.
Cette conception permet plusieurs opérations :
- Accès basé sur les chemins : Extrait les valeurs imbriquées sans parsing. La syntaxe deux-points (
:) permet de naviguer dans la structure. Notation par points pour les objets imbriqués. Notation par crochets pour les tableaux :
1variant_column:field.nested.path
- Conversion de Type : La syntaxe double deux-points convertit le variant vers un type spécifique. Échoue si les types sont incompatibles. Utilisez TRY_CAST pour une conversion sûre :
TRY_CAST(variant_column AS INT).
1variant_column::INT
2variant_column::STRING
- Introspection de Type : Retourne le nom du type. Permet la vérification de type à l’exécution.
1schema_of_variant(variant_column)
L’optimiseur Catalyst prend en charge les opérations VARIANT. Le pushdown de prédicats fonctionne pour les champs de premier niveau. Par exemple, WHERE payload:status::STRING = 'active' peut ignorer les partitions où le champ status ne correspond pas. Cependant, les prédicats imbriqués peuvent ne pas être utilisable efficacement pour un pushdown.
Liste des fonctions pour manipuler les données de type VARIANT : Documentation SQL
Comparaison
| Critère | STRING + JSON | STRUCT | VARIANT |
|---|---|---|---|
| Schéma fixe requis | Non | Oui | Non |
| Performance lecture | Faible (re-parsé) | Élevée | Bonne (pré-indexé) |
| Performance écriture | Élevée | Bonne | Bonne |
| Schémas hétérogènes | Oui | Non | Oui |
| Pushdown de filtres | Limité | Complet | Partiel |
Shredding
Le shredding est une technique hybride. Au lieu de stocker le JSON comme une simple chaîne de caractères, Spark analyse les données et crée :
- Des colonnes de métadonnées pour les champs qui reviennent souvent (ex: id, sensor).
- Une colonne “reste” qui contient tout ce qui est trop rare ou trop complexe pour être décomposé.
C’est un mécanisme important pour :
- La performance (I/O) : Si vous faites une requête pour accéder à l’information
$.sensor, Spark ne lit physiquement que la colonneshreddedcorrespondant à ce champ. Il ignore tout le reste du document JSON. - La compression : Comme les données décomposées sont typées (ex: tous les IDs sont des entiers), les algorithmes de compression comme Snappy ou Zstd sont beaucoup plus efficaces que sur du texte brut.
La limite du shredding : Cela ne transforme pas l’ensemble des éléments du Variant en colonnes. Spark utilise une limite (généralement sur le nombre de champs ou la profondeur) pour éviter de créer des milliers de colonnes dans Parquet.
Attention : Le
shreddingne se produit que si Spark détecte que les données sont suffisamment répétitives pour valoir le coup d’être extraites en colonnes. Sur un petit échantillon, il est possible que Spark décide de ne pas faire deshredding.
Démonstration du format stockage avec et sans shredding (avec l’utilisation de duckdb pour voir la structure du stockage) :
1from pyspark.sql import SparkSession
2from pyspark.sql.types import StructType, StructField, VariantType, IntegerType, StringType
3from pyspark.sql.functions import col, parse_json
4
5REP_DATA_FILES = "file:///opt/spark/data/files"
6URL_MASTER = "spark://spark-master:7077"
7FILE_NAME_NS = "data_variant_ns"
8FILE_NAME_SC = "data_variant_sc"
9
10
11schema = StructType([
12 StructField("id", IntegerType(), nullable=False),
13 StructField("data_raw", StringType(), nullable=False),
14 StructField("data", VariantType(), nullable=True)
15])
16
17raw_data_small = [
18 (1,'{"sensor": "temp", "value": 22.5, "unit": "C"}',None),
19 (2,'{"sensor": "pressure", "value": 1013}',None)
20]
21
22raw_data_more = [
23 (3,'{"sensor": "temp", "value": 23, "unit": "C"}',None),
24 (4,'{"sensor": "pressure", "value": 900}',None),
25 (5,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
26 (6,'{"sensor": "pressure", "value": 1000}',None),
27 (7,'{"sensor": "temp", "value": 23.5, "unit": "C"}',None),
28 (8,'{"sensor": "pressure", "value": 1000}',None),
29 (9,'{"sensor": "temp", "value": 22, "unit": "C"}',None),
30 (10,'{"sensor": "pressure", "value": 856}',None)
31]
32
33# Spark Instance for Shredding Demo
34spark = SparkSession.builder \
35 .appName("SPARK_VARIANTModeShredding") \
36 .master(URL_MASTER) \
37 .getOrCreate()
38
39
40# Load Dataframe with small dataset
41df_variant_ns = spark.createDataFrame(raw_data_small, schema)
42# Cast STRING to VARIANT (col : data)
43df_variant_ns = df_variant_ns.withColumn("data", parse_json(col("data_raw")))
44# Write dataframe into parquet file (no shredding)
45df_variant_ns.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_NS}.parquet")
46
47
48# Load Dataframe with complete dataset
49df_variant_sc = spark.createDataFrame((raw_data_small + raw_data_more), schema)
50# Cast STRING to VARIANT (col : data)
51df_variant_sc = df_variant_sc.withColumn("data", parse_json(col("data_raw")))
52# Write dataframe into parquet file (shredding)
53df_variant_sc.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/{FILE_NAME_SC}.parquet")
54
55spark.stop()
Résultat de la démonstration :
1/ Concernant le fichier data_variant_ns.parquet
- Ce fichier ne contient que deux lignes de données et par conséquent, Spark n’applique pas de
shreddingpar défaut. - Cela est confirmé en utilisant la commande
duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_ns.parquet/*.parquet';"qui permet d’afficher la description du type de la colonnedataqui stocke le JSON en VARIANT (qui est unSTRUCTcomposé demetadataetvalue)
┌─────────────┬─────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id │ INTEGER │ YES │ NULL │ NULL │ NULL │
│ data_raw │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ data │ STRUCT(metadata BLOB, "value" BLOB) │ YES │ NULL │ NULL │ NULL │
└─────────────┴─────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘
2/ Concernant le fichier data_variant_sc.parquet
- Ce fichier contient 10 lignes de données et Spark applique un
Shreddingpar défaut. - Cela se confirme en utilisant la commande
duckdb -c "DESCRIBE SELECT * FROM '~/{folder_data}/files/data_variant_sc.parquet/*.parquet';"qui permet d’afficher la description du type de la colonnedataqui stocke le JSON en VARIANT (qui est unSTRUCTcomposé demetadataetvaluemais avec cette fois en plus l’informationtyped_valuequi est le résultat dushredding)
┌─────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │ column_type │ null │ key │ default │ extra │
│ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │
├─────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ id │ INTEGER │ YES │ NULL │ NULL │ NULL │
│ data_raw │ VARCHAR │ YES │ NULL │ NULL │ NULL │
│ data │ STRUCT(metadata BLOB, "value" BLOB, typed_value STRUCT(sensor STRUCT("value" BLOB, typed_value VARCHAR), unit STRUCT("value" BLOB, typed_value VARCHAR), "value" STRUCT("value" BLOB, typed_value DECIMAL(18,1)))) │ YES │ NULL │ NULL │ NULL │
└─────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴─────────┘
Conseils
from_jsonnécessite de connaître le schéma à l’avance, alors queparse_jsonpréserve la flexibilité des données brutes tout en les optimisant.- Use
VARIANTfor the landing/raw layer (and exploration) andSTRUCTfor the Gold/Serving layer. - Utilisez
VARIANTpour la couche Landing/Bronze (et l’exploration) etSTRUCTpour la couche Optimisé/Gold
Advantages
- Flexibilité du Schéma : De nouveaux champs apparaissent sans
ALTER TABLE. Les payloads d’événements évoluent au fil du temps sans casser les requêtes. Différents types d’événements stockés dans la même table. Critique pour l’event sourcing, l’intégration d’API et les environnements d’itération rapide. - Meilleures performances que les données JSON : L’encodage binaire évite la surcharge de parsing à l’exécution. Le
shreddinget les métadonnées permettent l’accès direct aux informations. Cela élimine les opérations coûteuses de regex ou parsing JSON lors de la lecture. Le stockage compressé réduit les I/O. Schema-on-readpartiel : Interrogez uniquement les champs dont vous avez besoin. Les champs inutilisés n’entraînent aucun coût d’extraction. Permet l’analyse exploratoire sur des schémas inconnus. Migrez progressivement vers des colonnes typées à mesure que le schéma se stabilise.- Interopérabilité avec les systèmes JSON : Parsing et sérialisation JSON directs. Compatible avec les API REST, files de messages, exports NoSQL.
Limitations
- Performances plus lentes que les types natifs : Les agrégations sur les champs VARIANT sont plus lentes que les colonnes natives INT/DECIMAL. L’extraction et la validation de type se produisent à l’exécution. L’accès basé sur les chemins a une surcharge par rapport à l’accès direct aux colonnes. Non adapté pour l’analytique critique en performances sur des schémas stables.
- Pas de validation de schéma : N’importe quel JSON peut être inséré. Aucune contrainte sur la présence ou les types de champs. Les fautes de frappe dans les noms de champs retournent silencieusement
NULL. Les problèmes de qualité des données sont plus difficiles à détecter. Nécessite une validation au niveau applicatif. - Support limité de l’optimiseur : Les prédicats imbriqués complexes peuvent ne pas être poussés vers le stockage. La collecte de statistiques est limitée. L’optimisation des jointures est moins efficace que les types natifs. La planification de requête ne peut pas exploiter les informations de schéma pour l’optimisation.
- Surcharge de stockage pour les données simples : Il est légèrement plus lourd qu’un
STRUCToptimisé car il doit stocker des métadonnées supplémentaires pour chaque enregistrement - Écosystème Immature : Toutes les fonctions Spark ne prennent pas en charge le type VARIANT. Les outils BI et les connecteurs qui lisent directement les fichiers Parquet ne supportent pas encore tous le type VARIANT nativement.
Quand ne pas utiliser le type Variant :
- Données hautement structurées
- Opérations critiques en terme de performances
- Colonnes partitionnées
Real-World Use Cases
- Cas d’usage 1 : Agrégation d’événements multi-sources
- Une plateforme de données ingère des événements provenant de plus de 20 microservices. Chaque service a des schémas d’événements différents qui évoluent indépendamment. Une table avec une colonne VARIANT stocke tous les événements sans coordination de schéma. Les requêtes extraient les champs spécifiques au service selon les besoins. L’évolution du schéma se produit sans modification du pipeline.
- Cas d’usage 2 : Mise en cache de réponses d’API
- Un système met en cache les réponses API dans des tables Spark pour analyse. Les payloads API sont profondément imbriqués avec des champs optionnels. la colonne de type VARIANT stocke les réponses sans aplatissement. Les requêtes d’analyse extraient des chemins spécifiques. Les nouvelles versions d’API ajoutent des champs sans casser les requêtes existantes.
Codes
Exemple 1 : Manipulation du type Variant
Note : Création d’un script SQL avec le contenu du code et utilisation de Spark SQL pour l’exécuter :
docker exec -it spark-master spark-sql --master "local[*]" --conf "spark.hadoop.hive.cli.print.header=true" -f <script.sql> > script.log
SQL
1-- SQL Example - Work with variant Type
2
3-- ============================================================================
4-- INIT: Create Events Table with VARIANT Column
5-- ============================================================================
6
7SELECT '=== Create events table with event dataset ===' as section;
8
9DROP TABLE IF EXISTS events;
10CREATE TABLE events (
11 event_id STRING,
12 timestamp TIMESTAMP,
13 event_type STRING,
14 payload VARIANT
15);
16
17-- Dataset
18-- Each event type has different payload structure
19INSERT INTO events VALUES
20 ('EVT001', TIMESTAMP'2024-01-15 10:00:00', 'user_login',
21 PARSE_JSON('{"user_id": 12345, "username": "alice", "ip_address": "192.168.1.100", "device": "mobile"}')),
22 ('EVT002', TIMESTAMP'2024-01-15 10:05:00', 'purchase',
23 PARSE_JSON('{"user_id": 12345, "order_id": "ORD123", "amount": 99.99, "currency": "USD", "items": [{"sku": "PROD001", "quantity": 2, "price": 49.99}]}')),
24 ('EVT003', TIMESTAMP'2024-01-15 10:10:00', 'user_login',
25 PARSE_JSON('{"user_id": 67890, "username": "bob", "ip_address": "10.0.0.50", "device": "desktop", "session_id": "sess_xyz"}')),
26 ('EVT004', TIMESTAMP'2024-01-15 10:15:00', 'page_view',
27 PARSE_JSON('{"user_id": 12345, "page_url": "/products/electronics", "referrer": "https://google.com", "duration_seconds": 45}')),
28 ('EVT005', TIMESTAMP'2024-01-15 10:20:00', 'purchase',
29 PARSE_JSON('{"user_id": 67890, "order_id": "ORD124", "amount": 149.50, "currency": "USD", "items": [{"sku": "PROD002", "quantity": 1, "price": 149.50}], "discount_code": "SAVE10"}')),
30 ('EVT006', TIMESTAMP'2024-01-15 10:25:00', 'error',
31 PARSE_JSON('{"error_code": "E500", "error_message": "Database connection timeout", "service": "payment-api", "stack_trace": "com.example.PaymentService.processPayment"}')),
32 ('EVT007', TIMESTAMP'2024-01-15 10:30:00', 'page_view',
33 PARSE_JSON('{"user_id": 67890, "page_url": "/checkout", "referrer": "/cart", "duration_seconds": 120, "cart_items": 1}')),
34 ('EVT008', TIMESTAMP'2024-01-15 10:35:00', 'user_logout',
35 PARSE_JSON('{"user_id": 12345, "username": "alice", "session_duration_minutes": 35}')),
36 ('EVT009', TIMESTAMP'2024-01-15 10:40:00', 'error',
37 PARSE_JSON('{"error_code": "E404", "error_message": "Product not found", "service": "catalog-api", "requested_sku": "PROD999"}')),
38 ('EVT010', TIMESTAMP'2024-01-15 10:45:00', 'purchase',
39 PARSE_JSON('{"user_id": 12345, "order_id": "ORD125", "amount": 299.99, "currency": "USD", "items": [{"sku": "PROD003", "quantity": 3, "price": 99.99}], "shipping_address": {"country": "US", "zip": "94105"}}'));
40
41-- ============================================================================
42-- QUERY 1: Path-Based Field Extraction
43-- ============================================================================
44
45SELECT '=== Query 1: Path-Based Field Extraction ===' as section;
46
47SELECT
48 event_id,
49 event_type,
50 payload:user_id AS user_id,
51 payload:username AS username,
52 payload:order_id AS order_id
53FROM events
54WHERE payload:user_id IS NOT NULL
55ORDER BY timestamp;
56
57-- ============================================================================
58-- QUERY 2: Type Casting with Double-Colon Syntax
59-- ============================================================================
60
61SELECT '=== Query 2: Type Casting with Double-Colon Syntax ===' as section;
62
63SELECT
64 event_id,
65 timestamp,
66 payload:user_id::STRING AS user_id,
67 payload:order_id::STRING AS order_id,
68 payload:amount::DECIMAL(10,2) AS amount,
69 payload:currency::STRING AS currency,
70 payload:discount_code::STRING AS discount_code
71FROM events
72WHERE event_type = 'purchase'
73ORDER BY payload:amount::DECIMAL DESC;
74
75-- ============================================================================
76-- QUERY 3: Nested Field Access
77-- ============================================================================
78
79SELECT '=== Query 3: Nested Field Accesss ===' as section;
80
81-- Access nested shipping address
82SELECT
83 event_id,
84 payload:order_id AS order_id,
85 payload:shipping_address.country::STRING AS country,
86 payload:shipping_address.zip::STRING AS zip_code,
87 payload:items[0].sku::STRING AS first_item_sku,
88 payload:items[0].quantity::INT AS first_item_qty
89FROM events
90WHERE payload:shipping_address IS NOT NULL;
91
92-- ============================================================================
93-- QUERY 4: Type Introspection
94-- ============================================================================
95
96SELECT '=== Query 4: Type Introspection ===' as section;
97
98SELECT
99 event_type,
100 schema_of_variant(payload:user_id) AS user_id_type,
101 schema_of_variant(payload:amount) AS amount_type,
102 schema_of_variant(payload:items) AS items_type,
103 COUNT(*) AS event_count
104FROM events
105GROUP BY event_type, schema_of_variant(payload:user_id), schema_of_variant(payload:amount),schema_of_variant(payload:items)
106ORDER BY event_type;
107
108-- ============================================================================
109-- QUERY 5: Aggregations with VARIANT Fields
110-- ============================================================================
111
112SELECT '=== Query 5: Aggregations with VARIANT Fields ===' as section;
113
114SELECT
115 payload:user_id::STRING AS user_id,
116 COUNT(*) AS purchase_count,
117 SUM(payload:amount::DECIMAL(10,2)) AS total_revenue,
118 AVG(payload:amount::DECIMAL(10,2)) AS avg_order_value,
119 MIN(payload:amount::DECIMAL(10,2)) AS min_purchase,
120 MAX(payload:amount::DECIMAL(10,2)) AS max_purchase
121FROM events
122WHERE event_type = 'purchase'
123 AND payload:amount IS NOT NULL
124GROUP BY payload:user_id::STRING
125ORDER BY total_revenue DESC;
126
127-- ============================================================================
128-- QUERY 6: Conditional Logic Based on Variant Content
129-- ============================================================================
130
131SELECT '=== Query 6: Conditional Logic Based on Variant Content ===' as section;
132
133SELECT
134 event_id,
135 event_type,
136 payload:user_id::STRING AS user_id,
137 CASE
138 WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 200 THEN 'High Value'
139 WHEN event_type = 'purchase' AND payload:amount::DECIMAL(10,2) > 100 THEN 'Medium Value'
140 WHEN event_type = 'purchase' THEN 'Low Value'
141 WHEN event_type = 'error' AND payload:error_code::STRING LIKE 'E5%' THEN 'Critical Error'
142 WHEN event_type = 'error' THEN 'Minor Error'
143 ELSE 'Standard Event'
144 END AS event_category,
145 CASE
146 WHEN event_type = 'purchase' THEN payload:amount::DECIMAL(10,2)
147 ELSE 0
148 END AS monetary_value
149FROM events
150ORDER BY timestamp;
Résultat
1section
2=== Create events table with event dataset ===
3Time taken: 2.28 seconds, Fetched 1 row(s)
4Response code
5Time taken: 1.41 seconds
6Response code
7Time taken: 0.602 seconds
8Response code
9Time taken: 1.314 seconds
10
11
12section
13=== Query 1: Path-Based Field Extraction ===
14Time taken: 0.059 seconds, Fetched 1 row(s)
15event_id event_type user_id username order_id
16EVT001 user_login 12345 "alice" NULL
17EVT002 purchase 12345 NULL "ORD123"
18EVT003 user_login 67890 "bob" NULL
19EVT004 page_view 12345 NULL NULL
20EVT005 purchase 67890 NULL "ORD124"
21EVT007 page_view 67890 NULL NULL
22EVT008 user_logout 12345 "alice" NULL
23EVT010 purchase 12345 NULL "ORD125"
24Time taken: 1.815 seconds, Fetched 8 row(s)
25
26
27section
28=== Query 2: Type Casting with Double-Colon Syntax ===
29Time taken: 0.112 seconds, Fetched 1 row(s)
30event_id timestamp user_id order_id amount currency discount_code
31EVT010 2024-01-15 10:45:00 12345 ORD125 299.99 USD NULL
32EVT005 2024-01-15 10:20:00 67890 ORD124 149.50 USD SAVE10
33EVT002 2024-01-15 10:05:00 12345 ORD123 99.99 USD NULL
34Time taken: 0.972 seconds, Fetched 3 row(s)
35
36
37section
38=== Query 3: Nested Field Accesss ===
39Time taken: 0.166 seconds, Fetched 1 row(s)
40event_id order_id country zip_code first_item_sku first_item_qty
41EVT010 "ORD125" US 94105 PROD003 3
42Time taken: 0.427 seconds, Fetched 1 row(s)
43
44
45section
46=== Query 4: Type Introspection ===
47Time taken: 0.099 seconds, Fetched 1 row(s)
48event_type user_id_type amount_type items_type event_count
49error NULL NULL NULL 2
50page_view BIGINT NULL NULL 2
51purchase BIGINT DECIMAL(5,2) ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>> 1
52purchase BIGINT DECIMAL(4,2) ARRAY<OBJECT<price: DECIMAL(4,2), quantity: BIGINT, sku: STRING>> 1
53purchase BIGINT DECIMAL(4,1) ARRAY<OBJECT<price: DECIMAL(4,1), quantity: BIGINT, sku: STRING>> 1
54user_login BIGINT NULL NULL 2
55user_logout BIGINT NULL NULL 1
56Time taken: 1.193 seconds, Fetched 7 row(s)
57
58
59section
60=== Query 5: Aggregations with VARIANT Fields ===
61Time taken: 0.09 seconds, Fetched 1 row(s)
62user_id purchase_count total_revenue avg_order_value min_purchase max_purchase
6312345 2 399.98 199.990000 99.99 299.99
6467890 1 149.50 149.500000 149.50 149.50
65Time taken: 1.202 seconds, Fetched 2 row(s)
66
67
68section
69=== Query 6: Conditional Logic Based on Variant Content ===
70Time taken: 0.096 seconds, Fetched 1 row(s)
71event_id event_type user_id event_category monetary_value
72EVT001 user_login 12345 Standard Event 0.00
73EVT002 purchase 12345 Low Value 99.99
74EVT003 user_login 67890 Standard Event 0.00
75EVT004 page_view 12345 Standard Event 0.00
76EVT005 purchase 67890 Medium Value 149.50
77EVT006 error NULL Critical Error 0.00
78EVT007 page_view 67890 Standard Event 0.00
79EVT008 user_logout 12345 Standard Event 0.00
80EVT009 error NULL Minor Error 0.00
81EVT010 purchase 12345 High Value 299.99
82Time taken: 0.705 seconds, Fetched 10 row(s)
Exemple 2 - Comparaison du plan d’exécution : VARIANT vs STRING vs STRUCT
Spark
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col, get_json_object, from_json, expr
3from pyspark.sql.types import DoubleType, StringType, StructType, StructField
4
5
6REP_DATA_FILES = "file:///opt/spark/data/files"
7URL_MASTER = "spark://spark-master:7077"
8
9spark = SparkSession.builder \
10 .appName("SPARK_VARIANTModePERF") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14# Reference dataset
15transactions = [
16 ("TXN-001", '{"amount": 1500.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
17 ("TXN-002", '{"amount": 230.50, "currency": "USD", "merchant": "UBER", "category": "transport", "surge": 1.2}'),
18 ("TXN-003", '{"amount": 89750.0, "currency": "EUR", "merchant": "AIRBNB", "category": "travel", "nights": 5}'),
19 ("TXN-004", '{"amount": 45.00, "currency": "GBP", "merchant": "NETFLIX", "category": "entertainment"}'),
20 ("TXN-005", '{"amount": 3200.0, "currency": "EUR", "merchant": "APPLE", "category": "electronics", "items": 2}'),
21 ("TXN-006", '{"amount": 800.00, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
22 ("TXN-007", '{"amount": 1100.0, "currency": "USD", "merchant": "GOOGLE", "category": "software"}'),
23 ("TXN-008", '{"amount": 6500.0, "currency": "EUR", "merchant": "BOOKING", "category": "travel", "nights": 3}'),
24 ("TXN-009", '{"amount": 320.75, "currency": "GBP", "merchant": "SPOTIFY", "category": "entertainment"}'),
25 ("TXN-010", '{"amount": 2200.0, "currency": "EUR", "merchant": "AMZN", "category": "retail"}'),
26]
27
28df = spark.createDataFrame(transactions, ["txn_id", "payload_str"])
29
30# Approach n°1 : STRING + get_json_object (Spark 3)
31df_str = df.withColumn("amount_str", get_json_object(col("payload_str"), "$.amount")) \
32 .withColumn("currency_str", get_json_object(col("payload_str"), "$.currency"))
33df_str.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_str")
34df_temp0 = spark.read.parquet(f"{REP_DATA_FILES}/df_str")
35df_temp0.createOrReplaceTempView("df_str")
36
37# Approach 2: Typed STRUCT (best performance for stable and known schemas)
38txn_schema = StructType([
39 StructField("amount", DoubleType()),
40 StructField("currency", StringType()),
41 StructField("merchant", StringType()),
42 StructField("category", StringType()),
43])
44df_struct = df.withColumn("payload_struct", from_json(col("payload_str"), txn_schema))
45df_struct.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_struct")
46df_temp1 = spark.read.parquet(f"{REP_DATA_FILES}/df_struct")
47df_temp1.createOrReplaceTempView("df_struct")
48
49# Approach 3: VARIANT (Spark 4.0 — best for heterogeneous or evolving schemas)
50df_variant = df.withColumn("payload_variant", expr("PARSE_JSON(payload_str)"))
51df_variant.write.format("parquet").mode("overwrite").save(f"{REP_DATA_FILES}/df_variant")
52df_temp2 = spark.read.parquet(f"{REP_DATA_FILES}/df_variant")
53df_temp2.createOrReplaceTempView("df_variant")
54
55# Reference query : aggregation by currency
56print("=== STRING ===")
57spark.sql("""
58 SELECT currency_str
59 ,SUM(CAST(amount_str AS DOUBLE)) AS total
60 FROM df_str
61 GROUP BY currency_str
62""").explain(mode="formatted")
63
64print("=== STRUCT ===")
65spark.sql("""
66 SELECT payload_struct.currency
67 ,SUM(payload_struct.amount) AS total
68 FROM df_struct
69 GROUP BY payload_struct.currency
70""").explain(mode="formatted")
71
72print("=== VARIANT ===")
73spark.sql("""
74 SELECT payload_variant:currency::STRING AS currency
75 ,SUM(payload_variant:amount::DOUBLE) AS total
76 FROM df_variant
77 GROUP BY payload_variant:currency::STRING
78""").explain(mode="formatted")
79
80spark.stop()
Résultat
1=== STRING ===
2== Physical Plan ==
3AdaptiveSparkPlan (5)
4+- HashAggregate (4)
5 +- Exchange (3)
6 +- HashAggregate (2)
7 +- Scan parquet (1)
8
9(1) Scan parquet
10Output [2]: [amount_str#6, currency_str#7]
11Batched: true
12Location: InMemoryFileIndex [file:/opt/spark/data/files/df_str]
13ReadSchema: struct<amount_str:string,currency_str:string>
14
15(2) HashAggregate
16Input [2]: [amount_str#6, currency_str#7]
17Keys [1]: [currency_str#7]
18Functions [1]: [partial_sum(cast(amount_str#6 as double))]
19Aggregate Attributes [1]: [sum#18]
20Results [2]: [currency_str#7, sum#19]
21
22(3) Exchange
23Input [2]: [currency_str#7, sum#19]
24Arguments: hashpartitioning(currency_str#7, 200), ENSURE_REQUIREMENTS, [plan_id=78]
25
26(4) HashAggregate
27Input [2]: [currency_str#7, sum#19]
28Keys [1]: [currency_str#7]
29Functions [1]: [sum(cast(amount_str#6 as double))]
30Aggregate Attributes [1]: [sum(cast(amount_str#6 as double))#17]
31Results [2]: [currency_str#7, sum(cast(amount_str#6 as double))#17 AS total#16]
32
33(5) AdaptiveSparkPlan
34Output [2]: [currency_str#7, total#16]
35Arguments: isFinalPlan=false
36
37
38
39=== STRUCT ===
40== Physical Plan ==
41AdaptiveSparkPlan (6)
42+- HashAggregate (5)
43 +- Exchange (4)
44 +- HashAggregate (3)
45 +- Project (2)
46 +- Scan parquet (1)
47
48(1) Scan parquet
49Output [1]: [payload_struct#11]
50Batched: true
51Location: InMemoryFileIndex [file:/opt/spark/data/files/df_struct]
52ReadSchema: struct<payload_struct:struct<amount:double,currency:string>>
53
54(2) Project
55Output [2]: [payload_struct#11.amount AS _extract_amount#26, payload_struct#11.currency AS _groupingexpression#25]
56Input [1]: [payload_struct#11]
57
58(3) HashAggregate
59Input [2]: [_extract_amount#26, _groupingexpression#25]
60Keys [1]: [_groupingexpression#25]
61Functions [1]: [partial_sum(_extract_amount#26)]
62Aggregate Attributes [1]: [sum#30]
63Results [2]: [_groupingexpression#25, sum#31]
64
65(4) Exchange
66Input [2]: [_groupingexpression#25, sum#31]
67Arguments: hashpartitioning(_groupingexpression#25, 200), ENSURE_REQUIREMENTS, [plan_id=93]
68
69(5) HashAggregate
70Input [2]: [_groupingexpression#25, sum#31]
71Keys [1]: [_groupingexpression#25]
72Functions [1]: [sum(_extract_amount#26)]
73Aggregate Attributes [1]: [sum(_extract_amount#26)#24]
74Results [2]: [_groupingexpression#25 AS currency#22, sum(_extract_amount#26)#24 AS total#20]
75
76(6) AdaptiveSparkPlan
77Output [2]: [currency#22, total#20]
78Arguments: isFinalPlan=false
79
80
81
82=== VARIANT ===
83== Physical Plan ==
84AdaptiveSparkPlan (6)
85+- HashAggregate (5)
86 +- Exchange (4)
87 +- HashAggregate (3)
88 +- Project (2)
89 +- Scan parquet (1)
90
91
92(1) Scan parquet
93Output [1]: [payload_variant#39]
94Batched: true
95Location: InMemoryFileIndex [file:/opt/spark/data/files/df_variant]
96ReadSchema: struct<payload_variant:struct<0:variant,1:variant>>
97
98(2) Project
99Output [2]: [payload_variant#39.0 AS payload_variant#15, cast(payload_variant#39.1 as string) AS _groupingexpression#38]
100Input [1]: [payload_variant#39]
101
102(3) HashAggregate
103Input [2]: [payload_variant#15, _groupingexpression#38]
104Keys [1]: [_groupingexpression#38]
105Functions [1]: [partial_sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
106Aggregate Attributes [1]: [sum#40]
107Results [2]: [_groupingexpression#38, sum#41]
108
109(4) Exchange
110Input [2]: [_groupingexpression#38, sum#41]
111Arguments: hashpartitioning(_groupingexpression#38, 200), ENSURE_REQUIREMENTS, [plan_id=108]
112
113(5) HashAggregate
114Input [2]: [_groupingexpression#38, sum#41]
115Keys [1]: [_groupingexpression#38]
116Functions [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))]
117Aggregate Attributes [1]: [sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37]
118Results [2]: [_groupingexpression#38 AS currency#33, sum(cast(variant_get(payload_variant#15, $.amount, VariantType, true, Some(Etc/UTC)) as double))#37 AS total#35]
119
120(6) AdaptiveSparkPlan
121Output [2]: [currency#33, total#35]
122Arguments: isFinalPlan=false
