Top

pandasticsearch.queries module

# -*- coding: UTF-8 -*-

import collections
import json
import pandas


class Query(collections.MutableSequence):
    def __init__(self):
        super(Query, self).__init__()
        self._values = None
        self._result_dict = None

    def explain_result(self, result=None):
        if result is not None:
            assert isinstance(result, dict)
            self._result_dict = result

    def to_pandas(self):
        """
        Export the current query result to a Pandas DataFrame object.
        """
        raise NotImplementedError('implemented in subclass')

    def print_json(self):
        indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4,
                                   ensure_ascii=False)
        print(indented_json)

    @property
    def result(self):
        return self._values

    @property
    def json(self):
        """
        Gets the original JSON representation returned by Elasticsearch REST API
        :return: The JSON string indicating the query result
        :rtype: string
        """
        return json.dumps(self._result_dict)

    def insert(self, index, value):
        self._values.insert(index, value)

    def append(self, value):
        self._values.append(value)

    def __str__(self):
        return str(self._values)

    def __len__(self):
        return len(self._values)

    def __delitem__(self, index):
        del self._values[index]

    def __setitem__(self, index, value):
        self._values[index] = value

    def __getitem__(self, index):
        return self._values[index]


class Select(Query):
    def __init__(self):
        super(Select, self).__init__()

    def explain_result(self, result=None):
        super(Select, self).explain_result(result)
        self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']]

    def to_pandas(self):
        if self._values:
            df = pandas.DataFrame(data=self._values)
            return df

    @staticmethod
    def from_dict(d):
        query = Select()
        query.explain_result(d)
        return query


class Agg(Query):
    def __init__(self):
        super(Agg, self).__init__()
        self._index_names = None
        self._indexes = None

    def explain_result(self, result=None):
        super(Agg, self).explain_result(result)
        tuples = list(Agg._process_agg(self._result_dict['aggregations']))
        assert len(tuples) > 0
        self._index_names = list(tuples[0][0])
        self._values = []
        self._indexes = []
        for t in tuples:
            _, index, row = t
            self.append(row)
            if len(index) > 0:
                self._indexes.append(index)

    def to_pandas(self):
        print(self._indexes, self._values)
        if self._values is not None:
            if len(self._indexes) > 0:
                index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names)
                df = pandas.DataFrame(data=self._values, index=index)
            else:
                df = pandas.DataFrame(data=self._values)
            return df

    @classmethod
    def _process_agg(cls, bucket, indexes=(), names=()):
        """
        Recursively extract agg values
        :param bucket: a bucket contains either sub-buckets or a bunch of aggregated values
        :return: a list of tuples: (index_name, index_tuple, row)
        """
        # for each agg, yield a row
        row = {}
        for k, v in bucket.items():
            if isinstance(v, dict):
                if 'buckets' in v:
                    for sub_bucket in v['buckets']:
                        for x in Agg._process_agg(sub_bucket, indexes + (sub_bucket['key'],), names + (k,)):
                            yield x
                elif 'value' in v:
                    row[k] = v['value']
                elif 'values' in v:  # percentiles
                    row = v['values']
            if k == 'doc_count': # count docs
                row['doc_count'] = v

        if len(row) > 0:
            yield (names, indexes, row)

    @staticmethod
    def from_dict(d):
        agg = Agg()
        agg.explain_result(d)
        return agg

Classes

class Agg

All the operations on a read-only sequence.

Concrete subclasses must override new or init, getitem, and len.

