Module ktrain.text.shallownlp

Expand source code
from .classifier import Classifier
from .ner import NER
from .searcher import *
from .utils import extract_filenames, read_text, sent_tokenize

__all__ = [
    "Classifier",
    "Searcher",
    "search",
    "find_chinese",
    "find_arabic",
    "find_russian",
    "read_text",
    "NER",
    "sent_tokenize",
    "extract_filenames",
    "read_text",
]

Sub-modules

ktrain.text.shallownlp.classifier
ktrain.text.shallownlp.imports
ktrain.text.shallownlp.ner
ktrain.text.shallownlp.searcher
ktrain.text.shallownlp.utils

Functions

def extract_filenames(corpus_path, follow_links=False)
Expand source code
def extract_filenames(corpus_path, follow_links=False):
    if os.listdir(corpus_path) == []:
        raise ValueError("%s: path is empty" % corpus_path)
    for root, _, fnames in os.walk(corpus_path, followlinks=follow_links):
        for filename in fnames:
            try:
                yield os.path.join(root, filename)
            except Exception:
                continue
def find_arabic(s)
Expand source code
def find_arabic(s):
    return re.findall(r"[\u0600-\u06FF]+", s)
def find_chinese(s)
Expand source code
def find_chinese(s):
    return re.findall(r"[\u4e00-\u9fff]+", s)
def find_russian(s)
Expand source code
def find_russian(s):
    return find_cyrillic(s)
def read_text(filename)
Expand source code
def read_text(filename):
    with open(filename, "rb") as f:
        text = f.read()
    encoding = detect_encoding([text])
    try:
        decoded_text = text.decode(encoding)
    except:
        U.vprint(
            "Decoding with %s failed 1st attempt - using %s with skips"
            % (encoding, encoding),
            verbose=verbose,
        )
        decoded_text = decode_by_line(text, encoding=encoding)
    return decoded_text.strip()
def search(query, doc, case_sensitive=False, keys=[], progress=False)
Expand source code
def search(query, doc, case_sensitive=False, keys=[], progress=False):
    searcher = Searcher(query)
    return searcher.search(
        doc, case_sensitive=case_sensitive, keys=keys, progress=progress
    )
def sent_tokenize(text)

segment text into sentences

Expand source code
def sent_tokenize(text):
    """
    segment text into sentences
    """
    lang = detect_lang(text)
    sents = []
    if is_chinese(lang):
        for sent in re.findall("[^!?。\.\!\?]+[!?。\.\!\?]?", text, flags=re.U):
            sents.append(sent)
    else:
        for paragraph in segmenter.process(text):
            for sentence in paragraph:
                sents.append(" ".join([t.value for t in sentence]))
    return sents

Classes

class Classifier (model=None)

instantiate a classifier with an optional previously-saved model

