Top

pandasticsearch.dataframe module

# -*- coding: UTF-8 -*-

from pandasticsearch.client import RestClient
from pandasticsearch.queries import Agg, Select
from pandasticsearch.operators import *
from pandasticsearch.types import Column
from pandasticsearch.types import Row

import json
import six
import sys


class DataFrame(object):
    """
    A :class:`DataFrame` treats index and documents in Elasticsearch as named columns and rows.

    >>> from pandasticsearch import DataFrame
    >>> df = DataFrame.from_es('http://localhost:9200', index='people')

    It can be converted to Pandas object for subsequent analysis.
    >>> df.to_pandas()
    """

    def __init__(self, client, mapping, **kwargs):
        cols = []
        for index, mappings in six.iteritems(mapping):
            self._index = index
            for _, properties in six.iteritems(mappings['mappings']):
                for k, _ in six.iteritems(properties['properties']):
                    cols.append(k)

        if self._index is None:
            raise Exception('No index in [{0}]'.format(mapping))

        if len(cols) == 0:
            raise Exception('0 columns found in [{0}]'.format(self._index))

        self._client = client
        self._columns = sorted(cols)
        self._mapping = mapping
        self._filter = kwargs.get('filter', None)
        self._aggregation = kwargs.get('aggregation', None)
        self._sort = kwargs.get('sort', None)
        self._projection = kwargs.get('projection', None)
        self._limit = kwargs.get('limit', None)
        self._last_query = None

    @staticmethod
    def from_es(url, index, doc_type=None):
        """
        Creates an :class:`DataFrame` object by providing the URL of ElasticSearch node and the name of the index.

        :param str url: URL of the node connected to
        :param str index: The name of the index
        :param str doc_type: The type of the document
        :return: DataFrame object for accessing
        :rtype: DataFrame
        """
        # get mapping structure from server
        if doc_type is None:
            mapping_endpoint = index
        else:
            mapping_endpoint = index + '/_mapping/' + doc_type

        mapping = RestClient(url, mapping_endpoint).get()

        if doc_type is None:
            endpoint = index + '/_search'
        else:
            endpoint = index + '/' + doc_type + '/_search'
        return DataFrame(RestClient(url, endpoint), mapping)

    def __getattr__(self, name):
        """
        Returns the :class:`Column` denoted by ``name``.
        """
        if name not in self.columns:
            raise AttributeError(
                "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
        return Column(name)

    def __getitem__(self, item):
        if isinstance(item, six.string_types):
            if item not in self._columns:
                raise TypeError('Column does not exist: [{0}]'.format(item))
            return Column(item)
        elif isinstance(item, BooleanFilter):
            self._filter = item.build()
            return self
        else:
            raise TypeError('Unsupported expr: [{0}]'.format(item))

    def filter(self, condition):
        """
        Filters rows using a given condition

        where() is an alias for filter().

        :param condition: BooleanCond object

        >>> df.filter(df['age'] < 13).collect()
        [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]
        """
        assert isinstance(condition, BooleanFilter)
        return DataFrame(self._client, self._mapping,
                         filter=condition.build(),
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self._limit)

    where = filter

    def select(self, *cols):
        """
        Projects a set of columns and returns a new L{DataFrame}

        :param cols: list of column names or L{Column}.

        >>> df.filter(df['age'] < 25).select('name', 'age').collect()
        [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]
        """
        columns = []
        for col in cols:
            if isinstance(col, six.string_types):
                columns.append(getattr(self, col))
            elif isinstance(col, Column):
                columns.append(col)
            else:
                raise TypeError('{0} is supposed to be str or Column'.format(col))
        return self._select(*columns)

    def _select(self, *cols):
        project = {"includes": [col.field_name() for col in cols], "excludes": []}
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=project,
                         sort=self._sort,
                         limit=self._limit)

    def limit(self, num):
        """
        Limits the result count to the number specified.
        """
        assert isinstance(num, int)
        assert num >= 1
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=num)

    def groupby(self, *cols):
        columns = []
        for col in cols:
            if isinstance(col, six.string_types):
                columns.append(getattr(self, col))
            elif isinstance(col, Column):
                columns.append(col)
            else:
                raise TypeError('{0} is supposed to be str or Column'.format(col))

        group_agg = self._agg_by_group(*columns)
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=group_agg,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self.limit)

    def _agg_by_group(self, *cols):
        if len(cols) == 1:
            col = cols[0]
            return {col.field_name(): {'terms': {'field': col.field_name(), 'size': 20}}}
        else:
            col = cols[0]
            deep_agg = self._agg_by_group(*cols[1:])
            return {col.field_name(): {'terms': {'field': col.field_name(), 'size': 20}, 'aggregations': deep_agg}}

    def agg(self, *aggs):
        """
        Aggregate on the entire DataFrame without groups.
        :param aggs: aggregate functions

        >>> df[df['gender'] == 'male'].agg(df['age'].avg).collect()
        [Row(avg(age)=12)]
        """

        if self._aggregation is None:
            self._aggregation = self._agg(*aggs)
        else:
            aggregation = self._aggregation
            while True:
                key = list(aggregation.keys())[0]
                if 'aggregations' in aggregation[key]:
                    aggregation = aggregation[key]['aggregations']
                else:
                    break
            key = list(aggregation.keys())[0]
            aggregation[key]['aggregations'] = self._agg(*aggs)

        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self._limit)

    def _agg(self, *aggs):
        aggregation = {}
        for agg in aggs:
            assert isinstance(agg, Aggregator)
            aggregation.update(agg.build())
        return aggregation

    def sort(self, *cols):
        """Returns a new :class:`DataFrame` sorted by the specified column(s).

        :param cols: list of :class:`Column`to sort by.

        orderby() is an alias for sort().

        >>> df.sort(df['age'].asc).collect()
        [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]
        """
        sorts = []
        for col in cols:
            assert isinstance(col, Sorter)
            sorts.append(col.build())

        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         sort=sorts,
                         projection=self._projection,
                         limit=self._limit)

    orderby = sort

    def _execute(self):
        res_dict = self._client.post(data=self._build_query())
        if self._aggregation is not None:
            query = Agg.from_dict(res_dict)
        else:
            query = Select.from_dict(res_dict)
        return query

    def collect(self):
        """
        Returns all the records as a list of Row.

        :return: list of L{Row}

        >>> df.collect()
        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
        """
        query = self._execute()
        return [Row(**v) for v in query.result]

    def to_pandas(self):
        """
        Export to a Pandas DataFrame object.
        :return: The DataFrame representing the query result

        >>> df[df['gender'] == 'male'].agg(Avg('age')).to_pandas()
            avg(age)
        0        12
        """
        query = self._execute()
        return query.to_pandas()

    def count(self):
        """
        Returns the number of rows in this index/type.
        >>> df.count()
        2
        """
        _df = self.agg(MetricAggregator('_index', 'value_count'))
        return _df.collect()[0]['count(*)']

    def show(self, n=10000):
        """
        Prints the first ``n`` rows to the console.

        :param n:  Number of rows to show.

        >>> df.filter(df['age'] < 25).select('name').show(3)
        +------+
        | name |
        +------+
        | Alice|
        | Bob  |
        | Leo  |
        +------+
        """
        assert n > 0

        if self._aggregation is not None:
            raise TypeError('show() is not allowed for aggregation. use collect() instead')

        query = self._execute()
        cols = self._columns
        widths = []
        tavnit = '|'
        separator = '+'

        for col in cols:
            maxlen = len(col)
            for kv in query.result[:n]:
                if col in kv:
                    s = DataFrame._stringfy_value(kv[col])
                else:
                    s = '(NULL)'
                if len(s) > maxlen:
                    maxlen = len(s)
            widths.append(min(maxlen, 15))

        for w in widths:
            tavnit += ' %-' + '%ss |' % (w,)
            separator += '-' * w + '--+'

        sys.stdout.write(separator + '\n')
        sys.stdout.write(tavnit % tuple(cols) + '\n')
        sys.stdout.write(separator + '\n')
        for kv in query.result[:n]:
            row = []
            for col in cols:
                if col in kv:
                    row.append(DataFrame._stringfy_value(kv[col]))
                else:
                    row.append('(NULL)')
            sys.stdout.write(tavnit % tuple(row) + '\n')
        sys.stdout.write(separator + '\n')

    @classmethod
    def _stringfy_value(cls, value):
        b = six.StringIO()
        if isinstance(value, list):
            value = ','.join([DataFrame._stringfy_value(v) for v in value])
        b.write(repr(value))
        return b.getvalue()

    def __repr__(self):
        return "DataFrame[%s]" % (", ".join("%s" % c for c in self._columns))

    def print_debug(self):
        """
        Return a indented JSON string returned by the Elasticsearch Server
        """
        sys.stdout.write(json.dumps(self._client.post(data=self._build_query()), indent=4))

    def to_dict(self):
        """
        Converts the current :class:`DataFrame` to Elasticsearch search dictionary.
        :return: a dictionary which obeys the Elasticsearch RESTful protocol
        """
        return self._build_query()

    def print_schema(self):
        """
        Prints out the schema in the tree format.

        >>> df.print_schema()
        index_name
        |-- type_name
          |-- experience :  {'type': 'integer'}
          |-- id :  {'type': 'string'}
          |-- mobile :  {'index': 'not_analyzed', 'type': 'string'}
          |-- regions :  {'index': 'not_analyzed', 'type': 'string'}
        """
        for index, mappings in six.iteritems(self._mapping):
            sys.stdout.write('{0}\n'.format(index))
            for typ, properties in six.iteritems(mappings['mappings']):
                sys.stdout.write('|--{0}\n'.format(typ))
                for k, v in six.iteritems(properties['properties']):
                    sys.stdout.write('  |--{0}: {1}\n'.format(k, v))

    @property
    def columns(self):
        """
        Returns all column names as a list.
        :return: column names as a list

        >>> df.columns
        ['age', 'name']
        """
        return self._columns

    @property
    def schema(self):
        return self._mapping

    def _build_query(self):
        query = {}

        if self._limit:
            query['size'] = self._limit
        else:
            query['size'] = 20

        if self._aggregation:
            query['aggregations'] = self._aggregation
            query['size'] = 0

        if self._filter:
            query['query'] = {'filtered': {'filter': self._filter}}

        if self._projection:
            query['_source'] = self._projection

        if self._sort:
            query['sort'] = self._sort
        self._last_query = query
        return query

