Module ktrain.text.ner.predictor

Expand source code
from ...imports import *
from ...predictor import Predictor
from .preprocessor import NERPreprocessor
from ... import utils as U
from .. import textutils as TU

class NERPredictor(Predictor):
    """
    predicts  classes for string-representation of sentence
    """

    def __init__(self, model, preproc, batch_size=U.DEFAULT_BS):

        if not isinstance(model, Model):
            raise ValueError('model must be of instance Model')
        if not isinstance(preproc, NERPreprocessor):
        #if type(preproc).__name__ != 'NERPreprocessor':
            raise ValueError('preproc must be a NERPreprocessor object')
        self.model = model
        self.preproc = preproc
        self.c = self.preproc.get_classes()
        self.batch_size = batch_size 


    def get_classes(self):
        return self.c


    def predict(self, sentences, return_proba=False, merge_tokens=False, custom_tokenizer=None, return_offsets=False):
        """
        Makes predictions for a string-representation of a sentence
        Args:
          sentences(list|str): either a single sentence as a string or a list of sentences
          return_proba(bool): If return_proba is True, returns probability distribution for each token
          merge_tokens(bool):  If True, tokens will be merged together by the entity
                               to which they are associated:
                               ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
          custom_tokenizer(Callable): If specified, sentence will be tokenized based on custom tokenizer
          return_offsets(bool): If True, will return the chracter offsets in the results [experimental]

        Returns:
          list: If sentences is a string representation of single sentence:
                     list containing a tuple for each token in sentence
                IF sentences is a list of sentences:
                     list  of lists:  Each inner list represents a sentence and contains a tuple for each token in sentence
                If return_proba and return_offsets are both True, then tuples are of the form:  (token, label, probability, character offsets)
        """
        is_array = not isinstance(sentences, str)
        if not isinstance(sentences, (str, list)):
            raise ValueError('Param sentence must be either string-representation of a sentence or a list of sentence strings.')
        if return_proba and merge_tokens:
            raise ValueError('return_proba and merge_tokens are mutually exclusive with one another.')
        if isinstance(sentences, str): sentences = [sentences]
        lang = TU.detect_lang(sentences)

        # batchify
        num_chunks = math.ceil(len(sentences)/self.batch_size)
        batches = U.list2chunks(sentences, n=num_chunks)

        # process batches
        results = []
        for batch in batches:
            nerseq = self.preproc.preprocess(batch, lang=lang, custom_tokenizer=custom_tokenizer)
            if not nerseq.prepare_called:
                nerseq.prepare()
            nerseq.batch_size = len(batch)
            x_true, _ = nerseq[0]
            lengths = nerseq.get_lengths(0)
            y_pred = self.model.predict_on_batch(x_true)
            y_labels = self.preproc.p.inverse_transform(y_pred, lengths)
            # TODO: clean this up
            if return_proba:
                try:
                    probs = np.max(y_pred, axis=2)
                except:
                    probs = y_pred[0].numpy().tolist() # TODO: remove after confirmation (#316)
                for i, (x, y, prob) in enumerate(zip(nerseq.x, y_labels, probs)):
                    if return_offsets:
                        offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                        result = [(x[i], y[i], prob[i], (offsets[i]['start'], offsets[i]['end'])) for i in range(len(x))]
                    else:
                        result = [(x[i], y[i], prob[i]) for i in range(len(x))]
                    results.append(result)
            else:
                for i, (x,y) in enumerate(zip(nerseq.x, y_labels)):
                    if return_offsets:
                        offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                        result =  list(zip(x,y, [(o['start'], o['end']) for o in offsets]))
                    else:
                        result =  list(zip(x,y))
                    if merge_tokens:
                        result = self.merge_tokens(result, lang)
                    results.append(result)
        if not is_array: results = results[0]
        return results



    def merge_tokens(self, annotated_sentence, lang):

        if TU.is_chinese(lang, strict=False): # strict=False: workaround for langdetect bug on short chinese texts
            sep = ''
        else:
            sep = ' '

        current_token = ""
        current_tag = ""
        entities = []
        start = None
        last_end = None

        for tup in annotated_sentence:
            token = tup[0]
            entity = tup[1]
            offsets = tup[2] if len(tup)>2 else None
            tag = entity.split('-')[1] if '-' in entity else None
            prefix = entity.split('-')[0] if '-' in entity else None
            # not within entity
            if tag is None and not current_token:
                continue
            # beginning of entity
            #elif tag and prefix=='B':
            elif tag and (prefix=='B' or prefix=='I' and not current_token):
                if current_token: # consecutive entities
                    entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
                    current_token = ""
                    current_tag = None
                    start, end = None, None
                current_token = token
                current_tag = tag
                start = offsets[0] if offsets else None
                last_end = offsets[1] if offsets else None
            # end of entity
            elif tag is None and current_token:
                entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
                current_token = ""
                current_tag = None
                continue
            # within entity
            elif tag and current_token:  #  prefix I
                current_token = current_token + sep + token
                current_tag = tag
                last_end = offsets[1] if offsets else None
        if current_token and current_tag:
            entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end)) 
        return entities

    def _build_merge_tuple(self, current_token, current_tag, start=None, end=None):
        entry = [current_token, current_tag]
        if start is not None and end is not None: entry.append((start, end))
        return tuple(entry)

    def _save_preproc(self, fpath):

        # ensure transformers embedding model is saved in a subdirectory
        p = self.preproc.p
        hf_dir = os.path.join(fpath, 'hf')
        if p.te is not None:
            os.makedirs(hf_dir, exist_ok=True)
            p.te.model.save_pretrained(hf_dir)
            p.te.tokenizer.save_pretrained(hf_dir)
            p.te.config.save_pretrained(hf_dir)
            p.te_model = hf_dir

        # save preproc
        with open(os.path.join(fpath, U.PREPROC_NAME), 'wb') as f:
            pickle.dump(self.preproc, f)
        return

