Module pandas_profiling.model.describe
Compute statistical description of datasets.
Expand source code
"""Compute statistical description of datasets."""
import multiprocessing.pool
import multiprocessing
import os
import sys
import warnings
from pathlib import Path
from typing import Tuple, Callable, Mapping
from urllib.parse import urlsplit
from tqdm.autonotebook import tqdm
import numpy as np
import pandas as pd
from astropy.stats import bayesian_blocks
from scipy.stats.stats import chisquare
from pandas_profiling import __version__
from pandas_profiling.config import config as config
from pandas_profiling.model.messages import (
check_variable_messages,
check_table_messages,
warning_type_date,
check_correlation_messages,
)
from pandas_profiling.model import base
from pandas_profiling.model.base import Variable
from pandas_profiling.model.correlations import calculate_correlations
from pandas_profiling.visualisation.missing import (
missing_bar,
missing_matrix,
missing_heatmap,
missing_dendrogram,
)
from pandas_profiling.visualisation.plot import scatter_pairwise
def describe_numeric_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a numeric series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
Notes:
When 'bins_type' is set to 'bayesian_blocks', astropy.stats.bayesian_blocks is used to determine the number of
bins. Read the docs:
https://docs.astropy.org/en/stable/visualization/histogram.html
https://docs.astropy.org/en/stable/api/astropy.stats.bayesian_blocks.html
This method might print warnings, which we suppress.
https://github.com/astropy/astropy/issues/4927
"""
quantiles = config["vars"]["num"]["quantiles"].get(list)
stats = {
"mean": series.mean(),
"std": series.std(),
"variance": series.var(),
"min": series.min(),
"max": series.max(),
"kurtosis": series.kurt(),
"skewness": series.skew(),
"sum": series.sum(),
"mad": series.mad(),
"n_zeros": (len(series) - np.count_nonzero(series)),
"histogram_data": series,
"scatter_data": series, # For complex
}
chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float)
if chi_squared_threshold > 0.0:
histogram = np.histogram(series[series.notna()].values, bins="auto")[0]
stats["chi_squared"] = chisquare(histogram)
stats["range"] = stats["max"] - stats["min"]
stats.update(
{
"{:.0%}".format(percentile): value
for percentile, value in series.quantile(quantiles).to_dict().items()
}
)
stats["iqr"] = stats["75%"] - stats["25%"]
stats["cv"] = stats["std"] / stats["mean"] if stats["mean"] else np.NaN
stats["p_zeros"] = float(stats["n_zeros"]) / len(series)
bins = config["plot"]["histogram"]["bins"].get(int)
# Bins should never be larger than the number of distinct values
bins = min(series_description["distinct_count_with_nan"], bins)
stats["histogram_bins"] = bins
bayesian_blocks_bins = config["plot"]["histogram"]["bayesian_blocks_bins"].get(bool)
if bayesian_blocks_bins:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ret = bayesian_blocks(stats["histogram_data"])
# Sanity check
if not np.isnan(ret).any() and ret.size > 1:
stats["histogram_bins_bayesian_blocks"] = ret
return stats
def describe_date_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a date series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
stats = {"min": series.min(), "max": series.max(), "histogram_data": series}
bins = config["plot"]["histogram"]["bins"].get(int)
# Bins should never be larger than the number of distinct values
bins = min(series_description["distinct_count_with_nan"], bins)
stats["histogram_bins"] = bins
stats["range"] = stats["max"] - stats["min"]
chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float)
if chi_squared_threshold > 0.0:
histogram = np.histogram(
series[series.notna()].astype("int64").values, bins="auto"
)[0]
stats["chi_squared"] = chisquare(histogram)
return stats
def describe_categorical_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a categorical series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
# Make sure we deal with strings (Issue #100)
series = series.astype(str)
# Only run if at least 1 non-missing value
value_counts = series_description["value_counts_without_nan"]
stats = {"top": value_counts.index[0], "freq": value_counts.iloc[0]}
chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float)
if chi_squared_threshold > 0.0:
stats["chi_squared"] = list(chisquare(value_counts.values))
check_composition = config["vars"]["cat"]["check_composition"].get(bool)
if check_composition:
stats["max_length"] = series.str.len().max()
stats["mean_length"] = series.str.len().mean()
stats["min_length"] = series.str.len().min()
from visions.application.summaries.series.text_summary import text_summary
stats.update(text_summary(series))
stats["length"] = series.str.len()
stats["date_warning"] = warning_type_date(series)
return stats
def describe_url_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a url series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
# Make sure we deal with strings (Issue #100)
series = series[~series.isnull()].astype(str)
stats = {}
# Create separate columns for each URL part
keys = ["scheme", "netloc", "path", "query", "fragment"]
url_parts = dict(zip(keys, zip(*series.map(urlsplit))))
for name, part in url_parts.items():
stats["{}_counts".format(name.lower())] = pd.Series(
part, name=name
).value_counts()
# Only run if at least 1 non-missing value
value_counts = series_description["value_counts_without_nan"]
stats["top"] = value_counts.index[0]
stats["freq"] = value_counts.iloc[0]
return stats
def describe_path_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a path series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
series_description.update(describe_categorical_1d(series, series_description))
# Make sure we deal with strings (Issue #100)
series = series[~series.isnull()].astype(str)
series = series.map(Path)
common_prefix = os.path.commonprefix(list(series))
if common_prefix == "":
common_prefix = "No common prefix"
stats = {"common_prefix": common_prefix}
# Create separate columns for each path part
keys = ["stem", "suffix", "name", "parent"]
path_parts = dict(
zip(keys, zip(*series.map(lambda x: [x.stem, x.suffix, x.name, x.parent])))
)
for name, part in path_parts.items():
stats["{}_counts".format(name.lower())] = pd.Series(
part, name=name
).value_counts()
# Only run if at least 1 non-missing value
value_counts = series_description["value_counts_without_nan"]
stats["top"] = value_counts.index[0]
stats["freq"] = value_counts.iloc[0]
return stats
def describe_boolean_1d(series: pd.Series, series_description: dict) -> dict:
"""Describe a boolean series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
value_counts = series_description["value_counts_without_nan"]
stats = {"top": value_counts.index[0], "freq": value_counts.iloc[0]}
return stats
def describe_supported(series: pd.Series, series_description: dict) -> dict:
"""Describe a supported series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
# number of observations in the Series
leng = len(series)
# TODO: fix infinite logic
# number of non-NaN observations in the Series
count = series.count()
# number of infinite observations in the Series
n_infinite = count - series.count()
distinct_count = series_description["distinct_count_without_nan"]
stats = {
"n": leng,
"count": count,
"distinct_count": distinct_count,
"n_unique": distinct_count,
"p_missing": 1 - count * 1.0 / leng,
"n_missing": leng - count,
"p_infinite": n_infinite * 1.0 / leng,
"n_infinite": n_infinite,
"is_unique": distinct_count == count,
"mode": series.mode().iloc[0] if count > distinct_count > 1 else series[0],
"p_unique": distinct_count * 1.0 / count,
"memory_size": series.memory_usage(),
}
return stats
def describe_unsupported(series: pd.Series, series_description: dict):
"""Describe an unsupported series.
Args:
series: The Series to describe.
series_description: The dict containing the series description so far.
Returns:
A dict containing calculated series description values.
"""
# number of observations in the Series
leng = len(series)
# number of non-NaN observations in the Series
count = series.count()
# number of infinte observations in the Series
n_infinite = count - series.count()
results_data = {
"n": leng,
"count": count,
"p_missing": 1 - count * 1.0 / leng,
"n_missing": leng - count,
"p_infinite": n_infinite * 1.0 / leng,
"n_infinite": n_infinite,
"memory_size": series.memory_usage(),
}
return results_data
def describe_1d(series: pd.Series) -> dict:
"""Describe a series (infer the variable type, then calculate type-specific values).
Args:
series: The Series to describe.
Returns:
A Series containing calculated series description values.
"""
# Replace infinite values with NaNs to avoid issues with histograms later.
series.replace(to_replace=[np.inf, np.NINF, np.PINF], value=np.nan, inplace=True)
# Infer variable types
series_description = base.get_var_type(series)
# Run type specific analysis
if series_description["type"] == Variable.S_TYPE_UNSUPPORTED:
series_description.update(describe_unsupported(series, series_description))
else:
series_description.update(describe_supported(series, series_description))
type_to_func = {
Variable.TYPE_BOOL: describe_boolean_1d,
Variable.TYPE_NUM: describe_numeric_1d,
Variable.TYPE_DATE: describe_date_1d,
Variable.TYPE_CAT: describe_categorical_1d,
Variable.TYPE_URL: describe_url_1d,
Variable.TYPE_PATH: describe_path_1d,
}
if series_description["type"] in type_to_func:
series_description.update(
type_to_func[series_description["type"]](series, series_description)
)
else:
raise ValueError("Unexpected type")
# Return the description obtained
return series_description
def multiprocess_1d(args) -> Tuple[str, dict]:
"""Wrapper to process series in parallel.
Args:
column: The name of the column.
series: The series values.
Returns:
A tuple with column and the series description.
"""
column, series = args
return column, describe_1d(series)
def describe_table(df: pd.DataFrame, variable_stats: pd.DataFrame) -> dict:
"""General statistics for the DataFrame.
Args:
df: The DataFrame to describe.
variable_stats: Previously calculated statistic on the DataFrame.
Returns:
A dictionary that contains the table statistics.
"""
n = len(df)
memory_size = df.memory_usage(index=True, deep=True).sum()
record_size = float(memory_size) / n
table_stats = {
"n": n,
"n_var": len(df.columns),
"memory_size": memory_size,
"record_size": record_size,
"n_cells_missing": variable_stats.loc["n_missing"].sum(),
"n_vars_with_missing": sum((variable_stats.loc["n_missing"] > 0).astype(int)),
"n_vars_all_missing": sum((variable_stats.loc["n_missing"] == n).astype(int)),
}
table_stats["p_cells_missing"] = table_stats["n_cells_missing"] / (
table_stats["n"] * table_stats["n_var"]
)
supported_columns = variable_stats.transpose()[
variable_stats.transpose().type != Variable.S_TYPE_UNSUPPORTED
].index.tolist()
table_stats["n_duplicates"] = (
sum(df.duplicated(subset=supported_columns))
if len(supported_columns) > 0
else 0
)
table_stats["p_duplicates"] = (
(table_stats["n_duplicates"] / len(df))
if (len(supported_columns) > 0 and len(df) > 0)
else 0
)
# Variable type counts
table_stats.update({k.value: 0 for k in Variable})
table_stats.update(
{
"types": dict(
variable_stats.loc["type"].apply(lambda x: x.value).value_counts()
)
}
)
return table_stats
def warn_missing(missing_name, error):
warnings.warn(
"There was an attempt to generate the {missing_name} missing values diagrams, but this failed.\n"
"To hide this warning, disable the calculation\n"
'(using `df.profile_report(missing_diagrams={{"{missing_name}": False}}`)\n'
"If this is problematic for your use case, please report this as an issue:\n"
"https://github.com/pandas-profiling/pandas-profiling/issues\n"
"(include the error message: '{error}')".format(
missing_name=missing_name, error=error
)
)
def get_missing_diagrams(df: pd.DataFrame, table_stats: dict) -> dict:
"""Gets the rendered diagrams for missing values.
Args:
table_stats: The overall statistics for the DataFrame.
df: The DataFrame on which to calculate the missing values.
Returns:
A dictionary containing the base64 encoded plots for each diagram that is active in the config (matrix, bar, heatmap, dendrogram).
"""
disable_progress_bar = not config["progress_bar"].get(bool)
def missing_diagram(name) -> Callable:
return {
"bar": missing_bar,
"matrix": missing_matrix,
"heatmap": missing_heatmap,
"dendrogram": missing_dendrogram,
}[name]
missing_map = {
"bar": {"min_missing": 0, "name": "Count"},
"matrix": {"min_missing": 0, "name": "Matrix"},
"heatmap": {"min_missing": 2, "name": "Heatmap"},
"dendrogram": {"min_missing": 1, "name": "Dendrogram"},
}
missing_map = {
name: settings
for name, settings in missing_map.items()
if config["missing_diagrams"][name].get(bool)
and table_stats["n_vars_with_missing"] >= settings["min_missing"]
}
missing = {}
if len(missing_map) > 0:
with tqdm(
total=len(missing_map), desc="missing", disable=disable_progress_bar
) as pbar:
for name, settings in missing_map.items():
pbar.set_description_str("missing [{name}]".format(name=name))
try:
if name != "heatmap" or (
table_stats["n_vars_with_missing"]
- table_stats["n_vars_all_missing"]
>= settings["min_missing"]
):
missing[name] = {
"name": settings["name"],
"matrix": missing_diagram(name)(df),
}
except ValueError as e:
warn_missing(name, e)
pbar.update()
return missing
def get_scatter_matrix(df, variables):
disable_progress_bar = not config["progress_bar"].get(bool)
if config["interactions"]["continuous"].get(bool):
continuous_variables = [
column for column, type in variables.items() if type == Variable.TYPE_NUM
]
with tqdm(
total=len(continuous_variables) ** 2,
desc="interactions [continuous]",
disable=disable_progress_bar,
) as pbar:
scatter_matrix = {
x: {y: "" for y in continuous_variables} for x in continuous_variables
}
for x in continuous_variables:
for y in continuous_variables:
scatter_matrix[x][y] = scatter_pairwise(df[x], df[y], x, y)
pbar.update()
else:
scatter_matrix = {}
return scatter_matrix
def sort_column_names(dct: Mapping, sort: str):
sort = sort.lower()
if sys.version_info < (3, 6) and sort != "none":
warnings.warn("Sorting is supported from Python 3.6+")
else:
if sort.startswith("asc"):
dct = dict(sorted(dct.items(), key=lambda x: x[0].casefold()))
elif sort.startswith("desc"):
dct = dict(reversed(sorted(dct.items(), key=lambda x: x[0].casefold())))
elif sort != "none":
raise ValueError('"sort" should be "ascending", "descending" or "None".')
return dct
def describe(df: pd.DataFrame) -> dict:
"""Calculate the statistics for each series in this DataFrame.
Args:
df: DataFrame.
Returns:
This function returns a dictionary containing:
- table: overall statistics.
- variables: descriptions per series.
- correlations: correlation matrices.
- missing: missing value diagrams.
- messages: direct special attention to these patterns in your data.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("df must be of type pandas.DataFrame")
if df.empty:
raise ValueError("df can not be empty")
disable_progress_bar = not config["progress_bar"].get(bool)
# Multiprocessing of Describe 1D for each column
pool_size = config["pool_size"].get(int)
if pool_size <= 0:
pool_size = multiprocessing.cpu_count()
args = [(column, series) for column, series in df.iteritems()]
series_description = {}
with tqdm(total=len(args), desc="variables", disable=disable_progress_bar) as pbar:
if pool_size == 1:
for arg in args:
column, description = multiprocess_1d(arg)
series_description[column] = description
pbar.update()
else:
# Store the original order
original_order = {
k: v for v, k in enumerate([column for column, _ in args])
}
# TODO: use `Pool` for Linux-based systems
with multiprocessing.pool.ThreadPool(pool_size) as executor:
for i, (column, description) in enumerate(
executor.imap_unordered(multiprocess_1d, args)
):
series_description[column] = description
pbar.update()
# Restore the original order
series_description = dict(
sorted(
series_description.items(),
key=lambda index: original_order.get(index[0]),
)
)
# Mapping from column name to variable type
sort = config["sort"].get(str)
series_description = sort_column_names(series_description, sort)
variables = {
column: description["type"]
for column, description in series_description.items()
}
# Transform the series_description in a DataFrame
variable_stats = pd.DataFrame(series_description)
# Get correlations
correlations = calculate_correlations(df, variables)
# Scatter matrix
scatter_matrix = get_scatter_matrix(df, variables)
# Table statistics
with tqdm(total=1, desc="table", disable=disable_progress_bar) as pbar:
table_stats = describe_table(df, variable_stats)
pbar.update(1)
# missing diagrams
missing = get_missing_diagrams(df, table_stats)
# Messages
with tqdm(total=3, desc="warnings", disable=disable_progress_bar) as pbar:
pbar.set_description_str("warnings [table]")
messages = check_table_messages(table_stats)
pbar.update()
pbar.set_description_str("warnings [variables]")
for col, description in series_description.items():
messages += check_variable_messages(col, description)
pbar.update()
pbar.set_description_str("warnings [correlations]")
messages += check_correlation_messages(correlations)
messages.sort(key=lambda message: str(message.message_type))
pbar.update()
with tqdm(total=1, desc="package", disable=disable_progress_bar) as pbar:
package = {
"pandas_profiling_version": __version__,
"pandas_profiling_config": config.dump(),
}
pbar.update()
return {
# Overall description
"table": table_stats,
# Per variable descriptions
"variables": series_description,
# Bivariate relations
"scatter": scatter_matrix,
# Correlation matrices
"correlations": correlations,
# Missing values
"missing": missing,
# Warnings
"messages": messages,
# Package
"package": package,
}
Functions
def describe(df)
-
Calculate the statistics for each series in this DataFrame.
Args
df
- DataFrame.
Returns
This
function
returns
a
dictionary
containing
:-
- table: overall statistics.
- variables: descriptions per series.
- correlations: correlation matrices.
- missing: missing value diagrams.
- messages: direct special attention to these patterns in your data.
Expand source code
def describe(df: pd.DataFrame) -> dict: """Calculate the statistics for each series in this DataFrame. Args: df: DataFrame. Returns: This function returns a dictionary containing: - table: overall statistics. - variables: descriptions per series. - correlations: correlation matrices. - missing: missing value diagrams. - messages: direct special attention to these patterns in your data. """ if not isinstance(df, pd.DataFrame): raise TypeError("df must be of type pandas.DataFrame") if df.empty: raise ValueError("df can not be empty") disable_progress_bar = not config["progress_bar"].get(bool) # Multiprocessing of Describe 1D for each column pool_size = config["pool_size"].get(int) if pool_size <= 0: pool_size = multiprocessing.cpu_count() args = [(column, series) for column, series in df.iteritems()] series_description = {} with tqdm(total=len(args), desc="variables", disable=disable_progress_bar) as pbar: if pool_size == 1: for arg in args: column, description = multiprocess_1d(arg) series_description[column] = description pbar.update() else: # Store the original order original_order = { k: v for v, k in enumerate([column for column, _ in args]) } # TODO: use `Pool` for Linux-based systems with multiprocessing.pool.ThreadPool(pool_size) as executor: for i, (column, description) in enumerate( executor.imap_unordered(multiprocess_1d, args) ): series_description[column] = description pbar.update() # Restore the original order series_description = dict( sorted( series_description.items(), key=lambda index: original_order.get(index[0]), ) ) # Mapping from column name to variable type sort = config["sort"].get(str) series_description = sort_column_names(series_description, sort) variables = { column: description["type"] for column, description in series_description.items() } # Transform the series_description in a DataFrame variable_stats = pd.DataFrame(series_description) # Get correlations correlations = calculate_correlations(df, variables) # Scatter matrix scatter_matrix = get_scatter_matrix(df, variables) # Table statistics with tqdm(total=1, desc="table", disable=disable_progress_bar) as pbar: table_stats = describe_table(df, variable_stats) pbar.update(1) # missing diagrams missing = get_missing_diagrams(df, table_stats) # Messages with tqdm(total=3, desc="warnings", disable=disable_progress_bar) as pbar: pbar.set_description_str("warnings [table]") messages = check_table_messages(table_stats) pbar.update() pbar.set_description_str("warnings [variables]") for col, description in series_description.items(): messages += check_variable_messages(col, description) pbar.update() pbar.set_description_str("warnings [correlations]") messages += check_correlation_messages(correlations) messages.sort(key=lambda message: str(message.message_type)) pbar.update() with tqdm(total=1, desc="package", disable=disable_progress_bar) as pbar: package = { "pandas_profiling_version": __version__, "pandas_profiling_config": config.dump(), } pbar.update() return { # Overall description "table": table_stats, # Per variable descriptions "variables": series_description, # Bivariate relations "scatter": scatter_matrix, # Correlation matrices "correlations": correlations, # Missing values "missing": missing, # Warnings "messages": messages, # Package "package": package, }
def describe_1d(series)
-
Describe a series (infer the variable type, then calculate type-specific values).
Args
series
- The Series to describe.
Returns
A Series containing calculated series description values.
Expand source code
def describe_1d(series: pd.Series) -> dict: """Describe a series (infer the variable type, then calculate type-specific values). Args: series: The Series to describe. Returns: A Series containing calculated series description values. """ # Replace infinite values with NaNs to avoid issues with histograms later. series.replace(to_replace=[np.inf, np.NINF, np.PINF], value=np.nan, inplace=True) # Infer variable types series_description = base.get_var_type(series) # Run type specific analysis if series_description["type"] == Variable.S_TYPE_UNSUPPORTED: series_description.update(describe_unsupported(series, series_description)) else: series_description.update(describe_supported(series, series_description)) type_to_func = { Variable.TYPE_BOOL: describe_boolean_1d, Variable.TYPE_NUM: describe_numeric_1d, Variable.TYPE_DATE: describe_date_1d, Variable.TYPE_CAT: describe_categorical_1d, Variable.TYPE_URL: describe_url_1d, Variable.TYPE_PATH: describe_path_1d, } if series_description["type"] in type_to_func: series_description.update( type_to_func[series_description["type"]](series, series_description) ) else: raise ValueError("Unexpected type") # Return the description obtained return series_description
def describe_boolean_1d(series, series_description)
-
Describe a boolean series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_boolean_1d(series: pd.Series, series_description: dict) -> dict: """Describe a boolean series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ value_counts = series_description["value_counts_without_nan"] stats = {"top": value_counts.index[0], "freq": value_counts.iloc[0]} return stats
def describe_categorical_1d(series, series_description)
-
Describe a categorical series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_categorical_1d(series: pd.Series, series_description: dict) -> dict: """Describe a categorical series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ # Make sure we deal with strings (Issue #100) series = series.astype(str) # Only run if at least 1 non-missing value value_counts = series_description["value_counts_without_nan"] stats = {"top": value_counts.index[0], "freq": value_counts.iloc[0]} chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float) if chi_squared_threshold > 0.0: stats["chi_squared"] = list(chisquare(value_counts.values)) check_composition = config["vars"]["cat"]["check_composition"].get(bool) if check_composition: stats["max_length"] = series.str.len().max() stats["mean_length"] = series.str.len().mean() stats["min_length"] = series.str.len().min() from visions.application.summaries.series.text_summary import text_summary stats.update(text_summary(series)) stats["length"] = series.str.len() stats["date_warning"] = warning_type_date(series) return stats
def describe_date_1d(series, series_description)
-
Describe a date series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_date_1d(series: pd.Series, series_description: dict) -> dict: """Describe a date series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ stats = {"min": series.min(), "max": series.max(), "histogram_data": series} bins = config["plot"]["histogram"]["bins"].get(int) # Bins should never be larger than the number of distinct values bins = min(series_description["distinct_count_with_nan"], bins) stats["histogram_bins"] = bins stats["range"] = stats["max"] - stats["min"] chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float) if chi_squared_threshold > 0.0: histogram = np.histogram( series[series.notna()].astype("int64").values, bins="auto" )[0] stats["chi_squared"] = chisquare(histogram) return stats
def describe_numeric_1d(series, series_description)
-
Describe a numeric series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Notes
When 'bins_type' is set to 'bayesian_blocks', astropy.stats.bayesian_blocks is used to determine the number of bins. Read the docs: https://docs.astropy.org/en/stable/visualization/histogram.html https://docs.astropy.org/en/stable/api/astropy.stats.bayesian_blocks.html
This method might print warnings, which we suppress. https://github.com/astropy/astropy/issues/4927
Expand source code
def describe_numeric_1d(series: pd.Series, series_description: dict) -> dict: """Describe a numeric series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. Notes: When 'bins_type' is set to 'bayesian_blocks', astropy.stats.bayesian_blocks is used to determine the number of bins. Read the docs: https://docs.astropy.org/en/stable/visualization/histogram.html https://docs.astropy.org/en/stable/api/astropy.stats.bayesian_blocks.html This method might print warnings, which we suppress. https://github.com/astropy/astropy/issues/4927 """ quantiles = config["vars"]["num"]["quantiles"].get(list) stats = { "mean": series.mean(), "std": series.std(), "variance": series.var(), "min": series.min(), "max": series.max(), "kurtosis": series.kurt(), "skewness": series.skew(), "sum": series.sum(), "mad": series.mad(), "n_zeros": (len(series) - np.count_nonzero(series)), "histogram_data": series, "scatter_data": series, # For complex } chi_squared_threshold = config["vars"]["num"]["chi_squared_threshold"].get(float) if chi_squared_threshold > 0.0: histogram = np.histogram(series[series.notna()].values, bins="auto")[0] stats["chi_squared"] = chisquare(histogram) stats["range"] = stats["max"] - stats["min"] stats.update( { "{:.0%}".format(percentile): value for percentile, value in series.quantile(quantiles).to_dict().items() } ) stats["iqr"] = stats["75%"] - stats["25%"] stats["cv"] = stats["std"] / stats["mean"] if stats["mean"] else np.NaN stats["p_zeros"] = float(stats["n_zeros"]) / len(series) bins = config["plot"]["histogram"]["bins"].get(int) # Bins should never be larger than the number of distinct values bins = min(series_description["distinct_count_with_nan"], bins) stats["histogram_bins"] = bins bayesian_blocks_bins = config["plot"]["histogram"]["bayesian_blocks_bins"].get(bool) if bayesian_blocks_bins: with warnings.catch_warnings(): warnings.simplefilter("ignore") ret = bayesian_blocks(stats["histogram_data"]) # Sanity check if not np.isnan(ret).any() and ret.size > 1: stats["histogram_bins_bayesian_blocks"] = ret return stats
def describe_path_1d(series, series_description)
-
Describe a path series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_path_1d(series: pd.Series, series_description: dict) -> dict: """Describe a path series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ series_description.update(describe_categorical_1d(series, series_description)) # Make sure we deal with strings (Issue #100) series = series[~series.isnull()].astype(str) series = series.map(Path) common_prefix = os.path.commonprefix(list(series)) if common_prefix == "": common_prefix = "No common prefix" stats = {"common_prefix": common_prefix} # Create separate columns for each path part keys = ["stem", "suffix", "name", "parent"] path_parts = dict( zip(keys, zip(*series.map(lambda x: [x.stem, x.suffix, x.name, x.parent]))) ) for name, part in path_parts.items(): stats["{}_counts".format(name.lower())] = pd.Series( part, name=name ).value_counts() # Only run if at least 1 non-missing value value_counts = series_description["value_counts_without_nan"] stats["top"] = value_counts.index[0] stats["freq"] = value_counts.iloc[0] return stats
def describe_supported(series, series_description)
-
Describe a supported series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_supported(series: pd.Series, series_description: dict) -> dict: """Describe a supported series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ # number of observations in the Series leng = len(series) # TODO: fix infinite logic # number of non-NaN observations in the Series count = series.count() # number of infinite observations in the Series n_infinite = count - series.count() distinct_count = series_description["distinct_count_without_nan"] stats = { "n": leng, "count": count, "distinct_count": distinct_count, "n_unique": distinct_count, "p_missing": 1 - count * 1.0 / leng, "n_missing": leng - count, "p_infinite": n_infinite * 1.0 / leng, "n_infinite": n_infinite, "is_unique": distinct_count == count, "mode": series.mode().iloc[0] if count > distinct_count > 1 else series[0], "p_unique": distinct_count * 1.0 / count, "memory_size": series.memory_usage(), } return stats
def describe_table(df, variable_stats)
-
General statistics for the DataFrame.
Args
df
- The DataFrame to describe.
variable_stats
- Previously calculated statistic on the DataFrame.
Returns
A dictionary that contains the table statistics.
Expand source code
def describe_table(df: pd.DataFrame, variable_stats: pd.DataFrame) -> dict: """General statistics for the DataFrame. Args: df: The DataFrame to describe. variable_stats: Previously calculated statistic on the DataFrame. Returns: A dictionary that contains the table statistics. """ n = len(df) memory_size = df.memory_usage(index=True, deep=True).sum() record_size = float(memory_size) / n table_stats = { "n": n, "n_var": len(df.columns), "memory_size": memory_size, "record_size": record_size, "n_cells_missing": variable_stats.loc["n_missing"].sum(), "n_vars_with_missing": sum((variable_stats.loc["n_missing"] > 0).astype(int)), "n_vars_all_missing": sum((variable_stats.loc["n_missing"] == n).astype(int)), } table_stats["p_cells_missing"] = table_stats["n_cells_missing"] / ( table_stats["n"] * table_stats["n_var"] ) supported_columns = variable_stats.transpose()[ variable_stats.transpose().type != Variable.S_TYPE_UNSUPPORTED ].index.tolist() table_stats["n_duplicates"] = ( sum(df.duplicated(subset=supported_columns)) if len(supported_columns) > 0 else 0 ) table_stats["p_duplicates"] = ( (table_stats["n_duplicates"] / len(df)) if (len(supported_columns) > 0 and len(df) > 0) else 0 ) # Variable type counts table_stats.update({k.value: 0 for k in Variable}) table_stats.update( { "types": dict( variable_stats.loc["type"].apply(lambda x: x.value).value_counts() ) } ) return table_stats
def describe_unsupported(series, series_description)
-
Describe an unsupported series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_unsupported(series: pd.Series, series_description: dict): """Describe an unsupported series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ # number of observations in the Series leng = len(series) # number of non-NaN observations in the Series count = series.count() # number of infinte observations in the Series n_infinite = count - series.count() results_data = { "n": leng, "count": count, "p_missing": 1 - count * 1.0 / leng, "n_missing": leng - count, "p_infinite": n_infinite * 1.0 / leng, "n_infinite": n_infinite, "memory_size": series.memory_usage(), } return results_data
def describe_url_1d(series, series_description)
-
Describe a url series.
Args
series
- The Series to describe.
series_description
- The dict containing the series description so far.
Returns
A dict containing calculated series description values.
Expand source code
def describe_url_1d(series: pd.Series, series_description: dict) -> dict: """Describe a url series. Args: series: The Series to describe. series_description: The dict containing the series description so far. Returns: A dict containing calculated series description values. """ # Make sure we deal with strings (Issue #100) series = series[~series.isnull()].astype(str) stats = {} # Create separate columns for each URL part keys = ["scheme", "netloc", "path", "query", "fragment"] url_parts = dict(zip(keys, zip(*series.map(urlsplit)))) for name, part in url_parts.items(): stats["{}_counts".format(name.lower())] = pd.Series( part, name=name ).value_counts() # Only run if at least 1 non-missing value value_counts = series_description["value_counts_without_nan"] stats["top"] = value_counts.index[0] stats["freq"] = value_counts.iloc[0] return stats
def get_missing_diagrams(df, table_stats)
-
Gets the rendered diagrams for missing values.
Args
table_stats
- The overall statistics for the DataFrame.
df
- The DataFrame on which to calculate the missing values.
Returns
A dictionary containing the base64 encoded plots for each diagram that is active in the config (matrix, bar, heatmap, dendrogram).
Expand source code
def get_missing_diagrams(df: pd.DataFrame, table_stats: dict) -> dict: """Gets the rendered diagrams for missing values. Args: table_stats: The overall statistics for the DataFrame. df: The DataFrame on which to calculate the missing values. Returns: A dictionary containing the base64 encoded plots for each diagram that is active in the config (matrix, bar, heatmap, dendrogram). """ disable_progress_bar = not config["progress_bar"].get(bool) def missing_diagram(name) -> Callable: return { "bar": missing_bar, "matrix": missing_matrix, "heatmap": missing_heatmap, "dendrogram": missing_dendrogram, }[name] missing_map = { "bar": {"min_missing": 0, "name": "Count"}, "matrix": {"min_missing": 0, "name": "Matrix"}, "heatmap": {"min_missing": 2, "name": "Heatmap"}, "dendrogram": {"min_missing": 1, "name": "Dendrogram"}, } missing_map = { name: settings for name, settings in missing_map.items() if config["missing_diagrams"][name].get(bool) and table_stats["n_vars_with_missing"] >= settings["min_missing"] } missing = {} if len(missing_map) > 0: with tqdm( total=len(missing_map), desc="missing", disable=disable_progress_bar ) as pbar: for name, settings in missing_map.items(): pbar.set_description_str("missing [{name}]".format(name=name)) try: if name != "heatmap" or ( table_stats["n_vars_with_missing"] - table_stats["n_vars_all_missing"] >= settings["min_missing"] ): missing[name] = { "name": settings["name"], "matrix": missing_diagram(name)(df), } except ValueError as e: warn_missing(name, e) pbar.update() return missing
def get_scatter_matrix(df, variables)
-
Expand source code
def get_scatter_matrix(df, variables): disable_progress_bar = not config["progress_bar"].get(bool) if config["interactions"]["continuous"].get(bool): continuous_variables = [ column for column, type in variables.items() if type == Variable.TYPE_NUM ] with tqdm( total=len(continuous_variables) ** 2, desc="interactions [continuous]", disable=disable_progress_bar, ) as pbar: scatter_matrix = { x: {y: "" for y in continuous_variables} for x in continuous_variables } for x in continuous_variables: for y in continuous_variables: scatter_matrix[x][y] = scatter_pairwise(df[x], df[y], x, y) pbar.update() else: scatter_matrix = {} return scatter_matrix
def multiprocess_1d(args)
-
Wrapper to process series in parallel.
Args
column
- The name of the column.
series
- The series values.
Returns
A tuple with column and the series description.
Expand source code
def multiprocess_1d(args) -> Tuple[str, dict]: """Wrapper to process series in parallel. Args: column: The name of the column. series: The series values. Returns: A tuple with column and the series description. """ column, series = args return column, describe_1d(series)
def sort_column_names(dct, sort)
-
Expand source code
def sort_column_names(dct: Mapping, sort: str): sort = sort.lower() if sys.version_info < (3, 6) and sort != "none": warnings.warn("Sorting is supported from Python 3.6+") else: if sort.startswith("asc"): dct = dict(sorted(dct.items(), key=lambda x: x[0].casefold())) elif sort.startswith("desc"): dct = dict(reversed(sorted(dct.items(), key=lambda x: x[0].casefold()))) elif sort != "none": raise ValueError('"sort" should be "ascending", "descending" or "None".') return dct
def warn_missing(missing_name, error)
-
Expand source code
def warn_missing(missing_name, error): warnings.warn( "There was an attempt to generate the {missing_name} missing values diagrams, but this failed.\n" "To hide this warning, disable the calculation\n" '(using `df.profile_report(missing_diagrams={{"{missing_name}": False}}`)\n' "If this is problematic for your use case, please report this as an issue:\n" "https://github.com/pandas-profiling/pandas-profiling/issues\n" "(include the error message: '{error}')".format( missing_name=missing_name, error=error ) )