Module pandas_profiling.model.correlations
Correlations between variables.
Source code
"""Correlations between variables."""
import itertools
import warnings
from contextlib import suppress
from functools import partial
import pandas as pd
import numpy as np
from confuse import NotFoundError
from pandas.core.base import DataError
from scipy import stats
from pandas_profiling.config import config
from pandas_profiling.model.base import Variable
def cramers_corrected_stat(confusion_matrix, correction: bool) -> float:
"""Calculate the Cramer's V corrected stat for two variables.
Args:
confusion_matrix: Crosstab between two variables.
correction: Should the correction be applied?
Returns:
The Cramer's V corrected stat for the two variables.
"""
chi2 = stats.chi2_contingency(confusion_matrix, correction=correction)[0]
n = confusion_matrix.sum().sum()
phi2 = chi2 / n
r, k = confusion_matrix.shape
phi2corr = max(0.0, phi2 - ((k - 1.0) * (r - 1.0)) / (n - 1.0))
rcorr = r - ((r - 1.0) ** 2.0) / (n - 1.0)
kcorr = k - ((k - 1.0) ** 2.0) / (n - 1.0)
return np.sqrt(phi2corr / min((kcorr - 1.0), (rcorr - 1.0)))
def check_recoded(confusion_matrix, count: int) -> int:
"""Check if two variables are recoded based on their crosstab.
Args:
confusion_matrix: Crosstab between two variables.
count: The number of variables.
Returns:
Whether the variables are recoded.
"""
return int(confusion_matrix.values.diagonal().sum() == count)
def cramers_matrix(df: pd.DataFrame, variables: dict):
"""Calculate the Cramer's V correlation matrix.
Args:
df: The pandas DataFrame.
variables: A dict with column names mapped to variable type.
Returns:
A Cramer's V matrix for categorical variables.
"""
return categorical_matrix(
df, variables, partial(cramers_corrected_stat, correction=True)
)
def recoded_matrix(df: pd.DataFrame, variables: dict):
"""Calculate the recoded correlation matrix.
Args:
df: The pandas DataFrame.
variables: A dict with column names mapped to variable type.
Returns:
A recoded matrix for categorical variables.
"""
return categorical_matrix(df, variables, partial(check_recoded, count=len(df)))
def categorical_matrix(
df: pd.DataFrame, variables: dict, correlation_function: callable
):
"""Calculate a correlation matrix for categorical variables.
Args:
df: The pandas DataFrame.
variables: A dict with column names mapped to variable type.
correlation_function: A function to calculate the correlation between two variables.
Returns:
A correlation matrix for categorical variables.
"""
categoricals = {
column_name: df[column_name]
for column_name, variable_type in variables.items()
if variable_type == Variable.TYPE_CAT
and df[column_name].nunique()
<= config["categorical_maximum_correlation_distinct"].get(int)
}
correlation_matrix = pd.DataFrame(
np.ones((len(categoricals), len(categoricals))),
index=categoricals.keys(),
columns=categoricals.keys(),
)
for (name1, data1), (name2, data2) in itertools.combinations(
categoricals.items(), 2
):
confusion_matrix = pd.crosstab(data1, data2, dropna=False)
correlation_matrix.loc[name2, name1] = correlation_matrix.loc[
name1, name2
] = correlation_function(confusion_matrix)
return correlation_matrix
def warn_correlation(correlation_name, error):
warnings.warn(
"There was an attempt to calculate the {correlation_name} correlation, but this failed.\n"
"To hide this warning, disable the calculation\n"
'(using `df.profile_report(correlations={{"{correlation_name}": False}}`)\n'
"If this is problematic for your use case, please report this as an issue:\n"
"https://github.com/pandas-profiling/pandas-profiling/issues\n"
"(include the error message: '{error}')".format(
correlation_name=correlation_name, error=error
)
)
def calculate_correlations(df: pd.DataFrame, variables: dict) -> dict:
"""Calculate the correlation coefficients between variables for the correlation types selected in the config
(pearson, spearman, kendall, phi_k, cramer).
Args:
variables: A dict with column names and variable types.
df: The DataFrame with variables.
Returns:
A dictionary containing the correlation matrices for each of the active correlation measures.
"""
correlations = {}
for correlation_name in ["pearson", "spearman", "kendall"]:
if config["correlations"][correlation_name].get(bool):
try:
correlation = df.corr(method=correlation_name)
if len(correlation) > 0:
correlations[correlation_name] = correlation
except (ValueError, AssertionError) as e:
warn_correlation(correlation_name, e)
if config["correlations"]["phi_k"].get(bool):
import phik
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Phi_k does not filter non-numerical with high cardinality
selcols = []
intcols = []
for col in df.columns.tolist():
try:
tmp = (
df[col]
.value_counts(dropna=False)
.reset_index()
.dropna()
.set_index("index")
.iloc[:, 0]
)
if tmp.index.inferred_type == "mixed":
continue
if pd.api.types.is_numeric_dtype(df[col]):
intcols.append(col)
selcols.append(col)
elif df[col].nunique() <= config[
"categorical_maximum_correlation_distinct"
].get(int):
selcols.append(col)
except TypeError:
continue
except ValueError:
continue
try:
correlations["phi_k"] = df[selcols].phik_matrix(interval_cols=intcols)
# Only do this if the column_order is set
with suppress(NotFoundError):
# Get the preferred order
column_order = config["column_order"].get(list)
# Get the Phi_k sorted order
current_order = (
correlations["phi_k"].index.get_level_values("var1").tolist()
)
# Intersection (some columns are not used in correlation)
column_order = [x for x in column_order if x in current_order]
# Override the Phi_k sorting
correlations["phi_k"] = correlations["phi_k"].reindex(
index=column_order, columns=column_order
)
except ValueError as e:
warn_correlation("phi_k", e)
except DataError as e:
warn_correlation("phi_k", e)
categorical_correlations = {"cramers": cramers_matrix, "recoded": recoded_matrix}
for correlation_name, get_matrix in categorical_correlations.items():
if config["correlations"][correlation_name].get(bool):
try:
correlation = get_matrix(df, variables)
if len(correlation) > 0:
correlations[correlation_name] = correlation
except ValueError as e:
warn_correlation(correlation_name, e)
return correlations
def perform_check_correlation(
correlation_matrix, criterion: callable, special_type: Variable
):
"""Check whether selected variables are highly correlated values in the correlation matrix and if found, reject them.
Args:
correlation_matrix: The correlation matrix for the DataFrame.
criterion: a mapping function from the correlation function to a bool
special_type: which type to return when the criterion is True (CORR, RECODED).
Returns:
The variables that are highly correlated or recoded.
Notes:
If x~y and y~z but not x~z, it would be better to delete only y
Better way would be to find out which variable causes the highest increase in multicollinearity.
"""
# TODO: find a more reliable way to find highly correlated variables, as corr(x,y) > 0.9 and corr(y,z) > 0.9 does
# not imply corr(x,z) > 0.9
variables = {}
corr = correlation_matrix.copy()
correlation_overrides = config["correlation_overrides"].get(list)
for x, corr_x in corr.iterrows():
if correlation_overrides and x in correlation_overrides:
continue
for y, corr in corr_x.iteritems():
if x == y:
break
if criterion(corr):
variables[x] = {
"type": special_type,
"correlation_var": y,
"correlation": corr,
}
return variables
Functions
def calculate_correlations(df, variables)
-
Calculate the correlation coefficients between variables for the correlation types selected in the config (pearson, spearman, kendall, phi_k, cramer).
Args
variables
- A dict with column names and variable types.
df
- The DataFrame with variables.
Returns
A dictionary containing the correlation matrices for each of the active correlation measures.
Source code
def calculate_correlations(df: pd.DataFrame, variables: dict) -> dict: """Calculate the correlation coefficients between variables for the correlation types selected in the config (pearson, spearman, kendall, phi_k, cramer). Args: variables: A dict with column names and variable types. df: The DataFrame with variables. Returns: A dictionary containing the correlation matrices for each of the active correlation measures. """ correlations = {} for correlation_name in ["pearson", "spearman", "kendall"]: if config["correlations"][correlation_name].get(bool): try: correlation = df.corr(method=correlation_name) if len(correlation) > 0: correlations[correlation_name] = correlation except (ValueError, AssertionError) as e: warn_correlation(correlation_name, e) if config["correlations"]["phi_k"].get(bool): import phik with warnings.catch_warnings(): warnings.simplefilter("ignore") # Phi_k does not filter non-numerical with high cardinality selcols = [] intcols = [] for col in df.columns.tolist(): try: tmp = ( df[col] .value_counts(dropna=False) .reset_index() .dropna() .set_index("index") .iloc[:, 0] ) if tmp.index.inferred_type == "mixed": continue if pd.api.types.is_numeric_dtype(df[col]): intcols.append(col) selcols.append(col) elif df[col].nunique() <= config[ "categorical_maximum_correlation_distinct" ].get(int): selcols.append(col) except TypeError: continue except ValueError: continue try: correlations["phi_k"] = df[selcols].phik_matrix(interval_cols=intcols) # Only do this if the column_order is set with suppress(NotFoundError): # Get the preferred order column_order = config["column_order"].get(list) # Get the Phi_k sorted order current_order = ( correlations["phi_k"].index.get_level_values("var1").tolist() ) # Intersection (some columns are not used in correlation) column_order = [x for x in column_order if x in current_order] # Override the Phi_k sorting correlations["phi_k"] = correlations["phi_k"].reindex( index=column_order, columns=column_order ) except ValueError as e: warn_correlation("phi_k", e) except DataError as e: warn_correlation("phi_k", e) categorical_correlations = {"cramers": cramers_matrix, "recoded": recoded_matrix} for correlation_name, get_matrix in categorical_correlations.items(): if config["correlations"][correlation_name].get(bool): try: correlation = get_matrix(df, variables) if len(correlation) > 0: correlations[correlation_name] = correlation except ValueError as e: warn_correlation(correlation_name, e) return correlations
def categorical_matrix(df, variables, correlation_function)
-
Calculate a correlation matrix for categorical variables.
Args
df
- The pandas DataFrame.
variables
- A dict with column names mapped to variable type.
correlation_function
- A function to calculate the correlation between two variables.
Returns
A correlation matrix for categorical variables.
Source code
def categorical_matrix( df: pd.DataFrame, variables: dict, correlation_function: callable ): """Calculate a correlation matrix for categorical variables. Args: df: The pandas DataFrame. variables: A dict with column names mapped to variable type. correlation_function: A function to calculate the correlation between two variables. Returns: A correlation matrix for categorical variables. """ categoricals = { column_name: df[column_name] for column_name, variable_type in variables.items() if variable_type == Variable.TYPE_CAT and df[column_name].nunique() <= config["categorical_maximum_correlation_distinct"].get(int) } correlation_matrix = pd.DataFrame( np.ones((len(categoricals), len(categoricals))), index=categoricals.keys(), columns=categoricals.keys(), ) for (name1, data1), (name2, data2) in itertools.combinations( categoricals.items(), 2 ): confusion_matrix = pd.crosstab(data1, data2, dropna=False) correlation_matrix.loc[name2, name1] = correlation_matrix.loc[ name1, name2 ] = correlation_function(confusion_matrix) return correlation_matrix
def check_recoded(confusion_matrix, count)
-
Check if two variables are recoded based on their crosstab.
Args
confusion_matrix
- Crosstab between two variables.
count
- The number of variables.
Returns
Whether the variables are recoded.
Source code
def check_recoded(confusion_matrix, count: int) -> int: """Check if two variables are recoded based on their crosstab. Args: confusion_matrix: Crosstab between two variables. count: The number of variables. Returns: Whether the variables are recoded. """ return int(confusion_matrix.values.diagonal().sum() == count)
def cramers_corrected_stat(confusion_matrix, correction)
-
Calculate the Cramer's V corrected stat for two variables.
Args
confusion_matrix
- Crosstab between two variables.
correction
- Should the correction be applied?
Returns
The Cramer's V corrected stat for the two variables.
Source code
def cramers_corrected_stat(confusion_matrix, correction: bool) -> float: """Calculate the Cramer's V corrected stat for two variables. Args: confusion_matrix: Crosstab between two variables. correction: Should the correction be applied? Returns: The Cramer's V corrected stat for the two variables. """ chi2 = stats.chi2_contingency(confusion_matrix, correction=correction)[0] n = confusion_matrix.sum().sum() phi2 = chi2 / n r, k = confusion_matrix.shape phi2corr = max(0.0, phi2 - ((k - 1.0) * (r - 1.0)) / (n - 1.0)) rcorr = r - ((r - 1.0) ** 2.0) / (n - 1.0) kcorr = k - ((k - 1.0) ** 2.0) / (n - 1.0) return np.sqrt(phi2corr / min((kcorr - 1.0), (rcorr - 1.0)))
def cramers_matrix(df, variables)
-
Calculate the Cramer's V correlation matrix.
Args
df
- The pandas DataFrame.
variables
- A dict with column names mapped to variable type.
Returns
A Cramer's V matrix for categorical variables.
Source code
def cramers_matrix(df: pd.DataFrame, variables: dict): """Calculate the Cramer's V correlation matrix. Args: df: The pandas DataFrame. variables: A dict with column names mapped to variable type. Returns: A Cramer's V matrix for categorical variables. """ return categorical_matrix( df, variables, partial(cramers_corrected_stat, correction=True) )
def perform_check_correlation(correlation_matrix, criterion, special_type)
-
Check whether selected variables are highly correlated values in the correlation matrix and if found, reject them.
Args
correlation_matrix
- The correlation matrix for the DataFrame.
criterion
- a mapping function from the correlation function to a bool
special_type
- which type to return when the criterion is True (CORR, RECODED).
Returns
The variables that are highly correlated or recoded.
Notes
If x~y and y~z but not x~z, it would be better to delete only y Better way would be to find out which variable causes the highest increase in multicollinearity.
Source code
def perform_check_correlation( correlation_matrix, criterion: callable, special_type: Variable ): """Check whether selected variables are highly correlated values in the correlation matrix and if found, reject them. Args: correlation_matrix: The correlation matrix for the DataFrame. criterion: a mapping function from the correlation function to a bool special_type: which type to return when the criterion is True (CORR, RECODED). Returns: The variables that are highly correlated or recoded. Notes: If x~y and y~z but not x~z, it would be better to delete only y Better way would be to find out which variable causes the highest increase in multicollinearity. """ # TODO: find a more reliable way to find highly correlated variables, as corr(x,y) > 0.9 and corr(y,z) > 0.9 does # not imply corr(x,z) > 0.9 variables = {} corr = correlation_matrix.copy() correlation_overrides = config["correlation_overrides"].get(list) for x, corr_x in corr.iterrows(): if correlation_overrides and x in correlation_overrides: continue for y, corr in corr_x.iteritems(): if x == y: break if criterion(corr): variables[x] = { "type": special_type, "correlation_var": y, "correlation": corr, } return variables
def recoded_matrix(df, variables)
-
Calculate the recoded correlation matrix.
Args
df
- The pandas DataFrame.
variables
- A dict with column names mapped to variable type.
Returns
A recoded matrix for categorical variables.
Source code
def recoded_matrix(df: pd.DataFrame, variables: dict): """Calculate the recoded correlation matrix. Args: df: The pandas DataFrame. variables: A dict with column names mapped to variable type. Returns: A recoded matrix for categorical variables. """ return categorical_matrix(df, variables, partial(check_recoded, count=len(df)))
def warn_correlation(correlation_name, error)
-
Source code
def warn_correlation(correlation_name, error): warnings.warn( "There was an attempt to calculate the {correlation_name} correlation, but this failed.\n" "To hide this warning, disable the calculation\n" '(using `df.profile_report(correlations={{"{correlation_name}": False}}`)\n' "If this is problematic for your use case, please report this as an issue:\n" "https://github.com/pandas-profiling/pandas-profiling/issues\n" "(include the error message: '{error}')".format( correlation_name=correlation_name, error=error ) )