class Agg(Query):
    def __init__(self):
        super(Agg, self).__init__()
        self._index_names = None
        self._indexes = None

    def explain_result(self, result=None):
        super(Agg, self).explain_result(result)
        tuples = list(Agg._process_agg(self._result_dict['aggregations']))
        assert len(tuples) > 0
        self._index_names = list(tuples[0][0])
        self._values = []
        self._indexes = []
        for t in tuples:
            _, index, row = t
            self.append(row)
            if len(index) > 0:
                self._indexes.append(index)

    def to_pandas(self):
        print(self._indexes, self._values)
        if self._values is not None:
            if len(self._indexes) > 0:
                index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names)
                df = pandas.DataFrame(data=self._values, index=index)
            else:
                df = pandas.DataFrame(data=self._values)
            return df

    @classmethod
    def _process_agg(cls, bucket, indexes=(), names=()):
        """
        Recursively extract agg values
        :param bucket: a bucket contains either sub-buckets or a bunch of aggregated values
        :return: a list of tuples: (index_name, index_tuple, row)
        """
        # for each agg, yield a row
        row = {}
        for k, v in bucket.items():
            if isinstance(v, dict):
                if 'buckets' in v:
                    for sub_bucket in v['buckets']:
                        for x in Agg._process_agg(sub_bucket, indexes + (sub_bucket['key'],), names + (k,)):
                            yield x
                elif 'value' in v:
                    row[k] = v['value']
                elif 'values' in v:  # percentiles
                    row = v['values']
            if k == 'doc_count': # count docs
                row['doc_count'] = v

        if len(row) > 0:
            yield (names, indexes, row)

    @staticmethod
    def from_dict(d):
        agg = Agg()
        agg.explain_result(d)
        return agg

Ancestors (in MRO)

  • Agg
  • Query
  • collections.abc.MutableSequence
  • collections.abc.Sequence
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • builtins.object

Static methods

def __init__(

self)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self):
    super(Agg, self).__init__()
    self._index_names = None
    self._indexes = None

def append(

self, value)

S.append(value) -- append value to the end of the sequence

def append(self, value):
    self._values.append(value)

def clear(

self)

S.clear() -> None -- remove all items from S

def clear(self):
    'S.clear() -> None -- remove all items from S'
    try:
        while True:
            self.pop()
    except IndexError:
        pass

def count(

self, value)

S.count(value) -> integer -- return number of occurrences of value

def count(self, value):
    'S.count(value) -> integer -- return number of occurrences of value'
    return sum(1 for v in self if v == value)

def explain_result(

self, result=None)

def explain_result(self, result=None):
    super(Agg, self).explain_result(result)
    tuples = list(Agg._process_agg(self._result_dict['aggregations']))
    assert len(tuples) > 0
    self._index_names = list(tuples[0][0])
    self._values = []
    self._indexes = []
    for t in tuples:
        _, index, row = t
        self.append(row)
        if len(index) > 0:
            self._indexes.append(index)

def extend(

self, values)

S.extend(iterable) -- extend sequence by appending elements from the iterable

def extend(self, values):
    'S.extend(iterable) -- extend sequence by appending elements from the iterable'
    for v in values:
        self.append(v)

def from_dict(

d)

@staticmethod
def from_dict(d):
    agg = Agg()
    agg.explain_result(d)
    return agg

def index(

self, value, start=0, stop=None)

S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.

def index(self, value, start=0, stop=None):
    '''S.index(value, [start, [stop]]) -> integer -- return first index of value.
       Raises ValueError if the value is not present.
    '''
    if start is not None and start < 0:
        start = max(len(self) + start, 0)
    if stop is not None and stop < 0:
        stop += len(self)
    i = start
    while stop is None or i < stop:
        try:
            if self[i] == value:
                return i
        except IndexError:
            break
        i += 1
    raise ValueError

def insert(

self, index, value)

S.insert(index, value) -- insert value before index

def insert(self, index, value):
    self._values.insert(index, value)

def pop(

self, index=-1)

S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.

def pop(self, index=-1):
    '''S.pop([index]) -> item -- remove and return item at index (default last).
       Raise IndexError if list is empty or index is out of range.
    '''
    v = self[index]
    del self[index]
    return v

def print_json(

self)

def print_json(self):
    indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4,
                               ensure_ascii=False)
    print(indented_json)

def remove(

self, value)

S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.

def remove(self, value):
    '''S.remove(value) -- remove first occurrence of value.
       Raise ValueError if the value is not present.
    '''
    del self[self.index(value)]

def reverse(

self)

S.reverse() -- reverse IN PLACE