Expand source code
class Classifier:
    def __init__(self, model=None):
        """
        instantiate a classifier with an optional previously-saved model
        """
        self.model = None

    def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True):
        """
        ```
        create a model
        Args:
          ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'}
          texts(list): list of texts
          hp_dict(dict): dictionary of hyperparameters to use for the ctype selected.
                         hp_dict can also be used to supply arguments to CountVectorizer
          ngram_range(tuple): default ngram_range.
                              overridden if 'ngram_range' in hp_dict
          binary(bool): default value for binary argument to CountVectorizer.
                        overridden if 'binary' key in hp_dict
        ```
        """
        lang = U.detect_lang(texts)
        if U.is_chinese(lang):
            token_pattern = r"(?u)\b\w+\b"
        else:
            token_pattern = r"\w+|[%s]" % string.punctuation
        if ctype == "nbsvm":
            clf = NBSVM(
                C=hp_dict.get("C", 0.01),
                alpha=hp_dict.get("alpha", 0.75),
                beta=hp_dict.get("beta", 0.25),
                fit_intercept=hp_dict.get("fit_intercept", False),
            )
        elif ctype == "logreg":
            clf = LogisticRegression(
                C=hp_dict.get("C", 0.1),
                dual=hp_dict.get("dual", True),
                penalty=hp_dict.get("penalty", "l2"),
                tol=hp_dict.get("tol", 1e-4),
                intercept_scaling=hp_dict.get("intercept_scaling", 1),
                solver=hp_dict.get("solver", "liblinear"),
                max_iter=hp_dict.get("max_iter", 100),
                multi_class=hp_dict.get("multi_class", "auto"),
                warm_start=hp_dict.get("warm_start", False),
                n_jobs=hp_dict.get("n_jobs", None),
                l1_ratio=hp_dict.get("l1_ratio", None),
                random_state=hp_dict.get("random_state", 42),
                class_weight=hp_dict.get("class_weight", None),
            )
        elif ctype == "sgdclassifier":
            clf = SGDClassifier(
                loss=hp_dict.get("loss", "hinge"),
                penalty=hp_dict.get("penalty", "l2"),
                alpha=hp_dict.get("alpha", 1e-3),
                random_state=hp_dict.get("random_state", 42),
                max_iter=hp_dict.get("max_iter", 5),  # scikit-learn default is 1000
                tol=hp_dict.get("tol", None),
                l1_ratio=hp_dict.get("l1_ratio", 0.15),
                fit_intercept=hp_dict.get("fit_intercept", True),
                episilon=hp_dict.get("epsilon", 0.1),
                n_jobs=hp_dict.get("n_jobs", None),
                learning_rate=hp_dict.get("learning_rate", "optimal"),
                eta0=hp_dict.get("eta0", 0.0),
                power_t=hp_dict.get("power_t", 0.5),
                early_stopping=hp_dict.get("early_stopping", False),
                validation_fraction=hp_dict.get("validation_fraction", 0.1),
                n_iter_no_change=hp_dict.get("n_iter_no_change", 5),
                warm_start=hp_dict.get("warm_start", False),
                average=hp_dict.get("average", False),
                class_weight=hp_dict.get("class_weight", None),
            )
        else:
            raise ValueError("Unknown ctype: %s" % (ctype))

        self.model = Pipeline(
            [
                (
                    "vect",
                    CountVectorizer(
                        ngram_range=hp_dict.get("ngram_range", ngram_range),
                        binary=hp_dict.get("binary", binary),
                        token_pattern=token_pattern,
                        max_features=hp_dict.get("max_features", None),
                        max_df=hp_dict.get("max_df", 1.0),
                        min_df=hp_dict.get("min_df", 1),
                        stop_words=hp_dict.get("stop_words", None),
                        lowercase=hp_dict.get("lowercase", True),
                        strip_accents=hp_dict.get("strip_accents", None),
                        encoding=hp_dict.get("encoding", "utf-8"),
                    ),
                ),
                ("clf", clf),
            ]
        )
        return

    @classmethod
    def load_texts_from_folder(
        cls, folder_path, subfolders=None, shuffle=True, encoding=None
    ):
        """
        ```
        load text files from folder

        Args:
          folder_path(str): path to folder containing documents
                            The supplied folder should contain a subfolder
                            for each category, which will be used as the class label
          subfolders(list): list of subfolders under folder_path to consider
                            Example: If folder_path contains subfolders pos, neg, and
                            unlabeled, then unlabeled folder can be ignored by
                            setting subfolders=['pos', 'neg']
          shuffle(bool):  If True, list of texts will be shuffled
          encoding(str): encoding to use.  default:None (auto-detected)
        Returns:
          tuple: (texts, labels, label_names)
        ```
        """
        bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle)
        texts = bunch.data
        labels = bunch.target
        label_names = bunch.target_names
        # print('target names:')
        # for idx, label_name in enumerate(bunch.target_names):
        # print('\t%s:%s' % (idx, label_name))

        # decode based on supplied encoding
        if encoding is None:
            encoding = U.detect_encoding(texts)
            if encoding != "utf-8":
                print("detected encoding: %s" % (encoding))

        try:
            texts = [text.decode(encoding) for text in texts]
        except:
            print(
                "Decoding with %s failed 1st attempt - using %s with skips"
                % (encoding, encoding)
            )
            texts = U.decode_by_line(texts, encoding=encoding)
        return (texts, labels, label_names)

    @classmethod
    def load_texts_from_csv(
        cls,
        csv_filepath,
        text_column="text",
        label_column="label",
        sep=",",
        encoding=None,
    ):
        """
        ```
        load text files from csv file
        CSV should have at least two columns.
        Example:
        Text               | Label
        I love this movie. | positive
        I hated this movie.| negative


        Args:
          csv_filepath(str): path to CSV file
          text_column(str): name of column containing the texts. default:'text'
          label_column(str): name of column containing the labels in string format
                             default:'label'
          sep(str): character that separates columns in CSV. default:','
          encoding(str): encoding to use. default:None (auto-detected)
        Returns:
          tuple: (texts, labels, label_names)
        ```
        """
        if encoding is None:
            with open(csv_filepath, "rb") as f:
                encoding = U.detect_encoding([f.read()])
                if encoding != "utf-8":
                    print("detected encoding: %s (if wrong, set manually)" % (encoding))
        import pandas as pd

        df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep)
        texts = df[text_column].fillna("fillna").values
        labels = df[label_column].values
        le = LabelEncoder()
        le.fit(labels)
        labels = le.transform(labels)
        return (texts, labels, le.classes_)

    def fit(self, x_train, y_train, ctype="logreg"):
        """
        ```
        train a classifier
        Args:
          x_train(list or np.ndarray):  training texts
          y_train(np.ndarray):  training labels
          ctype(str):  One of {'logreg', 'nbsvm', 'sgdclassifier'}.  default:nbsvm
        ```
        """
        lang = U.detect_lang(x_train)
        if U.is_chinese(lang):
            x_train = U.split_chinese(x_train)
        if self.model is None:
            self.create_model(ctype, x_train)
        self.model.fit(x_train, y_train)
        return self

    def predict(self, x_test, return_proba=False):
        """
        ```
        make predictions on text data
        Args:
          x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
        ```
        """
        if return_proba and not hasattr(self.model["clf"], "predict_proba"):
            raise ValueError(
                "%s does not support predict_proba" % (type(self.model["clf"]).__name__)
            )
        if isinstance(x_test, str):
            x_test = [x_test]
        lang = U.detect_lang(x_test)
        if U.is_chinese(lang):
            x_test = U.split_chinese(x_test)
        if self.model is None:
            raise ValueError("model is None - call fit or load to set the model")
        if return_proba:
            predicted = self.model.predict_proba(x_test)
        else:
            predicted = self.model.predict(x_test)
        if len(predicted) == 1:
            predicted = predicted[0]
        return predicted

    def predict_proba(self, x_test):
        """
        predict_proba
        """
        return self.predict(x_test, return_proba=True)

    def evaluate(self, x_test, y_test):
        """
        ```
        evaluate
        Args:
          x_test(list or np.ndarray):  training texts
          y_test(np.ndarray):  training labels
        ```
        """
        predicted = self.predict(x_test)
        return np.mean(predicted == y_test)

    def save(self, filename):
        """
        save model
        """
        dump(self.model, filename)

    def load(self, filename):
        """
        load model
        """
        self.model = load(filename)

    def grid_search(self, params, x_train, y_train, n_jobs=-1):
        """
        ```
        Performs grid search to find optimal set of hyperparameters
        Args:
          params (dict):  A dictionary defining the space of the search.
                          Example for finding optimal value of alpha in NBSVM:
                        parameters = {
                                      #'clf__C': (1e0, 1e-1, 1e-2),
                                      'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0),
                                      #'clf__fit_intercept': (True, False),
                                      #'clf__beta' : (0.1, 0.25, 0.5, 0.9)
                                      }
          n_jobs(int): number of jobs to run in parallel.  default:-1 (use all processors)
        ```
        """
        gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs)
        gs_clf = gs_clf.fit(x_train, y_train)
        # gs_clf.best_score_
        for param_name in sorted(params.keys()):
            print("%s: %r" % (param_name, gs_clf.best_params_[param_name]))
        return

