Module hashformers.beamsearch.data_structures

Expand source code
import pandas as pd
from dataclasses import dataclass
import json 
import numpy as np 

@dataclass
class Node:
    hypothesis: str
    characters: str
    score: float

@dataclass
class ProbabilityDictionary(object):
    dictionary: dict

    def get_segmentations(
        self,
        astype='dict',
        gold_array=None
    ):
        top_1 = self.get_top_k(k=1)
        if gold_array and astype == 'list':
            gold_df = pd.DataFrame([{
                "gold": x,
                "characters": x.replace(" ", "")
            } for x in gold_array])
            seg_df = pd.DataFrame([{
                "segmentation": x,
                "characters": x.replace(" ", "") 
            } for x in top_1])
            output_df = pd.merge(
                gold_df,
                seg_df,
                how='left',
                on='characters'
            )
            output_series = output_df['segmentation'].values.tolist()
            output_series = [
                str(x) for x in output_series
            ]
            return output_series
        if astype == 'dict':
            return { k.replace(" ", ""):k for k,v in top_1.items() }
        elif astype == 'list':
            return list(top_1.keys())

    def get_top_k(
        self,
        k=2,
        characters_field='characters',
        segmentation_field='segmentation',
        score_field='score',
        return_dataframe=False,
        fill=False
    ):
        df = self.to_dataframe(
            characters_field=characters_field,
            segmentation_field=segmentation_field,
            score_field=score_field
        )
        df = df\
        .sort_values(by=score_field, ascending=True)\
        .groupby(characters_field)\
        .head(k)
        if fill == False and return_dataframe == True:
            return df
        elif fill == True and return_dataframe == True:
            df['group_length'] = df.groupby(characters_field)[segmentation_field].transform(len)
            df['group_length'] = df['group_length'] * -1 + k + 1
            len_array = df['group_length'].values            
            df = df.drop(columns=['group_length'])
            records = np.array(df.to_dict('records'))
            cloned_records = list(np.repeat(records, len_array))
            df = pd.DataFrame(cloned_records)
            return df
        elif fill == False and return_dataframe == False:
            keys = df[segmentation_field].values
            values = df[score_field].values
            output = {
                k:v for k,v in list(zip(keys, values))
            }
            return output
        elif fill == True and return_dataframe == False:
            raise NotImplementedError

    def to_dataframe(
            self,
            characters_field='characters',
            segmentation_field='segmentation',
            score_field='score'):
        df = [
            {
                characters_field: key.replace(" ", ""),
                segmentation_field: key,
                score_field: value
            } for key, value \
                in self.dictionary.items()
        ]
        df = pd.DataFrame(df)
        df = df.sort_values(
            by=[
                characters_field, 
                score_field
            ]
        )
        return df
    
    def to_csv(
        self,
        filename,
        characters_field='characters',
        segmentation_field='segmentation',
        score_field='score'
    ):
        df = self.to_dataframe(
            characters_field=characters_field,
            segmentation_field=segmentation_field,
            score_field=score_field
        )
        df.to_csv(filename)


    def to_json(
        self,
        filepath
    ):
        with open(filepath, 'w') as f:
            json.dump(self.dictionary, f)

def enforce_prob_dict(
    dictionary,
    score_field="score",
    segmentation_field="segmentation"):
    if isinstance(dictionary, ProbabilityDictionary):
        return dictionary
    elif isinstance(dictionary, dict):
        return ProbabilityDictionary(dictionary)
    elif isinstance(dictionary, list) \
        and all(isinstance(x, str) for x in dictionary):
        dct = {
            k:0.0 for k in list(set(dictionary))
        }
        return ProbabilityDictionary(dct)
    elif isinstance(dictionary, pd.DataFrame):
        df = dictionary
        df_scores = df[score_field].values.tolist()
        df_segs = df[segmentation_field].values.tolist()
        dct = {
            k:v for k,v in list(zip(df_segs, df_scores))
        }
        return ProbabilityDictionary(dct)
    else:
        raise NotImplementedError