Classes

class DataFrame

A :class:DataFrame treats index and documents in Elasticsearch as named columns and rows.

from pandasticsearch import DataFrame df = DataFrame.from_es('http://localhost:9200', index='people')

It can be converted to Pandas object for subsequent analysis.

df.to_pandas()

class DataFrame(object):
    """
    A :class:`DataFrame` treats index and documents in Elasticsearch as named columns and rows.

    >>> from pandasticsearch import DataFrame
    >>> df = DataFrame.from_es('http://localhost:9200', index='people')

    It can be converted to Pandas object for subsequent analysis.
    >>> df.to_pandas()
    """

    def __init__(self, client, mapping, **kwargs):
        cols = []
        for index, mappings in six.iteritems(mapping):
            self._index = index
            for _, properties in six.iteritems(mappings['mappings']):
                for k, _ in six.iteritems(properties['properties']):
                    cols.append(k)

        if self._index is None:
            raise Exception('No index in [{0}]'.format(mapping))

        if len(cols) == 0:
            raise Exception('0 columns found in [{0}]'.format(self._index))

        self._client = client
        self._columns = sorted(cols)
        self._mapping = mapping
        self._filter = kwargs.get('filter', None)
        self._aggregation = kwargs.get('aggregation', None)
        self._sort = kwargs.get('sort', None)
        self._projection = kwargs.get('projection', None)
        self._limit = kwargs.get('limit', None)
        self._last_query = None

    @staticmethod
    def from_es(url, index, doc_type=None):
        """
        Creates an :class:`DataFrame` object by providing the URL of ElasticSearch node and the name of the index.

        :param str url: URL of the node connected to
        :param str index: The name of the index
        :param str doc_type: The type of the document
        :return: DataFrame object for accessing
        :rtype: DataFrame
        """
        # get mapping structure from server
        if doc_type is None:
            mapping_endpoint = index
        else:
            mapping_endpoint = index + '/_mapping/' + doc_type

        mapping = RestClient(url, mapping_endpoint).get()

        if doc_type is None:
            endpoint = index + '/_search'
        else:
            endpoint = index + '/' + doc_type + '/_search'
        return DataFrame(RestClient(url, endpoint), mapping)

    def __getattr__(self, name):
        """
        Returns the :class:`Column` denoted by ``name``.
        """
        if name not in self.columns:
            raise AttributeError(
                "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
        return Column(name)

    def __getitem__(self, item):
        if isinstance(item, six.string_types):
            if item not in self._columns:
                raise TypeError('Column does not exist: [{0}]'.format(item))
            return Column(item)
        elif isinstance(item, BooleanFilter):
            self._filter = item.build()
            return self
        else:
            raise TypeError('Unsupported expr: [{0}]'.format(item))

    def filter(self, condition):
        """
        Filters rows using a given condition

        where() is an alias for filter().

        :param condition: BooleanCond object

        >>> df.filter(df['age'] < 13).collect()
        [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]
        """
        assert isinstance(condition, BooleanFilter)
        return DataFrame(self._client, self._mapping,
                         filter=condition.build(),
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self._limit)

    where = filter

    def select(self, *cols):
        """
        Projects a set of columns and returns a new L{DataFrame}

        :param cols: list of column names or L{Column}.

        >>> df.filter(df['age'] < 25).select('name', 'age').collect()
        [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]
        """
        columns = []
        for col in cols:
            if isinstance(col, six.string_types):
                columns.append(getattr(self, col))
            elif isinstance(col, Column):
                columns.append(col)
            else:
                raise TypeError('{0} is supposed to be str or Column'.format(col))
        return self._select(*columns)

    def _select(self, *cols):
        project = {"includes": [col.field_name() for col in cols], "excludes": []}
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=project,
                         sort=self._sort,
                         limit=self._limit)

    def limit(self, num):
        """
        Limits the result count to the number specified.
        """
        assert isinstance(num, int)
        assert num >= 1
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=num)

    def groupby(self, *cols):
        columns = []
        for col in cols:
            if isinstance(col, six.string_types):
                columns.append(getattr(self, col))
            elif isinstance(col, Column):
                columns.append(col)
            else:
                raise TypeError('{0} is supposed to be str or Column'.format(col))

        group_agg = self._agg_by_group(*columns)
        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=group_agg,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self.limit)

    def _agg_by_group(self, *cols):
        if len(cols) == 1:
            col = cols[0]
            return {col.field_name(): {'terms': {'field': col.field_name(), 'size': 20}}}
        else:
            col = cols[0]
            deep_agg = self._agg_by_group(*cols[1:])
            return {col.field_name(): {'terms': {'field': col.field_name(), 'size': 20}, 'aggregations': deep_agg}}

    def agg(self, *aggs):
        """
        Aggregate on the entire DataFrame without groups.
        :param aggs: aggregate functions

        >>> df[df['gender'] == 'male'].agg(df['age'].avg).collect()
        [Row(avg(age)=12)]
        """

        if self._aggregation is None:
            self._aggregation = self._agg(*aggs)
        else:
            aggregation = self._aggregation
            while True:
                key = list(aggregation.keys())[0]
                if 'aggregations' in aggregation[key]:
                    aggregation = aggregation[key]['aggregations']
                else:
                    break
            key = list(aggregation.keys())[0]
            aggregation[key]['aggregations'] = self._agg(*aggs)

        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         projection=self._projection,
                         sort=self._sort,
                         limit=self._limit)

    def _agg(self, *aggs):
        aggregation = {}
        for agg in aggs:
            assert isinstance(agg, Aggregator)
            aggregation.update(agg.build())
        return aggregation

    def sort(self, *cols):
        """Returns a new :class:`DataFrame` sorted by the specified column(s).

        :param cols: list of :class:`Column`to sort by.

        orderby() is an alias for sort().

        >>> df.sort(df['age'].asc).collect()
        [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]
        """
        sorts = []
        for col in cols:
            assert isinstance(col, Sorter)
            sorts.append(col.build())

        return DataFrame(self._client, self._mapping,
                         filter=self._filter,
                         aggregation=self._aggregation,
                         sort=sorts,
                         projection=self._projection,
                         limit=self._limit)

    orderby = sort

    def _execute(self):
        res_dict = self._client.post(data=self._build_query())
        if self._aggregation is not None:
            query = Agg.from_dict(res_dict)
        else:
            query = Select.from_dict(res_dict)
        return query

    def collect(self):
        """
        Returns all the records as a list of Row.

        :return: list of L{Row}

        >>> df.collect()
        [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
        """
        query = self._execute()
        return [Row(**v) for v in query.result]

    def to_pandas(self):
        """
        Export to a Pandas DataFrame object.
        :return: The DataFrame representing the query result

        >>> df[df['gender'] == 'male'].agg(Avg('age')).to_pandas()
            avg(age)
        0        12
        """
        query = self._execute()
        return query.to_pandas()

    def count(self):
        """
        Returns the number of rows in this index/type.
        >>> df.count()
        2
        """
        _df = self.agg(MetricAggregator('_index', 'value_count'))
        return _df.collect()[0]['count(*)']

    def show(self, n=10000):
        """
        Prints the first ``n`` rows to the console.

        :param n:  Number of rows to show.

        >>> df.filter(df['age'] < 25).select('name').show(3)
        +------+
        | name |
        +------+
        | Alice|
        | Bob  |
        | Leo  |
        +------+
        """
        assert n > 0

        if self._aggregation is not None:
            raise TypeError('show() is not allowed for aggregation. use collect() instead')

        query = self._execute()
        cols = self._columns
        widths = []
        tavnit = '|'
        separator = '+'

        for col in cols:
            maxlen = len(col)
            for kv in query.result[:n]:
                if col in kv:
                    s = DataFrame._stringfy_value(kv[col])
                else:
                    s = '(NULL)'
                if len(s) > maxlen:
                    maxlen = len(s)
            widths.append(min(maxlen, 15))

        for w in widths:
            tavnit += ' %-' + '%ss |' % (w,)
            separator += '-' * w + '--+'

        sys.stdout.write(separator + '\n')
        sys.stdout.write(tavnit % tuple(cols) + '\n')
        sys.stdout.write(separator + '\n')
        for kv in query.result[:n]:
            row = []
            for col in cols:
                if col in kv:
                    row.append(DataFrame._stringfy_value(kv[col]))
                else:
                    row.append('(NULL)')
            sys.stdout.write(tavnit % tuple(row) + '\n')
        sys.stdout.write(separator + '\n')

    @classmethod
    def _stringfy_value(cls, value):
        b = six.StringIO()
        if isinstance(value, list):
            value = ','.join([DataFrame._stringfy_value(v) for v in value])
        b.write(repr(value))
        return b.getvalue()

    def __repr__(self):
        return "DataFrame[%s]" % (", ".join("%s" % c for c in self._columns))

    def print_debug(self):
        """
        Return a indented JSON string returned by the Elasticsearch Server
        """
        sys.stdout.write(json.dumps(self._client.post(data=self._build_query()), indent=4))

    def to_dict(self):
        """
        Converts the current :class:`DataFrame` to Elasticsearch search dictionary.
        :return: a dictionary which obeys the Elasticsearch RESTful protocol
        """
        return self._build_query()

    def print_schema(self):
        """
        Prints out the schema in the tree format.

        >>> df.print_schema()
        index_name
        |-- type_name
          |-- experience :  {'type': 'integer'}
          |-- id :  {'type': 'string'}
          |-- mobile :  {'index': 'not_analyzed', 'type': 'string'}
          |-- regions :  {'index': 'not_analyzed', 'type': 'string'}
        """
        for index, mappings in six.iteritems(self._mapping):
            sys.stdout.write('{0}\n'.format(index))
            for typ, properties in six.iteritems(mappings['mappings']):
                sys.stdout.write('|--{0}\n'.format(typ))
                for k, v in six.iteritems(properties['properties']):
                    sys.stdout.write('  |--{0}: {1}\n'.format(k, v))

    @property
    def columns(self):
        """
        Returns all column names as a list.
        :return: column names as a list

        >>> df.columns
        ['age', 'name']
        """
        return self._columns

    @property
    def schema(self):
        return self._mapping

    def _build_query(self):
        query = {}

        if self._limit:
            query['size'] = self._limit
        else:
            query['size'] = 20

        if self._aggregation:
            query['aggregations'] = self._aggregation
            query['size'] = 0

        if self._filter:
            query['query'] = {'filtered': {'filter': self._filter}}

        if self._projection:
            query['_source'] = self._projection

        if self._sort:
            query['sort'] = self._sort
        self._last_query = query
        return query