Classes

class NERPredictor (model, preproc, batch_size=32)

predicts classes for string-representation of sentence

Expand source code
class NERPredictor(Predictor):
    """
    predicts  classes for string-representation of sentence
    """

    def __init__(self, model, preproc, batch_size=U.DEFAULT_BS):

        if not isinstance(model, Model):
            raise ValueError('model must be of instance Model')
        if not isinstance(preproc, NERPreprocessor):
        #if type(preproc).__name__ != 'NERPreprocessor':
            raise ValueError('preproc must be a NERPreprocessor object')
        self.model = model
        self.preproc = preproc
        self.c = self.preproc.get_classes()
        self.batch_size = batch_size 


    def get_classes(self):
        return self.c


    def predict(self, sentences, return_proba=False, merge_tokens=False, custom_tokenizer=None, return_offsets=False):
        """
        Makes predictions for a string-representation of a sentence
        Args:
          sentences(list|str): either a single sentence as a string or a list of sentences
          return_proba(bool): If return_proba is True, returns probability distribution for each token
          merge_tokens(bool):  If True, tokens will be merged together by the entity
                               to which they are associated:
                               ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
          custom_tokenizer(Callable): If specified, sentence will be tokenized based on custom tokenizer
          return_offsets(bool): If True, will return the chracter offsets in the results [experimental]

        Returns:
          list: If sentences is a string representation of single sentence:
                     list containing a tuple for each token in sentence
                IF sentences is a list of sentences:
                     list  of lists:  Each inner list represents a sentence and contains a tuple for each token in sentence
                If return_proba and return_offsets are both True, then tuples are of the form:  (token, label, probability, character offsets)
        """
        is_array = not isinstance(sentences, str)
        if not isinstance(sentences, (str, list)):
            raise ValueError('Param sentence must be either string-representation of a sentence or a list of sentence strings.')
        if return_proba and merge_tokens:
            raise ValueError('return_proba and merge_tokens are mutually exclusive with one another.')
        if isinstance(sentences, str): sentences = [sentences]
        lang = TU.detect_lang(sentences)

        # batchify
        num_chunks = math.ceil(len(sentences)/self.batch_size)
        batches = U.list2chunks(sentences, n=num_chunks)

        # process batches
        results = []
        for batch in batches:
            nerseq = self.preproc.preprocess(batch, lang=lang, custom_tokenizer=custom_tokenizer)
            if not nerseq.prepare_called:
                nerseq.prepare()
            nerseq.batch_size = len(batch)
            x_true, _ = nerseq[0]
            lengths = nerseq.get_lengths(0)
            y_pred = self.model.predict_on_batch(x_true)
            y_labels = self.preproc.p.inverse_transform(y_pred, lengths)
            # TODO: clean this up
            if return_proba:
                try:
                    probs = np.max(y_pred, axis=2)
                except:
                    probs = y_pred[0].numpy().tolist() # TODO: remove after confirmation (#316)
                for i, (x, y, prob) in enumerate(zip(nerseq.x, y_labels, probs)):
                    if return_offsets:
                        offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                        result = [(x[i], y[i], prob[i], (offsets[i]['start'], offsets[i]['end'])) for i in range(len(x))]
                    else:
                        result = [(x[i], y[i], prob[i]) for i in range(len(x))]
                    results.append(result)
            else:
                for i, (x,y) in enumerate(zip(nerseq.x, y_labels)):
                    if return_offsets:
                        offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                        result =  list(zip(x,y, [(o['start'], o['end']) for o in offsets]))
                    else:
                        result =  list(zip(x,y))
                    if merge_tokens:
                        result = self.merge_tokens(result, lang)
                    results.append(result)
        if not is_array: results = results[0]
        return results



    def merge_tokens(self, annotated_sentence, lang):

        if TU.is_chinese(lang, strict=False): # strict=False: workaround for langdetect bug on short chinese texts
            sep = ''
        else:
            sep = ' '

        current_token = ""
        current_tag = ""
        entities = []
        start = None
        last_end = None

        for tup in annotated_sentence:
            token = tup[0]
            entity = tup[1]
            offsets = tup[2] if len(tup)>2 else None
            tag = entity.split('-')[1] if '-' in entity else None
            prefix = entity.split('-')[0] if '-' in entity else None
            # not within entity
            if tag is None and not current_token:
                continue
            # beginning of entity
            #elif tag and prefix=='B':
            elif tag and (prefix=='B' or prefix=='I' and not current_token):
                if current_token: # consecutive entities
                    entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
                    current_token = ""
                    current_tag = None
                    start, end = None, None
                current_token = token
                current_tag = tag
                start = offsets[0] if offsets else None
                last_end = offsets[1] if offsets else None
            # end of entity
            elif tag is None and current_token:
                entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
                current_token = ""
                current_tag = None
                continue
            # within entity
            elif tag and current_token:  #  prefix I
                current_token = current_token + sep + token
                current_tag = tag
                last_end = offsets[1] if offsets else None
        if current_token and current_tag:
            entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end)) 
        return entities

    def _build_merge_tuple(self, current_token, current_tag, start=None, end=None):
        entry = [current_token, current_tag]
        if start is not None and end is not None: entry.append((start, end))
        return tuple(entry)

    def _save_preproc(self, fpath):

        # ensure transformers embedding model is saved in a subdirectory
        p = self.preproc.p
        hf_dir = os.path.join(fpath, 'hf')
        if p.te is not None:
            os.makedirs(hf_dir, exist_ok=True)
            p.te.model.save_pretrained(hf_dir)
            p.te.tokenizer.save_pretrained(hf_dir)
            p.te.config.save_pretrained(hf_dir)
            p.te_model = hf_dir

        # save preproc
        with open(os.path.join(fpath, U.PREPROC_NAME), 'wb') as f:
            pickle.dump(self.preproc, f)
        return

