pandasticsearch.queries module
# -*- coding: UTF-8 -*- import collections import json import pandas class Query(collections.MutableSequence): def __init__(self): super(Query, self).__init__() self._values = None self._result_dict = None def explain_result(self, result=None): if result is not None: assert isinstance(result, dict) self._result_dict = result def to_pandas(self): """ Export the current query result to a Pandas DataFrame object. """ raise NotImplementedError('implemented in subclass') def print_json(self): indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4, ensure_ascii=False) print(indented_json) @property def result(self): return self._values @property def json(self): """ Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string """ return json.dumps(self._result_dict) def insert(self, index, value): self._values.insert(index, value) def append(self, value): self._values.append(value) def __str__(self): return str(self._values) def __len__(self): return len(self._values) def __delitem__(self, index): del self._values[index] def __setitem__(self, index, value): self._values[index] = value def __getitem__(self, index): return self._values[index] class Select(Query): def __init__(self): super(Select, self).__init__() def explain_result(self, result=None): super(Select, self).explain_result(result) self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']] def to_pandas(self): if self._values: df = pandas.DataFrame(data=self._values) return df @staticmethod def from_dict(d): query = Select() query.explain_result(d) return query class Agg(Query): def __init__(self): super(Agg, self).__init__() self._index_names = None self._indexes = None def explain_result(self, result=None): super(Agg, self).explain_result(result) tuples = list(Agg._process_agg(self._result_dict['aggregations'])) assert len(tuples) > 0 self._index_names = list(tuples[0][0]) self._values = [] self._indexes = [] for t in tuples: _, index, row = t self.append(row) if len(index) > 0: self._indexes.append(index) def to_pandas(self): print(self._indexes, self._values) if self._values is not None: if len(self._indexes) > 0: index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names) df = pandas.DataFrame(data=self._values, index=index) else: df = pandas.DataFrame(data=self._values) return df @classmethod def _process_agg(cls, bucket, indexes=(), names=()): """ Recursively extract agg values :param bucket: a bucket contains either sub-buckets or a bunch of aggregated values :return: a list of tuples: (index_name, index_tuple, row) """ # for each agg, yield a row row = {} for k, v in bucket.items(): if isinstance(v, dict): if 'buckets' in v: for sub_bucket in v['buckets']: for x in Agg._process_agg(sub_bucket, indexes + (sub_bucket['key'],), names + (k,)): yield x elif 'value' in v: row[k] = v['value'] elif 'values' in v: # percentiles row = v['values'] if k == 'doc_count': # count docs row['doc_count'] = v if len(row) > 0: yield (names, indexes, row) @staticmethod def from_dict(d): agg = Agg() agg.explain_result(d) return agg
Classes
class Agg
All the operations on a read-only sequence.
Concrete subclasses must override new or init, getitem, and len.
class Agg(Query): def __init__(self): super(Agg, self).__init__() self._index_names = None self._indexes = None def explain_result(self, result=None): super(Agg, self).explain_result(result) tuples = list(Agg._process_agg(self._result_dict['aggregations'])) assert len(tuples) > 0 self._index_names = list(tuples[0][0]) self._values = [] self._indexes = [] for t in tuples: _, index, row = t self.append(row) if len(index) > 0: self._indexes.append(index) def to_pandas(self): print(self._indexes, self._values) if self._values is not None: if len(self._indexes) > 0: index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names) df = pandas.DataFrame(data=self._values, index=index) else: df = pandas.DataFrame(data=self._values) return df @classmethod def _process_agg(cls, bucket, indexes=(), names=()): """ Recursively extract agg values :param bucket: a bucket contains either sub-buckets or a bunch of aggregated values :return: a list of tuples: (index_name, index_tuple, row) """ # for each agg, yield a row row = {} for k, v in bucket.items(): if isinstance(v, dict): if 'buckets' in v: for sub_bucket in v['buckets']: for x in Agg._process_agg(sub_bucket, indexes + (sub_bucket['key'],), names + (k,)): yield x elif 'value' in v: row[k] = v['value'] elif 'values' in v: # percentiles row = v['values'] if k == 'doc_count': # count docs row['doc_count'] = v if len(row) > 0: yield (names, indexes, row) @staticmethod def from_dict(d): agg = Agg() agg.explain_result(d) return agg
Ancestors (in MRO)
- Agg
- Query
- collections.abc.MutableSequence
- collections.abc.Sequence
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- builtins.object
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): super(Agg, self).__init__() self._index_names = None self._indexes = None
def append(
self, value)
S.append(value) -- append value to the end of the sequence
def append(self, value): self._values.append(value)
def clear(
self)
S.clear() -> None -- remove all items from S
def clear(self): 'S.clear() -> None -- remove all items from S' try: while True: self.pop() except IndexError: pass
def count(
self, value)
S.count(value) -> integer -- return number of occurrences of value
def count(self, value): 'S.count(value) -> integer -- return number of occurrences of value' return sum(1 for v in self if v == value)
def explain_result(
self, result=None)
def explain_result(self, result=None): super(Agg, self).explain_result(result) tuples = list(Agg._process_agg(self._result_dict['aggregations'])) assert len(tuples) > 0 self._index_names = list(tuples[0][0]) self._values = [] self._indexes = [] for t in tuples: _, index, row = t self.append(row) if len(index) > 0: self._indexes.append(index)
def extend(
self, values)
S.extend(iterable) -- extend sequence by appending elements from the iterable
def extend(self, values): 'S.extend(iterable) -- extend sequence by appending elements from the iterable' for v in values: self.append(v)
def from_dict(
d)
@staticmethod def from_dict(d): agg = Agg() agg.explain_result(d) return agg
def index(
self, value, start=0, stop=None)
S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.
def index(self, value, start=0, stop=None): '''S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present. ''' if start is not None and start < 0: start = max(len(self) + start, 0) if stop is not None and stop < 0: stop += len(self) i = start while stop is None or i < stop: try: if self[i] == value: return i except IndexError: break i += 1 raise ValueError
def insert(
self, index, value)
S.insert(index, value) -- insert value before index
def insert(self, index, value): self._values.insert(index, value)
def pop(
self, index=-1)
S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.
def pop(self, index=-1): '''S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range. ''' v = self[index] del self[index] return v
def print_json(
self)
def print_json(self): indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4, ensure_ascii=False) print(indented_json)
def remove(
self, value)
S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.
def remove(self, value): '''S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present. ''' del self[self.index(value)]
def reverse(
self)
S.reverse() -- reverse IN PLACE
def reverse(self): 'S.reverse() -- reverse *IN PLACE*' n = len(self) for i in range(n//2): self[i], self[n-i-1] = self[n-i-1], self[i]
def to_pandas(
self)
Export the current query result to a Pandas DataFrame object.
def to_pandas(self): print(self._indexes, self._values) if self._values is not None: if len(self._indexes) > 0: index = pandas.MultiIndex.from_tuples(self._indexes, names=self._index_names) df = pandas.DataFrame(data=self._values, index=index) else: df = pandas.DataFrame(data=self._values) return df
Instance variables
var json
Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string
var result
class Query
All the operations on a read-only sequence.
Concrete subclasses must override new or init, getitem, and len.
class Query(collections.MutableSequence): def __init__(self): super(Query, self).__init__() self._values = None self._result_dict = None def explain_result(self, result=None): if result is not None: assert isinstance(result, dict) self._result_dict = result def to_pandas(self): """ Export the current query result to a Pandas DataFrame object. """ raise NotImplementedError('implemented in subclass') def print_json(self): indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4, ensure_ascii=False) print(indented_json) @property def result(self): return self._values @property def json(self): """ Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string """ return json.dumps(self._result_dict) def insert(self, index, value): self._values.insert(index, value) def append(self, value): self._values.append(value) def __str__(self): return str(self._values) def __len__(self): return len(self._values) def __delitem__(self, index): del self._values[index] def __setitem__(self, index, value): self._values[index] = value def __getitem__(self, index): return self._values[index]
Ancestors (in MRO)
- Query
- collections.abc.MutableSequence
- collections.abc.Sequence
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- builtins.object
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): super(Query, self).__init__() self._values = None self._result_dict = None
def append(
self, value)
S.append(value) -- append value to the end of the sequence
def append(self, value): self._values.append(value)
def clear(
self)
S.clear() -> None -- remove all items from S
def clear(self): 'S.clear() -> None -- remove all items from S' try: while True: self.pop() except IndexError: pass
def count(
self, value)
S.count(value) -> integer -- return number of occurrences of value
def count(self, value): 'S.count(value) -> integer -- return number of occurrences of value' return sum(1 for v in self if v == value)
def explain_result(
self, result=None)
def explain_result(self, result=None): if result is not None: assert isinstance(result, dict) self._result_dict = result
def extend(
self, values)
S.extend(iterable) -- extend sequence by appending elements from the iterable
def extend(self, values): 'S.extend(iterable) -- extend sequence by appending elements from the iterable' for v in values: self.append(v)
def index(
self, value, start=0, stop=None)
S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.
def index(self, value, start=0, stop=None): '''S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present. ''' if start is not None and start < 0: start = max(len(self) + start, 0) if stop is not None and stop < 0: stop += len(self) i = start while stop is None or i < stop: try: if self[i] == value: return i except IndexError: break i += 1 raise ValueError
def insert(
self, index, value)
S.insert(index, value) -- insert value before index
def insert(self, index, value): self._values.insert(index, value)
def pop(
self, index=-1)
S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.
def pop(self, index=-1): '''S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range. ''' v = self[index] del self[index] return v
def print_json(
self)
def print_json(self): indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4, ensure_ascii=False) print(indented_json)
def remove(
self, value)
S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.
def remove(self, value): '''S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present. ''' del self[self.index(value)]
def reverse(
self)
S.reverse() -- reverse IN PLACE
def reverse(self): 'S.reverse() -- reverse *IN PLACE*' n = len(self) for i in range(n//2): self[i], self[n-i-1] = self[n-i-1], self[i]
def to_pandas(
self)
Export the current query result to a Pandas DataFrame object.
def to_pandas(self): """ Export the current query result to a Pandas DataFrame object. """ raise NotImplementedError('implemented in subclass')
Instance variables
var json
Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string
var result
class Select
All the operations on a read-only sequence.
Concrete subclasses must override new or init, getitem, and len.
class Select(Query): def __init__(self): super(Select, self).__init__() def explain_result(self, result=None): super(Select, self).explain_result(result) self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']] def to_pandas(self): if self._values: df = pandas.DataFrame(data=self._values) return df @staticmethod def from_dict(d): query = Select() query.explain_result(d) return query
Ancestors (in MRO)
- Select
- Query
- collections.abc.MutableSequence
- collections.abc.Sequence
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- builtins.object
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): super(Select, self).__init__()
def append(
self, value)
S.append(value) -- append value to the end of the sequence
def append(self, value): self._values.append(value)
def clear(
self)
S.clear() -> None -- remove all items from S
def clear(self): 'S.clear() -> None -- remove all items from S' try: while True: self.pop() except IndexError: pass
def count(
self, value)
S.count(value) -> integer -- return number of occurrences of value
def count(self, value): 'S.count(value) -> integer -- return number of occurrences of value' return sum(1 for v in self if v == value)
def explain_result(
self, result=None)
def explain_result(self, result=None): super(Select, self).explain_result(result) self._values = [hit['_source'] for hit in self._result_dict['hits']['hits']]
def extend(
self, values)
S.extend(iterable) -- extend sequence by appending elements from the iterable
def extend(self, values): 'S.extend(iterable) -- extend sequence by appending elements from the iterable' for v in values: self.append(v)
def from_dict(
d)
@staticmethod def from_dict(d): query = Select() query.explain_result(d) return query
def index(
self, value, start=0, stop=None)
S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.
def index(self, value, start=0, stop=None): '''S.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present. ''' if start is not None and start < 0: start = max(len(self) + start, 0) if stop is not None and stop < 0: stop += len(self) i = start while stop is None or i < stop: try: if self[i] == value: return i except IndexError: break i += 1 raise ValueError
def insert(
self, index, value)
S.insert(index, value) -- insert value before index
def insert(self, index, value): self._values.insert(index, value)
def pop(
self, index=-1)
S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range.
def pop(self, index=-1): '''S.pop([index]) -> item -- remove and return item at index (default last). Raise IndexError if list is empty or index is out of range. ''' v = self[index] del self[index] return v
def print_json(
self)
def print_json(self): indented_json = json.dumps(self._result_dict, sort_keys=True, separators=(',', ': '), indent=4, ensure_ascii=False) print(indented_json)
def remove(
self, value)
S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present.
def remove(self, value): '''S.remove(value) -- remove first occurrence of value. Raise ValueError if the value is not present. ''' del self[self.index(value)]
def reverse(
self)
S.reverse() -- reverse IN PLACE
def reverse(self): 'S.reverse() -- reverse *IN PLACE*' n = len(self) for i in range(n//2): self[i], self[n-i-1] = self[n-i-1], self[i]
def to_pandas(
self)
Export the current query result to a Pandas DataFrame object.
def to_pandas(self): if self._values: df = pandas.DataFrame(data=self._values) return df
Instance variables
var json
Gets the original JSON representation returned by Elasticsearch REST API :return: The JSON string indicating the query result :rtype: string
var result