Ancestors (in MRO)

Static methods

def __init__(

self, client, mapping, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, client, mapping, **kwargs):
    cols = []
    for index, mappings in six.iteritems(mapping):
        self._index = index
        for _, properties in six.iteritems(mappings['mappings']):
            for k, _ in six.iteritems(properties['properties']):
                cols.append(k)
    if self._index is None:
        raise Exception('No index in [{0}]'.format(mapping))
    if len(cols) == 0:
        raise Exception('0 columns found in [{0}]'.format(self._index))
    self._client = client
    self._columns = sorted(cols)
    self._mapping = mapping
    self._filter = kwargs.get('filter', None)
    self._aggregation = kwargs.get('aggregation', None)
    self._sort = kwargs.get('sort', None)
    self._projection = kwargs.get('projection', None)
    self._limit = kwargs.get('limit', None)
    self._last_query = None

def agg(

self, *aggs)

Aggregate on the entire DataFrame without groups. :param aggs: aggregate functions

df[df['gender'] == 'male'].agg(df['age'].avg).collect() [Row(avg(age)=12)]

def agg(self, *aggs):
    """
    Aggregate on the entire DataFrame without groups.
    :param aggs: aggregate functions
    >>> df[df['gender'] == 'male'].agg(df['age'].avg).collect()
    [Row(avg(age)=12)]
    """
    if self._aggregation is None:
        self._aggregation = self._agg(*aggs)
    else:
        aggregation = self._aggregation
        while True:
            key = list(aggregation.keys())[0]
            if 'aggregations' in aggregation[key]:
                aggregation = aggregation[key]['aggregations']
            else:
                break
        key = list(aggregation.keys())[0]
        aggregation[key]['aggregations'] = self._agg(*aggs)
    return DataFrame(self._client, self._mapping,
                     filter=self._filter,
                     aggregation=self._aggregation,
                     projection=self._projection,
                     sort=self._sort,
                     limit=self._limit)

