Pythian Blog: Technical Track

Updating Elasticsearch indexes with Spark

With the extensive adoption of Elasticsearch as a search and analytics engine, more often we build data pipelines that interact with Elasticsearch. And apparently, most often the processing framework of choice is Apache Spark. Although reading data from Elasticsearch and processing them using Spark has been widely documented, we have not come across any complete guide on updating documents in an Elasticsearch index. So, in this post we are going to present a step-by-step guide on how to load, transform and update Elasticsearch documents using Spark dataframes. Elasticsearch supports document updates by document id. The official documentation includes the following example:
POST /customer/doc/1/_update?pretty
 {
   "doc": { "name": "Jane Doe" }
 }
To update an Elasticsearch document using the Spark connector, it is absolutely essential that the dataframe contains the document _id field so that ES updates the correct document. Therefore, when loading documents from Elasticsearch we must always make sure that we retrieve the _id for all documents. This is controlled by adding the Spark option "es.read.metadata": "true" when reading from Elasticsearch.

Setting up the scene

To give an example, we have extracted a small JSON file from the IMDB movie database and we are going to ingest it in Elasticsearch. To make the example simpler, we have chosen to include only specific fields (title, num_votes, genres and plots) from the JSON file.
>>> json_movies = spark.read.json("/path/to/movies.json")
 >>> json_movies.select("title", "num_votes", "genres", "plots") \
 ... .write.format("org.elasticsearch.spark.sql") \
 ... .mode("Overwrite") \
 ... .save("movies/data")
 >>> json_movies.count()
 250

Interacting with Elasticsearch

Now that we have ingested all documents to Elasticsearch, we are going to query Elasticsearch and retrieve the documents to a Spark dataframe.
>>> es_conf = {
 ... "es.read.metadata": "true",  (1)
 ... "es.read.field.as.array.include" : "genres, plots"
 ... }
 
 >>> movies = spark.read.format("org.elasticsearch.spark.sql") \
 ... .options(**es_conf).load("movies/data")
 >>> movies.count()  (2)
 250
 >>> movies.select("title", "_metadata", "num_votes").head() (3)
Row(title=u'Throne of Blood', _metadata={u'_score': None, u'_type': u'data', u'_id': u'AWB5p5DxB6YFESvhmC1k', u'_index': u'movies'}, num_votes=30191)
(1) Also, include the document metadata, a dictionary that includes the document _id. (2) Verify that 250 documents have been written correctly. (3) Print a sample document. In this example we decided not to create a custom _id field for each document. In this case, Elasticsearch will create and assign a unique id. Notice that the _id field is included in the _metadata object. Next, we are going to search for all movies that have been voted more than 500,000 votes and mark them as ‘hot’, since they are highly voted. So, before starting we want to make sure that currently there are no movies marked as hot.
movies.filter("hot == true").count()
 # Error: pyspark.sql.utils.AnalysisException: u"cannot resolve '`hot`' given input columns: [title, plots, _metadata, num_votes, genres];
>>> movies.filter("num_votes >= 500000").count()
 78
The error above signifies that there is no document containing the field hot. However, there are 78 movies that have been voted at least 500,000 times. From those document we will only work with fields _metadata[‘_id’] and num_votes.
>>> movies.select("title", "_metadata._id", "num_votes").show(4)
 +--------------------+--------------------+---------+
 | title| _id|num_votes|
 +--------------------+--------------------+---------+
 | Catch Me If You Can|AWBzpUimB6YFESvhmCoR| 498525|
 |Spring, Summer, F...|AWBzpUimB6YFESvhmCoZ| 61194|
 |Lagaan: Once Upon...|AWBzpUimB6YFESvhmCoa| 67322|
 | Reservoir Dogs|AWBzpUqPB6YFESvhmCof| 643090|
 +--------------------+--------------------+---------+
We are going to create a new dataframe containing the hot movies. The dataframe should contain only their id and a column named hot = True.
>> from pyspark.sql.functions import expr, lit
 >>> hot = movies.filter(movies["num_votes"] > 500000) \
 ... .select(expr("_metadata._id as id")) \
 ... .withColumn("hot", lit(True))
 >>> hot.count()
 78
 >>> hot.head()
 Row(id=u'AWCjCgsKDHrqgAB6UlJ5', hot=True)
 

Updating the index

Finally, the last step is to update the existing movies index with the information of this dataframe:
>>> es_conf = {
 ... "es.mapping.id": "id", (1)
 ... "es.mapping.exclude": "id", (2)
 ... "es.write.operation": "update" (3)
 ... }
 
 >>> hot.write.format("org.elasticsearch.spark.sql") \
 ... .options(**es_conf) \
 ... .mode("append") \ 
 ... .save("movies/data") 
Pay attention to the important bits of this update:
(1) We must inform the connector that the document _id to will be found in the id field. (2) We must exclude the id field from being saved in the movies collection. If we ommit this line, there will be a new field named id (3) Use the 'update' operation so that existing documents are updated. Also, this can be 'upsert' in case documents not found in the index should be created.

No Comments Yet

Let us know what you think

Subscribe by email