Static methods

def load_texts_from_csv(csv_filepath, text_column='text', label_column='label', sep=',', encoding=None)
load text files from csv file
CSV should have at least two columns.
Example:
Text               | Label
I love this movie. | positive
I hated this movie.| negative


Args:
  csv_filepath(str): path to CSV file
  text_column(str): name of column containing the texts. default:'text'
  label_column(str): name of column containing the labels in string format
                     default:'label'
  sep(str): character that separates columns in CSV. default:','
  encoding(str): encoding to use. default:None (auto-detected)
Returns:
  tuple: (texts, labels, label_names)
Expand source code
@classmethod
def load_texts_from_csv(
    cls,
    csv_filepath,
    text_column="text",
    label_column="label",
    sep=",",
    encoding=None,
):
    """
    ```
    load text files from csv file
    CSV should have at least two columns.
    Example:
    Text               | Label
    I love this movie. | positive
    I hated this movie.| negative


    Args:
      csv_filepath(str): path to CSV file
      text_column(str): name of column containing the texts. default:'text'
      label_column(str): name of column containing the labels in string format
                         default:'label'
      sep(str): character that separates columns in CSV. default:','
      encoding(str): encoding to use. default:None (auto-detected)
    Returns:
      tuple: (texts, labels, label_names)
    ```
    """
    if encoding is None:
        with open(csv_filepath, "rb") as f:
            encoding = U.detect_encoding([f.read()])
            if encoding != "utf-8":
                print("detected encoding: %s (if wrong, set manually)" % (encoding))
    import pandas as pd

    df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep)
    texts = df[text_column].fillna("fillna").values
    labels = df[label_column].values
    le = LabelEncoder()
    le.fit(labels)
    labels = le.transform(labels)
    return (texts, labels, le.classes_)