def collect(

self)

Returns all the records as a list of Row.

:return: list of L{Row}

df.collect() [Row(age=2, name='Alice'), Row(age=5, name='Bob')]

def collect(self):
    """
    Returns all the records as a list of Row.
    :return: list of L{Row}
    >>> df.collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    """
    query = self._execute()
    return [Row(**v) for v in query.result]

def count(

self)

Returns the number of rows in this index/type.

df.count() 2

def count(self):
    """
    Returns the number of rows in this index/type.
    >>> df.count()
    2
    """
    _df = self.agg(MetricAggregator('_index', 'value_count'))
    return _df.collect()[0]['count(*)']

def filter(

self, condition)

Filters rows using a given condition

where() is an alias for filter().

:param condition: BooleanCond object

df.filter(df['age'] < 13).collect() [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]

def filter(self, condition):
    """
    Filters rows using a given condition
    where() is an alias for filter().
    :param condition: BooleanCond object
    >>> df.filter(df['age'] < 13).collect()
    [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]
    """
    assert isinstance(condition, BooleanFilter)
    return DataFrame(self._client, self._mapping,
                     filter=condition.build(),
                     aggregation=self._aggregation,
                     projection=self._projection,
                     sort=self._sort,
                     limit=self._limit)

def from_es(

url, index, doc_type=None)

