datacompy.SparkCompare

class datacompy.SparkCompare(spark_session, base_df, compare_df, join_columns, column_mapping=None, cache_intermediates=False, known_differences=None, rel_tol=0, abs_tol=0, show_all_columns=False, match_rates=False)

Comparison class used to compare two Spark Dataframes.

Extends the Compare functionality to the wide world of Spark and out-of-memory data.

Parameters:
spark_session : pyspark.sql.SparkSession

A SparkSession to be used to execute Spark commands in the comparison.

base_df : pyspark.sql.DataFrame

The dataframe to serve as a basis for comparison. While you will ultimately get the same results comparing A to B as you will comparing B to A, by convention base_df should be the canonical, gold standard reference dataframe in the comparison.

compare_df : pyspark.sql.DataFrame

The dataframe to be compared against base_df.

join_columns : list

A list of columns comprising the join key(s) of the two dataframes. If the column names are the same in the two dataframes, the names of the columns can be given as strings. If the names differ, the join_columns list should include tuples of the form (base_column_name, compare_column_name).

column_mapping : list[tuple], optional

If columns to be compared have different names in the base and compare dataframes, a list should be provided in columns_mapping consisting of tuples of the form (base_column_name, compare_column_name) for each set of differently-named columns to be compared against each other.

cache_intermediates : bool, optional

Whether or not SparkCompare will cache intermediate dataframes (such as the deduplicated version of dataframes, or the joined comparison). This will take a large amount of cache, proportional to the size of your dataframes, but will significantly speed up performance, as multiple steps will not have to recompute transformations. False by default.

known_differences : list[dict], optional

A list of dictionaries that define transformations to apply to the compare dataframe to match values when there are known differences between base and compare. The dictionaries should contain:

  • name: A name that describes the transformation
  • types: The types that the transformation should be applied to.
    This prevents certain transformations from being applied to types that don’t make sense and would cause exceptions.
  • transformation: A Spark SQL statement to apply to the column
    in the compare dataset. The string “{input}” will be replaced by the variable in question.
abs_tol : float, optional

Absolute tolerance between two values.

rel_tol : float, optional

Relative tolerance between two values.

show_all_columns : bool, optional

If true, all columns will be shown in the report including columns with a 100% match rate.

match_rates : bool, optional

If true, match rates by column will be shown in the column summary.

Returns:
SparkCompare

Instance of a SparkCompare object, ready to do some comparin’. Note that if cache_intermediates=True, this instance will already have done some work deduping the input dataframes. If cache_intermediates=False, the instantiation of this object is lazy.

base_row_count

int: Get the count of rows in the de-duped base dataframe

columns_compared

list[str]: Get columns to be compared in both dataframes (all columns in both excluding the join key(s)

columns_in_both

set[str]: Get columns in both dataframes

columns_only_base

set[str]: Get columns that are unique to the base dataframe

columns_only_compare

set[str]: Get columns that are unique to the compare dataframe

common_row_count

int: Get the count of rows in common between base and compare dataframes

compare_row_count

int: Get the count of rows in the de-duped compare dataframe

report(file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)

Creates a comparison report and prints it to the file specified (stdout by default).

Parameters:
file : file, optional

A filehandle to write the report to. By default, this is sys.stdout, printing the report to stdout. You can also redirect this to an output file, as in the example.

Examples

>>> with open('my_report.txt', 'w') as report_file:
...     comparison.report(file=report_file)
rows_both_all

pyspark.sql.DataFrame: Returns all rows in both dataframes

rows_both_mismatch

pyspark.sql.DataFrame: Returns all rows in both dataframes that have mismatches

rows_only_base

pyspark.sql.DataFrame: Returns rows only in the base dataframe

rows_only_compare

pyspark.sql.DataFrame: Returns rows only in the compare dataframe