def load_texts_from_folder(folder_path, subfolders=None, shuffle=True, encoding=None)
load text files from folder

Args:
  folder_path(str): path to folder containing documents
                    The supplied folder should contain a subfolder
                    for each category, which will be used as the class label
  subfolders(list): list of subfolders under folder_path to consider
                    Example: If folder_path contains subfolders pos, neg, and
                    unlabeled, then unlabeled folder can be ignored by
                    setting subfolders=['pos', 'neg']
  shuffle(bool):  If True, list of texts will be shuffled
  encoding(str): encoding to use.  default:None (auto-detected)
Returns:
  tuple: (texts, labels, label_names)
Expand source code
@classmethod
def load_texts_from_folder(
    cls, folder_path, subfolders=None, shuffle=True, encoding=None
):
    """
    ```
    load text files from folder

    Args:
      folder_path(str): path to folder containing documents
                        The supplied folder should contain a subfolder
                        for each category, which will be used as the class label
      subfolders(list): list of subfolders under folder_path to consider
                        Example: If folder_path contains subfolders pos, neg, and
                        unlabeled, then unlabeled folder can be ignored by
                        setting subfolders=['pos', 'neg']
      shuffle(bool):  If True, list of texts will be shuffled
      encoding(str): encoding to use.  default:None (auto-detected)
    Returns:
      tuple: (texts, labels, label_names)
    ```
    """
    bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle)
    texts = bunch.data
    labels = bunch.target
    label_names = bunch.target_names
    # print('target names:')
    # for idx, label_name in enumerate(bunch.target_names):
    # print('\t%s:%s' % (idx, label_name))

    # decode based on supplied encoding
    if encoding is None:
        encoding = U.detect_encoding(texts)
        if encoding != "utf-8":
            print("detected encoding: %s" % (encoding))

    try:
        texts = [text.decode(encoding) for text in texts]
    except:
        print(
            "Decoding with %s failed 1st attempt - using %s with skips"
            % (encoding, encoding)
        )
        texts = U.decode_by_line(texts, encoding=encoding)
    return (texts, labels, label_names)

Methods

def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True)
create a model
Args:
  ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'}
  texts(list): list of texts
  hp_dict(dict): dictionary of hyperparameters to use for the ctype selected.
                 hp_dict can also be used to supply arguments to CountVectorizer
  ngram_range(tuple): default ngram_range.
                      overridden if 'ngram_range' in hp_dict
  binary(bool): default value for binary argument to CountVectorizer.
                overridden if 'binary' key in hp_dict