Functions

def enforce_prob_dict(dictionary, score_field='score', segmentation_field='segmentation')
Expand source code
def enforce_prob_dict(
    dictionary,
    score_field="score",
    segmentation_field="segmentation"):
    if isinstance(dictionary, ProbabilityDictionary):
        return dictionary
    elif isinstance(dictionary, dict):
        return ProbabilityDictionary(dictionary)
    elif isinstance(dictionary, list) \
        and all(isinstance(x, str) for x in dictionary):
        dct = {
            k:0.0 for k in list(set(dictionary))
        }
        return ProbabilityDictionary(dct)
    elif isinstance(dictionary, pd.DataFrame):
        df = dictionary
        df_scores = df[score_field].values.tolist()
        df_segs = df[segmentation_field].values.tolist()
        dct = {
            k:v for k,v in list(zip(df_segs, df_scores))
        }
        return ProbabilityDictionary(dct)
    else:
        raise NotImplementedError

Classes

class Node (hypothesis: str, characters: str, score: float)

Node(hypothesis: str, characters: str, score: float)

Expand source code
class Node:
    hypothesis: str
    characters: str
    score: float

Class variables

var characters : str
var hypothesis : str
var score : float
class ProbabilityDictionary (dictionary: dict)

ProbabilityDictionary(dictionary: dict)

Expand source code
class ProbabilityDictionary(object):
    dictionary: dict

    def get_segmentations(
        self,
        astype='dict',
        gold_array=None
    ):
        top_1 = self.get_top_k(k=1)
        if gold_array and astype == 'list':
            gold_df = pd.DataFrame([{
                "gold": x,
                "characters": x.replace(" ", "")
            } for x in gold_array])
            seg_df = pd.DataFrame([{
                "segmentation": x,
                "characters": x.replace(" ", "") 
            } for x in top_1])
            output_df = pd.merge(
                gold_df,
                seg_df,
                how='left',
                on='characters'
            )
            output_series = output_df['segmentation'].values.tolist()
            output_series = [
                str(x) for x in output_series
            ]
            return output_series
        if astype == 'dict':
            return { k.replace(" ", ""):k for k,v in top_1.items() }
        elif astype == 'list':
            return list(top_1.keys())

    def get_top_k(
        self,
        k=2,
        characters_field='characters',
        segmentation_field='segmentation',
        score_field='score',
        return_dataframe=False,
        fill=False
    ):
        df = self.to_dataframe(
            characters_field=characters_field,
            segmentation_field=segmentation_field,
            score_field=score_field
        )
        df = df\
        .sort_values(by=score_field, ascending=True)\
        .groupby(characters_field)\
        .head(k)
        if fill == False and return_dataframe == True:
            return df
        elif fill == True and return_dataframe == True:
            df['group_length'] = df.groupby(characters_field)[segmentation_field].transform(len)
            df['group_length'] = df['group_length'] * -1 + k + 1
            len_array = df['group_length'].values            
            df = df.drop(columns=['group_length'])
            records = np.array(df.to_dict('records'))
            cloned_records = list(np.repeat(records, len_array))
            df = pd.DataFrame(cloned_records)
            return df
        elif fill == False and return_dataframe == False:
            keys = df[segmentation_field].values
            values = df[score_field].values
            output = {
                k:v for k,v in list(zip(keys, values))
            }
            return output
        elif fill == True and return_dataframe == False:
            raise NotImplementedError

    def to_dataframe(
            self,
            characters_field='characters',
            segmentation_field='segmentation',
            score_field='score'):
        df = [
            {
                characters_field: key.replace(" ", ""),
                segmentation_field: key,
                score_field: value
            } for key, value \
                in self.dictionary.items()
        ]
        df = pd.DataFrame(df)
        df = df.sort_values(
            by=[
                characters_field, 
                score_field
            ]
        )
        return df
    
    def to_csv(
        self,
        filename,
        characters_field='characters',
        segmentation_field='segmentation',
        score_field='score'
    ):
        df = self.to_dataframe(
            characters_field=characters_field,
            segmentation_field=segmentation_field,
            score_field=score_field
        )
        df.to_csv(filename)


    def to_json(
        self,
        filepath
    ):
        with open(filepath, 'w') as f:
            json.dump(self.dictionary, f)

