Module pandas_profiling.model.correlations

Correlations between variables.

Expand source code
"""Correlations between variables."""
import itertools
import warnings
from contextlib import suppress
from functools import partial
from typing import Callable, Dict, List, Optional

import pandas as pd
import numpy as np
from confuse import NotFoundError
from pandas.core.base import DataError
from scipy import stats
from tqdm.auto import tqdm

from pandas_profiling.config import config
from pandas_profiling.model.base import Variable


def cramers_corrected_stat(confusion_matrix, correction: bool) -> float:
    """Calculate the Cramer's V corrected stat for two variables.

    Args:
        confusion_matrix: Crosstab between two variables.
        correction: Should the correction be applied?

    Returns:
        The Cramer's V corrected stat for the two variables.
    """
    chi2 = stats.chi2_contingency(confusion_matrix, correction=correction)[0]
    n = confusion_matrix.sum().sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape

    # Deal with NaNs later on
    with np.errstate(divide="ignore", invalid="ignore"):
        phi2corr = max(0.0, phi2 - ((k - 1.0) * (r - 1.0)) / (n - 1.0))
        rcorr = r - ((r - 1.0) ** 2.0) / (n - 1.0)
        kcorr = k - ((k - 1.0) ** 2.0) / (n - 1.0)
        corr = np.sqrt(phi2corr / min((kcorr - 1.0), (rcorr - 1.0)))
    return corr


def check_recoded(confusion_matrix, count: int) -> int:
    """Check if two variables are recoded based on their crosstab.

    Args:
        confusion_matrix: Crosstab between two variables.
        count:  The number of variables.

    Returns:
        Whether the variables are recoded.
    """
    return int(confusion_matrix.values.diagonal().sum() == count)


def cramers_matrix(df: pd.DataFrame, variables: dict):
    """Calculate the Cramer's V correlation matrix.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.

    Returns:
        A Cramer's V matrix for categorical variables.
    """
    return categorical_matrix(
        df, variables, partial(cramers_corrected_stat, correction=True)
    )


def recoded_matrix(df: pd.DataFrame, variables: dict):
    """Calculate the recoded correlation matrix.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.

    Returns:
        A recoded matrix for categorical variables.
    """
    return categorical_matrix(df, variables, partial(check_recoded, count=len(df)))


def categorical_matrix(
    df: pd.DataFrame, variables: dict, correlation_function: Callable
) -> Optional[pd.DataFrame]:
    """Calculate a correlation matrix for categorical variables.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.
        correlation_function: A function to calculate the correlation between two variables.

    Returns:
        A correlation matrix for categorical variables.
    """
    categoricals = {
        column_name: df[column_name]
        for column_name, variable_type in variables.items()
        if variable_type == Variable.TYPE_CAT
        # TODO: solve in type system
        and config["categorical_maximum_correlation_distinct"].get(int)
        >= df[column_name].nunique()
        > 1
    }

    if len(categoricals) <= 1:
        return None

    correlation_matrix = pd.DataFrame(
        np.ones((len(categoricals), len(categoricals))),
        index=categoricals.keys(),
        columns=categoricals.keys(),
    )

    for (name1, data1), (name2, data2) in itertools.combinations(
        categoricals.items(), 2
    ):
        confusion_matrix = pd.crosstab(data1, data2)
        correlation_matrix.loc[name2, name1] = correlation_matrix.loc[
            name1, name2
        ] = correlation_function(confusion_matrix)

    return correlation_matrix


def warn_correlation(correlation_name, error):
    warnings.warn(
        "There was an attempt to calculate the {correlation_name} correlation, but this failed.\n"
        "To hide this warning, disable the calculation\n"
        '(using `df.profile_report(correlations={{"{correlation_name}": {{"calculate": False}}}})`\n'
        "If this is problematic for your use case, please report this as an issue:\n"
        "https://github.com/pandas-profiling/pandas-profiling/issues\n"
        "(include the error message: '{error}')".format(
            correlation_name=correlation_name, error=error
        )
    )