Creates an :class:DataFrame object by providing the URL of ElasticSearch node and the name of the index.

:param str url: URL of the node connected to :param str index: The name of the index :param str doc_type: The type of the document :return: DataFrame object for accessing :rtype: DataFrame

@staticmethod
def from_es(url, index, doc_type=None):
    """
    Creates an :class:`DataFrame` object by providing the URL of ElasticSearch node and the name of the index.
    :param str url: URL of the node connected to
    :param str index: The name of the index
    :param str doc_type: The type of the document
    :return: DataFrame object for accessing
    :rtype: DataFrame
    """
    # get mapping structure from server
    if doc_type is None:
        mapping_endpoint = index
    else:
        mapping_endpoint = index + '/_mapping/' + doc_type
    mapping = RestClient(url, mapping_endpoint).get()
    if doc_type is None:
        endpoint = index + '/_search'
    else:
        endpoint = index + '/' + doc_type + '/_search'
    return DataFrame(RestClient(url, endpoint), mapping)

def groupby(

self, *cols)

def groupby(self, *cols):
    columns = []
    for col in cols:
        if isinstance(col, six.string_types):
            columns.append(getattr(self, col))
        elif isinstance(col, Column):
            columns.append(col)
        else:
            raise TypeError('{0} is supposed to be str or Column'.format(col))
    group_agg = self._agg_by_group(*columns)
    return DataFrame(self._client, self._mapping,
                     filter=self._filter,
                     aggregation=group_agg,
                     projection=self._projection,
                     sort=self._sort,
                     limit=self.limit)