Expand source code
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True):
    """
    ```
    create a model
    Args:
      ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'}
      texts(list): list of texts
      hp_dict(dict): dictionary of hyperparameters to use for the ctype selected.
                     hp_dict can also be used to supply arguments to CountVectorizer
      ngram_range(tuple): default ngram_range.
                          overridden if 'ngram_range' in hp_dict
      binary(bool): default value for binary argument to CountVectorizer.
                    overridden if 'binary' key in hp_dict
    ```
    """
    lang = U.detect_lang(texts)
    if U.is_chinese(lang):
        token_pattern = r"(?u)\b\w+\b"
    else:
        token_pattern = r"\w+|[%s]" % string.punctuation
    if ctype == "nbsvm":
        clf = NBSVM(
            C=hp_dict.get("C", 0.01),
            alpha=hp_dict.get("alpha", 0.75),
            beta=hp_dict.get("beta", 0.25),
            fit_intercept=hp_dict.get("fit_intercept", False),
        )
    elif ctype == "logreg":
        clf = LogisticRegression(
            C=hp_dict.get("C", 0.1),
            dual=hp_dict.get("dual", True),
            penalty=hp_dict.get("penalty", "l2"),
            tol=hp_dict.get("tol", 1e-4),
            intercept_scaling=hp_dict.get("intercept_scaling", 1),
            solver=hp_dict.get("solver", "liblinear"),
            max_iter=hp_dict.get("max_iter", 100),
            multi_class=hp_dict.get("multi_class", "auto"),
            warm_start=hp_dict.get("warm_start", False),
            n_jobs=hp_dict.get("n_jobs", None),
            l1_ratio=hp_dict.get("l1_ratio", None),
            random_state=hp_dict.get("random_state", 42),
            class_weight=hp_dict.get("class_weight", None),
        )
    elif ctype == "sgdclassifier":
        clf = SGDClassifier(
            loss=hp_dict.get("loss", "hinge"),
            penalty=hp_dict.get("penalty", "l2"),
            alpha=hp_dict.get("alpha", 1e-3),
            random_state=hp_dict.get("random_state", 42),
            max_iter=hp_dict.get("max_iter", 5),  # scikit-learn default is 1000
            tol=hp_dict.get("tol", None),
            l1_ratio=hp_dict.get("l1_ratio", 0.15),
            fit_intercept=hp_dict.get("fit_intercept", True),
            episilon=hp_dict.get("epsilon", 0.1),
            n_jobs=hp_dict.get("n_jobs", None),
            learning_rate=hp_dict.get("learning_rate", "optimal"),
            eta0=hp_dict.get("eta0", 0.0),
            power_t=hp_dict.get("power_t", 0.5),
            early_stopping=hp_dict.get("early_stopping", False),
            validation_fraction=hp_dict.get("validation_fraction", 0.1),
            n_iter_no_change=hp_dict.get("n_iter_no_change", 5),
            warm_start=hp_dict.get("warm_start", False),
            average=hp_dict.get("average", False),
            class_weight=hp_dict.get("class_weight", None),
        )
    else:
        raise ValueError("Unknown ctype: %s" % (ctype))

    self.model = Pipeline(
        [
            (
                "vect",
                CountVectorizer(
                    ngram_range=hp_dict.get("ngram_range", ngram_range),
                    binary=hp_dict.get("binary", binary),
                    token_pattern=token_pattern,
                    max_features=hp_dict.get("max_features", None),
                    max_df=hp_dict.get("max_df", 1.0),
                    min_df=hp_dict.get("min_df", 1),
                    stop_words=hp_dict.get("stop_words", None),
                    lowercase=hp_dict.get("lowercase", True),
                    strip_accents=hp_dict.get("strip_accents", None),
                    encoding=hp_dict.get("encoding", "utf-8"),
                ),
            ),
            ("clf", clf),
        ]
    )
    return
def evaluate(self, x_test, y_test)
evaluate
Args:
  x_test(list or np.ndarray):  training texts
  y_test(np.ndarray):  training labels
Expand source code
def evaluate(self, x_test, y_test):
    """
    ```
    evaluate
    Args:
      x_test(list or np.ndarray):  training texts
      y_test(np.ndarray):  training labels
    ```
    """
    predicted = self.predict(x_test)
    return np.mean(predicted == y_test)
def fit(self, x_train, y_train, ctype='logreg')
train a classifier
Args:
  x_train(list or np.ndarray):  training texts
  y_train(np.ndarray):  training labels
  ctype(str):  One of {'logreg', 'nbsvm', 'sgdclassifier'}.  default:nbsvm
Expand source code
def fit(self, x_train, y_train, ctype="logreg"):
    """
    ```
    train a classifier
    Args:
      x_train(list or np.ndarray):  training texts
      y_train(np.ndarray):  training labels
      ctype(str):  One of {'logreg', 'nbsvm', 'sgdclassifier'}.  default:nbsvm
    ```
    """
    lang = U.detect_lang(x_train)
    if U.is_chinese(lang):
        x_train = U.split_chinese(x_train)
    if self.model is None:
        self.create_model(ctype, x_train)
    self.model.fit(x_train, y_train)
    return self
Performs grid search to find optimal set of hyperparameters
Args:
  params (dict):  A dictionary defining the space of the search.
                  Example for finding optimal value of alpha in NBSVM:
                parameters = {
                              #'clf__C': (1e0, 1e-1, 1e-2),
                              'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0),
                              #'clf__fit_intercept': (True, False),
                              #'clf__beta' : (0.1, 0.25, 0.5, 0.9)
                              }
  n_jobs(int): number of jobs to run in parallel.  default:-1 (use all processors)