def calculate_correlations(df: pd.DataFrame, variables: dict) -> dict:
    """Calculate the correlation coefficients between variables for the correlation types selected in the config
    (pearson, spearman, kendall, phi_k, cramers).

    Args:
        variables: A dict with column names and variable types.
        df: The DataFrame with variables.

    Returns:
        A dictionary containing the correlation matrices for each of the active correlation measures.
    """
    correlations = {}

    disable_progress_bar = not config["progress_bar"].get(bool)

    correlation_names = [
        correlation_name
        for correlation_name in [
            "pearson",
            "spearman",
            "kendall",
            "phi_k",
            "cramers",
            "recoded",
        ]
        if config["correlations"][correlation_name]["calculate"].get(bool)
    ]

    categorical_correlations = {"cramers": cramers_matrix, "recoded": recoded_matrix}

    if len(correlation_names) > 0:
        with tqdm(
            total=len(correlation_names),
            desc="correlations",
            disable=disable_progress_bar,
        ) as pbar:
            for correlation_name in correlation_names:
                pbar.set_description_str(
                    "correlations [{correlation_name}]".format(
                        correlation_name=correlation_name
                    )
                )

                if correlation_name in ["pearson", "spearman", "kendall"]:
                    try:
                        correlation = df.corr(method=correlation_name)
                        if len(correlation) > 0:
                            correlations[correlation_name] = correlation
                    except (ValueError, AssertionError) as e:
                        warn_correlation(correlation_name, e)
                elif correlation_name in ["phi_k"]:
                    import phik

                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        # Phi_k does not filter non-numerical with high cardinality
                        selcols = []
                        intcols = []
                        for col in df.columns.tolist():
                            try:
                                tmp = (
                                    df[col]
                                    .value_counts(dropna=False)
                                    .reset_index()
                                    .dropna()
                                    .set_index("index")
                                    .iloc[:, 0]
                                )
                                if tmp.index.inferred_type == "mixed":
                                    continue

                                if pd.api.types.is_numeric_dtype(df[col]):
                                    intcols.append(col)
                                    selcols.append(col)
                                elif df[col].nunique() <= config[
                                    "categorical_maximum_correlation_distinct"
                                ].get(int):
                                    selcols.append(col)
                            except (TypeError, ValueError):
                                continue

                        if len(selcols) > 1:
                            try:
                                correlations["phi_k"] = df[selcols].phik_matrix(
                                    interval_cols=intcols
                                )

                                # Only do this if the column_order is set
                                with suppress(NotFoundError):
                                    # Get the preferred order
                                    column_order = config["column_order"].get(list)

                                    # Get the Phi_k sorted order
                                    current_order = (
                                        correlations["phi_k"]
                                        .index.get_level_values("var1")
                                        .tolist()
                                    )

                                    # Intersection (some columns are not used in correlation)
                                    column_order = [
                                        x for x in column_order if x in current_order
                                    ]

                                    # Override the Phi_k sorting
                                    correlations["phi_k"] = correlations[
                                        "phi_k"
                                    ].reindex(index=column_order, columns=column_order)
                            except (ValueError, DataError, IndexError, TypeError) as e:
                                warn_correlation("phi_k", e)
                elif correlation_name in ["cramers", "recoded"]:
                    get_matrix = categorical_correlations[correlation_name]
                    correlation = get_matrix(df, variables)
                    if correlation is not None and len(correlation) > 0:
                        correlations[correlation_name] = correlation

                if correlation_name in correlations:
                    # Drop rows and columns with NaNs
                    correlations[correlation_name].dropna(inplace=True, how="all")
                    if correlations[correlation_name].empty:
                        del correlations[correlation_name]

                pbar.update()

    return correlations


def get_correlation_mapping() -> Dict[str, List[str]]:
    """Workaround variable type annotations not being supported in Python 3.5

    Returns:
        type annotated empty dict
    """
    return {}


