Spark Usage

Under Construction

Meanwhile, see the Readme “Spark Detail” section for a usage example and comments on SparkCompare. You may also want to checkout the datacompy.SparkCompare API documentation, which is pretty well-documented, if I do say so myself.

Known Differences

For cases when two dataframes are expected to differ, it can be helpful to cluster detected differences into three categories: matches, known differences, and true mismatches. Known differences can be specified through an optional parameter:

SparkCompare(spark, base_df, compare_df, join_columns=[...], column_mapping=[...],
    known_differences = [
        {
         'name':  "My Known Difference Name",
         'types': ['int', 'bigint'],
         'flags': ['nullcheck'],
         'transformation': "case when {input}=0 then null else {input} end"
        },
        ...
    ]
)

The ‘known_differences’ parameter is a list of Python dicts with the following fields:

Field Required? Description
name yes A user-readable title for this known difference
types yes A list of Spark data types on which this transformation can be applied
flags no Special flags used for computing known differences
transformation yes Spark SQL function to apply, where {input} is a cell in the comparison

Valid flags are:

Flag Description
nullcheck Must be set when the output of the transformation can be null

Transformations are applied to the compare side only. A known difference is found when transformation(compare.cell) equals base.cell. An example comparison is shown below.

import datetime
import datacompy
from pyspark.sql import Row

base_data = [
    Row(acct_id=10000001234, acct_sfx_num=0, clsd_reas_cd='*2', open_dt=datetime.date(2017, 5, 1), tbal_cd='0001'),
    Row(acct_id=10000001235, acct_sfx_num=0, clsd_reas_cd='V1', open_dt=datetime.date(2017, 5, 2), tbal_cd='0002'),
    Row(acct_id=10000001236, acct_sfx_num=0, clsd_reas_cd='V2', open_dt=datetime.date(2017, 5, 3), tbal_cd='0003'),
    Row(acct_id=10000001237, acct_sfx_num=0, clsd_reas_cd='*2', open_dt=datetime.date(2017, 5, 4), tbal_cd='0004'),
    Row(acct_id=10000001238, acct_sfx_num=0, clsd_reas_cd='*2', open_dt=datetime.date(2017, 5, 5), tbal_cd='0005')
]
base_df = spark.createDataFrame(base_data)

compare_data = [
    Row(ACCOUNT_IDENTIFIER=10000001234, SUFFIX_NUMBER=0, AM00_STATC_CLOSED=None, AM00_DATE_ACCOUNT_OPEN=2017121, AM0B_FC_TBAL=1.0),
    Row(ACCOUNT_IDENTIFIER=10000001235, SUFFIX_NUMBER=0, AM00_STATC_CLOSED='V1', AM00_DATE_ACCOUNT_OPEN=2017122, AM0B_FC_TBAL=2.0),
    Row(ACCOUNT_IDENTIFIER=10000001236, SUFFIX_NUMBER=0, AM00_STATC_CLOSED='V2', AM00_DATE_ACCOUNT_OPEN=2017123, AM0B_FC_TBAL=3.0),
    Row(ACCOUNT_IDENTIFIER=10000001237, SUFFIX_NUMBER=0, AM00_STATC_CLOSED='V3', AM00_DATE_ACCOUNT_OPEN=2017124, AM0B_FC_TBAL=4.0),
    Row(ACCOUNT_IDENTIFIER=10000001238, SUFFIX_NUMBER=0, AM00_STATC_CLOSED=None, AM00_DATE_ACCOUNT_OPEN=2017125, AM0B_FC_TBAL=5.0)
]
compare_df = spark.createDataFrame(compare_data)

comparison = datacompy.SparkCompare(spark, base_df, compare_df,
                    join_columns =   [('acct_id', 'ACCOUNT_IDENTIFIER'), ('acct_sfx_num', 'SUFFIX_NUMBER')],
                    column_mapping = [('clsd_reas_cd', 'AM00_STATC_CLOSED'),
                                      ('open_dt', 'AM00_DATE_ACCOUNT_OPEN'),
                                      ('tbal_cd', 'AM0B_FC_TBAL')],
                    known_differences= [
                        {'name': 'Left-padded, four-digit numeric code',
                         'types': ['tinyint', 'smallint', 'int', 'bigint', 'float', 'double', 'decimal'],
                         'transformation': "lpad(cast({input} AS bigint), 4, '0')"},
                        {'name': 'Null to *2',
                         'types': ['string'],
                         'transformation': "case when {input} is null then '*2' else {input} end"},
                        {'name': 'Julian date -> date',
                         'types': ['bigint'],
                         'transformation': "to_date(cast(unix_timestamp(cast({input} AS string), 'yyyyDDD') AS timestamp))"}
                    ])
comparison.report()

Corresponding output:

****** Column Summary ******
Number of columns in common with matching schemas: 3
Number of columns in common with schema differences: 2
Number of columns in base but not compare: 0
Number of columns in compare but not base: 0

****** Schema Differences ******
Base Column Name  Compare Column Name     Base Dtype     Compare Dtype
----------------  ----------------------  -------------  -------------
open_dt           AM00_DATE_ACCOUNT_OPEN  date           bigint
tbal_cd           AM0B_FC_TBAL            string         double

****** Row Summary ******
Number of rows in common: 5
Number of rows in base but not compare: 0
Number of rows in compare but not base: 0
Number of duplicate rows found in base: 0
Number of duplicate rows found in compare: 0

****** Row Comparison ******
Number of rows with some columns unequal: 5
Number of rows with all columns equal: 0

****** Column Comparison ******
Number of columns compared with unexpected differences in some values: 1
Number of columns compared with all values equal but known differences found: 2
Number of columns compared with all values completely equal: 0

****** Columns with Unequal Values ******
Base Column Name  Compare Column Name     Base Dtype     Compare Dtype  # Matches  # Known Diffs  # Mismatches
----------------  -------------------     -------------  -------------  ---------  -------------  ------------
clsd_reas_cd      AM00_STATC_CLOSED       string         string                 2              2             1
open_dt           AM00_DATE_ACCOUNT_OPEN  date           bigint                 0              5             0
tbal_cd           AM0B_FC_TBAL            string         double                 0              5             0