Expand source code
def grid_search(self, params, x_train, y_train, n_jobs=-1):
    """
    ```
    Performs grid search to find optimal set of hyperparameters
    Args:
      params (dict):  A dictionary defining the space of the search.
                      Example for finding optimal value of alpha in NBSVM:
                    parameters = {
                                  #'clf__C': (1e0, 1e-1, 1e-2),
                                  'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0),
                                  #'clf__fit_intercept': (True, False),
                                  #'clf__beta' : (0.1, 0.25, 0.5, 0.9)
                                  }
      n_jobs(int): number of jobs to run in parallel.  default:-1 (use all processors)
    ```
    """
    gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs)
    gs_clf = gs_clf.fit(x_train, y_train)
    # gs_clf.best_score_
    for param_name in sorted(params.keys()):
        print("%s: %r" % (param_name, gs_clf.best_params_[param_name]))
    return
def load(self, filename)

load model

Expand source code
def load(self, filename):
    """
    load model
    """
    self.model = load(filename)
def predict(self, x_test, return_proba=False)
make predictions on text data
Args:
  x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
Expand source code
def predict(self, x_test, return_proba=False):
    """
    ```
    make predictions on text data
    Args:
      x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
    ```
    """
    if return_proba and not hasattr(self.model["clf"], "predict_proba"):
        raise ValueError(
            "%s does not support predict_proba" % (type(self.model["clf"]).__name__)
        )
    if isinstance(x_test, str):
        x_test = [x_test]
    lang = U.detect_lang(x_test)
    if U.is_chinese(lang):
        x_test = U.split_chinese(x_test)
    if self.model is None:
        raise ValueError("model is None - call fit or load to set the model")
    if return_proba:
        predicted = self.model.predict_proba(x_test)
    else:
        predicted = self.model.predict(x_test)
    if len(predicted) == 1:
        predicted = predicted[0]
    return predicted
def predict_proba(self, x_test)

predict_proba

Expand source code
def predict_proba(self, x_test):
    """
    predict_proba
    """
    return self.predict(x_test, return_proba=True)
def save(self, filename)

save model

Expand source code
def save(self, filename):
    """
    save model
    """
    dump(self.model, filename)
class NER (lang='en', predictor_path=None)
pretrained NER.
Only English and Chinese are currenty supported.

Args:
  lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian
Expand source code
class NER:
    def __init__(self, lang="en", predictor_path=None):
        """
        ```
        pretrained NER.
        Only English and Chinese are currenty supported.

        Args:
          lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian
        ```
        """
        if lang is None:
            raise ValueError(
                'lang is required (e.g., "en" for English, "zh" for Chinese, "ru" for Russian, etc.'
            )
        if predictor_path is None and lang not in ["en", "zh", "ru"]:
            raise ValueError(
                "Unsupported language: if predictor_path is None,  then lang must be "
                + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese"
            )
        self.lang = lang
        if os.environ.get("DISABLE_V2_BEHAVIOR", None) != "1":
            warnings.warn(
                "Please add os.environ['DISABLE_V2_BEHAVIOR'] = '1' at top of your script or notebook"
            )
            msg = (
                "\nNER in ktrain uses the CRF module from keras_contrib, which is not yet\n"
                + "fully compatible with TensorFlow 2. To use NER, you must add the following to the top of your\n"
                + "script or notebook BEFORE you import ktrain (after restarting runtime):\n\n"
                + "import os\n"
                + "os.environ['DISABLE_V2_BEHAVIOR'] = '1'\n"
            )
            print(msg)
            return
        else:
            import tensorflow.compat.v1 as tf

            tf.disable_v2_behavior()

        if predictor_path is None and self.lang == "zh":
            dirpath = os.path.dirname(os.path.abspath(__file__))
            fpath = os.path.join(dirpath, "ner_models/ner_chinese")
        elif predictor_path is None and self.lang == "ru":
            dirpath = os.path.dirname(os.path.abspath(__file__))
            fpath = os.path.join(dirpath, "ner_models/ner_russian")
        elif predictor_path is None and self.lang == "en":
            dirpath = os.path.dirname(os.path.abspath(__file__))
            fpath = os.path.join(dirpath, "ner_models/ner_english")
        elif predictor_path is None:
            raise ValueError(
                "Unsupported language: if predictor_path is None,  then lang must be "
                + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese"
            )
        else:
            if not os.path.isfile(predictor_path) or not os.path.isfile(
                predictor_path + ".preproc"
            ):
                raise ValueError(
                    "could not find a valid predictor model "
                    + "%s or valid Preprocessor %s at specified path"
                    % (predictor_path, predictor_path + ".preproc")
                )
            fpath = predictor_path
        try:
            import io
            from contextlib import redirect_stdout

            f = io.StringIO()
            with redirect_stdout(f):
                import ktrain
        except:
            raise ValueError(
                "ktrain could not be imported. Install with: pip install ktrain"
            )
        self.predictor = ktrain.load_predictor(fpath)

    def predict(self, texts, merge_tokens=True, batch_size=32):
        """
        ```
        Extract named entities from supplied text

        Args:
          texts (list of str or str): list of texts to annotate
          merge_tokens(bool):  If True, tokens will be merged together by the entity
                               to which they are associated:
                               ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
          batch_size(int):    Batch size to use for predictions (default:32)
        ```
        """
        if isinstance(texts, str):
            texts = [texts]
        self.predictor.batch_size = batch_size
        texts = [t.strip() for t in texts]
        results = self.predictor.predict(texts, merge_tokens=merge_tokens)
        if len(results) == 1:
            results = results[0]
        return results