def limit(

self, num)

Limits the result count to the number specified.

def limit(self, num):
    """
    Limits the result count to the number specified.
    """
    assert isinstance(num, int)
    assert num >= 1
    return DataFrame(self._client, self._mapping,
                     filter=self._filter,
                     aggregation=self._aggregation,
                     projection=self._projection,
                     sort=self._sort,
                     limit=num)

def orderby(

self, *cols)

Returns a new :class:DataFrame sorted by the specified column(s).

:param cols: list of :class:Columnto sort by.

orderby() is an alias for sort().

df.sort(df['age'].asc).collect() [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]

def sort(self, *cols):
    """Returns a new :class:`DataFrame` sorted by the specified column(s).
    :param cols: list of :class:`Column`to sort by.
    orderby() is an alias for sort().
    >>> df.sort(df['age'].asc).collect()
    [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]
    """
    sorts = []
    for col in cols:
        assert isinstance(col, Sorter)
        sorts.append(col.build())
    return DataFrame(self._client, self._mapping,
                     filter=self._filter,
                     aggregation=self._aggregation,
                     sort=sorts,
                     projection=self._projection,
                     limit=self._limit)

def print_debug(

self)

Return a indented JSON string returned by the Elasticsearch Server

def print_debug(self):
    """
    Return a indented JSON string returned by the Elasticsearch Server
    """
    sys.stdout.write(json.dumps(self._client.post(data=self._build_query()), indent=4))

