# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import pickle
from os import path
import numpy as np
import spacy
from spacy.tokens import Doc
from spacy.tokens import Span
from nlp_architect.models.chunker import SequenceChunker
from nlp_architect.utils.generic import pad_sentences
from nlp_architect.utils.io import validate_existing_filepath
from nlp_architect.utils.text import extract_nps, Stopwords
[docs]class NPAnnotator(object):
"""
Spacy based NP annotator - uses models.SequenceChunker model for annotation
Args:
model (SequenceChunker): a chunker model
word_vocab (Vocabulary): word-id vocabulary of the model
char_vocab (Vocabulary): char id vocabulary of words of the model
chunk_vocab (Vocabulary): chunk tag vocabulary of the model
batch_size (int, optional): inference batch size
"""
def __init__(self, model, word_vocab, char_vocab, chunk_vocab, batch_size: int = 32):
self.model = model
self.bs = batch_size
self.word_vocab = word_vocab
self.char_vocab = char_vocab
self.chunk_vocab = chunk_vocab
Doc.set_extension('noun_phrases', default=[], force=True)
[docs] @classmethod
def load(cls, model_path: str, parameter_path: str, batch_size: int = 32,
use_cudnn: bool = False):
"""
Load a NPAnnotator annotator
Args:
model_path (str): path to trained model
parameter_path (str): path to model parameters
batch_size (int, optional): inference batch_size
use_cudnn (bool, optional): use gpu for inference (cudnn cells)
Returns:
NPAnnotator class with loaded model
"""
_model_path = path.join(path.dirname(path.realpath(__file__)), model_path)
validate_existing_filepath(_model_path)
_parameter_path = path.join(path.dirname(path.realpath(__file__)), parameter_path)
validate_existing_filepath(_parameter_path)
model = SequenceChunker(use_cudnn=use_cudnn)
model.load(_model_path)
with open(_parameter_path, 'rb') as fp:
model_params = pickle.load(fp)
word_vocab = model_params['word_vocab']
chunk_vocab = model_params['chunk_vocab']
char_vocab = model_params.get('char_vocab', None)
return cls(model, word_vocab, char_vocab, chunk_vocab, batch_size)
def _infer_chunks(self, input_vec, doc_lengths):
tagged_sents = self.model.predict(input_vec, batch_size=self.bs).argmax(2)
sentence = []
for c, l in zip(tagged_sents, doc_lengths):
sentence.append(c[-l:])
doc = np.concatenate(sentence)
chunk_tags = [self.chunk_vocab.id_to_word(w) for w in doc]
return extract_nps(chunk_tags)
def _feature_extractor(self, doc):
features = np.asarray([self.word_vocab[w] if self.word_vocab[w] is not None else 1
for w in doc])
if self.char_vocab:
sentence_chars = []
for w in doc:
word_chars = []
for c in w:
_cid = self.char_vocab[c]
word_chars.append(_cid if _cid is not None else 1)
sentence_chars.append(word_chars)
sentence_chars = pad_sentences(sentence_chars, self.model.max_word_len)
features = (features, sentence_chars)
return features
def __call__(self, doc: Doc) -> Doc:
"""
Annotate the document with noun phrase spans
"""
spans = []
doc_vecs = []
doc_chars = []
doc_lens = []
if len(doc) < 1:
return doc
for sentence in doc.sents:
features = self._feature_extractor([t.text for t in sentence])
if isinstance(features, tuple):
doc_vec = features[0]
doc_chars.append(features[1])
else:
doc_vec = features
doc_vecs.append(doc_vec)
doc_lens.append(len(doc_vec))
doc_vectors = pad_sentences(np.asarray(doc_vecs))
inputs = doc_vectors
if self.char_vocab:
max_len = doc_vectors.shape[1]
padded_chars = np.zeros((len(doc_chars), max_len, self.model.max_word_len))
for idx, d in enumerate(doc_chars):
d = d[:max_len]
padded_chars[idx, -d.shape[0]:] = d
inputs = [inputs, padded_chars]
np_indexes = self._infer_chunks(inputs, doc_lens)
for s, e in np_indexes:
np_span = Span(doc, s, e)
spans.append(np_span)
spans = _NPPostprocessor.process(spans)
set_noun_phrases(doc, spans)
return doc
[docs]def get_noun_phrases(doc: Doc) -> [Span]:
"""
Get noun phrase tags from a spacy annotated document.
Args:
doc (Doc): a spacy type document
Returns:
a list of noun phrase Span objects
"""
assert hasattr(doc._, 'noun_phrases'), 'no noun_phrase attributes in document'
return doc._.noun_phrases
[docs]def set_noun_phrases(doc: Doc, nps: [Span]) -> None:
"""
Set noun phrase tags
Args:
doc (Doc): a spacy type document
nps ([Span]): a list of Spans
"""
assert hasattr(doc._, 'noun_phrases'), 'no noun_phrase attributes in document'
doc._.set('noun_phrases', nps)
class _NPPostprocessor:
@classmethod
def process(cls, noun_phrases: [Span]) -> [Span]:
new_phrases = []
for phrase in noun_phrases:
p = _NPPostprocessor._phrase_process(phrase)
if p is not None and len(p) > 0:
new_phrases.append(p)
return new_phrases
@classmethod
def _phrase_process(cls, phrase: Span) -> Span:
last_phrase = None
while phrase != last_phrase:
last_phrase = phrase
for func_args in post_processing_rules:
pf = func_args[0]
args = func_args[1:]
if len(args) > 0:
phrase = pf(phrase, *args)
else:
phrase = pf(phrase)
if phrase is None:
break
return phrase
def _filter_repeating_nonalnum(phrase, length):
"""
Check if a given phrase has non repeating alphanumeric chars
of given length.
Example: 'phrase $$$' with length=3 will return False
"""
if len(phrase) > 0:
alnum_len = length
for t in phrase:
if not t.is_alpha:
alnum_len -= 1
else:
alnum_len = length
if alnum_len == 0:
return None
return phrase
def _filter_long_phrases(phrase, word_length, phrase_length):
if len(phrase) > 0 and max([len(t) for t in phrase]) > word_length \
and len(phrase) > phrase_length:
return None
return phrase
def _remove_non_alphanum_from_start(phrase):
if len(phrase) > 1 and not phrase[0].is_alpha:
phrase = phrase[1:]
return phrase
def _remove_non_alphanum_from_end(phrase):
if len(phrase) > 1 and not phrase[-1].is_alpha:
phrase = phrase[:-1]
return phrase
def _remove_stop_words(phrase):
while len(phrase) > 0 and (phrase[0].is_stop
or str(phrase[0]).strip().lower() in Stopwords.get_words()):
phrase = phrase[1:]
while len(phrase) > 0 and (phrase[-1].is_stop
or str(phrase[-1]).strip().lower() in Stopwords.get_words()):
phrase = phrase[:-1]
return phrase
def _remove_char_at_start(phrase):
chars = ['@', '-', '=', '.', ':', '+', '?', 'nt', '\"', '\'', '\'S', '\'s', ',']
if phrase and len(phrase) > 0:
while len(phrase) > 0 and phrase[0].text in chars:
phrase = phrase[1:]
return phrase
def _remove_char_at_end(phrase):
chars = [',', '(', ')', ' ', '-']
if phrase:
while len(phrase) > 0 and phrase[-1].text in chars:
phrase = phrase[:-1]
return phrase
def _remove_pos_from_start(phrase):
tag_list = ['WDT', 'PRP$', ':']
pos_list = ['PUNCT', 'INTJ', 'NUM', 'PART', 'ADV', 'DET', 'PRON', 'VERB']
if phrase:
while len(phrase) > 0 and (phrase[0].pos_ in pos_list or phrase[0].tag_ in tag_list):
phrase = phrase[1:]
return phrase
def _remove_pos_from_end(phrase):
tag_list = ['WDT', ':']
pos_list = ['DET', 'PUNCT', 'CONJ']
if phrase:
while len(phrase) > 0 and (phrase[-1].pos_ in pos_list or phrase[-1].tag_ in tag_list):
phrase = phrase[:-1]
return phrase
def _filter_single_pos(phrase):
pos_list = ['VERB', 'ADJ', 'ADV']
if phrase and len(phrase) == 1 and phrase[0].pos_ in pos_list:
return None
return phrase
def _filter_fp_nums(phrase):
if len(phrase) > 0:
try:
# check for float number
float(phrase.text.replace(',', ''))
return None
except ValueError:
return phrase
return phrase
def _filter_single_char(phrase):
if phrase and len(phrase) == 1 and len(phrase[0]) == 1:
return None
return phrase
def _filter_empty(phrase):
if phrase is None or len(phrase) == 0 or len(phrase.text) == 0 \
or len(str(phrase.text).strip()) == 0:
return None
return phrase
post_processing_rules = [
(_filter_single_char,),
(_filter_single_pos,),
(_remove_pos_from_start,),
(_remove_pos_from_end,),
(_remove_stop_words,),
(_remove_non_alphanum_from_start,),
(_remove_non_alphanum_from_end,),
(_filter_repeating_nonalnum, 5),
(_filter_long_phrases, 5, 75),
(_remove_char_at_start,),
(_remove_char_at_end,),
(_filter_fp_nums,),
(_filter_empty,),
]
[docs]class SpacyNPAnnotator(object):
"""
Simple Spacy pipe with NP extraction annotations
"""
def __init__(self, model_path, settings_path, spacy_model='en', batch_size=32,
use_cudnn=False):
_model_path = path.join(path.dirname(path.realpath(__file__)), model_path)
validate_existing_filepath(_model_path)
_settings_path = path.join(path.dirname(path.realpath(__file__)), settings_path)
validate_existing_filepath(_settings_path)
nlp = spacy.load(spacy_model)
for p in nlp.pipe_names:
if p not in ['tagger']:
nlp.remove_pipe(p)
nlp.add_pipe(nlp.create_pipe('sentencizer'), first=True)
nlp.add_pipe(NPAnnotator.load(_model_path, settings_path, batch_size=batch_size,
use_cudnn=use_cudnn), last=True)
self.nlp = nlp
def __call__(self, text: str) -> [str]:
"""
Parse a given text and return a list of noun phrases found
Args:
text (str): a text string
Returns:
list of noun phrases as strings
"""
return [np.text for np in get_noun_phrases(self.nlp(text))]