def reverse(self):
    'S.reverse() -- reverse *IN PLACE*'
    n = len(self)
    for i in range(n//2):
        self[i], self[n-i-1] = self[n-i-1], self[i]

def to_pandas(

self)

Export the current query result to a Pandas DataFrame object.

def to_pandas(self):
    print(self._indexes, self._values)
    if self._values is not None:
        if len(self._indexes) > 0:
            index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names)
            df = pandas.DataFrame(data=self._values, index=index)
        else:
            df = pandas.DataFrame(data=self._values)
        return df

Instance variables

var json

Inheritance: Query.json

Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string

var result

class Query

All the operations on a read-only sequence.

Concrete subclasses must override new or init, getitem, and len.

class Query(collections.MutableSequence):
    def __init__(self):
        super(Query, self).__init__()
        self._values = None
        self._result_dict = None

    def explain_result(self, result=None):
        if result is not None:
            assert isinstance(result, dict)
            self._result_dict = result

    def to_pandas(self):
        """
        Export the current query result to a Pandas DataFrame object.
        """
        raise NotImplementedError('implemented in subclass')

    def print_json(self):
        indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4,
                                   ensure_ascii=False)
        print(indented_json)

    @property
    def result(self):
        return self._values

    @property
    def json(self):
        """
        Gets the original JSON representation returned by Elasticsearch REST API
        :return: The JSON string indicating the query result
        :rtype: string
        """
        return json.dumps(self._result_dict)

    def insert(self, index, value):
        self._values.insert(index, value)

    def append(self, value):
        self._values.append(value)

    def __str__(self):
        return str(self._values)

    def __len__(self):
        return len(self._values)

    def __delitem__(self, index):
        del self._values[index]

    def __setitem__(self, index, value):
        self._values[index] = value

    def __getitem__(self, index):
        return self._values[index]

Ancestors (in MRO)

  • Query
  • collections.abc.MutableSequence
  • collections.abc.Sequence
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • builtins.object

Static methods

def __init__(

self)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self):
    super(Query, self).__init__()
    self._values = None
    self._result_dict = None

def append(

self, value)

S.append(value) -- append value to the end of the sequence

def append(self, value):
    self._values.append(value)

def clear(

self)

S.clear() -> None -- remove all items from S

def clear(self):
    'S.clear() -> None -- remove all items from S'
    try:
        while True:
            self.pop()
    except IndexError:
        pass

def count(

self, value)

S.count(value) -> integer -- return number of occurrences of value

def count(self, value):
    'S.count(value) -> integer -- return number of occurrences of value'
    return sum(1 for v in self if v == value)

def explain_result(

self, result=None)

def explain_result(self, result=None):
    if result is not None:
        assert isinstance(result, dict)
        self._result_dict = result

def extend(

self, values)

S.extend(iterable) -- extend sequence by appending elements from the iterable

def extend(self, values):
    'S.extend(iterable) -- extend sequence by appending elements from the iterable'
    for v in values:
        self.append(v)

def index(

self, value, start=0, stop=None)

S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.

def index(self, value, start=0, stop=None):
    '''S.index(value, [start, [stop]]) -> integer -- return first index of value.
       Raises ValueError if the value is not present.
    '''
    if start is not None and start < 0:
        start = max(len(self) + start, 0)
    if stop is not None and stop < 0:
        stop += len(self)
    i = start
    while stop is None or i < stop:
        try:
            if self[i] == value:
                return i
        except IndexError:
            break
        i += 1
    raise ValueError

def insert(

self, index, value)

S.insert(index, value) -- insert value before index

def insert(self, index, value):
    self._values.insert(index, value)

def pop(

self, index=-1)

S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.

def pop(self, index=-1):
    '''S.pop([index]) -> item -- remove and return item at index (default last).
       Raise IndexError if list is empty or index is out of range.
    '''
    v = self[index]
    del self[index]
    return v

def print_json(

self)

def print_json(self):
    indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4,
                               ensure_ascii=False)
    print(indented_json)

def remove(

self, value)

S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.