Methods

def predict(self, texts, merge_tokens=True, batch_size=32)
Extract named entities from supplied text

Args:
  texts (list of str or str): list of texts to annotate
  merge_tokens(bool):  If True, tokens will be merged together by the entity
                       to which they are associated:
                       ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
  batch_size(int):    Batch size to use for predictions (default:32)
Expand source code
def predict(self, texts, merge_tokens=True, batch_size=32):
    """
    ```
    Extract named entities from supplied text

    Args:
      texts (list of str or str): list of texts to annotate
      merge_tokens(bool):  If True, tokens will be merged together by the entity
                           to which they are associated:
                           ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER')
      batch_size(int):    Batch size to use for predictions (default:32)
    ```
    """
    if isinstance(texts, str):
        texts = [texts]
    self.predictor.batch_size = batch_size
    texts = [t.strip() for t in texts]
    results = self.predictor.predict(texts, merge_tokens=merge_tokens)
    if len(results) == 1:
        results = results[0]
    return results
class Searcher (queries, lang=None)

Search for keywords in text documents

Args:
  queries(list of str): list of chinese text queries
  lang(str): language of queries.  default:None --> auto-detected
Expand source code
class Searcher:
    """
    Search for keywords in text documents
    """

    def __init__(self, queries, lang=None):
        """
        ```
        Args:
          queries(list of str): list of chinese text queries
          lang(str): language of queries.  default:None --> auto-detected
        ```
        """
        self.queries = queries
        if isinstance(self.queries, str):
            self.queries = [self.queries]
        self.lang = lang
        if self.lang is None:
            self.lang = U.detect_lang(queries)
        # print("lang:%s" %(self.lang))

    def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True):
        """
        ```
        executes self.queries on supplied list of documents
        Args:
          docs(list of str): list of chinese texts
          case_sensitive(bool):  If True, case sensitive search
          keys(list): list keys for supplied docs (e.g., file paths).
                      default: key is index in range(len(docs))
          min_matches(int): results must have at least these many word matches
          progress(bool): whether or not to show progress bar
        Returns:
          list of tuples of results of the form:
            (key, query, no. of matches)
          For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
        ```
        """
        if isinstance(docs, str):
            docs = [docs]
        if keys and len(keys) != len(docs):
            raise ValueError("lengths of keys and docs must be the same")
        results = []
        l = len(docs)
        for idx, text in enumerate(docs):
            for q in self.queries:
                if U.is_chinese(self.lang):
                    r = self._search_chinese(
                        q, [text], min_matches=min_matches, parse=1, progress=False
                    )
                elif self.lang == "ar":
                    r = self._search(
                        q,
                        [text],
                        case_sensitive=case_sensitive,
                        min_matches=min_matches,
                        progress=False,
                        substrings_on=True,
                    )
                else:
                    r = self._search(
                        q,
                        [text],
                        case_sensitive=case_sensitive,
                        min_matches=min_matches,
                        progress=False,
                        substrings_on=False,
                    )
                if not r:
                    continue
                r = r[0]
                k = idx
                if keys:
                    k = keys[idx]
                num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2])
                results.append((k, q, num_matches))
            if progress:
                printProgressBar(
                    idx + 1, l, prefix="progress: ", suffix="complete", length=50
                )
        return results

    def _search(
        self,
        query,
        docs,
        case_sensitive=False,
        substrings_on=False,
        min_matches=1,
        progress=True,
    ):
        """
        ```
        search documents for query string.
        Args:
            query(str or list):  the word or phrase to search (or list of them)
                                 if list is provided, each element is combined using OR
            docs (list of str): list of text documents
            case_sensitive(bool):  If True, case sensitive search
            substrings_on(bool): whether to use "\b" in regex. default:True
                                 If True, will find substrings
        returns:
            list or tuple:  Returns list of results if len(docs) > 1.  Otherwise, returns tuple of results
        ```
        """
        if not isinstance(query, (list, tuple, str)):
            raise ValueError("query must be str or list of str")
        if isinstance(query, str):
            query = [query]
        if not isinstance(docs, (list, np.ndarray)):
            raise ValueError("docs must be list of str")

        flag = 0
        if not case_sensitive:
            flag = re.I
        qlist = []
        for q in query:
            qlist.append("\s+".join(q.split()))
        original_query = query
        query = "|".join(qlist)
        bound = r"\b"
        if substrings_on:
            bound = ""
        pattern_str = r"%s(?:%s)%s" % (bound, query, bound)
        pattern = re.compile(pattern_str, flag)

        results = []
        l = len(docs)
        for idx, text in enumerate(docs):
            matches = pattern.findall(text)
            if matches and len(matches) >= min_matches:
                results.append((idx, text, matches))
            if progress:
                printProgressBar(
                    idx + 1, l, prefix="progress: ", suffix="complete", length=50
                )
        return results

    def _search_chinese(
        self, query, docs, substrings_on=True, parse=1, min_matches=1, progress=False
    ):
        """
        convenience method to search chinese text
        """
        original_query = query
        if not isinstance(query, str):
            raise ValueError("query must be str")
        if parse > 0:
            q = U.split_chinese(query)[0]
            num_words = len(q.split())
            query = build_ngrams(q, n=parse)
            query = ["".join(q) for q in query]
        return self._search(query, docs, substrings_on=substrings_on, progress=progress)

