Categories
csv cURL dev tools Elasticsearch Kibana log management python Search engine

Extract data from Elasticsearch using Python

Learn how to extract and write queries to fetch data from Elasticsearch using the official elasticsearch python package.

Elasticsearch has a very convenient python package called “elasticsearch” to extract data from Elasticsearch. Getting data from Elasticsearch makes it easier for data analysis in python.

1. Setup Elasticsearch and Kibana

Check if you have done Elasticsearch and Kibana setup. Follow these links if you have not done setups.

Install Elasticsearch.

Install Kibana.

You also need some data indexed in elasticsearch before getting started. See how to load csv file into elasticsearch.

2. Install the elasticsearch python package

You can install the elasticsearch python package as below,

pip install elasticsearch

3. Extract data

Now you are ready for extracting data from Elasticsearch.

Before we run any elasticsearch query, it’s better to check in Kibana Dev tools, if your query is giving correct results.

You can also check the performance of your query in the “Search profiler.”

Here is my article on how to use Kibana dev-tools to extract data from elasticsearch.

Extract data from elasticsearch using Kibana.

If you see my above blog post, I have used a query to get all verified “BTC” symbol tweets.

Here also we will run a simple query to get all “BTCsymbol tweets from “tweets” index.

Check query results in Kibana dev-tools:

Open Kibana url, http://localhost:5601/

Click on left side menu > then click on management > You can see “Dev Tools”. Click on that.

Paste the below query in Console and click on the green play button.

GET tweets/_search
{
  "query":{
    "match": {
      "symbols.keyword": "BTC"
    }
  }
}

You will see the below response in right-side panel of kibana dev tools.

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 85,
      "relation" : "eq"
    },
    "max_score" : 5.801287,
    "hits" : [
      {
        "_index" : "tweets",
        "_type" : "_doc",
        "_id" : "dulgnXQBhEpcX7VCh2b4",
        "_score" : 5.801287,
        "_source" : {
          "id" : "1019716662587740200",
          "text" : "Barry Silbert is extremely optimistic on bitcoin -- but predicts that 99% of new crypto entrants are “going to zero… https://t.co/mGMVo2cZgY",
          "timestamp" : "Wed Jul 18 22:52:52 +0000 2018",
          "source" : "MarketWatch",
          "symbols" : "BTC",
          "company_names" : "Bitcoin",
          "url" : "https://twitter.com/i/web/status/1019716662587740160",
          "verified" : "True"
        }
      },
      {
        "_index" : "tweets",
        "_type" : "_doc",
        "_id" : "fOlgnXQBhEpcX7VCh2b4",
        "_score" : 5.801287,
        "_source" : {
          "id" : "1019721145396887600",
          "text" : "Hedge fund manager Marc Larsy says bitcoin $40K is possible https://t.co/54uPe0OWqT",
          "timestamp" : "Wed Jul 18 23:10:41 +0000 2018",
          "source" : "MarketWatch",
          "symbols" : "BTC",
          "company_names" : "Bitcoin",
          "url" : "https://on.mktw.net/2Ntr7k9",
          "verified" : "True"
        }
      },

The above response tells us that there are 85 documents matching the condition "symbols" : "BTC".

The response is in JSON format, but we can make a data frame of it.

Now, we saw our query is working fine. Now, let’s use Python to extract data from Elasticsearch.

Using python

import scan method from elasticsearch package and pandas.

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import pandas as pd
What is the elasticsearch scan method?

Scan is part of the elasticsearch helpers function. It is used to get data if your response has more than 10K documents.

It abstracts  scroll() API – a simple iterator that yields all hits as returned by underlining scroll requests.

Here are the parameters of the scan function:-

  1. client – an instance of Elasticsearch to use
  2. query – body for the search() API
  3. scroll – Specify how long a consistent view of the index should be maintained for scrolled search
  4. raise_on_error – raises an exception (ScanError) if an error is encountered (some shards fail to execute). By default we raise.
  5. preserve_order – don’t set the search_type to scan – this will cause the scroll to paginate with preserving the order. Note that this can be an extremely expensive operation and can easily lead to unpredictable results, use with caution.
  6. size – size (per shard) of the batch send at each iteration.
  7. request_timeout – explicit timeout for each call to scan
  8. clear_scroll – explicitly calls delete on the scroll id via the clear scroll API at the end of the method on completion or error, defaults to true.
  9. scroll_kwargs – additional kwargs to be passed to scroll()

Now create the elasticsearch client as below,

es = Elasticsearch(host='localhost', port=9200)

We will put the query in a python dict object and use that in the scan function.

def get_data_from_elastic():
    # query: The elasticsearch query.
    query = {
        "query": {
            "match": {
                "symbols.keyword": "BTC"
            }
        }
    }

    # Scan function to get all the data. 
    rel = scan(client=es,             
               query=query,                                     
               scroll='1m',
               index='tweets',
               raise_on_error=True,
               preserve_order=False,
               clear_scroll=True)

    # Keep response in a list.
    result = list(rel)

    temp = []

    # We need only '_source', which has all the fields required.
    # This elimantes the elasticsearch metdata like _id, _type, _index.
    for hit in result:
        temp.append(hit['_source'])

    # Create a dataframe.
    df = pd.DataFrame(temp)

    return df

Call the get_data_from_elastic() function,

df = get_data_from_elastic()

Now, df has the data from elasticsearch. Now it’s easier for you to analyze data from elasticsearch.

Print head of the data frame.

print(df.head())

This is how to extract data from elasticsearch using python.

Putting together all codes,

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import pandas as pd

es = Elasticsearch(host='localhost', port=9200)

def get_data_from_elastic():
    # query: The elasticsearch query.
    query = {
        "query": {
            "match": {
                "symbols.keyword": "BTC"
            }
        }
    }

    # Scan function to get all the data. 
    rel = scan(client=es,             
               query=query,                                     
               scroll='1m',
               index='tweets',
               raise_on_error=True,
               preserve_order=False,
               clear_scroll=True)

    # Keep response in a list.
    result = list(rel)

    temp = []

    # We need only '_source', which has all the fields required.
    # This elimantes the elasticsearch metdata like _id, _type, _index.
    for hit in result:
        temp.append(hit['_source'])

    # Create a dataframe.
    df = pd.DataFrame(temp)

    return df


df = get_data_from_elastic()

print(df.head())

Please let me know in the comments section if you are facing any issues.

You can follow me on Twitter and Instagram to get notified whenever I post new content.

Happy learning 🙂

By Satyanarayan Bhanja

Machine learning engineer

Leave a Reply

Your email address will not be published. Required fields are marked *

How to do semantic document similarity using BERT Zero-shot classification using Huggingface transformers