def perform_check_correlation(
    correlation_matrix: pd.DataFrame, threshold: float
) -> Dict[str, List[str]]:
    """Check whether selected variables are highly correlated values in the correlation matrix.

    Args:
        correlation_matrix: The correlation matrix for the DataFrame.
        threshold:.

    Returns:
        The variables that are highly correlated or recoded.
    """

    corr = correlation_matrix.copy()

    # TODO: use matrix logic
    # correlation_tri = correlation.where(np.triu(np.ones(correlation.shape),k=1).astype(np.bool))
    # drop_cols = [i for i in correlation_tri if any(correlation_tri[i]>threshold)]

    mapping = get_correlation_mapping()
    for x, corr_x in corr.iterrows():
        for y, corr in corr_x.iteritems():
            if x == y:
                break

            if corr >= threshold or corr <= -1 * threshold:
                if x not in mapping:
                    mapping[x] = []
                if y not in mapping:
                    mapping[y] = []

                mapping[x].append(y)
                mapping[y].append(x)
    return mapping

Functions

def calculate_correlations(df, variables)

Calculate the correlation coefficients between variables for the correlation types selected in the config (pearson, spearman, kendall, phi_k, cramers).

Args

variables
A dict with column names and variable types.
df
The DataFrame with variables.

Returns

A dictionary containing the correlation matrices for each of the active correlation measures.

Expand source code
def calculate_correlations(df: pd.DataFrame, variables: dict) -> dict:
    """Calculate the correlation coefficients between variables for the correlation types selected in the config
    (pearson, spearman, kendall, phi_k, cramers).

    Args:
        variables: A dict with column names and variable types.
        df: The DataFrame with variables.

    Returns:
        A dictionary containing the correlation matrices for each of the active correlation measures.
    """
    correlations = {}

    disable_progress_bar = not config["progress_bar"].get(bool)

    correlation_names = [
        correlation_name
        for correlation_name in [
            "pearson",
            "spearman",
            "kendall",
            "phi_k",
            "cramers",
            "recoded",
        ]
        if config["correlations"][correlation_name]["calculate"].get(bool)
    ]

    categorical_correlations = {"cramers": cramers_matrix, "recoded": recoded_matrix}

    if len(correlation_names) > 0:
        with tqdm(
            total=len(correlation_names),
            desc="correlations",
            disable=disable_progress_bar,
        ) as pbar:
            for correlation_name in correlation_names:
                pbar.set_description_str(
                    "correlations [{correlation_name}]".format(
                        correlation_name=correlation_name
                    )
                )

                if correlation_name in ["pearson", "spearman", "kendall"]:
                    try:
                        correlation = df.corr(method=correlation_name)
                        if len(correlation) > 0:
                            correlations[correlation_name] = correlation
                    except (ValueError, AssertionError) as e:
                        warn_correlation(correlation_name, e)
                elif correlation_name in ["phi_k"]:
                    import phik

                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        # Phi_k does not filter non-numerical with high cardinality
                        selcols = []
                        intcols = []
                        for col in df.columns.tolist():
                            try:
                                tmp = (
                                    df[col]
                                    .value_counts(dropna=False)
                                    .reset_index()
                                    .dropna()
                                    .set_index("index")
                                    .iloc[:, 0]
                                )
                                if tmp.index.inferred_type == "mixed":
                                    continue

                                if pd.api.types.is_numeric_dtype(df[col]):
                                    intcols.append(col)
                                    selcols.append(col)
                                elif df[col].nunique() <= config[
                                    "categorical_maximum_correlation_distinct"
                                ].get(int):
                                    selcols.append(col)
                            except (TypeError, ValueError):
                                continue

                        if len(selcols) > 1:
                            try:
                                correlations["phi_k"] = df[selcols].phik_matrix(
                                    interval_cols=intcols
                                )

                                # Only do this if the column_order is set
                                with suppress(NotFoundError):
                                    # Get the preferred order
                                    column_order = config["column_order"].get(list)

                                    # Get the Phi_k sorted order
                                    current_order = (
                                        correlations["phi_k"]
                                        .index.get_level_values("var1")
                                        .tolist()
                                    )

                                    # Intersection (some columns are not used in correlation)
                                    column_order = [
                                        x for x in column_order if x in current_order
                                    ]

                                    # Override the Phi_k sorting
                                    correlations["phi_k"] = correlations[
                                        "phi_k"
                                    ].reindex(index=column_order, columns=column_order)
                            except (ValueError, DataError, IndexError, TypeError) as e:
                                warn_correlation("phi_k", e)
                elif correlation_name in ["cramers", "recoded"]:
                    get_matrix = categorical_correlations[correlation_name]
                    correlation = get_matrix(df, variables)
                    if correlation is not None and len(correlation) > 0:
                        correlations[correlation_name] = correlation

                if correlation_name in correlations:
                    # Drop rows and columns with NaNs
                    correlations[correlation_name].dropna(inplace=True, how="all")
                    if correlations[correlation_name].empty:
                        del correlations[correlation_name]

                pbar.update()

    return correlations