Methods

def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True)
executes self.queries on supplied list of documents
Args:
  docs(list of str): list of chinese texts
  case_sensitive(bool):  If True, case sensitive search
  keys(list): list keys for supplied docs (e.g., file paths).
              default: key is index in range(len(docs))
  min_matches(int): results must have at least these many word matches
  progress(bool): whether or not to show progress bar
Returns:
  list of tuples of results of the form:
    (key, query, no. of matches)
  For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
Expand source code
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True):
    """
    ```
    executes self.queries on supplied list of documents
    Args:
      docs(list of str): list of chinese texts
      case_sensitive(bool):  If True, case sensitive search
      keys(list): list keys for supplied docs (e.g., file paths).
                  default: key is index in range(len(docs))
      min_matches(int): results must have at least these many word matches
      progress(bool): whether or not to show progress bar
    Returns:
      list of tuples of results of the form:
        (key, query, no. of matches)
      For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
    ```
    """
    if isinstance(docs, str):
        docs = [docs]
    if keys and len(keys) != len(docs):
        raise ValueError("lengths of keys and docs must be the same")
    results = []
    l = len(docs)
    for idx, text in enumerate(docs):
        for q in self.queries:
            if U.is_chinese(self.lang):
                r = self._search_chinese(
                    q, [text], min_matches=min_matches, parse=1, progress=False
                )
            elif self.lang == "ar":
                r = self._search(
                    q,
                    [text],
                    case_sensitive=case_sensitive,
                    min_matches=min_matches,
                    progress=False,
                    substrings_on=True,
                )
            else:
                r = self._search(
                    q,
                    [text],
                    case_sensitive=case_sensitive,
                    min_matches=min_matches,
                    progress=False,
                    substrings_on=False,
                )
            if not r:
                continue
            r = r[0]
            k = idx
            if keys:
                k = keys[idx]
            num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2])
            results.append((k, q, num_matches))
        if progress:
            printProgressBar(
                idx + 1, l, prefix="progress: ", suffix="complete", length=50
            )
    return results