def print_schema(

self)

Prints out the schema in the tree format.

df.print_schema() index_name |-- type_name |-- experience : {'type': 'integer'} |-- id : {'type': 'string'} |-- mobile : {'index': 'not_analyzed', 'type': 'string'} |-- regions : {'index': 'not_analyzed', 'type': 'string'}

def print_schema(self):
    """
    Prints out the schema in the tree format.
    >>> df.print_schema()
    index_name
    |-- type_name
      |-- experience :  {'type': 'integer'}
      |-- id :  {'type': 'string'}
      |-- mobile :  {'index': 'not_analyzed', 'type': 'string'}
      |-- regions :  {'index': 'not_analyzed', 'type': 'string'}
    """
    for index, mappings in six.iteritems(self._mapping):
        sys.stdout.write('{0}\n'.format(index))
        for typ, properties in six.iteritems(mappings['mappings']):
            sys.stdout.write('|--{0}\n'.format(typ))
            for k, v in six.iteritems(properties['properties']):
                sys.stdout.write('  |--{0}: {1}\n'.format(k, v))

def select(

self, *cols)

Projects a set of columns and returns a new L{DataFrame}

:param cols: list of column names or L{Column}.

df.filter(df['age'] < 25).select('name', 'age').collect() [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]

def select(self, *cols):
    """
    Projects a set of columns and returns a new L{DataFrame}
    :param cols: list of column names or L{Column}.
    >>> df.filter(df['age'] < 25).select('name', 'age').collect()
    [Row(age=12,name='Alice'), Row(age=11,name='Bob'), Row(age=13,name='Leo')]
    """
    columns = []
    for col in cols:
        if isinstance(col, six.string_types):
            columns.append(getattr(self, col))
        elif isinstance(col, Column):
            columns.append(col)
        else:
            raise TypeError('{0} is supposed to be str or Column'.format(col))
    return self._select(*columns)