def categorical_matrix(df, variables, correlation_function)

Calculate a correlation matrix for categorical variables.

Args

df
The pandas DataFrame.
variables
A dict with column names mapped to variable type.
correlation_function
A function to calculate the correlation between two variables.

Returns

A correlation matrix for categorical variables.

Expand source code
def categorical_matrix(
    df: pd.DataFrame, variables: dict, correlation_function: Callable
) -> Optional[pd.DataFrame]:
    """Calculate a correlation matrix for categorical variables.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.
        correlation_function: A function to calculate the correlation between two variables.

    Returns:
        A correlation matrix for categorical variables.
    """
    categoricals = {
        column_name: df[column_name]
        for column_name, variable_type in variables.items()
        if variable_type == Variable.TYPE_CAT
        # TODO: solve in type system
        and config["categorical_maximum_correlation_distinct"].get(int)
        >= df[column_name].nunique()
        > 1
    }

    if len(categoricals) <= 1:
        return None

    correlation_matrix = pd.DataFrame(
        np.ones((len(categoricals), len(categoricals))),
        index=categoricals.keys(),
        columns=categoricals.keys(),
    )

    for (name1, data1), (name2, data2) in itertools.combinations(
        categoricals.items(), 2
    ):
        confusion_matrix = pd.crosstab(data1, data2)
        correlation_matrix.loc[name2, name1] = correlation_matrix.loc[
            name1, name2
        ] = correlation_function(confusion_matrix)

    return correlation_matrix
def check_recoded(confusion_matrix, count)

Check if two variables are recoded based on their crosstab.

Args

confusion_matrix
Crosstab between two variables.
count
The number of variables.

Returns

Whether the variables are recoded.

Expand source code
def check_recoded(confusion_matrix, count: int) -> int:
    """Check if two variables are recoded based on their crosstab.

    Args:
        confusion_matrix: Crosstab between two variables.
        count:  The number of variables.

    Returns:
        Whether the variables are recoded.
    """
    return int(confusion_matrix.values.diagonal().sum() == count)
def cramers_corrected_stat(confusion_matrix, correction)

Calculate the Cramer's V corrected stat for two variables.

Args

confusion_matrix
Crosstab between two variables.
correction
Should the correction be applied?

Returns

The Cramer's V corrected stat for the two variables.

