Deduplicate data
Stack
The Beats framework guarantees at-least-once delivery to ensure that no data is lost when events are sent to outputs that support acknowledgement, such as Elasticsearch, Logstash, Kafka, and Redis. This is great if everything goes as planned. But if Filebeat shuts down during processing, or the connection is lost before events are acknowledged, you can end up with duplicate data.
When an output is blocked, the retry mechanism in Filebeat attempts to resend events until they are acknowledged by the output. If the output receives the events, but is unable to acknowledge them, the data might be sent to the output multiple times. Because document IDs are typically set by Elasticsearch after it receives the data from Beats, the duplicate events are indexed as new documents.
Rather than allowing Elasticsearch to set the document ID, set the ID in Beats. The ID is stored in the Beats @metadata._id field and used to set the document ID during indexing. That way, if Beats sends the same event to Elasticsearch more than once, Elasticsearch overwrites the existing document rather than creating a new one.
The @metadata._id field is passed along with the event so that you can use it to set the document ID after the event has been published by Filebeat but before it’s received by Elasticsearch. For example, see Logstash pipeline example.
There are several ways to set the document ID in Beats:
add_idprocessorUse the
add_idprocessor when your data has no natural key field, and you can’t derive a unique key from existing fields.This example generates a unique ID for each event and adds it to the
@metadata._idfield:processors: - add_id: ~fingerprintprocessorUse the
fingerprintprocessor to derive a unique key from one or more existing fields.This example uses the values of
field1andfield2to derive a unique key that it adds to the@metadata._idfield:processors: - fingerprint: fields: ["field1", "field2"] target_field: "@metadata._id"decode_json_fieldsprocessorUse the
document_idsetting in thedecode_json_fieldsprocessor when you’re decoding a JSON string that contains a natural key field.For this example, assume that the
messagefield contains the JSON string{"myid": "100", "text": "Some text"}. This example takes the value ofmyidfrom the JSON string and stores it in the@metadata._idfield:processors: - decode_json_fields: document_id: "myid" fields: ["message"] max_depth: 1 target: ""The resulting document ID is
100.JSON input settings
Use the
json.document_idinput setting if you’re ingesting JSON-formatted data, and the data has a natural key field.This example takes the value of
key1from the JSON document and stores it in the@metadata._idfield:filebeat.inputs: - type: log paths: - /path/to/json.log json.document_id: "key1"
For this example, assume that you’ve used one of the approaches described earlier to store the document ID in the Beats @metadata._id field. To preserve the ID when you send Beats data through Logstash en route to Elasticsearch, set the document_id field in the Logstash pipeline:
input {
beats {
port => 5044
}
}
output {
if [@metadata][_id] {
elasticsearch {
hosts => ["http://localhost:9200"]
document_id => "%{[@metadata][_id]}"
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
} else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
}
}
- Sets the
document_idfield in the Elasticsearch output to the value stored in@metadata._id.
When Elasticsearch indexes the document, it sets the document ID to the specified value, preserving the ID passed from Beats.