def show(

self, n=10000)

Prints the first n rows to the console.

:param n: Number of rows to show.

df.filter(df['age'] < 25).select('name').show(3) +------+ | name | +------+ | Alice| | Bob | | Leo | +------+

def show(self, n=10000):
    """
    Prints the first ``n`` rows to the console.
    :param n:  Number of rows to show.
    >>> df.filter(df['age'] < 25).select('name').show(3)
    +------+
    | name |
    +------+
    | Alice|
    | Bob  |
    | Leo  |
    +------+
    """
    assert n > 0
    if self._aggregation is not None:
        raise TypeError('show() is not allowed for aggregation. use collect() instead')
    query = self._execute()
    cols = self._columns
    widths = []
    tavnit = '|'
    separator = '+'
    for col in cols:
        maxlen = len(col)
        for kv in query.result[:n]:
            if col in kv:
                s = DataFrame._stringfy_value(kv[col])
            else:
                s = '(NULL)'
            if len(s) > maxlen:
                maxlen = len(s)
        widths.append(min(maxlen, 15))
    for w in widths:
        tavnit += ' %-' + '%ss |' % (w,)
        separator += '-' * w + '--+'
    sys.stdout.write(separator + '\n')
    sys.stdout.write(tavnit % tuple(cols) + '\n')
    sys.stdout.write(separator + '\n')
    for kv in query.result[:n]:
        row = []
        for col in cols:
            if col in kv:
                row.append(DataFrame._stringfy_value(kv[col]))
            else:
                row.append('(NULL)')
        sys.stdout.write(tavnit % tuple(row) + '\n')
    sys.stdout.write(separator + '\n')

def sort(

self, *cols)

Returns a new :class:DataFrame sorted by the specified column(s).

:param cols: list of :class:Columnto sort by.

orderby() is an alias for sort().

df.sort(df['age'].asc).collect() [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]

def sort(self, *cols):
    """Returns a new :class:`DataFrame` sorted by the specified column(s).
    :param cols: list of :class:`Column`to sort by.
    orderby() is an alias for sort().
    >>> df.sort(df['age'].asc).collect()
    [Row(age=11,name='Bob'), Row(age=12,name='Alice'), Row(age=13,name='Leo')]
    """
    sorts = []
    for col in cols:
        assert isinstance(col, Sorter)
        sorts.append(col.build())
    return DataFrame(self._client, self._mapping,
                     filter=self._filter,
                     aggregation=self._aggregation,
                     sort=sorts,
                     projection=self._projection,
                     limit=self._limit)

def to_dict(

self)

Converts the current :class:DataFrame to Elasticsearch search dictionary. :return: a dictionary which obeys the Elasticsearch RESTful protocol

def to_dict(self):
    """
    Converts the current :class:`DataFrame` to Elasticsearch search dictionary.
    :return: a dictionary which obeys the Elasticsearch RESTful protocol
    """
    return self._build_query()

def to_pandas(

self)

Export to a Pandas DataFrame object. :return: The DataFrame representing the query result

df[df['gender'] == 'male'].agg(Avg('age')).to_pandas() avg(age) 0 12

def to_pandas(self):
    """
    Export to a Pandas DataFrame object.
    :return: The DataFrame representing the query result
    >>> df[df['gender'] == 'male'].agg(Avg('age')).to_pandas()
        avg(age)
    0        12
    """
    query = self._execute()
    return query.to_pandas()

def where(

self, condition)

Filters rows using a given condition

where() is an alias for filter().

:param condition: BooleanCond object

df.filter(df['age'] < 13).collect() [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]

def filter(self, condition):
    """
    Filters rows using a given condition
    where() is an alias for filter().
    :param condition: BooleanCond object
    >>> df.filter(df['age'] < 13).collect()
    [Row(age=12,gender='female',name='Alice'), Row(age=11,gender='male',name='Bob')]
    """
    assert isinstance(condition, BooleanFilter)
    return DataFrame(self._client, self._mapping,
                     filter=condition.build(),
                     aggregation=self._aggregation,
                     projection=self._projection,
                     sort=self._sort,
                     limit=self._limit)

Instance variables

var columns

Returns all column names as a list. :return: column names as a list

df.columns ['age', 'name']

var schema