How-To Guides
The Search object represents the entire search request:
- queries
- filters
- aggregations
- k-nearest neighbor searches
- sort
- pagination
- highlighting
- suggestions
- collapsing
- additional parameters
- associated client
The API is designed to be chainable. With the exception of the aggregations functionality this means that the Search object is immutable -all changes to the object will result in a shallow copy being created which contains the changes. You can safely pass the Search object to foreign code without fear of it modifying your objects as long as it sticks to the Search object APIs.
You can pass an instance of the elasticsearch client when instantiating the Search object:
from elasticsearch import Elasticsearch
from elasticsearch.dsl import Search
client = Elasticsearch()
s = Search(using=client)
You can also define the client at a later time (for more options see the configuration chapter):
s = s.using(client)
All methods return a copy of the object, making it safe to pass to outside code.
The API is chainable, allowing you to combine multiple method calls in one statement:
s = Search().using(client).query(Match("title", "python"))
To send the request to Elasticsearch:
response = s.execute()
If you just want to iterate over the hits returned by your search you can iterate over the Search object:
for hit in s:
print(hit.title)
Search results will be cached. Subsequent calls to execute or trying to iterate over an already executed Search object will not trigger additional requests being sent to Elasticsearch. To force a new request to be issued specify ignore_cache=True when calling execute.
For debugging purposes you can serialize the Search object to a dict with the raw Elasticsearch request:
print(s.to_dict())
You can delete the documents matching a search by calling delete on the Search object instead of execute like this:
s = Search(index='i').query(Match("title", "python"))
response = s.delete()
To pass deletion parameters
in your query, you can add them by calling params on the Search object before delete like this:
s = Search(index='i').query("match", title="python")
s = s.params(ignore_unavailable=False, wait_for_completion=True)
response = s.delete()
The elasticsearch.dsl.query module provides classes for all Elasticsearch query types. These classes accept keyword arguments in their constructors, which are serialized to the appropriate format to be sent to Elasticsearch. There is a clear one-to-one mapping between the raw query and its equivalent class-based version:
>>> from elasticsearch.dsl.query import MultiMatch, Match
>>> q = MultiMatch(query='python django', fields=['title', 'body'])
>>> q.to_dict()
{'multi_match': {'query': 'python django', 'fields': ['title', 'body']}}
>>> q = Match("title", {"query": "web framework", "type": "phrase"})
>>> q.to_dict()
{'match': {'title': {'query': 'web framework', 'type': 'phrase'}}}
An alternative to the class-based queries is to use the Q shortcut, passing a query name followed by its parameters, or the raw query as a dict:
from elasticsearch.dsl import Q
Q("multi_match", query='python django', fields=['title', 'body'])
Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})
To add a query to the Search object, use the .query() method. This works with class-based or Q queries:
q = Q("multi_match", query='python django', fields=['title', 'body'])
s = s.query(q)
As a shortcut the query() method also accepts all the parameters of the Q shortcut directly:
s = s.query("multi_match", query='python django', fields=['title', 'body'])
If you already have a query object, or a dict representing one, you can assign it to the query attribute of a Search object to add it to it, replacing any previously configured queries:
s.query = Q('bool', must=[Q('match', title='python'), Q('match', body='best')])
Sometimes you want to refer to a field within another field, either as a multi-field (title.keyword) or in a structured json document like address.city. This is not a problem when using class-based queries, but when working without classes it is often required to pass field names as keyword arguments. To make this easier, you can use __ (double underscore) in place of a dot in a keyword argument:
s = Search()
s = s.filter('term', category__keyword='Python')
s = s.query('match', address__city='prague')
Alternatively you can use Python’s keyword argument unpacking:
s = Search()
s = s.filter('term', **{'category.keyword': 'Python'})
s = s.query('match', **{'address.city': 'prague'})
Query objects can be combined using logical operators |, & and ~:
>>> q = Match("title", "python") | Match("title", "django")
>>> q.to_dict()
{'bool': {'should': [{'match': {'title': 'python'}}, {'match': {'title': 'django'}}]}}
>>> q = Match("title", "python") & Match("title", "django")
>>> q.to_dict()
{'bool': {'must': [{'match': {'title': 'python'}}, {'match': {'title': 'django'}}]}}
>>> q = ~Match("title", "python")
>>> q.to_dict()
{'bool': {'must_not': [{'match': {'title': 'python'}}]}}
When you call the .query() method multiple times, the & operator will be used internally to combine all the queries:
s = s.query().query()
print(s.to_dict())
# {"query": {"bool": {...}}}
If you want to have precise control over the query form, use the Q shortcut to directly construct the combined query:
q = Q('bool',
must=[Q('match', title='python')],
should=[Q(...), Q(...)],
minimum_should_match=1
)
s = Search().query(q)
If you want to add a query in a filter context you can use the filter() method to make things easier:
from elasticsearch.dsl.query import Terms
s = Search()
s = s.filter(Terms("tags", ['search', 'python']))
Behind the scenes this will produce a Bool query and place the specified terms query into its filter branch, making it equivalent to:
from elasticsearch.dsl.query import Terms, Bool
s = Search()
s = s.query(Bool(filter=[Terms("tags", ["search", "python"])]))
If you want to use the post_filter element for faceted navigation, use the .post_filter() method.
The exclude() method works like filter(), but it applies the query as negated:
s = Search()
s = s.exclude(Terms("tags", ['search', 'python']))
which is shorthand for:
s = s.query(Bool(filter=[~Terms("tags", ["search", "python"])]))
As with queries, there are classes that represent each aggregation type, all accessible through the elasticsearch.dsl.aggs module:
from elasticsearch.dsl import aggs
a = aggs.Terms(field="tags")
# {"terms": {"field": "tags"}}
It is also possible to define an aggregation using the A shortcut:
from elasticsearch.dsl import A
A('terms', field='tags')
To nest aggregations, you can use the .bucket(), .metric() and .pipeline() methods:
a = aggs.Terms(field="category")
# {'terms': {'field': 'category'}}
a.metric("clicks_per_category", aggs.Sum(field="clicks")) \
.bucket("tags_per_category", aggs.Terms(field="tags"))
# {
# 'terms': {'field': 'category'},
# 'aggs': {
# 'clicks_per_category': {'sum': {'field': 'clicks'}},
# 'tags_per_category': {'terms': {'field': 'tags'}}
# }
# }
To add aggregations to the Search object, use the .aggs property, which acts as a top-level aggregation:
s = Search()
a = aggs.Terms(field="category")
s.aggs.bucket("category_terms", a)
# {
# 'aggs': {
# 'category_terms': {
# 'terms': {
# 'field': 'category'
# }
# }
# }
# }
or
s = Search()
s.aggs.bucket("articles_per_day", aggs.DateHistogram(field="publish_date", interval="day")) \
.metric("clicks_per_day", aggs.Sum(field="clicks")) \
.pipeline("moving_click_average", aggs.MovingAvg(buckets_path="clicks_per_day")) \
.bucket("tags_per_day", aggs.Terms(field="tags"))
s.to_dict()
# {
# "aggs": {
# "articles_per_day": {
# "date_histogram": { "interval": "day", "field": "publish_date" },
# "aggs": {
# "clicks_per_day": { "sum": { "field": "clicks" } },
# "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
# "tags_per_day": { "terms": { "field": "tags" } }
# }
# }
# }
# }
You can access an existing bucket by its name:
s = Search()
s.aggs.bucket("per_category", aggs.Terms(field="category"))
s.aggs["per_category"].metric("clicks_per_category", aggs.Sum(field="clicks"))
s.aggs["per_category"].bucket("tags_per_category", aggs.Terms(field="tags"))
When chaining multiple aggregations, there is a difference between what .bucket() and .metric() methods return - .bucket() returns the newly defined bucket while .metric() returns its parent bucket to allow further chaining.
As opposed to other methods on the Search objects, aggregations are defined in-place, without returning a new copy.
To issue a kNN search, use the .knn() method:
s = Search()
vector = get_embedding("search text")
s = s.knn(
field="embedding",
k=5,
num_candidates=10,
query_vector=vector
)
The field, k and num_candidates arguments can be given as positional or keyword arguments and are required. In addition to these, query_vector or query_vector_builder must be given as well.
The .knn() method can be invoked multiple times to include multiple kNN searches in the request.
To specify sorting order, use the .sort() method:
s = Search().sort(
'category',
'-title',
{"lines" : {"order" : "asc", "mode" : "avg"}}
)
It accepts positional arguments which can be either strings or dictionaries. String value is a field name, optionally prefixed by the - sign to specify a descending order.
To reset the sorting, just call the method with no arguments:
s = s.sort()
To specify the from/size parameters, apply the standard Python slicing operator on the Search instance:
s = s[10:20]
# {"from": 10, "size": 10}
s = s[:20]
# {"size": 20}
s = s[10:]
# {"from": 10}
s = s[10:20][2:]
# {"from": 12, "size": 8}
If you want to access all the documents matched by your query you can use the scan method which uses the scan/scroll elasticsearch API:
for hit in s.scan():
print(hit.title)
Note that in this case the results won’t be sorted.
To set common attributes for highlighting use the highlight_options method:
s = s.highlight_options(order='score')
Enabling highlighting for individual fields is done using the highlight method:
s = s.highlight('title')
# or, including parameters:
s = s.highlight('title', fragment_size=50)
The fragments in the response will then be available on each Result object as .meta.highlight.FIELD which will contain the list of fragments:
response = s.execute()
for hit in response:
for fragment in hit.meta.highlight.title:
print(fragment)
To specify a suggest request on your Search object use the suggest method:
# check for correct spelling
s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})
The first argument is the name of the suggestions (name under which it will be returned), second is the actual text you wish the suggester to work on and the keyword arguments will be added to the suggest’s json as-is which means that it should be one of term, phrase or completion to indicate which type of suggester should be used.
To collapse search results use the collapse method on your Search object:
s = Search().query(Match("message", "GET /search"))
# collapse results by user_id
s = s.collapse("user_id")
The top hits will only include one result per user_id. You can also expand each collapsed top hit with the inner_hits parameter, max_concurrent_group_searches being the number of concurrent requests allowed to retrieve the inner hits per group:
inner_hits = {"name": "recent_search", "size": 5, "sort": [{"@timestamp": "desc"}]}
s = s.collapse("user_id", inner_hits=inner_hits, max_concurrent_group_searches=4)
To use Elasticsearch’s more_like_this functionality, you can use the MoreLikeThis query type.
A simple example is below
from elasticsearch.dsl.query import MoreLikeThis
from elasticsearch.dsl import Search
my_text = 'I want to find something similar'
s = Search()
# We're going to match based only on two fields, in this case text and title
s = s.query(MoreLikeThis(like=my_text, fields=['text', 'title']))
# You can also exclude fields from the result to make the response quicker in the normal way
s = s.source(exclude=["text"])
response = s.execute()
for hit in response:
print(hit.title)
To set extra properties of the search request, use the .extra() method. This can be used to define keys in the body that cannot be defined via a specific API method like explain or search_after:
s = s.extra(explain=True)
To set query parameters, use the .params() method:
s = s.params(routing="42")
If you need to limit the fields being returned by elasticsearch, use the source() method:
# only return the selected fields
s = s.source(['title', 'body'])
# don't return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(includes=["title"], excludes=["user.*"])
# reset the field selection
s = s.source(None)
The search object can be serialized into a dictionary by using the .to_dict() method.
You can also create a Search object from a dict using the from_dict class method. This will create a new Search object and populate it using the data from the dict:
s = Search.from_dict({"query": {"match": {"title": "python"}}})
If you wish to modify an existing Search object, overriding it’s properties, instead use the update_from_dict method that alters an instance in-place:
s = Search(index='i')
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})
You can execute your search by calling the .execute() method that will return a Response object. The Response object allows you access to any key from the response dictionary via attribute access. It also provides some convenient helpers:
response = s.execute()
print(response.success())
# True
print(response.took)
# 12
print(response.hits.total.relation)
# eq
print(response.hits.total.value)
# 142
print(response.suggest.my_suggestions)
If you want to inspect the contents of the response objects, just use its to_dict method to get access to the raw data for pretty printing.
To access the hits returned by the search, use the hits property or just iterate over the Response object:
response = s.execute()
print(f"Total {response.hits.total} hits found.")
for h in response:
print(h.title, h.body)
If you are only seeing partial results (e.g. 10000 or even 10 results), consider using the option s.extra(track_total_hits=True) to get a full hit count.
The individual hits is wrapped in a convenience class that allows attribute access to the keys in the returned dictionary. All the metadata for the results are accessible via meta (without the leading _):
response = s.execute()
h = response.hits[0]
print(f"/{h.meta.index}/{h.meta.doc_type}/{h.meta.id} returned with score {h.meta.score}")
If your document has a field called meta you have to access it using the get item syntax: hit['meta'].
Aggregations are available through the aggregations property:
for tag in response.aggregations.per_tag.buckets:
print(tag.key, tag.max_lines.value)
If you need to execute multiple searches at the same time you can use the MultiSearch class which will use the _msearch API:
from elasticsearch.dsl import MultiSearch, Search
from elasticsearch.dsl.query import Term
ms = MultiSearch(index='blogs')
ms = ms.add(Search().filter(Term("tags", "python")))
ms = ms.add(Search().filter(Term("tags", 'elasticsearch')))
responses = ms.execute()
for response in responses:
print("Results for query %r." % response._search.query)
for hit in response:
print(hit.title)
The EmptySearch class can be used as a fully compatible version of Search that will return no results, regardless of any queries configured.
You can use the DSL module to define your mappings and a basic persistent layer for your application.
For more comprehensive examples have a look at the DSL examples directory in the repository.
If you want to create a model-like wrapper around your documents, use the Document class (or the equivalent AsyncDocument for asynchronous applications). It can also be used to create all the necessary mappings and settings in Elasticsearch (see Document life cycle below for details).
from datetime import datetime
from elasticsearch.dsl import Document, Date, Nested, Boolean, \
analyzer, InnerDoc, Completion, Keyword, Text
html_strip = analyzer('html_strip',
tokenizer="standard",
filter=["standard", "lowercase", "stop", "snowball"],
char_filter=["html_strip"]
)
class Comment(InnerDoc):
author = Text(fields={'raw': Keyword()})
content = Text(analyzer='snowball')
created_at = Date()
def age(self):
return datetime.now() - self.created_at
class Post(Document):
title = Text()
title_suggest = Completion()
created_at = Date()
published = Boolean()
category = Text(
analyzer=html_strip,
fields={'raw': Keyword()}
)
comments = Nested(Comment)
class Index:
name = 'blog'
def add_comment(self, author, content):
self.comments.append(
Comment(author=author, content=content, created_at=datetime.now()))
def save(self, ** kwargs):
self.created_at = datetime.now()
return super().save(** kwargs)
The Document instances use native python types such as str and datetime for its attributes. In case of Object or Nested fields an instance of the InnerDoc subclass is used, as in the add_comment method in the above example, where we are creating an instance of the Comment class.
There are some specific types that were created to make working with some field types easier, for example the Range object used in any of the range fields:
from elasticsearch.dsl import Document, DateRange, Keyword, Range
class RoomBooking(Document):
room = Keyword()
dates = DateRange()
rb = RoomBooking(
room='Conference Room II',
dates=Range(
gte=datetime(2018, 11, 17, 9, 0, 0),
lt=datetime(2018, 11, 17, 10, 0, 0)
)
)
# Range supports the in operator correctly:
datetime(2018, 11, 17, 9, 30, 0) in rb.dates
# you can also get the limits and whether they are inclusive or exclusive:
rb.dates.lower
rb.dates.upper
# empty range is unbounded
Range().lower
- True
- datetime(2018, 11, 17, 9, 0, 0), True
- datetime(2018, 11, 17, 10, 0, 0), False
- None, False
Document fields can be defined using standard Python type hints if desired. Here are some simple examples:
from typing import Optional
class Post(Document):
title: str
created_at: Optional[datetime]
published: bool
- same as title = Text(required=True)
- same as created_at = Date(required=False)
- same as published = Boolean(required=True)
When using Field subclasses such as Text, Date and Boolean to define attributes, these classes must be given in the right-hand side.
class Post(Document):
title = Text()
subtitle: Text
- correct
- incorrect
Using a Field subclass as a Python type hint will result in errors.
Python types are mapped to their corresponding Field types according to the following table:
| Python type | DSL field |
|---|---|
str |
Text(required=True) |
bool |
Boolean(required=True) |
int |
Integer(required=True) |
float |
Float(required=True) |
bytes |
Binary(required=True) |
datetime |
Date(required=True) |
date |
Date(format="yyyy-MM-dd", required=True) |
To type a field as optional, the standard Optional modifier from the Python typing package can be used. When using Python 3.10 or newer, "pipe" syntax can also be used, by adding | None to a type. The List modifier can be added to a field to convert it to an array, similar to using the multi=True argument on the Field object.
from typing import Optional, List
class MyDoc(Document):
pub_date: Optional[datetime]
middle_name: str | None
authors: List[str]
comments: Optional[List[str]]
- same as pub_date = Date()
- same as middle_name = Text()
- same as authors = Text(multi=True, required=True)
- same as comments = Text(multi=True)
A field can also be given a type hint of an InnerDoc subclass, in which case it becomes an Object field of that class. When the InnerDoc subclass is wrapped with List, a Nested field is created instead.
from typing import List
class Address(InnerDoc):
...
class Comment(InnerDoc):
...
class Post(Document):
address: Address
comments: List[Comment]
- same as address = Object(Address, required=True)
- same as comments = Nested(Comment, required=True)
Unfortunately it is impossible to have Python type hints that uniquely identify every possible Elasticsearch Field type. To choose a type that is different than the one that is assigned according to the table above, the desired Field instance can be added explicitly as a right-side assignment in the field declaration. The next example creates a field that is typed as Optional[str], but is mapped to Keyword instead of Text:
class MyDocument(Document):
category: Optional[str] = Keyword()
This form can also be used when additional options need to be given to initialize the field, such as when using custom analyzer settings:
class Comment(InnerDoc):
content: str = Text(analyzer='snowball')
When using type hints as above, subclasses of Document and InnerDoc inherit some of the behaviors associated with Python dataclasses, as defined by PEP 681 and the dataclass_transform decorator. To add per-field dataclass options such as default or default_factory, the mapped_field() wrapper can be used on the right side of a typed field declaration:
class MyDocument(Document):
title: str = mapped_field(default="no title")
created_at: datetime = mapped_field(default_factory=datetime.now)
published: bool = mapped_field(default=False)
category: str = mapped_field(Keyword(), default="general")
The mapped_field() wrapper function can optionally be given an explicit field type instance as a first positional argument, as the category field does in the example above to be defined as Keyword instead of the Text default.
Static type checkers such as mypy and pyright can use the type hints and the dataclass-specific options added to the mapped_field() function to improve type inference and provide better real-time code completion and suggestions in IDEs.
One situation in which type checkers can’t infer the correct type is when using fields as class attributes. Consider the following example:
class MyDocument(Document):
title: str
doc = MyDocument()
# doc.title is typed as "str" (correct)
# MyDocument.title is also typed as "str" (incorrect)
To help type checkers correctly identify class attributes as such, the M generic must be used as a wrapper to the type hint, as shown in the next examples:
from elasticsearch.dsl import M
class MyDocument(Document):
title: M[str]
created_at: M[datetime] = mapped_field(default_factory=datetime.now)
doc = MyDocument()
# doc.title is typed as "str"
# doc.created_at is typed as "datetime"
# MyDocument.title is typed as "InstrumentedField"
# MyDocument.created_at is typed as "InstrumentedField"
Note that the M type hint does not provide any runtime behavior and its use is not required, but it can be useful to eliminate spurious type errors in IDEs or type checking builds.
The InstrumentedField objects returned when fields are accessed as class attributes are proxies for the field instances that can be used anywhere a field needs to be referenced, such as when specifying sort options in a Search object:
# sort by creation date descending, and title ascending
s = MyDocument.search().sort(-MyDocument.created_at, MyDocument.title)
When specifying sorting order, the + and - unary operators can be used on the class field attributes to indicate ascending and descending order.
Finally, it is also possible to define class attributes and request that they are ignored when building the Elasticsearch mapping. One way is to type attributes with the ClassVar annotation. Alternatively, the mapped_field() wrapper function accepts an exclude argument that can be set to True:
from typing import ClassVar
class MyDoc(Document):
title: M[str] created_at: M[datetime] = mapped_field(default_factory=datetime.now)
my_var: ClassVar[str]
anoter_custom_var: int = mapped_field(exclude=True)
- regular class variable, ignored by Elasticsearch
- also ignored by Elasticsearch
The DSL module will always respect the timezone information (or lack thereof) on the datetime objects passed in or stored in Elasticsearch. Elasticsearch itself interprets all datetimes with no timezone information as UTC. If you wish to reflect this in your python code, you can specify default_timezone when instantiating a Date field:
class Post(Document):
created_at = Date(default_timezone='UTC')
In that case any datetime object passed in (or parsed from elasticsearch) will be treated as if it were in UTC timezone.
Before you first use the Post document type, you need to create the mappings in Elasticsearch. For that you can either use the index object or create the mappings directly by calling the init class method:
# create the mappings in Elasticsearch
Post.init()
This code will typically be run in the setup for your application during a code deploy, similar to running database migrations.
To create a new Post document just instantiate the class and pass in any fields you wish to set, you can then use standard attribute setting to change/add more fields. Note that you are not limited to the fields defined explicitly:
# instantiate the document
first = Post(title='My First Blog Post, yay!', published=True)
# assign some field values, can be values or lists of values
first.category = ['everything', 'nothing']
# every document has an id in meta
first.meta.id = 47
# save the document into the cluster
first.save()
All the metadata fields (id, routing, index, etc.) can be accessed (and set) via a meta attribute or directly using the underscored variant:
post = Post(meta={'id': 42})
# prints 42
print(post.meta.id)
# override default index
post.meta.index = 'my-blog'
Having all metadata accessible through meta means that this name is reserved and you shouldn’t have a field called meta on your document. If you, however, need it you can still access the data using the get item (as opposed to attribute) syntax: post['meta'].
To retrieve an existing document use the get class method:
# retrieve the document
first = Post.get(id=42)
# now we can call methods, change fields, ...
first.add_comment('me', 'This is nice!')
# and save the changes into the cluster again
first.save()
The Update API can also be used via the update method. By default any keyword arguments, beyond the parameters of the API, will be considered fields with new values. Those fields will be updated on the local copy of the document and then sent over as partial document to be updated:
# retrieve the document
first = Post.get(id=42)
# you can update just individual fields which will call the update API
# and also update the document in place
first.update(published=True, published_by='me')
In case you wish to use a painless script to perform the update you can pass in the script string as script or the id of a stored script via script_id. All additional keyword arguments to the update method will then be passed in as parameters of the script. The document will not be updated in place.
# retrieve the document
first = Post.get(id=42)
# we execute a script in elasticsearch with additional kwargs being passed
# as params into the script
first.update(script='ctx._source.category.add(params.new_category)',
new_category='testing')
If the document is not found in elasticsearch an exception (elasticsearch.NotFoundError) will be raised. If you wish to return None instead just pass in ignore=404 to suppress the exception:
p = Post.get(id='not-in-es', ignore=404)
p is None
When you wish to retrieve multiple documents at the same time by their id you can use the mget method:
posts = Post.mget([42, 47, 256])
mget will, by default, raise a NotFoundError if any of the documents wasn’t found and RequestError if any of the document had resulted in error. You can control this behavior by setting parameters:
raise_on_error: IfTrue(default) then any error will cause an exception to be raised. Otherwise all documents containing errors will be treated as missing.missing: Can have three possible values:'none'(default),'raise'and'skip'. If a document is missing or errored it will either be replaced withNone, an exception will be raised or the document will be skipped in the output list entirely.
The index associated with the Document is accessible via the _index class property which gives you access to the index class.
The _index attribute is also home to the load_mappings method which will update the mapping on the Index from elasticsearch. This is very useful if you use dynamic mappings and want the class to be aware of those fields (for example if you wish the Date fields to be properly (de)serialized):
Post._index.load_mappings()
To delete a document just call its delete method:
first = Post.get(id=42)
first.delete()
This functionality is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features.
This feature is available in the Python Elasticsearch client starting with release 9.2.0.
Applications that define their data models using Pydantic can combine these
models with Elasticsearch DSL annotations. To take advantage of this option, Pydantic's BaseModel base class
needs to be replaced with BaseESModel (or AsyncBaseESModel for asynchronous applications), and then the model
can include type annotations for Pydantic and Elasticsearch both, as demonstrated in the following example:
from typing import Annotated
from pydantic import Field
from elasticsearch import dsl
from elasticsearch.dsl.pydantic import BaseESModel
class Quote(BaseESModel):
quote: str
author: Annotated[str, dsl.Keyword()]
tags: Annotated[list[str], dsl.Keyword(normalizer="lowercase")]
embedding: Annotated[list[float], dsl.DenseVector()] = Field(init=False, default=[])
class Index:
name = "quotes"
In this example, the quote attribute is annotated with a str type hint. Both Pydantic and Elasticsearch use this
annotation.
The author and tags attributes have a Python type hint and an Elasticsearch annotation, both wrapped with
Python's typing.Annotated. When using the BaseESModel class, the typing information intended for Elasticsearch needs
to be defined inside Annotated.
The embedding attribute includes a base Python type and an Elasticsearch annotation in the same format as the
other fields, but it adds Pydantic's Field definition as a right-hand side assignment.
Finally, any other items that need to be defined for the Elasticsearch document class, such as class Index and
class Meta entries (discussed later), can be added as well.
The next example demonstrates how to define Object and Nested fields:
from typing import Annotated
from pydantic import BaseModel, Field
from elasticsearch import dsl
from elasticsearch.dsl.pydantic import BaseESModel
class Phone(BaseModel):
type: Annotated[str, dsl.Keyword()] = Field(default="Home")
number: str
class Person(BaseESModel):
name: str
main_phone: Phone
other_phones: list[Phone]
class Index:
name = "people"
- same as Object(Phone)
- same as Nested(Phone)
Note that inner classes do not need to be defined with a custom base class; these should be standard Pydantic model
classes. The attributes defined in these classes can include Elasticsearch annotations, as long as they are given
in an Annotated type hint.
All model classes that are created as described in this section function like normal Pydantic models and can be used anywhere standard Pydantic models are used, but they have some added attributes:
_doc: a class attribute that is a dynamically generatedDocumentclass to use with the Elasticsearch index.meta: an attribute added to all models that includes Elasticsearch document metadata items such asid,score, etc.to_doc(): a method that converts the Pydantic model to an Elasticsearch document.from_doc(): a class method that accepts an Elasticsearch document as an argument and returns an equivalent Pydantic model.
These are demonstrated in the examples below:
# create a Pydantic model
quote = Quote(
quote="An unexamined life is not worth living.",
author="Socrates",
tags=["phillosophy"]
)
# save the model to the Elasticsearch index
quote.to_doc().save()
# get a document from the Elasticsearch index as a Pydantic model
quote = Quote.from_doc(Quote._doc.get(id=42))
# run a search and print the Pydantic models
s = Quote._doc.search().query(Match(Quote._doc.quote, "life"))
for doc in s:
quote = Quote.from_doc(doc)
print(quote.meta.id, quote.meta.score, quote.quote)
To specify analyzer values for Text fields you can just use the name of the analyzer (as a string) and either rely on the analyzer being defined (like built-in analyzers) or define the analyzer yourself manually.
Alternatively, you can create your own analyzer and have the persistence layer handle its creation, from our example earlier:
from elasticsearch.dsl import analyzer, tokenizer
my_analyzer = analyzer('my_analyzer',
tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3),
filter=['lowercase']
)
Each analysis object needs to have a name (my_analyzer and trigram in our example) and tokenizers, token filters and char filters also need to specify type (nGram in our example).
Once you have an instance of a custom analyzer you can also call the analyze API on it by using the simulate method:
response = my_analyzer.simulate('Hello World!')
# ['hel', 'ell', 'llo', 'lo ', 'o w', ' wo', 'wor', 'orl', 'rld', 'ld!']
tokens = [t.token for t in response.tokens]
When creating a mapping which relies on a custom analyzer the index must either not exist or be closed. To create multiple Document-defined mappings you can use the index object.
To search for this document type, use the search class method:
# by calling .search we get back a standard Search object
s = Post.search()
# the search is already limited to the index and doc_type of our document
s = s.filter('term', published=True).query('match', title='first')
results = s.execute()
# when you execute the search the results are wrapped in your document class (Post)
for post in results:
print(post.meta.score, post.title)
Alternatively you can just take a Search object and restrict it to return our document type, wrapped in correct class:
s = Search()
s = s.doc_type(Post)
You can also combine document classes with standard doc types (just strings), which will be treated as before. You can also pass in multiple Document subclasses and each document in the response will be wrapped in it’s class.
If you want to run suggestions, just use the suggest method on the Search object:
s = Post.search()
s = s.suggest('title_suggestions', 'pyth', completion={'field': 'title_suggest'})
response = s.execute()
for result in response.suggest.title_suggestions:
print('Suggestions for %s:' % result.text)
for option in result.options:
print(' %s (%r)' % (option.text, option.payload))
In the Meta class inside your document definition you can define various metadata for your document:
mapping: optional instance ofMappingclass to use as base for the mappings created from the fields on the document class itself.
Any attributes on the Meta class that are instance of MetaField will be used to control the mapping of the meta fields (_all, dynamic etc). Just name the parameter (without the leading underscore) as the field you wish to map and pass any parameters to the MetaField class:
class Post(Document):
title = Text()
class Meta:
all = MetaField(enabled=False)
dynamic = MetaField('strict')
This section of the Document definition can contain any information about the index, its name, settings and other attributes:
name: name of the index to use, if it contains a wildcard (*) then it cannot be used for any write operations and anindexkwarg will have to be passed explicitly when calling methods like.save().using: default connection alias to use, defaults to'default'settings: dictionary containing any settings for theIndexobject likenumber_of_shards.analyzers: additional list of analyzers that should be defined on an index (seeanalysisfor details).aliases: dictionary with any aliases definitions
You can use standard Python inheritance to extend models, this can be useful in a few scenarios. For example if you want to have a BaseDocument defining some common fields that several different Document classes should share:
class User(InnerDoc):
username: str = mapped_field(Text(fields={'keyword': Keyword()}))
email: str
class BaseDocument(Document):
created_by: User
created_date: datetime
last_updated: datetime
def save(**kwargs):
if not self.created_date:
self.created_date = datetime.now()
self.last_updated = datetime.now()
return super(BaseDocument, self).save(**kwargs)
class BlogPost(BaseDocument):
class Index:
name = 'blog'
Another use case would be using the join type to have multiple different entities in a single index. You can see an example of this approach. Note that in this case, if the subclasses don’t define their own Index classes, the mappings are merged and shared between all the subclasses.
In typical scenario using class Index on a Document class is sufficient to perform any action. In a few cases though it can be useful to manipulate an Index object directly.
Index is a class responsible for holding all the metadata related to an index in elasticsearch - mappings and settings. It is most useful when defining your mappings since it allows for easy creation of multiple mappings at the same time. This is especially useful when setting up your elasticsearch objects in a migration:
from elasticsearch.dsl import Index, Document, Text, analyzer
blogs = Index('blogs')
# define custom settings
blogs.settings(
number_of_shards=1,
number_of_replicas=0
)
# define aliases
blogs.aliases(
old_blogs={}
)
# register a document with the index
blogs.document(Post)
# can also be used as class decorator when defining the Document
@blogs.document
class Post(Document):
title: str
# You can attach custom analyzers to the index
html_strip = analyzer('html_strip',
tokenizer="standard",
filter=["standard", "lowercase", "stop", "snowball"],
char_filter=["html_strip"]
)
blogs.analyzer(html_strip)
# delete the index, ignore if it doesn't exist
blogs.delete(ignore=404)
# create the index in elasticsearch
blogs.create()
You can also set up a template for your indices and use the clone method to create specific copies:
blogs = Index('blogs', using='production')
blogs.settings(number_of_shards=2)
blogs.document(Post)
# create a copy of the index with different name
company_blogs = blogs.clone('company-blogs')
# create a different copy on different cluster
dev_blogs = blogs.clone('blogs', using='dev')
# and change its settings
dev_blogs.setting(number_of_shards=1)
The DSL module also exposes an option to manage index templates in elasticsearch using the ComposableIndexTemplate and IndexTemplate classes, which have very similar API to Index.
Composable index templates should be always be preferred over the legacy index templates, since the latter are deprecated.
Once an index template is saved in Elasticsearch its contents will be automatically applied to new indices (existing indices are completely unaffected by templates) that match the template pattern (any index starting with blogs- in our example), even if the index is created automatically upon indexing a document into that index.
Potential workflow for a set of time based indices governed by a single template:
from datetime import datetime
from elasticsearch.dsl import Document, Date, Text
class Log(Document):
content: str
timestamp: datetime
class Index:
name = "logs-*"
def save(self, **kwargs):
# assign now if no timestamp given
if not self.timestamp:
self.timestamp = datetime.now()
# override the index to go to the proper timeslot
kwargs['index'] = self.timestamp.strftime('logs-%Y%m%d')
return super().save(**kwargs)
# once, as part of application setup, during deploy/migrations:
logs = Log._index.as_composable_template('logs', priority=100)
logs.save()
# to perform search across all logs:
search = Log.search()
The library comes with a simple abstraction aimed at helping you develop faceted navigation for your data.
You can provide several configuration options (as class attributes) when declaring a FacetedSearch subclass:
index: the name of the index (as string) to search through, defaults to'_all'.doc_types: list ofDocumentsubclasses or strings to be used, defaults to['_all'].fields: list of fields on the document type to search through. The list will be passes toMultiMatchquery so can contain boost values ('title^5'), defaults to['*'].facets: dictionary of facets to display/filter on. The key is the name displayed and values should be instances of anyFacetsubclass, for example:{'tags': TermsFacet(field='tags')}
There are several different facets available:
TermsFacet: provides an option to split documents into groups based on a value of a field, for exampleTermsFacet(field='category')DateHistogramFacet: split documents into time intervals, example:DateHistogramFacet(field="published_date", calendar_interval="day")HistogramFacet: similar toDateHistogramFacetbut for numerical values:HistogramFacet(field="rating", interval=2)RangeFacet: allows you to define your own ranges for a numerical fields:RangeFacet(field="comment_count", ranges=[("few", (None, 2)), ("lots", (2, None))])NestedFacet: is just a simple facet that wraps another to provide access to nested documents:NestedFacet('variants', TermsFacet(field='variants.color'))
By default facet results will only calculate document count, if you wish for a different metric you can pass in any single value metric aggregation as the metric kwarg (TermsFacet(field='tags', metric=A('max', field=timestamp))). When specifying metric the results will be, by default, sorted in descending order by that metric. To change it to ascending specify metric_sort="asc" and to just sort by document count use metric_sort=False.
If you require any custom behavior or modifications simply override one or more of the methods responsible for the class' functions:
search(self): is responsible for constructing theSearchobject used. Override this if you want to customize the search object (for example by adding a global filter for published articles only).query(self, search): adds the query position of the search (if search input specified), by default usingMultiFieldquery. Override this if you want to modify the query type used.highlight(self, search): defines the highlighting on theSearchobject and returns a new one. Default behavior is to highlight on all fields specified for search.
The custom subclass can be instantiated empty to provide an empty search (matching everything) or with query, filters and sort.
query: is used to pass in the text of the query to be performed. IfNoneis passed in (default) aMatchAllquery will be used. For example'python web'filters: is a dictionary containing all the facet filters that you wish to apply. Use the name of the facet (from.facetsattribute) as the key and one of the possible values as value. For example{'tags': 'python'}.sort: is a tuple or list of fields on which the results should be sorted. The format of the individual fields are to be the same as those passed to~elasticsearch.dsl.Search.sort.
the response returned from the FacetedSearch object (by calling .execute()) is a subclass of the standard Response class that adds a property called facets which contains a dictionary with lists of buckets -each represented by a tuple of key, document count and a flag indicating whether this value has been filtered on.
from datetime import date
from elasticsearch.dsl import FacetedSearch, TermsFacet, DateHistogramFacet
class BlogSearch(FacetedSearch):
doc_types = [Article, ]
# fields that should be searched
fields = ['tags', 'title', 'body']
facets = {
# use bucket aggregations to define facets
'tags': TermsFacet(field='tags'),
'publishing_frequency': DateHistogramFacet(field='published_from', interval='month')
}
def search(self):
# override methods to add custom pieces
s = super().search()
return s.filter('range', publish_from={'lte': 'now/h'})
bs = BlogSearch('python web', {'publishing_frequency': date(2015, 6)})
response = bs.execute()
# access hits and other attributes as usual
total = response.hits.total
print('total hits', total.relation, total.value)
for hit in response:
print(hit.meta.score, hit.title)
for (tag, count, selected) in response.facets.tags:
print(tag, ' (SELECTED):' if selected else ':', count)
for (month, count, selected) in response.facets.publishing_frequency:
print(month.strftime('%B %Y'), ' (SELECTED):' if selected else ':', count)
The Update By Query object enables the use of the _update_by_query endpoint to perform an update on documents that match a search query.
The object is implemented as a modification of the Search object, containing a subset of its query methods, as well as a script method, which is used to make updates.
The Update By Query object implements the following Search query types:
- queries
- filters
- excludes
For more information on queries, see the search_dsl chapter.
Like the Search object, the API is designed to be chainable. This means that the Update By Query object is immutable: all changes to the object will result in a shallow copy being created which contains the changes. This means you can safely pass the Update By Query object to foreign code without fear of it modifying your objects as long as it sticks to the Update By Query object APIs.
You can define your client in a number of ways, but the preferred method is to use a global configuration. For more information on defining a client, see the configuration chapter.
Once your client is defined, you can instantiate a copy of the Update By Query object as seen below:
from elasticsearch.dsl import UpdateByQuery
ubq = UpdateByQuery().using(client)
# or
ubq = UpdateByQuery(using=client)
All methods return a copy of the object, making it safe to pass to outside code.
The API is chainable, allowing you to combine multiple method calls in one statement:
ubq = UpdateByQuery().using(client).query(Match("title", python"))
To send the request to Elasticsearch:
response = ubq.execute()
It should be noted, that there are limits to the chaining using the script method: calling script multiple times will overwrite the previous value. That is, only a single script can be sent with a call. An attempt to use two scripts will result in only the second script being stored.
Given the below example:
ubq = UpdateByQuery() \
.using(client) \
.script(source="ctx._source.likes++") \
.script(source="ctx._source.likes+=2")
This means that the stored script by this client will be 'source': 'ctx._source.likes{{plus}}=2' and the previous call will not be stored.
For debugging purposes you can serialize the Update By Query object to a dict explicitly:
print(ubq.to_dict())
Also, to use variables in script see below example:
ubq.script(
source="ctx._source.messages.removeIf(x -> x.somefield == params.some_var)",
params={
'some_var': 'some_string_val'
}
)
The search object can be serialized into a dictionary by using the .to_dict() method.
You can also create a Update By Query object from a dict using the from_dict class method. This will create a new Update By Query object and populate it using the data from the dict:
ubq = UpdateByQuery.from_dict({"query": {"match": {"title": "python"}}})
If you wish to modify an existing Update By Query object, overriding it’s properties, instead use the update_from_dict method that alters an instance in-place:
ubq = UpdateByQuery(index='i')
ubq.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})
To set extra properties of the search request, use the .extra() method. This can be used to define keys in the body that cannot be defined via a specific API method like explain:
ubq = ubq.extra(explain=True)
To set query parameters, use the .params() method:
ubq = ubq.params(routing="42")
You can execute your search by calling the .execute() method that will return a Response object. The Response object allows you access to any key from the response dictionary via attribute access. It also provides some convenient helpers:
response = ubq.execute()
print(response.success())
# True
print(response.took)
# 12
If you want to inspect the contents of the response objects, just use its to_dict method to get access to the raw data for pretty printing.
When working with Document classes, you can use the ES|QL query language to retrieve documents. For this you can use the esql_from() and esql_execute() methods available to all sub-classes of Document.
Consider the following Employee document definition:
from elasticsearch.dsl import Document, InnerDoc, M
class Address(InnerDoc):
address: M[str]
city: M[str]
zip_code: M[str]
class Employee(Document):
emp_no: M[int]
first_name: M[str]
last_name: M[str]
height: M[float]
still_hired: M[bool]
address: M[Address]
class Index:
name = 'employees'
The esql_from() method creates a base ES|QL query for the index associated with the document class. The following example creates a base query for the Employee class:
query = Employee.esql_from()
This query includes a FROM command with the index name, and a KEEP command that retrieves all the document attributes.
To execute this query and receive the results, you can pass the query to the esql_execute() method:
for emp in Employee.esql_execute(query):
print(f"{emp.name} from {emp.address.city} is {emp.height:.2f}m tall")
In this example, the esql_execute() class method runs the query and returns all the documents in the index, up to the maximum of 1000 results allowed by ES|QL. Here is a possible output from this example:
Kevin Macias from North Robert is 1.60m tall
Drew Harris from Boltonshire is 1.68m tall
Julie Williams from Maddoxshire is 1.99m tall
Christopher Jones from Stevenbury is 1.98m tall
Anthony Lopez from Port Sarahtown is 2.42m tall
Tricia Stone from North Sueshire is 2.39m tall
Katherine Ramirez from Kimberlyton is 1.83m tall
...
To search for specific documents you can extend the base query with additional ES|QL commands that narrow the search criteria. The next example searches for documents that include only employees that are taller than 2 meters, sorted by their last name. It also limits the results to 4 people:
query = (
Employee.esql_from()
.where(Employee.height > 2)
.sort(Employee.last_name)
.limit(4)
)
When running this query with the same for-loop shown above, possible results would be:
Michael Adkins from North Stacey is 2.48m tall
Kimberly Allen from Toddside is 2.24m tall
Crystal Austin from East Michaelchester is 2.30m tall
Rebecca Berger from Lake Adrianside is 2.40m tall
ES|QL provides a few ways to add new fields to a query, for example through the EVAL command. The following example shows a query that adds an evaluated field:
from elasticsearch.esql import E, functions
query = (
Employee.esql_from()
.eval(height_cm=functions.round(Employee.height * 100))
.where(E("height_cm") >= 200)
.sort(Employee.last_name)
.limit(10)
)
In this example we are adding the height in centimeters to the query, calculated from the height document field, which is in meters. The height_cm calculated field is available to use in other query clauses, and in particular is referenced in where() in this example. Note how the new field is given as E("height_cm") in this clause. The E() wrapper tells the query builder that the argument is an ES|QL field name and not a string literal. This is done automatically for document fields that are given as class attributes, such as Employee.height in the eval(). The E() wrapper is only needed for fields that are not in the document.
By default, the esql_execute() method returns only document instances. To receive any additional fields that are not part of the document in the query results, the return_additional=True argument can be passed to it, and then the results are returned as tuples with the document as first element, and a dictionary with the additional fields as second element:
for emp, additional in Employee.esql_execute(query, return_additional=True):
print(emp.name, additional)
Example output from the query given above:
Michael Adkins {'height_cm': 248.0}
Kimberly Allen {'height_cm': 224.0}
Crystal Austin {'height_cm': 230.0}
Rebecca Berger {'height_cm': 240.0}
Katherine Blake {'height_cm': 214.0}
Edward Butler {'height_cm': 246.0}
Steven Carlson {'height_cm': 242.0}
Mark Carter {'height_cm': 240.0}
Joseph Castillo {'height_cm': 229.0}
Alexander Cohen {'height_cm': 245.0}
The base query returned by the esql_from() method includes a KEEP command with the complete list of fields that are part of the document. If any subsequent clauses added to the query remove fields that are part of the document, then the esql_execute() method will raise an exception, because it will not be able construct complete document instances to return as results.
To prevent errors, it is recommended that the keep() and drop() clauses are not used when working with Document instances.
If a query has missing fields, it can be forced to execute without errors by passing the ignore_missing_fields=True argument to esql_execute(). When this option is used, returned documents will have any missing fields set to None.
The DSL module supports async/await with asyncio. To ensure that you have all the required dependencies, install the [async] extra:
$ python -m pip install "elasticsearch[async]"
The DSL module also supports Trio when using the Async HTTPX client. You do need to install Trio and HTTPX separately:
$ python -m pip install "elasticsearch trio httpx"
Use the async_connections module to manage your asynchronous connections.
from elasticsearch.dsl import async_connections
async_connections.create_connection(hosts=['localhost'], timeout=20)
If you're using Trio, you need to explicitly request the Async HTTP client:
from elasticsearch.dsl import async_connections
async_connections.create_connection(hosts=['localhost'], node_class="httpxasync")
All the options available in the connections module can be used with async_connections.
These warnings come from the aiohttp package, which is used internally by the AsyncElasticsearch client. They appear often when the application exits and are caused by HTTP connections that are open when they are garbage collected. To avoid these warnings, make sure that you close your connections.
es = async_connections.get_connection()
await es.close()
Use the AsyncSearch class to perform asynchronous searches.
from elasticsearch.dsl import AsyncSearch
from elasticsearch.dsl.query import Match
s = AsyncSearch().query(Match("title", "python"))
async for hit in s:
print(hit.title)
Instead of using the AsyncSearch object as an asynchronous iterator, you can explicitly call the execute() method to get a Response object.
s = AsyncSearch().query(Match("title", "python"))
response = await s.execute()
for hit in response:
print(hit.title)
An AsyncMultiSearch is available as well.
from elasticsearch.dsl import AsyncMultiSearch
from elasticsearch.dsl.query import Term
ms = AsyncMultiSearch(index='blogs')
ms = ms.add(AsyncSearch().filter(Term("tags", "python")))
ms = ms.add(AsyncSearch().filter(Term("tags", "elasticsearch")))
responses = await ms.execute()
for response in responses:
print("Results for query %r." % response.search.query)
for hit in response:
print(hit.title)
The Document, BaseESModel, Index, IndexTemplate, Mapping, UpdateByQuery and FacetedSearch classes all have asynchronous versions that use the same name with an Async prefix. These classes expose the same interfaces as the synchronous versions, but any methods that perform I/O are defined as coroutines.
Auxiliary classes that do not perform I/O do not have asynchronous versions. The same classes can be used in synchronous and asynchronous applications.
When using a custom analyzer in an asynchronous application, use the async_simulate() method to invoke the Analyze API on it.
Consult the api section for details about each specific method.