Ancestors

Methods

def get_classes(self)
Expand source code
def get_classes(self):
    return self.c
def merge_tokens(self, annotated_sentence, lang)
Expand source code
def merge_tokens(self, annotated_sentence, lang):

    if TU.is_chinese(lang, strict=False): # strict=False: workaround for langdetect bug on short chinese texts
        sep = ''
    else:
        sep = ' '

    current_token = ""
    current_tag = ""
    entities = []
    start = None
    last_end = None

    for tup in annotated_sentence:
        token = tup[0]
        entity = tup[1]
        offsets = tup[2] if len(tup)>2 else None
        tag = entity.split('-')[1] if '-' in entity else None
        prefix = entity.split('-')[0] if '-' in entity else None
        # not within entity
        if tag is None and not current_token:
            continue
        # beginning of entity
        #elif tag and prefix=='B':
        elif tag and (prefix=='B' or prefix=='I' and not current_token):
            if current_token: # consecutive entities
                entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
                current_token = ""
                current_tag = None
                start, end = None, None
            current_token = token
            current_tag = tag
            start = offsets[0] if offsets else None
            last_end = offsets[1] if offsets else None
        # end of entity
        elif tag is None and current_token:
            entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end))
            current_token = ""
            current_tag = None
            continue
        # within entity
        elif tag and current_token:  #  prefix I
            current_token = current_token + sep + token
            current_tag = tag
            last_end = offsets[1] if offsets else None
    if current_token and current_tag:
        entities.append(self._build_merge_tuple(current_token, current_tag, start, last_end)) 
    return entities