Class variables

var dictionary : dict

Methods

def get_segmentations(self, astype='dict', gold_array=None)
Expand source code
def get_segmentations(
    self,
    astype='dict',
    gold_array=None
):
    top_1 = self.get_top_k(k=1)
    if gold_array and astype == 'list':
        gold_df = pd.DataFrame([{
            "gold": x,
            "characters": x.replace(" ", "")
        } for x in gold_array])
        seg_df = pd.DataFrame([{
            "segmentation": x,
            "characters": x.replace(" ", "") 
        } for x in top_1])
        output_df = pd.merge(
            gold_df,
            seg_df,
            how='left',
            on='characters'
        )
        output_series = output_df['segmentation'].values.tolist()
        output_series = [
            str(x) for x in output_series
        ]
        return output_series
    if astype == 'dict':
        return { k.replace(" ", ""):k for k,v in top_1.items() }
    elif astype == 'list':
        return list(top_1.keys())
def get_top_k(self, k=2, characters_field='characters', segmentation_field='segmentation', score_field='score', return_dataframe=False, fill=False)
Expand source code
def get_top_k(
    self,
    k=2,
    characters_field='characters',
    segmentation_field='segmentation',
    score_field='score',
    return_dataframe=False,
    fill=False
):
    df = self.to_dataframe(
        characters_field=characters_field,
        segmentation_field=segmentation_field,
        score_field=score_field
    )
    df = df\
    .sort_values(by=score_field, ascending=True)\
    .groupby(characters_field)\
    .head(k)
    if fill == False and return_dataframe == True:
        return df
    elif fill == True and return_dataframe == True:
        df['group_length'] = df.groupby(characters_field)[segmentation_field].transform(len)
        df['group_length'] = df['group_length'] * -1 + k + 1
        len_array = df['group_length'].values            
        df = df.drop(columns=['group_length'])
        records = np.array(df.to_dict('records'))
        cloned_records = list(np.repeat(records, len_array))
        df = pd.DataFrame(cloned_records)
        return df
    elif fill == False and return_dataframe == False:
        keys = df[segmentation_field].values
        values = df[score_field].values
        output = {
            k:v for k,v in list(zip(keys, values))
        }
        return output
    elif fill == True and return_dataframe == False:
        raise NotImplementedError
def to_csv(self, filename, characters_field='characters', segmentation_field='segmentation', score_field='score')
Expand source code
def to_csv(
    self,
    filename,
    characters_field='characters',
    segmentation_field='segmentation',
    score_field='score'
):
    df = self.to_dataframe(
        characters_field=characters_field,
        segmentation_field=segmentation_field,
        score_field=score_field
    )
    df.to_csv(filename)
def to_dataframe(self, characters_field='characters', segmentation_field='segmentation', score_field='score')
Expand source code
def to_dataframe(
        self,
        characters_field='characters',
        segmentation_field='segmentation',
        score_field='score'):
    df = [
        {
            characters_field: key.replace(" ", ""),
            segmentation_field: key,
            score_field: value
        } for key, value \
            in self.dictionary.items()
    ]
    df = pd.DataFrame(df)
    df = df.sort_values(
        by=[
            characters_field, 
            score_field
        ]
    )
    return df
def to_json(self, filepath)
Expand source code
def to_json(
    self,
    filepath
):
    with open(filepath, 'w') as f:
        json.dump(self.dictionary, f)