# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
# pylint: disable=no-name-in-module
import itertools
import math
import numpy as np
from spacy.tokens.doc import Doc
from spacy.tokens.span import Span
from spacy.tokens.token import Token
from wordfreq import zipf_frequency
[docs]class TextSpanScoring:
"""
Text spans scoring class.
Contains misc scoring algorithms for scoring text fragments extracted
from a corpus.
Arguments:
documents(list): List of spaCy documents.
spans(list[list]): List of spaCy spans representing noun phrases of documents
document.
"""
def __init__(self, documents, spans, min_tf=1):
assert len(documents) == len(spans)
self._documents = documents
self._doc_text_spans = spans
self.index = CorpusIndex(documents, spans)
assert min_tf > 0, 'min_tf must be > 0'
if min_tf > 1:
self.re_index_min_tf(min_tf)
[docs] def re_index_min_tf(self, tf):
filtered_doc_text_spans = []
for d in self.doc_text_spans:
filtered_doc_phrases = [p for p in d if self.index.tf(p) >= tf]
filtered_doc_text_spans.append(filtered_doc_phrases)
self._doc_text_spans = filtered_doc_text_spans
self.index = CorpusIndex(self.documents, self.doc_text_spans)
@property
def documents(self):
return self._documents
@property
def doc_text_spans(self):
return self._doc_text_spans
[docs] def get_tfidf_scores(self, group_similar_spans=True):
"""
Get TF-IDF scores of spans
return a list of spans sorted by desc order of importance
span score = TF (global) * (1 + log_n(DF/N))
"""
phrases_and_scores = {}
num_of_docs = len(self.documents)
for _, noun_phrases in zip(self.documents, self.doc_text_spans):
for p in noun_phrases:
if p not in phrases_and_scores:
tf = self.index.tf(p)
df = self.index.df(p)
phrases_and_scores[p] = \
(tf + 1) * math.log(1 + num_of_docs / df)
if len(phrases_and_scores) > 0:
return self._maybe_group_and_sort(group_similar_spans,
phrases_and_scores)
return []
[docs] def get_freq_scores(self, group_similar_spans=True):
phrases_and_scores = {}
for _, noun_phrases in zip(self.documents, self.doc_text_spans):
for p in noun_phrases:
if p not in phrases_and_scores:
phrases_and_scores[p] = zipf_frequency(p.text, 'en')
return self._maybe_group_and_sort(group_similar_spans,
phrases_and_scores)
[docs] def group_spans(self, phrases):
pid_phrase_scores = [{'k': self.index.get_pid(p),
'v': (p, s)}
for p, s in phrases.items()]
phrase_groups = []
for _, group in itertools.groupby(
sorted(pid_phrase_scores, key=lambda x: x['k']),
lambda x: x['k']):
_group = list(group)
phrases = {g['v'][0].text for g in _group}
score = _group[0]['v'][1]
phrase_groups.append((sorted(phrases), score))
return phrase_groups
def _maybe_group_and_sort(self, is_group, phrases_dict):
phrase_groups = phrases_dict.items()
if is_group:
phrase_groups = self.group_spans(phrases_dict)
return sorted(phrase_groups, key=lambda x: x[1], reverse=True)
[docs] @staticmethod
def normalize_minmax(phrases_list, invert=False):
_, scores = list(zip(*phrases_list))
max_score = max(scores)
min_score = min(scores)
norm_list = []
for p, s in phrases_list:
if max_score - min_score > 0:
new_score = (s - min_score) / (max_score - min_score)
else:
new_score = 0
norm_list.append([p, new_score])
if invert:
for e in norm_list:
e[1] = 1.0 - e[1]
return norm_list
[docs] @staticmethod
def normalize_l2(phrases_list):
phrases, scores = list(zip(*phrases_list))
scores = scores / np.linalg.norm(scores)
return list(zip(phrases, scores.tolist()))
[docs] @staticmethod
def interpolate_scores(phrase_lists, weights=None):
if weights is None:
weights = [1.0 / len(phrase_lists)]
else:
assert len(weights) == len(phrase_lists)
list_sizes = [len(l) for l in phrase_lists]
for l in list_sizes:
assert len(phrase_lists[0]) == l, 'list sizes not equal'
phrase_list_dicts = []
for lst in phrase_lists:
phrase_list_dicts.append({tuple(k): v for k, v in lst})
phrases = phrase_list_dicts[0].keys()
interp_scores = {}
for p in phrases:
interp_scores[p] = 0.0
for ref, w in zip(phrase_list_dicts, weights):
interp_scores[p] += ref[p] * w
return sorted(interp_scores.items(), key=lambda x: x[1], reverse=True)
[docs] def get_cvalue_scores(self, group_similar_spans=True):
phrases_text = [p for ps in self.index.pid_to_spans.values() for p in ps]
phrase_scores = {}
for phrase in phrases_text:
sub_phrase_pids = None
for w in phrase:
sub_pids = self.index.get_subphrases_of_word(w)
if sub_phrase_pids is None:
sub_phrase_pids = sub_pids
else:
sub_phrase_pids = sub_phrase_pids.intersection(sub_pids)
sub_phrase_pids.discard(self.index.get_pid(phrase))
if len(sub_phrase_pids) > 0:
score = math.log2(1 + len(phrase)) * (
self.index.tf(phrase) - 1.0 / len(sub_phrase_pids) * sum(
[self.index.tf(list(self.index.get_phrase(p))[0])
for p in sub_phrase_pids]))
else:
score = math.log2(1 + len(phrase)) * self.index.tf(phrase)
phrase_scores[phrase] = score
for phrase in phrase_scores:
score = [phrase_scores[p] for p in
self.index.get_phrase(self.index.get_pid(phrase))]
phrase_scores[phrase] = sum(score) / len(score)
return self._maybe_group_and_sort(group_similar_spans,
phrase_scores)
[docs] @staticmethod
def multiply_scores(phrase_lists):
phrase_list_dicts = []
for lst in phrase_lists:
phrase_list_dicts.append({tuple(k): v for k, v in lst})
phrases = phrase_list_dicts[0].keys()
interp_scores = {}
for p in phrases:
interp_scores[p] = 1.0
for ref in phrase_list_dicts:
interp_scores[p] *= ref[p]
return sorted(interp_scores.items(), key=lambda x: x[1], reverse=True)
[docs]class CorpusIndex:
"""
Text span index class.
Holds TF and DF values per span. Text spans are normalized and similar
spans are mapped to the same TF DF values.
"""
def __init__(self, documents: list, spans: list):
self.df_index = {} # span to DF
self.tf_index = {} # span, doc to TF
self.pid_to_spans = {} # id to spans (normalized types)
self.word_to_phrase_index = {}
self.documents = set()
for d, phrases in zip(documents, spans):
doc_id = CorpusIndex.get_docid(d)
self.documents.add(doc_id)
for phrase in phrases:
pid = CorpusIndex.get_pid(phrase)
# add to norm phrases dict
if pid not in self.pid_to_spans:
self.pid_to_spans[pid] = set()
self.pid_to_spans[pid].add(phrase)
# add to df index
if self.df_index.get(pid, None) is None:
self.df_index[pid] = {doc_id}
else:
self.df_index[pid].add(doc_id)
# add to tf index
if self.tf_index.get(pid, None) is None:
self.tf_index[pid] = 1
else:
self.tf_index[pid] += 1
for word in phrase:
wid = self.get_wid(word)
if wid not in self.word_to_phrase_index:
self.word_to_phrase_index[wid] = set()
self.word_to_phrase_index[wid].add(pid)
[docs] def get_phrase(self, pid):
return self.pid_to_spans.get(pid)
[docs] def get_subphrases_of_word(self, w):
wid = self.get_wid(w)
return self.word_to_phrase_index.get(wid)
[docs] @staticmethod
def get_wid(w: Token):
"""
get word id
"""
return CorpusIndex.hash_func(w.text)
[docs] @staticmethod
def get_pid(p: Span):
"""
get phrase id
"""
return CorpusIndex.hash_func(p.lemma_)
[docs] @staticmethod
def get_docid(d: Doc):
"""
get doc id
"""
return CorpusIndex.hash_func(d)
[docs] @staticmethod
def hash_func(x):
return hash(x)
[docs] def tf(self, phrase):
"""
Get TF of phrase in doc
"""
pid = CorpusIndex.get_pid(phrase)
if self.tf_index.get(pid, None) is not None:
return self.tf_index.get(pid)
return 0
[docs] def df(self, phrase):
"""
Get DF of phrase in corpus
"""
pid = CorpusIndex.get_pid(phrase)
if self.df_index.get(pid, None) is not None:
return len(self.df_index[pid])
return 0