def predict(self, sentences, return_proba=False, merge_tokens=False, custom_tokenizer=None, return_offsets=False)

Makes predictions for a string-representation of a sentence

Args

sentences(list|str): either a single sentence as a string or a list of sentences return_proba(bool): If return_proba is True, returns probability distribution for each token merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') custom_tokenizer(Callable): If specified, sentence will be tokenized based on custom tokenizer return_offsets(bool): If True, will return the chracter offsets in the results [experimental]

Returns

list
If sentences is a string representation of single sentence: list containing a tuple for each token in sentence IF sentences is a list of sentences: list of lists: Each inner list represents a sentence and contains a tuple for each token in sentence If return_proba and return_offsets are both True, then tuples are of the form: (token, label, probability, character offsets)
Expand source code
def predict(self, sentences, return_proba=False, merge_tokens=False, custom_tokenizer=None, return_offsets=False):
    """
    Makes predictions for a string-representation of a sentence
    Args:
      sentences(list|str): either a single sentence as a string or a list of sentences
      return_proba(bool): If return_proba is True, returns probability distribution for each token
      merge_tokens(bool):  If True, tokens will be merged together by the entity
                           to which they are associated:
                           ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
      custom_tokenizer(Callable): If specified, sentence will be tokenized based on custom tokenizer
      return_offsets(bool): If True, will return the chracter offsets in the results [experimental]

    Returns:
      list: If sentences is a string representation of single sentence:
                 list containing a tuple for each token in sentence
            IF sentences is a list of sentences:
                 list  of lists:  Each inner list represents a sentence and contains a tuple for each token in sentence
            If return_proba and return_offsets are both True, then tuples are of the form:  (token, label, probability, character offsets)
    """
    is_array = not isinstance(sentences, str)
    if not isinstance(sentences, (str, list)):
        raise ValueError('Param sentence must be either string-representation of a sentence or a list of sentence strings.')
    if return_proba and merge_tokens:
        raise ValueError('return_proba and merge_tokens are mutually exclusive with one another.')
    if isinstance(sentences, str): sentences = [sentences]
    lang = TU.detect_lang(sentences)

    # batchify
    num_chunks = math.ceil(len(sentences)/self.batch_size)
    batches = U.list2chunks(sentences, n=num_chunks)

    # process batches
    results = []
    for batch in batches:
        nerseq = self.preproc.preprocess(batch, lang=lang, custom_tokenizer=custom_tokenizer)
        if not nerseq.prepare_called:
            nerseq.prepare()
        nerseq.batch_size = len(batch)
        x_true, _ = nerseq[0]
        lengths = nerseq.get_lengths(0)
        y_pred = self.model.predict_on_batch(x_true)
        y_labels = self.preproc.p.inverse_transform(y_pred, lengths)
        # TODO: clean this up
        if return_proba:
            try:
                probs = np.max(y_pred, axis=2)
            except:
                probs = y_pred[0].numpy().tolist() # TODO: remove after confirmation (#316)
            for i, (x, y, prob) in enumerate(zip(nerseq.x, y_labels, probs)):
                if return_offsets:
                    offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                    result = [(x[i], y[i], prob[i], (offsets[i]['start'], offsets[i]['end'])) for i in range(len(x))]
                else:
                    result = [(x[i], y[i], prob[i]) for i in range(len(x))]
                results.append(result)
        else:
            for i, (x,y) in enumerate(zip(nerseq.x, y_labels)):
                if return_offsets:
                    offsets = TU.extract_offsets(sentences[i], tokens=[entry[0] for entry in x])
                    result =  list(zip(x,y, [(o['start'], o['end']) for o in offsets]))
                else:
                    result =  list(zip(x,y))
                if merge_tokens:
                    result = self.merge_tokens(result, lang)
                results.append(result)
    if not is_array: results = results[0]
    return results

Inherited members