Expand source code
def cramers_corrected_stat(confusion_matrix, correction: bool) -> float:
    """Calculate the Cramer's V corrected stat for two variables.

    Args:
        confusion_matrix: Crosstab between two variables.
        correction: Should the correction be applied?

    Returns:
        The Cramer's V corrected stat for the two variables.
    """
    chi2 = stats.chi2_contingency(confusion_matrix, correction=correction)[0]
    n = confusion_matrix.sum().sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape

    # Deal with NaNs later on
    with np.errstate(divide="ignore", invalid="ignore"):
        phi2corr = max(0.0, phi2 - ((k - 1.0) * (r - 1.0)) / (n - 1.0))
        rcorr = r - ((r - 1.0) ** 2.0) / (n - 1.0)
        kcorr = k - ((k - 1.0) ** 2.0) / (n - 1.0)
        corr = np.sqrt(phi2corr / min((kcorr - 1.0), (rcorr - 1.0)))
    return corr
def cramers_matrix(df, variables)

Calculate the Cramer's V correlation matrix.

Args

df
The pandas DataFrame.
variables
A dict with column names mapped to variable type.

Returns

A Cramer's V matrix for categorical variables.

Expand source code
def cramers_matrix(df: pd.DataFrame, variables: dict):
    """Calculate the Cramer's V correlation matrix.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.

    Returns:
        A Cramer's V matrix for categorical variables.
    """
    return categorical_matrix(
        df, variables, partial(cramers_corrected_stat, correction=True)
    )
def get_correlation_mapping()

Workaround variable type annotations not being supported in Python 3.5

Returns

type annotated empty dict
 
Expand source code
def get_correlation_mapping() -> Dict[str, List[str]]:
    """Workaround variable type annotations not being supported in Python 3.5

    Returns:
        type annotated empty dict
    """
    return {}
def perform_check_correlation(correlation_matrix, threshold)

Check whether selected variables are highly correlated values in the correlation matrix.

Args

correlation_matrix
The correlation matrix for the DataFrame.

threshold:.

Returns

The variables that are highly correlated or recoded.

Expand source code
def perform_check_correlation(
    correlation_matrix: pd.DataFrame, threshold: float
) -> Dict[str, List[str]]:
    """Check whether selected variables are highly correlated values in the correlation matrix.

    Args:
        correlation_matrix: The correlation matrix for the DataFrame.
        threshold:.

    Returns:
        The variables that are highly correlated or recoded.
    """

    corr = correlation_matrix.copy()

    # TODO: use matrix logic
    # correlation_tri = correlation.where(np.triu(np.ones(correlation.shape),k=1).astype(np.bool))
    # drop_cols = [i for i in correlation_tri if any(correlation_tri[i]>threshold)]

    mapping = get_correlation_mapping()
    for x, corr_x in corr.iterrows():
        for y, corr in corr_x.iteritems():
            if x == y:
                break

            if corr >= threshold or corr <= -1 * threshold:
                if x not in mapping:
                    mapping[x] = []
                if y not in mapping:
                    mapping[y] = []

                mapping[x].append(y)
                mapping[y].append(x)
    return mapping
def recoded_matrix(df, variables)

Calculate the recoded correlation matrix.

Args

df
The pandas DataFrame.
variables
A dict with column names mapped to variable type.

Returns

A recoded matrix for categorical variables.

Expand source code
def recoded_matrix(df: pd.DataFrame, variables: dict):
    """Calculate the recoded correlation matrix.

    Args:
        df: The pandas DataFrame.
        variables: A dict with column names mapped to variable type.

    Returns:
        A recoded matrix for categorical variables.
    """
    return categorical_matrix(df, variables, partial(check_recoded, count=len(df)))
def warn_correlation(correlation_name, error)
Expand source code
def warn_correlation(correlation_name, error):
    warnings.warn(
        "There was an attempt to calculate the {correlation_name} correlation, but this failed.\n"
        "To hide this warning, disable the calculation\n"
        '(using `df.profile_report(correlations={{"{correlation_name}": {{"calculate": False}}}})`\n'
        "If this is problematic for your use case, please report this as an issue:\n"
        "https://github.com/pandas-profiling/pandas-profiling/issues\n"
        "(include the error message: '{error}')".format(
            correlation_name=correlation_name, error=error
        )
    )