def remove(self, value):
    '''S.remove(value) -- remove first occurrence of value.
       Raise ValueError if the value is not present.
    '''
    del self[self.index(value)]

def reverse(

self)

S.reverse() -- reverse IN PLACE

def reverse(self):
    'S.reverse() -- reverse *IN PLACE*'
    n = len(self)
    for i in range(n//2):
        self[i], self[n-i-1] = self[n-i-1], self[i]

def to_pandas(

self)

Export the current query result to a Pandas DataFrame object.

def to_pandas(self):
    """
    Export the current query result to a Pandas DataFrame object.
    """
    raise NotImplementedError('implemented in subclass')

Instance variables

var json

Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string

var result

class Select

All the operations on a read-only sequence.

Concrete subclasses must override new or init, getitem, and len.

class Select(Query):
    def __init__(self):
        super(Select, self).__init__()

    def explain_result(self, result=None):
        super(Select, self).explain_result(result)
        self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']]

    def to_pandas(self):
        if self._values:
            df = pandas.DataFrame(data=self._values)
            return df

    @staticmethod
    def from_dict(d):
        query = Select()
        query.explain_result(d)
        return query

Ancestors (in MRO)

  • Select
  • Query
  • collections.abc.MutableSequence
  • collections.abc.Sequence
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container
  • builtins.object

Static methods

def __init__(

self)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self):
    super(Select, self).__init__()

def append(

self, value)

S.append(value) -- append value to the end of the sequence

def append(self, value):
    self._values.append(value)

def clear(

self)

S.clear() -> None -- remove all items from S

def clear(self):
    'S.clear() -> None -- remove all items from S'
    try:
        while True:
            self.pop()
    except IndexError:
        pass

def count(

self, value)

S.count(value) -> integer -- return number of occurrences of value

def count(self, value):
    'S.count(value) -> integer -- return number of occurrences of value'
    return sum(1 for v in self if v == value)

def explain_result(

self, result=None)

def explain_result(self, result=None):
    super(Select, self).explain_result(result)
    self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']]

def extend(

self, values)

S.extend(iterable) -- extend sequence by appending elements from the iterable

def extend(self, values):
    'S.extend(iterable) -- extend sequence by appending elements from the iterable'
    for v in values:
        self.append(v)

def from_dict(

d)

@staticmethod
def from_dict(d):
    query = Select()
    query.explain_result(d)
    return query

def index(

self, value, start=0, stop=None)

S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.

def index(self, value, start=0, stop=None):
    '''S.index(value, [start, [stop]]) -> integer -- return first index of value.
       Raises ValueError if the value is not present.
    '''
    if start is not None and start < 0:
        start = max(len(self) + start, 0)
    if stop is not None and stop < 0:
        stop += len(self)
    i = start
    while stop is None or i < stop:
        try:
            if self[i] == value:
                return i
        except IndexError:
            break
        i += 1
    raise ValueError

def insert(

self, index, value)

S.insert(index, value) -- insert value before index

def insert(self, index, value):
    self._values.insert(index, value)

def pop(

self, index=-1)

S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.

def pop(self, index=-1):
    '''S.pop([index]) -> item -- remove and return item at index (default last).
       Raise IndexError if list is empty or index is out of range.
    '''
    v = self[index]
    del self[index]
    return v

def print_json(

self)

def print_json(self):
    indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4,
                               ensure_ascii=False)
    print(indented_json)

def remove(

self, value)

S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.

def remove(self, value):
    '''S.remove(value) -- remove first occurrence of value.
       Raise ValueError if the value is not present.
    '''
    del self[self.index(value)]

def reverse(

self)

S.reverse() -- reverse IN PLACE

def reverse(self):
    'S.reverse() -- reverse *IN PLACE*'
    n = len(self)
    for i in range(n//2):
        self[i], self[n-i-1] = self[n-i-1], self[i]

def to_pandas(

self)

Export the current query result to a Pandas DataFrame object.

def to_pandas(self):
    if self._values:
        df = pandas.DataFrame(data=self._values)
        return df

Instance variables

var json

Inheritance: Query.json

Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string

var result