Spark : v4.x - Fonctionnalités - Scripts SQL, Session et Conformité ANSI
Vous trouverez dans cet article, des informations sur les fonctionnalité SQL (Script, Session, Conformité) à partir de Spark v4.x.
Conformité ANSI SQL
Introduction
It enables type safety, prevents implicit casting errors and enforces reserved keyword restrictions.
Le mode de conformité ANSI SQL applique une sémantique SQL plus stricte correspondant à la norme ISO/IEC 9075. Cela permet de limiter les erreurs de typage, empêcher les erreurs de conversion implicite et appliquer les restrictions sur les mots-clés réservés.
Détail
Le comportement SQL par défaut de Spark privilégie la facilité d’utilisation plutôt que la rigueur. Les conversions de types implicites réussissent silencieusement même lorsqu’elles sont sémantiquement incorrectes. La division d’entiers retourne des entiers (troncature). Les opérations de dépassement de capacité bouclent sans erreur. Les mots-clés réservés peuvent être utilisés comme noms de colonnes sans guillemets.
La conformité ANSI SQL modifie fondamentalement ces comportements pour correspondre aux bases de données traditionnelles comme PostgreSQL, SQL Server et autres. Les changements affectent trois domaines principaux :
- Rigueur du système de types : Les requêtes échouent au lieu de retourner silencieusement des résultats tronqués ou
null - Gestion des NULL : Le mode ANSI applique une sémantique
NULLplus stricte dans les prédicats et les opérations ensemblistes.NULL = NULLretourne toujoursNULL(pas vrai).UNION,INTERSECTetEXCEPTtraitent les valeursNULLcomme égales pour la déduplication (conformément à la norme SQL). - Mots-clés réservés : Le mode ANSI applique les restrictions sur les mots-clés réservés au SQL. Les noms de colonnes comme
SELECT,FROM,WHEREdoivent être entourés de backticks (guillemets inversés). Cela correspond au comportement des bases de données traditionnelles mais casse les requêtes Spark SQL qui utilisent ces noms librement.
L’implémentation affecte plusieurs composants de Spark :
- Optimiseur Catalyst : Ajoute des règles de validation de types pendant la phase d’analyse. Rejette les requêtes avec des incompatibilités de types plus tôt dans la planification.
- Génération de code : Modifie le code généré pour les opérations arithmétiques afin d’inclure des vérifications de dépassement de capacité. Ajoute des instructions de branchement qui vérifient les conditions de dépassement avant de retourner les résultats.
- Exécution à la volée (runtime) : Encapsule les opérations dans des gestionnaires d’exceptions qui font échouer les tâches en cas de violations ANSI. Cela change la gestion des erreurs d’une propagation silencieuse de NULL à des échecs explicites.
Le mode est configuré par session via spark.sql.ansi.enabled. Une fois activé, toutes les requêtes de cette session suivent les règles ANSI.
Conseils
spark.sql.ansi.enabled = true:- Interrupteur principal pour la conformité ANSI. Active tous les comportements stricts : vérification du dépassement de capacité, sûreté des types, sémantique de division.
- Doit être défini avant la création de la session pour un comportement cohérent.
- Ne peut pas être modifié en cours de session dans certains modes de déploiement Spark.
TRY_CASTvsCASTen mode ANSI :CAST()lève des exceptions en cas d’erreurs de conversion en mode ANSI.TRY_CAST()retourneNULLmais signale explicitement l’échec de validation.- Utilisez
TRY_CAST()pour les pipelines de validation de données où les lignes incorrectes doivent être filtrées/journalisées plutôt que de faire échouer la tâche entière.
- Fonction
typeof():- Retourne le type de données d’un résultat d’expression. Indique si le résultat est
INT,DECIMAL,DOUBLE, etc. À utiliser pour identifier les changements de type (dans le cas d’une migration par exemple).
- Retourne le type de données d’un résultat d’expression. Indique si le résultat est
- Guillemets pour les mots-clés réservés :
- Le mode ANSI impose l’utilisation de backticks (guillemets inversés) pour les mots réservés. Les requêtes avec des noms de colonnes comme
user,timestamp,orderdoivent être réécrites. Il s’agit d’un changement critique pour les grandes bases de code SQL. - Exécutez une analyse statique pour identifier les requêtes affectées avant la migration.
- Le mode ANSI impose l’utilisation de backticks (guillemets inversés) pour les mots réservés. Les requêtes avec des noms de colonnes comme
Avantages
- Amélioration de la sureté des types : La vérification stricte des types détecte les erreurs lors de la compilation de la requête plutôt qu’à l’exécution. Les problèmes d’évolution de schéma apparaissent plus tôt. Empêche les problèmes de qualité des données de se propager par les pipelines de données.
- Détection d’erreurs rapide : Les incompatibilités de types et les erreurs arithmétiques échouent immédiatement au lieu de produire silencieusement des résultats incorrects. Un pipeline qui traite des transactions financières ne corrompra pas silencieusement les données en cas de dépassement de capacité. Les erreurs apparaissent pendant les tests plutôt qu’en production.
- Compatibilité de migration de base de données : Les requêtes qui fonctionnaient correctement dans la base de données source se comportent de manière identique dans Spark. (Réduit le risque de migration et la charge de test)
- Conformité réglementaire : Les secteurs des services financiers et de la santé exigent un comportement de requête déterministe et auditable. Le mode ANSI fournit une sémantique prévisible qui correspond aux attentes réglementaires.
Limitations
- Modifications incompatibles avec les requêtes existantes : Les requêtes qui s’appuyaient sur le comportement permissif de Spark échoueront. Les opérations CAST qui retournaient précédemment NULL lèvent maintenant des exceptions. Les résultats de division d’entiers changent de 0 à 0,5. Les grandes bases de code nécessitent des tests et une refactorisation approfondis.
- Surcoût en terme de performance : Chaque addition, multiplication et division inclut des branches conditionnelles pour vérifier le dépassement de capacité. Les traitements avec beaucoup de calcul sur un volume important de donnée peut subir une dégradation de performance.
- Configuration tout ou rien : Impossible d’activer le mode ANSI pour des requêtes ou des tables spécifiques. Le paramètre au niveau de la session affecte toutes les opérations. Une migration progressive nécessite l’exécution de deux sessions Spark distinctes avec des configurations différentes.
- Les erreurs peuvent faire échouer des tâches entières : Une seule ligne avec une erreur de conversion de type fait échouer la tâche entière. En mode par défaut, les lignes incorrectes deviennent NULL et le traitement continue. Le mode ANSI manque d’une gestion d’erreur au niveau de la ligne. Nécessite une pré-validation ou un pipeline de gestion d’erreurs distinct.
Cas d’usages (concret)
- Cas d’usage 1 : Migration de PostgreSQL vers Spark
- Une entreprise migre plus de 500 requêtes SQL d’un entrepôt de données PostgreSQL vers un cluster Spark. Les requêtes contiennent des divisions d’entiers, des calculs sujets au dépassement de capacité et des dépendances de types strictes. Le mode ANSI permet d’exécuter les requêtes sans modification. Sans le mode ANSI, un pourcentage important des requêtes peut nécessiter une réécriture pour correspondre à la sémantique de PostgreSQL.
- Cas d’usage 2 : Précision des calculs financiers
- Une société de services financiers calcule les intérêts courus sur des portefeuilles de prêts. La division d’entiers par défaut de Spark
principal * taux / 365tronquerait à zéro pour les petits taux journaliers. Le mode ANSI impose une division décimale produisant des résultats précis. Cela peut éviter des erreurs de calcul de plusieurs millions de dollars.
- Une société de services financiers calcule les intérêts courus sur des portefeuilles de prêts. La division d’entiers par défaut de Spark
- Cas d’usage 3 : Pipeline de validation de la qualité des données
- Un pipeline ETL valide les données entrantes par rapport à des schémas de types stricts. Les enregistrements avec des conversions de types invalides doivent être rejetés et non silencieusement convertis en NULL. Le mode ANSI fait échouer les tâches dès le premier enregistrement incorrect, déclenchant des alertes. Les équipes enquêtent immédiatement sur les problèmes de qualité des données au lieu de découvrir des données corrompues en aval.
Codes
Exemple : Conversion des types et comportement des opérations arithmétiques
Cet exemple montre comment le mode ANSI modifie la conversion des types et les opérations arithmétiques. Il illustre les changements importants par rapport au comportement par défaut.
Spark : Mode par défaut
1# PySpark Example - Not ANSI SQL Type Safety and Arithmetic
2from pyspark.sql import SparkSession
3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
4
5URL_MASTER = "spark://spark-master:7077"
6
7
8spark = SparkSession.builder \
9 .appName("SPARK_DefaultSQLMode") \
10 .config("spark.sql.ansi.enabled", "false") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14
15print("""
16##########################
17##### Build Datasets #####
18##########################
19""")
20
21# Source system 1: Financial dataset
22# Demonstrates real-world scenarios where ANSI mode prevents errors
23financial_data = [
24 ("LOAN001", 10000, 5.25, 365, "2024-01-15"), # Standard loan
25 ("LOAN002", 50000, 4.75, 365, "2024-01-15"), # Large principal
26 ("LOAN003", 1500, 12.50, 365, "2024-01-15"), # High interest rate
27 ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
28 ("LOAN005", 25000, 6.00, 365, "2024-01-15"), # Medium loan
29 ("LOAN006", 8000, 0.00, 365, "2024-01-15"), # Zero interest (division edge case)
30 ("LOAN007", 100000, 7.25, 365, "2024-01-15"), # Large loan
31 ("LOAN008", 500, 15.00, 365, "2024-01-15"), # Small principal, high rate
32 ("LOAN009", 75000, 4.25, 365, "2024-01-15"), # Standard
33 ("LOAN010", 1000000, 3.50, 365, "2024-01-15"), # Very large principal (overflow in calculations)
34]
35
36# Source system 2: Problematic Financial dataset
37# Test dataset with intentionally problematic values for type conversion
38problematic_data = [
39 ("CONV001", "12345", "2024-01-15"), # Valid integer string
40 ("CONV002", "67890", "2024-01-15"), # Valid integer string
41 ("CONV003", "abc123", "2024-01-15"), # Invalid integer (contains letters)
42 ("CONV004", "99999", "2024-01-15"), # Valid integer
43 ("CONV005", "12.34", "2024-01-15"), # Decimal in integer field
44 ("CONV006", "", "2024-01-15"), # Empty string
45 ("CONV007", "55555", "2024-01-15"), # Valid integer
46 ("CONV008", "1e5", "2024-01-15"), # Scientific notation
47 ("CONV009", "42", "2024-01-15"), # Valid integer
48 ("CONV010", "NULL", "2024-01-15"), # String "NULL" not actual NULL
49]
50
51financial_schema = StructType([
52 StructField("loan_id", StringType()),
53 StructField("principal", IntegerType()),
54 StructField("annual_rate", DoubleType()),
55 StructField("days", IntegerType()),
56 StructField("date", StringType()),
57])
58
59problematic_schema = StructType([
60 StructField("record_id", StringType()),
61 StructField("value_str", StringType()),
62 StructField("date", StringType()),
63])
64
65
66# Create DataFrame in both sessions
67loans_data = spark.createDataFrame(financial_data,financial_schema)
68print(f"financial_data dataset: {loans_data.count()}")
69loans_data.show(truncate=False)
70
71
72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
73print(f"problematic_data dataset: {problems_data.count()}")
74problems_data.show(truncate=False)
75
76
77# Register DataFrames as temporary views for SQL access
78loans_data.createOrReplaceTempView("loans_data")
79problems_data.createOrReplaceTempView("problems_data")
80
81
82print("""
83########################################
84##### 1. Integer Overflow Behavior #####
85########################################
86""")
87
88
89overflow = spark.sql("""
90 SELECT
91 loan_id,
92 principal,
93 principal + 1000000 as principal_plus_1m,
94 CASE
95 WHEN principal + 1000000 < 0 THEN 'OVERFLOW DETECTED'
96 ELSE 'OK'
97 END as overflow_check
98 FROM loans_data
99 WHERE loan_id IN ('LOAN004', 'LOAN010')
100""")
101
102# LOAN004 (at INT max) wrapped to negative value
103overflow.show(truncate=False)
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113
114cast = spark.sql("""
115 SELECT
116 record_id,
117 value_str,
118 CAST(value_str AS INT) as value_int,
119 CASE
120 WHEN CAST(value_str AS INT) IS NULL THEN 'CAST FAILED'
121 ELSE 'CAST OK'
122 END as cast_status
123 FROM problems_data
124 ORDER BY record_id
125""")
126
127# Invalid values became NULL without any error indication
128cast.show(truncate=False)
129
130
131spark.stop()
Résultat : Mode par défaut
1##########################
2##### Build Datasets #####
3##########################
4
5financial_data dataset: 10
6+-------+----------+-----------+----+----------+
7|loan_id|principal |annual_rate|days|date |
8+-------+----------+-----------+----+----------+
9|LOAN001|10000 |5.25 |365 |2024-01-15|
10|LOAN002|50000 |4.75 |365 |2024-01-15|
11|LOAN003|1500 |12.5 |365 |2024-01-15|
12|LOAN004|2147483647|3.0 |365 |2024-01-15|
13|LOAN005|25000 |6.0 |365 |2024-01-15|
14|LOAN006|8000 |0.0 |365 |2024-01-15|
15|LOAN007|100000 |7.25 |365 |2024-01-15|
16|LOAN008|500 |15.0 |365 |2024-01-15|
17|LOAN009|75000 |4.25 |365 |2024-01-15|
18|LOAN010|1000000 |3.5 |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date |
24+---------+---------+----------+
25|CONV001 |12345 |2024-01-15|
26|CONV002 |67890 |2024-01-15|
27|CONV003 |abc123 |2024-01-15|
28|CONV004 |99999 |2024-01-15|
29|CONV005 |12.34 |2024-01-15|
30|CONV006 | |2024-01-15|
31|CONV007 |55555 |2024-01-15|
32|CONV008 |1e5 |2024-01-15|
33|CONV009 |42 |2024-01-15|
34|CONV010 |NULL |2024-01-15|
35+---------+---------+----------+
36
37
38##############################################################
39##### 1. Calculate daily interest using integer division #####
40##############################################################
41
42+-------+----------+-----------+------------------+-----------+
43|loan_id|principal |annual_rate|daily_interest |result_type|
44+-------+----------+-----------+------------------+-----------+
45|LOAN001|10000 |5.25 |1.4383561643835616|double |
46|LOAN002|50000 |4.75 |6.506849315068493 |double |
47|LOAN003|1500 |12.5 |0.5136986301369864|double |
48|LOAN004|2147483647|3.0 |176505.5052328767 |double |
49|LOAN005|25000 |6.0 |4.109589041095891 |double |
50|LOAN006|8000 |0.0 |0.0 |double |
51|LOAN007|100000 |7.25 |19.863013698630137|double |
52|LOAN008|500 |15.0 |0.2054794520547945|double |
53|LOAN009|75000 |4.25 |8.732876712328768 |double |
54|LOAN010|1000000 |3.5 |95.89041095890411 |double |
55+-------+----------+-----------+------------------+-----------+
56
57
58########################################
59##### 2. Integer Overflow Behavior #####
60########################################
61
62+-------+----------+-----------------+-----------------+
63|loan_id|principal |principal_plus_1m|overflow_check |
64+-------+----------+-----------------+-----------------+
65|LOAN004|2147483647|-2146483649 |OVERFLOW DETECTED|
66|LOAN010|1000000 |2000000 |OK |
67+-------+----------+-----------------+-----------------+
68
69
70####################################
71##### 3. Type Casting Behavior #####
72####################################
73
74+---------+---------+---------+-----------+
75|record_id|value_str|value_int|cast_status|
76+---------+---------+---------+-----------+
77|CONV001 |12345 |12345 |CAST OK |
78|CONV002 |67890 |67890 |CAST OK |
79|CONV003 |abc123 |NULL |CAST FAILED|
80|CONV004 |99999 |99999 |CAST OK |
81|CONV005 |12.34 |12 |CAST OK |
82|CONV006 | |NULL |CAST FAILED|
83|CONV007 |55555 |55555 |CAST OK |
84|CONV008 |1e5 |NULL |CAST FAILED|
85|CONV009 |42 |42 |CAST OK |
86|CONV010 |NULL |NULL |CAST FAILED|
87+---------+---------+---------+-----------+
Spark : Mode ANSI
1# PySpark Example - ANSI SQL Type Safety and Arithmetic
2from pyspark.sql import SparkSession
3from pyspark.sql.types import IntegerType, DoubleType, StringType, StructField, StructType
4
5URL_MASTER = "spark://spark-master:7077"
6
7
8spark = SparkSession.builder \
9 .appName("SPARK_ANSISQLMode") \
10 .config("spark.sql.ansi.enabled", "true") \
11 .master(URL_MASTER) \
12 .getOrCreate()
13
14
15print("""
16##########################
17##### Build Datasets #####
18##########################
19""")
20
21# Source system 1: Financial dataset
22# Demonstrates real-world scenarios where ANSI mode prevents errors
23financial_data = [
24 ("LOAN001", 10000, 5.25, 365, "2024-01-15"), # Standard loan
25 ("LOAN002", 50000, 4.75, 365, "2024-01-15"), # Large principal
26 ("LOAN003", 1500, 12.50, 365, "2024-01-15"), # High interest rate
27 ("LOAN004", 2147483647, 3.00, 365, "2024-01-15"), # Near integer max (overflow risk)
28 ("LOAN005", 25000, 6.00, 365, "2024-01-15"), # Medium loan
29 ("LOAN006", 8000, 0.00, 365, "2024-01-15"), # Zero interest (division edge case)
30 ("LOAN007", 100000, 7.25, 365, "2024-01-15"), # Large loan
31 ("LOAN008", 500, 15.00, 365, "2024-01-15"), # Small principal, high rate
32 ("LOAN009", 75000, 4.25, 365, "2024-01-15"), # Standard
33 ("LOAN010", 1000000, 3.50, 365, "2024-01-15"), # Very large principal (overflow in calculations)
34]
35
36# Source system 2: Problematic Financial dataset
37# Test dataset with intentionally problematic values for type conversion
38problematic_data = [
39 ("CONV001", "12345", "2024-01-15"), # Valid integer string
40 ("CONV002", "67890", "2024-01-15"), # Valid integer string
41 ("CONV003", "abc123", "2024-01-15"), # Invalid integer (contains letters)
42 ("CONV004", "99999", "2024-01-15"), # Valid integer
43 ("CONV005", "12.34", "2024-01-15"), # Decimal in integer field
44 ("CONV006", "", "2024-01-15"), # Empty string
45 ("CONV007", "55555", "2024-01-15"), # Valid integer
46 ("CONV008", "1e5", "2024-01-15"), # Scientific notation
47 ("CONV009", "42", "2024-01-15"), # Valid integer
48 ("CONV010", "NULL", "2024-01-15"), # String "NULL" not actual NULL
49]
50
51financial_schema = StructType([
52 StructField("loan_id", StringType()),
53 StructField("principal", IntegerType()),
54 StructField("annual_rate", DoubleType()),
55 StructField("days", IntegerType()),
56 StructField("date", StringType()),
57])
58
59problematic_schema = StructType([
60 StructField("record_id", StringType()),
61 StructField("value_str", StringType()),
62 StructField("date", StringType()),
63])
64
65
66# Create DataFrame in both sessions
67loans_data = spark.createDataFrame(financial_data,financial_schema)
68print(f"financial_data dataset: {loans_data.count()}")
69loans_data.show(truncate=False)
70
71
72problems_data = spark.createDataFrame(problematic_data,problematic_schema)
73print(f"problematic_data dataset: {problems_data.count()}")
74problems_data.show(truncate=False)
75
76
77# Register DataFrames as temporary views for SQL access
78loans_data.createOrReplaceTempView("loans_data")
79problems_data.createOrReplaceTempView("problems_data")
80
81
82print("""
83########################################
84##### 1. Integer Overflow Behavior #####
85########################################
86""")
87
88
89try:
90 overflow = spark.sql("""
91 SELECT
92 loan_id,
93 principal,
94 principal + 1000000 as principal_plus_1m
95 FROM loans_data
96 WHERE loan_id IN ('LOAN004', 'LOAN010')
97 """)
98 overflow.show(truncate=False)
99 print("Query completed (no overflow detected)")
100except Exception as e:
101 print(f"Exception Caught: {type(e).__name__}")
102 print(f"Message: {str(e)[:200]}")
103 print("\nThis is EXPECTED in ANSI mode - overflow is an error, not silent corruption")
104
105
106
107print("""
108####################################
109##### 2. Type Casting Behavior #####
110####################################
111""")
112
113try:
114 cast = spark.sql("""
115 SELECT
116 record_id,
117 value_str,
118 CAST(value_str AS INT) as value_int
119 FROM problems_data
120 ORDER BY record_id
121 """)
122 cast.show(truncate=False)
123except Exception as e:
124 print(f"Exception Caught: {type(e).__name__}")
125 print(f"Failed on record: {str(e)[str(e).find('CONV'):str(e).find('CONV')+10] if 'CONV' in str(e) else 'unknown'}")
126 print("\nThis is EXPECTED in ANSI mode - forces explicit error handling")
127
128
129print("\n##### with TRY_CAST: Safe Error Handling #####")
130try_cast = spark.sql("""
131 SELECT
132 record_id,
133 value_str,
134 TRY_CAST(value_str AS INT) as value_int,
135 CASE
136 WHEN TRY_CAST(value_str AS INT) IS NULL THEN 'CONVERSION_ERROR'
137 ELSE 'OK'
138 END as validation_status
139 FROM problems_data
140 ORDER BY record_id
141""")
142# TRY_CAST provides NULL on error but explicitly marks rows as problematic
143try_cast.show(truncate=False)
144
145
146spark.stop()
Résultat : Mode ANSI
1##########################
2##### Build Datasets #####
3##########################
4
5financial_data dataset: 10
6+-------+----------+-----------+----+----------+
7|loan_id|principal |annual_rate|days|date |
8+-------+----------+-----------+----+----------+
9|LOAN001|10000 |5.25 |365 |2024-01-15|
10|LOAN002|50000 |4.75 |365 |2024-01-15|
11|LOAN003|1500 |12.5 |365 |2024-01-15|
12|LOAN004|2147483647|3.0 |365 |2024-01-15|
13|LOAN005|25000 |6.0 |365 |2024-01-15|
14|LOAN006|8000 |0.0 |365 |2024-01-15|
15|LOAN007|100000 |7.25 |365 |2024-01-15|
16|LOAN008|500 |15.0 |365 |2024-01-15|
17|LOAN009|75000 |4.25 |365 |2024-01-15|
18|LOAN010|1000000 |3.5 |365 |2024-01-15|
19+-------+----------+-----------+----+----------+
20
21problematic_data dataset: 10
22+---------+---------+----------+
23|record_id|value_str|date |
24+---------+---------+----------+
25|CONV001 |12345 |2024-01-15|
26|CONV002 |67890 |2024-01-15|
27|CONV003 |abc123 |2024-01-15|
28|CONV004 |99999 |2024-01-15|
29|CONV005 |12.34 |2024-01-15|
30|CONV006 | |2024-01-15|
31|CONV007 |55555 |2024-01-15|
32|CONV008 |1e5 |2024-01-15|
33|CONV009 |42 |2024-01-15|
34|CONV010 |NULL |2024-01-15|
35+---------+---------+----------+
36
37
38########################################
39##### 1. Integer Overflow Behavior #####
40########################################
41
42== SQL (line 5, position 13) ==
43 principal + 1000000 as principal_plus_1m
44 ^^^^^^^^^^^^^^^^^^^
45
46 at org.apache.spark.sql.errors.ExecutionErrors.arithmeticOverflowError(ExecutionErrors.scala:132)
47 ...
48 at java.base/java.lang.Thread.run(Thread.java:1583)
49
50
51Exception Caught: ArithmeticException
52Message: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22003
53== SQL (l
54
55This is EXPECTED in ANSI mode - overflow is an error, not silent corruption
56
57####################################
58##### 2. Type Casting Behavior #####
59####################################
60
61== SQL (line 5, position 13) ==
62 CAST(value_str AS INT) as value_int
63 ^^^^^^^^^^^^^^^^^^^^^^
64
65 at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
66 ...
67 at java.base/java.lang.Thread.run(Thread.java:1583)
68
69Exception Caught: NumberFormatException
70Failed on record: unknown
71
72This is EXPECTED in ANSI mode - forces explicit error handling
73
74##### with TRY_CAST: Safe Error Handling #####
75
76+---------+---------+---------+-----------------+
77|record_id|value_str|value_int|validation_status|
78+---------+---------+---------+-----------------+
79|CONV001 |12345 |12345 |OK |
80|CONV002 |67890 |67890 |OK |
81|CONV003 |abc123 |NULL |CONVERSION_ERROR |
82|CONV004 |99999 |99999 |OK |
83|CONV005 |12.34 |NULL |CONVERSION_ERROR |
84|CONV006 | |NULL |CONVERSION_ERROR |
85|CONV007 |55555 |55555 |OK |
86|CONV008 |1e5 |NULL |CONVERSION_ERROR |
87|CONV009 |42 |42 |OK |
88|CONV010 |NULL |NULL |CONVERSION_ERROR |
89+---------+---------+---------+-----------------+
Variables SQL de Session
Introduction
Les variables SQL de session permettent le paramétrage dynamique des requêtes sans concaténation de chaînes de caractères ni de fichiers de configuration externes. (Les variables sont limitées à la session et typées)
Détail
Avant Spark 4.0, le paramétrage des requêtes SQL nécessitait trois approches problématiques :
- Interpolation des chaînes de caractères dans le code applicatif
- Risque : injection SQL si
countryprovient d’une entrée utilisateur. Nécessite la recompilation du plan de requête pour chaque valeur de paramètre. - Exemple :
- Risque : injection SQL si
1country = "US"
2df = spark.sql(f"SELECT * FROM orders WHERE country = '{country}'")
- Propriétés de configuration Spark
- Syntaxe verbeuse. Limité aux types
String. Pollution de l’espace de noms de configuration. - Exemple :
- Syntaxe verbeuse. Limité aux types
1spark.conf.set("my.param.country", "US")
2spark.sql("SELECT * FROM orders WHERE country = '${spark.my.param.country}'")
- Fichiers de configuration externes
- Ajoute une dépendance aux systèmes externes. Possible dérive de configuration entre les environnements.
- Exemple :
1params = yaml.load("config.yaml")
2spark.sql(f"SELECT * FROM orders WHERE country = '{params['country']}'")
Spark 4.0 introduit les variables SQL qui résolvent ces problèmes. Les variables sont déclarées avec des types explicites et limitées à la session :
1DECLARE min_amount DECIMAL(10,2) DEFAULT 100.0;
2SELECT * FROM orders WHERE amount >= min_amount;
Les deux syntaxes supportées :
- Instruction
DECLARE(Norme SQL)- Déclaration de type explicite. Requis en mode ANSI SQL.
- Exemple :
1DECLARE variable_name TYPE [DEFAULT value];
- Instruction
SET(Rétrocompatibilité)- Type déduit de la valeur. Compatible avec la syntaxe des propriétés de session Spark 3.x. Les variables sont typées de manière sûre une fois définies.
- Exemple :
1SET variable_name = value;
La référence est résolue lors de la compilation de la requête, pas à l’exécution. Cela permet à l’optimiseur Catalyst d’utiliser les valeurs des variables pour le pushdown de prédicats et l’élagage de partitions (partition pruning).
Le système de variables maintient une table de symboles au niveau de la session. Chaque variable possède :
- Nom : Identifiant utilisé dans les requêtes
- Type : Type de données scalaire (Int, String, Decimal, Boolean, Date, Timestamp)
- Valeur : Valeur actuelle (mutable avec SET ultérieurs)
- Portée : Session uniquement (non visible par les autres sessions)
Les variables s’intègrent à la planification des requêtes. Lorsque Catalyst analyse une requête avec variable_name, il :
- Recherche la variable dans la table de symboles de session
- Remplace la référence par la valeur littérale
- Procède à l’optimisation en utilisant la valeur concrète
Les performances d’exécution sont identiques aux littéraux codés en dur.
Avantages
- Paramétrage
Type-Safe: Les variables ont des types explicites vérifiés à la déclaration. L’assignation d’un mauvais type produit une erreur de compilation. Cela prévient les erreurs de type à l’exécution provenant de paramètres mal configurés. - Prévention de l’injection SQL : Les références de variables sont résolues lors de la compilation de la requête, pas par substitution de chaînes de caractères à l’exécution. Les entrées malveillantes dans les valeurs de variables ne peuvent pas altérer la structure de la requête.
- Configuration multi-environnement simplifiée : Le même script SQL s’exécute en dev/rec/prod avec différentes valeurs de variables. Aucune modification de code entre les environnements. Les variables peuvent être définies à partir d’arguments en ligne de commande ou de variables d’environnement au démarrage de la session.
- Paramétrage pris en compte par l’optimiseur Catalyst : L’optimiseur Catalyst voit les valeurs des variables lors de la planification. Cela permet l’élagage de partitions (
partition pruning) et lepushdownde prédicats basés sur les valeurs des variables.
Limitations
- Portée limitée à la session : Les variables ne persistent pas après la clôture de la session. Pas de variables globales partagées entre sessions concurrentes. Chaque client doit redéclarer les variables. Inadapté pour la configuration au niveau du cluster nécessitant une cohérence entre les jobs.
- Pas de types complexes : Les variables ne supportent que les types scalaires. Impossible de déclarer des variables
ARRAY,STRUCTouMAP. Le passage de valeurs de paramètres complexes nécessite toujours des propriétés de configuration ou un stockage externe. - Pas d’API de persistance des variables : Les variables ne peuvent pas être sauvegardées dans le metastore ou un stockage externe. Aucun mécanisme intégré pour charger des variables depuis des fichiers YAML/JSON. Les équipes doivent construire une gestion de variables personnalisée au-dessus des variables SQL de base.
Cas d’usages (concret)
- Cas d’usage 1 : Rapports paramétrés
- Définir une plage de dates en début de script et l’utiliser dans plusieurs requêtes sans répétition.
- Cas d’usage 2 : Traitement de données spécifique à l’environnement
- Un pipeline ETL s’exécute dans les environnements dev, rec et prod. Chaque environnement lit depuis différents préfixes de base de données (
dev_db.orders,prod_db.orders). Les variablessource_databaseettarget_databasesont définies à partir de variables d’environnement au démarrage du job. Le même script SQL fonctionne dans tous les environnements sans modification.
- Un pipeline ETL s’exécute dans les environnements dev, rec et prod. Chaque environnement lit depuis différents préfixes de base de données (
Codes
Exemple : Fonctionnement des variables SQL
Spark
1# PySpark Example : Demonstrates how variables works
2from pyspark.sql import SparkSession
3from pyspark.sql.types import DoubleType, StringType, StructField, StructType
4from pyspark.sql.functions import to_timestamp
5
6URL_MASTER = "spark://spark-master:7077"
7
8
9spark = SparkSession.builder \
10 .appName("SPARK_APPVARIABLE") \
11 .config("spark.sql.ansi.enabled", "true") \
12 .master(URL_MASTER) \
13 .getOrCreate()
14
15
16print("""
17##########################
18##### Build Datasets #####
19##########################
20""")
21
22# Source system 1: Sensor dataset
23# Demonstrates real-world scenarios where ANSI mode prevents errors
24sensor_readings = [
25 ("SENSOR_01", "2024-01-15 10:00:00", 22.5, 45.2, "warehouse_A", "normal"),
26 ("SENSOR_02", "2024-01-15 10:05:00", 35.8, 78.5, "warehouse_A", "normal"), # High temp
27 ("SENSOR_03", "2024-01-15 10:10:00", 18.2, 42.1, "warehouse_B", "normal"),
28 ("SENSOR_01", "2024-01-15 10:15:00", 23.1, 46.8, "warehouse_A", "normal"),
29 ("SENSOR_02", "2024-01-15 10:20:00", 38.5, 82.3, "warehouse_A", "alert"), # Anomaly
30 ("SENSOR_04", "2024-01-15 10:25:00", 15.2, 38.5, "warehouse_C", "normal"), # Low temp
31 ("SENSOR_03", "2024-01-15 10:30:00", 19.1, 43.7, "warehouse_B", "normal"),
32 ("SENSOR_01", "2024-01-15 10:35:00", 24.2, 47.5, "warehouse_A", "normal"),
33 ("SENSOR_04", "2024-01-15 10:40:00", 12.8, 35.2, "warehouse_C", "alert"), # Anomaly
34 ("SENSOR_02", "2024-01-15 10:45:00", 32.1, 72.5, "warehouse_A", "normal"),
35]
36
37
38sensor_schema = StructType([
39 StructField("sensor_id", StringType()),
40 StructField("cur_timestamp", StringType()),
41 StructField("temperature", DoubleType()),
42 StructField("humidity", DoubleType()),
43 StructField("location", StringType()),
44 StructField("status", StringType()),
45])
46
47
48# Create DataFrame
49sensors_df = spark.createDataFrame(sensor_readings,sensor_schema).withColumn("cur_timestamp",to_timestamp("cur_timestamp", format="yyyy-MM-dd HH:mm:ss"))
50print(f"sensor dataset: {sensors_df.count()}")
51sensors_df.show(truncate=False)
52
53# Register DataFrames as temporary views for SQL access
54sensors_df.createOrReplaceTempView("sensors_data")
55
56
57
58print("""
59########################################
60##### 1. Time-Based Variable Usage #####
61########################################
62""")
63
64spark.sql("DECLARE VARIABLE analysis_start TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:00:00'")
65spark.sql("DECLARE VARIABLE analysis_end TIMESTAMP DEFAULT TIMESTAMP'2024-01-15 10:15:00'")
66
67query_time_based = """
68 SELECT
69 sensor_id,
70 location,
71 COUNT(*) as readings_in_window,
72 AVG(temperature) as avg_temp,
73 MAX(temperature) - MIN(temperature) as temp_range,
74 analysis_start as window_start,
75 analysis_end as window_end
76 FROM sensors_data
77 WHERE cur_timestamp BETWEEN analysis_start AND analysis_end
78 GROUP BY sensor_id, location
79 ORDER BY sensor_id, location;
80"""
81
82print("EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00")
83time_based = spark.sql(query_time_based)
84print(f"Result records: {time_based.count()}")
85time_based.show(truncate=False)
86
87
88print("EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00")
89spark.sql("SET VARIABLE analysis_end = TIMESTAMP'2024-01-15 10:30:00'")
90time_based = spark.sql(query_time_based)
91print(f"Result records: {time_based.count()}")
92time_based.show(truncate=False)
93
94
95
96print("""
97################################################
98##### 2. Variables vs String Interpolation #####
99################################################
100""")
101spark.sql("DECLARE VARIABLE target_location STRING DEFAULT 'warehouse_A'")
102
103print("UNSAFE: String interpolation (vulnerable to SQL injection)")
104def get_user_input() -> str :
105 return "warehouse_A' OR '1'='1"
106
107target_location = get_user_input()
108unsafe_interpolation = spark.sql(f"SELECT * FROM sensors_data WHERE location = '{target_location}'")
109print(f"Result records: {unsafe_interpolation.count()}")
110unsafe_interpolation.show(truncate=False)
111
112
113print("SAFE: Variable substitution (injection-proof)")
114#Malicious input becomes a string value, not SQL code
115spark.sql("SET VARIABLE target_location = \"warehouse_A' OR '1'='1\"")
116safe_interpolation = spark.sql("SELECT * FROM sensors_data WHERE location = target_location")
117print(f"Result records: {safe_interpolation.count()}")
118safe_interpolation.show(truncate=False)
119# Variable value is treated as string literal "US' OR '1'='1"
120
121
122
123spark.stop()
Résultat
1##########################
2##### Build Datasets #####
3##########################
4
5sensor dataset: 10
6+---------+-------------------+-----------+--------+-----------+------+
7|sensor_id|cur_timestamp |temperature|humidity|location |status|
8+---------+-------------------+-----------+--------+-----------+------+
9|SENSOR_01|2024-01-15 10:00:00|22.5 |45.2 |warehouse_A|normal|
10|SENSOR_02|2024-01-15 10:05:00|35.8 |78.5 |warehouse_A|normal|
11|SENSOR_03|2024-01-15 10:10:00|18.2 |42.1 |warehouse_B|normal|
12|SENSOR_01|2024-01-15 10:15:00|23.1 |46.8 |warehouse_A|normal|
13|SENSOR_02|2024-01-15 10:20:00|38.5 |82.3 |warehouse_A|alert |
14|SENSOR_04|2024-01-15 10:25:00|15.2 |38.5 |warehouse_C|normal|
15|SENSOR_03|2024-01-15 10:30:00|19.1 |43.7 |warehouse_B|normal|
16|SENSOR_01|2024-01-15 10:35:00|24.2 |47.5 |warehouse_A|normal|
17|SENSOR_04|2024-01-15 10:40:00|12.8 |35.2 |warehouse_C|alert |
18|SENSOR_02|2024-01-15 10:45:00|32.1 |72.5 |warehouse_A|normal|
19+---------+-------------------+-----------+--------+-----------+------+
20
21
22########################################
23##### 1. Time-Based Variable Usage #####
24########################################
25
26EXECUTION n°1 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:15:00
27Result records: 3
28+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
29|sensor_id|location |readings_in_window|avg_temp|temp_range |window_start |window_end |
30+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
31|SENSOR_01|warehouse_A|2 |22.8 |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:15:00|
32|SENSOR_02|warehouse_A|1 |35.8 |0.0 |2024-01-15 10:00:00|2024-01-15 10:15:00|
33|SENSOR_03|warehouse_B|1 |18.2 |0.0 |2024-01-15 10:00:00|2024-01-15 10:15:00|
34+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
35
36EXECUTION n°2 : filter between 2024-01-15 10:00:00 and 2024-01-15 10:30:00
37Result records: 4
38+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
39|sensor_id|location |readings_in_window|avg_temp|temp_range |window_start |window_end |
40+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
41|SENSOR_01|warehouse_A|2 |22.8 |0.6000000000000014|2024-01-15 10:00:00|2024-01-15 10:30:00|
42|SENSOR_02|warehouse_A|2 |37.15 |2.700000000000003 |2024-01-15 10:00:00|2024-01-15 10:30:00|
43|SENSOR_03|warehouse_B|2 |18.65 |0.9000000000000021|2024-01-15 10:00:00|2024-01-15 10:30:00|
44|SENSOR_04|warehouse_C|1 |15.2 |0.0 |2024-01-15 10:00:00|2024-01-15 10:30:00|
45+---------+-----------+------------------+--------+------------------+-------------------+-------------------+
46
47
48################################################
49##### 2. Variables vs String Interpolation #####
50################################################
51
52UNSAFE: String interpolation (vulnerable to SQL injection)
53Result records: 10
54+---------+-------------------+-----------+--------+-----------+------+
55|sensor_id|cur_timestamp |temperature|humidity|location |status|
56+---------+-------------------+-----------+--------+-----------+------+
57|SENSOR_01|2024-01-15 10:00:00|22.5 |45.2 |warehouse_A|normal|
58|SENSOR_02|2024-01-15 10:05:00|35.8 |78.5 |warehouse_A|normal|
59|SENSOR_03|2024-01-15 10:10:00|18.2 |42.1 |warehouse_B|normal|
60|SENSOR_01|2024-01-15 10:15:00|23.1 |46.8 |warehouse_A|normal|
61|SENSOR_02|2024-01-15 10:20:00|38.5 |82.3 |warehouse_A|alert |
62|SENSOR_04|2024-01-15 10:25:00|15.2 |38.5 |warehouse_C|normal|
63|SENSOR_03|2024-01-15 10:30:00|19.1 |43.7 |warehouse_B|normal|
64|SENSOR_01|2024-01-15 10:35:00|24.2 |47.5 |warehouse_A|normal|
65|SENSOR_04|2024-01-15 10:40:00|12.8 |35.2 |warehouse_C|alert |
66|SENSOR_02|2024-01-15 10:45:00|32.1 |72.5 |warehouse_A|normal|
67+---------+-------------------+-----------+--------+-----------+------+
68
69SAFE: Variable substitution (injection-proof)
70Result records: 0
71+---------+-------------+-----------+--------+--------+------+
72|sensor_id|cur_timestamp|temperature|humidity|location|status|
73+---------+-------------+-----------+--------+--------+------+
74+---------+-------------+-----------+--------+--------+------+
Scripts SQL Multi-Instructions
Introduction
Les scripts SQL multi-instructions permettent d’exécuter plusieurs commandes SQL à partir d’un seul ensemble d’instructions (script) avec un flux de contrôle procédural. Cela permet de prendre en charge les déclarations de variables, la logique conditionnelle (IF/ELSE), les boucles (WHILE, FOR), la gestion des exceptions (blocs BEGIN/END) et le séquencement des instructions.
Détail
Avant Spark 4.0, l’exécution de plusieurs instructions SQL nécessitait soit :
- Orchestration au niveau applicatif : Chaque instruction est un appel API séparé. Aucune sémantique transactionnelle. La gestion des échecs nécessite des blocs try-catch dans le code applicatif.
1spark.sql("CREATE TABLE staging AS SELECT * FROM source")
2spark.sql("DELETE FROM staging WHERE invalid = true")
3spark.sql("INSERT INTO target SELECT * FROM staging")
- Outils de workflow externes : Ajoute de la complexité opérationnelle. Nécessite une infrastructure d’orchestration séparée. Une logique SQL simple devient une configuration Python/YAML.
- Invocations multiples de Spark SQL : Aucun partage d’état entre les invocations. Impossible de passer des variables. Mauvaises performances dues à l’initialisation répétée de la session.
Spark 4.0 introduit les scripts multi-instructions qui s’exécutent comme une seule unité. Les scripts contiennent plusieurs instructions SQL séparées par des points-virgules, avec un flux de contrôle procédural.
L’analyseur de script décompose l’entrée en instructions individuelles et construit un plan d’exécution. Chaque instruction s’exécute séquentiellement. Les variables déclarées dans les premières instructions sont disponibles pour les instructions ultérieures. Les instructions de flux de contrôle (IF/ELSE, WHILE) modifient l’ordre d’exécution en fonction des conditions d’exécution.
- Les scripts s’exécutent sur le driver Spark.
- Le flux de contrôle est évalué sur le driver, et non distribué aux exécuteurs.
- Chaque instruction passe par l’optimisation et l’exécution Catalyst normales.
- Le modèle d’exécution est séquentiel, non transactionnel : Si l’instruction 3 échoue, les instructions 1 et 2 ont déjà validé leurs modifications. Il n’y a pas de rollback automatique. Cela diffère des procédures stockées de bases de données traditionnelles qui fournissent souvent une sémantique transactionnelle.
Constructions linguistiques clés :
- Déclaration et Affectation de Variables
- Exécution Conditionnelle : IF, CASE
- Exécution Itérative : WHILE, FOR, REPEAT
- Gestion des Exceptions
Avantages
- Dépendances externes réduites : Les pipelines ETL simples ne nécessitent plus des outils d’orchestration spécifiques. Les scripts SQL peuvent implémenter la logique de validation, transformation et chargement dans un seul fichier. Cela peut réduire la complexité opérationnelle et les coûts d’infrastructure.
- Logique procédurale en SQL : Les constructions IF/ELSE et WHILE permettent l’implémentation de la logique métier directement en SQL. Réduit les changements de contexte entre SQL et Python/Scala. Les équipes ayant de solides compétences SQL peuvent construire des pipelines complets sans code applicatif.
- Contrôle de version et portabilité : Les scripts sont des fichiers texte brut qui résident dans le contrôle de version. Les modifications sont suivies via Git. Les scripts sont portables entre les déploiements Spark (local, cluster, Connect). Plus facile à réviser et auditer que du code applicatif réparti sur plusieurs fichiers.
- Exécution côté serveur : Avec Spark Connect, le script entier est envoyé au serveur et s’exécute à distance. Réduit les allers-retours réseau par rapport à la soumission instruction par instruction. Le client ne reçoit que les résultats finaux, pas l’état intermédiaire.
Limitations
- Pas de rollback transactionnel : Les instructions échouées n’annulent pas automatiquement les instructions précédentes. Si le script crée une table de staging, charge des données, puis échoue lors de la validation, la table de staging reste. Un nettoyage manuel est requis. Ceci est fondamentalement différent des bases de données ACID.
- Capacités de correction des erreurs limitées : Pas de correction des erreurs interactive. Pas de points d’arrêt. Pas d’inspection de variables pendant l’exécution. La correction d’erreur nécessite d’ajouter des instructions SELECT pour afficher les valeurs intermédiaires. Les messages d’erreur peuvent ne pas indiquer clairement quelle instruction a échoué dans les scripts complexes.
- Seuil de complexité : La logique complexe est mieux implémentée dans le code applicatif avec des frameworks de tests appropriés. Les scripts doivent rester des pipelines de données linéaires, et non des applications complètes.
- Limitation dans la gestion des erreurs : Impossible de capturer des types d’erreur spécifiques avec la granularité du try-catch en Python/Scala. Impossible de continuer l’exécution après certaines erreurs. Le modèle d’échec de script est tout-ou-rien pour de nombreux types d’erreur.
Cas d’usages (concret)
- Cas d’usage 1 : Chargement Incrémental
- Une boucle qui traite 24 partitions horaires une par une.
- Un script qui ajoute de nouvelles partitions pour les 30 prochains jours, en vérifiant si elles existent déjà.
- Cas d’usage 2 : Déploiement Spécifique à l’Environnement
- Les scripts de déploiement doivent créer différentes structures de tables en dev et en prod. Le script utilise IF/ELSE basé sur une variable d’environnement pour exécuter les instructions
CREATE TABLEappropriées. Un seul script remplace des fichiers SQL séparés pour dev et prod.
- Les scripts de déploiement doivent créer différentes structures de tables en dev et en prod. Le script utilise IF/ELSE basé sur une variable d’environnement pour exécuter les instructions
Codes
Exemple : Exécution d’un script SQL multi-instructions avec Spark Connect
Spark
Contenu du script Spark Connect :
1from pyspark.sql import SparkSession
2
3REMOTE_URL = "sc://localhost:15002"
4SCRIPT_SQL_NAME = "example_SQL.sql"
5
6# Connexion to Spark Connect
7spark = SparkSession.builder \
8 .remote(REMOTE_URL) \
9 .appName("SPARK_SQLScript") \
10 .config("spark.sql.ansi.enabled", "true") \
11 .getOrCreate()
12
13print("Successfully connected to Spark!")
14print(f"Spark version: {spark.version}")
15
16# Script reading
17with open(SCRIPT_SQL_NAME, "r", encoding="utf-8") as f:
18 sql_script = f.read()
19
20# Script SQL execution with only one spark call
21result = spark.sql(sql_script)
22result.show()
Contenu du script SQL example_SQL.sql :
1-- Spark 4.x - SQL Scripting
2
3BEGIN
4 -- =============================================
5 -- STEP n°1 : VARIABLES
6 -- =============================================
7 DECLARE batch_date DATE DEFAULT CURRENT_DATE();
8
9 -- Business thresholds
10 DECLARE min_valid_amount DECIMAL(10,2) DEFAULT 10.00;
11 DECLARE standard_threshold DECIMAL(10,2) DEFAULT 100.00;
12 DECLARE premium_threshold DECIMAL(10,2) DEFAULT 1000.00;
13
14 -- Metrics
15 DECLARE total_processed INT DEFAULT 0;
16 DECLARE total_invalid INT DEFAULT 0;
17 DECLARE total_revenue DECIMAL(15,2) DEFAULT 0.0;
18
19 -- =============================================
20 -- STEP n°2 : TABLES INITIALIZATION
21 -- =============================================
22
23 -- Staging table for raw data
24 DROP TABLE IF EXISTS raw_orders;
25 CREATE TABLE raw_orders (
26 order_id BIGINT,
27 customer_id BIGINT,
28 order_date DATE,
29 amount DECIMAL(10,2),
30 status STRING,
31 region STRING
32 ) USING parquet;
33
34 -- Table of validated orders
35 DROP TABLE IF EXISTS validated_orders;
36 CREATE TABLE validated_orders (
37 order_id BIGINT,
38 customer_id BIGINT,
39 order_date DATE,
40 amount DECIMAL(10,2),
41 region STRING,
42 order_category STRING,
43 loyalty_points DECIMAL(10,2),
44 processing_date DATE
45 ) USING parquet;
46
47 -- Quarantine table for invalid data
48 DROP TABLE IF EXISTS invalid_orders;
49 CREATE TABLE invalid_orders (
50 order_id BIGINT,
51 customer_id BIGINT,
52 amount DECIMAL(10,2),
53 error_reason STRING,
54 quarantine_timestamp TIMESTAMP
55 ) USING parquet;
56
57 -- =============================================
58 -- STEP n°3 : DATASETS
59 -- =============================================
60
61 INSERT INTO raw_orders VALUES
62 (1001, 501, DATE'2024-01-15', 150.50, 'completed', 'EMEA'),
63 (1002, 502, DATE'2024-01-15', 1250.00, 'completed', 'AMER'),
64 (1003, 503, DATE'2024-01-15', -50.00, 'completed', 'APAC'), -- Invalid: negative
65 (1004, 504, DATE'2024-01-15', 890.00, 'completed', 'EMEA'),
66 (2001, 505, DATE'2024-01-16', 2100.00, 'completed', 'AMER'),
67 (2002, 506, DATE'2024-01-16', NULL, 'completed', 'APAC'), -- Invalid: NULL amount
68 (2003, 507, DATE'2024-01-16', 450.00, 'completed', 'EMEA'),
69 (2004, 508, DATE'2024-01-16', 5.00, 'completed', 'AMER'), -- Invalid: below minimum
70 (3001, 509, DATE'2024-01-17', 750.00, 'completed', 'APAC'),
71 (3002, 510, DATE'2024-01-17', 1500.00, 'completed', 'EMEA'),
72 (3003, 511, DATE'2024-01-17', 220.00, 'completed', 'AMER');
73
74 -- =============================================
75 -- STEP n°4 : DATA VALIDATION
76 -- =============================================
77
78 -- Quarantining invalid records
79 INSERT INTO invalid_orders
80 SELECT
81 order_id,
82 customer_id,
83 amount,
84 CASE
85 WHEN amount IS NULL THEN 'NULL_AMOUNT'
86 WHEN amount < 0 THEN 'NEGATIVE_AMOUNT'
87 WHEN amount < min_valid_amount THEN 'BELOW_MINIMUM'
88 ELSE 'UNKNOWN_ERROR'
89 END AS error_reason,
90 CURRENT_TIMESTAMP() AS quarantine_timestamp
91 FROM raw_orders
92 WHERE status = 'completed'
93 AND (amount IS NULL
94 OR amount < 0
95 OR amount < min_valid_amount);
96
97 -- Counting invalid rows
98 SET total_invalid = (SELECT COUNT(*) FROM invalid_orders);
99
100 -- =============================================
101 -- STEP n°5 : VALID DATA TRANSFORMATION
102 -- =============================================
103
104 INSERT INTO validated_orders
105 SELECT
106 order_id,
107 customer_id,
108 order_date,
109 amount,
110 region,
111 -- CASE WHEN classification
112 CASE
113 WHEN amount >= premium_threshold THEN 'PREMIUM'
114 WHEN amount >= standard_threshold THEN 'STANDARD'
115 ELSE 'BASIC'
116 END AS order_category,
117 -- Calculation of loyalty points
118 CASE
119 WHEN amount >= premium_threshold THEN amount * 0.10
120 WHEN amount >= standard_threshold THEN amount * 0.05
121 ELSE amount * 0.02
122 END AS loyalty_points,
123 batch_date AS processing_date
124 FROM raw_orders
125 WHERE status = 'completed'
126 AND amount IS NOT NULL
127 AND amount >= min_valid_amount;
128
129 -- Counting total rows processed
130 SET total_processed = (SELECT COUNT(*) FROM validated_orders WHERE processing_date = batch_date);
131
132 -- Revenue calculation
133 SET total_revenue = (SELECT COALESCE(SUM(amount), 0) FROM validated_orders WHERE processing_date = batch_date);
134
135
136 -- =============================================
137 -- STEP n°6 : GENERATION OF FINAL REPORTS
138 -- =============================================
139 SELECT '========== EXECUTION - SUMMARY ==========' AS message;
140
141 -- Execution report
142 SELECT 'Batch Date' as `Metric`, CAST(batch_date AS STRING) as `Value`
143 UNION ALL SELECT 'Total Valid Orders', CAST(total_processed AS STRING)
144 UNION ALL SELECT 'Total Invalid Orders', CAST(total_invalid AS STRING)
145 UNION ALL SELECT 'Total Revenue', CAST(total_revenue AS STRING)
146 UNION ALL SELECT 'Invalid Ratio %', CAST(ROUND((total_invalid * 100.0) / GREATEST(total_processed + total_invalid, 1), 2) AS STRING);
147
148 -- =============================================
149 -- STEP n°7 : DROP TABLES
150 -- =============================================
151 DROP TABLE IF EXISTS raw_orders;
152 DROP TABLE IF EXISTS validated_orders;
153 DROP TABLE IF EXISTS invalid_orders;
154
155END;
Résultat
1Successfully connected to Spark!
2Spark version: 4.1.1
3+--------------------+----------+
4| Metric| Value|
5+--------------------+----------+
6| Batch Date|2026-03-10|
7| Total Valid Orders| 8|
8|Total Invalid Orders| 3|
9| Total Revenue| 7310.50|
10| Invalid Ratio %| 27.27|
11+--------------------+----------+
