Loading

Tutorial: Threat hunting with ES|QL

Elastic Stack Serverless

This hands-on tutorial demonstrates advanced threat hunting techniques using the Elasticsearch Query Language (ES|QL).

Following a simulated Advanced Persistent Threat (APT) campaign, we analyze security events across authentication, process execution, and network telemetry to detect:

  • Initial compromise via malicious email attachments
  • Lateral movement through the network
  • Privilege escalation attempts
  • Data exfiltration activities

ES|QL enables powerful transformations, filtering, enrichment, and statistical analysis, making it ideal for complex security investigations. This tutorial provides practical examples of how to leverage ES|QL for threat hunting, from identifying suspicious user behavior to building attack timelines.

Requirements

You need a running Elasticsearch cluster, together with Kibana to run this tutorial. Refer to choose your deployment type for deployment options.

In this tutorial, ES|QL examples are displayed in the following format:

FROM windows-security-logs
| WHERE event.code == "4624"
| LIMIT 1000

You can run these queries using:

  • Interactive interfaces:

  • REST API via Dev Tools Console. This requires additional formatting:

To follow along with this tutorial, you need to add sample data to your cluster, using the Dev Tools Console.

Broadly, there are two types of data:

  1. Core indices: These are the main security indices that contain the logs and events you want to analyze. We need three core indices: windows-security-logs, process-logs, and network-logs.
  2. Lookup indices: These are auxiliary indices that provide additional context to your core data. We need three lookup indices: asset-inventory, user-context, and threat-intel.

First, create the core security indices for our threat hunting scenario:

 PUT /windows-security-logs {
  "mappings": {
    "properties": {
      "@timestamp": {"type": "date"},
      "event": {
        "properties": {
          "code": {"type": "keyword"},
          "action": {"type": "keyword"}
        }
      },
      "user": {
        "properties": {
          "name": {"type": "keyword"},
          "domain": {"type": "keyword"}
        }
      },
      "host": {
        "properties": {
          "name": {"type": "keyword"},
          "ip": {"type": "ip"}
        }
      },
      "source": {
        "properties": {
          "ip": {"type": "ip"}
        }
      },
      "logon": {
        "properties": {
          "type": {"type": "keyword"}
        }
      }
    }
  }
}
  1. Event codes like 4624 (successful logon) and 4625 (failed logon) are stored as keywords for exact matching.

Now let's add some sample data to the windows-security-logs index around authentication events, namely failed and successful logins.

 POST /_bulk?refresh=wait_for {"index":{"_index":"windows-security-logs"}}
{"@timestamp":"2025-05-20T08:15:00Z","event":{"code":"4625","action":"logon_failed"},"user":{"name":"jsmith","domain":"corp"},"host":{"name":"WS-001","ip":"10.1.1.50"},"source":{"ip":"10.1.1.100"}}
{"index":{"_index":"windows-security-logs"}}
{"@timestamp":"2025-05-20T08:17:00Z","event":{"code":"4624","action":"logon_success"},"user":{"name":"jsmith","domain":"corp"},"host":{"name":"WS-001","ip":"10.1.1.50"},"source":{"ip":"10.1.1.100"},"logon":{"type":"3"}}
{"index":{"_index":"windows-security-logs"}}
{"@timestamp":"2025-05-20T09:30:00Z","event":{"code":"4624","action":"logon_success"},"user":{"name":"jsmith","domain":"corp"},"host":{"name":"SRV-001","ip":"10.1.2.10"},"source":{"ip":"10.1.1.50"},"logon":{"type":"3"}}
{"index":{"_index":"windows-security-logs"}}
{"@timestamp":"2025-05-20T10:45:00Z","event":{"code":"4624","action":"logon_success"},"user":{"name":"jsmith","domain":"corp"},"host":{"name":"DB-001","ip":"10.1.3.5"},"source":{"ip":"10.1.2.10"},"logon":{"type":"3"}}
{"index":{"_index":"windows-security-logs"}}
{"@timestamp":"2025-05-20T02:30:00Z","event":{"code":"4624","action":"logon_success"},"user":{"name":"admin","domain":"corp"},"host":{"name":"DC-001","ip":"10.1.4.10"},"source":{"ip":"10.1.3.5"},"logon":{"type":"3"}}

Next, create an index for process execution logs.

 PUT /process-logs {
  "mappings": {
    "properties": {
      "@timestamp": {"type": "date"},
      "process": {
        "properties": {
          "name": {"type": "keyword"},
          "command_line": {"type": "text"},
          "parent": {
            "properties": {
              "name": {"type": "keyword"}
            }
          }
        }
      },
      "user": {
        "properties": {
          "name": {"type": "keyword"}
        }
      },
      "host": {
        "properties": {
          "name": {"type": "keyword"}
        }
      }
    }
  }
}
  1. Command lines are stored as text fields to enable full-text search for suspicious parameters and encoded commands.

Add some sample data to the process-logs index.

 POST /_bulk?refresh=wait_for {"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T08:20:00Z","process":{"name":"powershell.exe","command_line":"powershell.exe -enc JABzAD0ATgBlAHcALgBPAGIAagBlAGMAdAAgAFMAeQBzAHQAZQBtAC4ATgBlAHQALgBXAGUAYgBDAGwAaQBlAG4AdAA=","parent":{"name":"winword.exe"}},"user":{"name":"jsmith"},"host":{"name":"WS-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T09:35:00Z","process":{"name":"net.exe","command_line":"net user /domain","parent":{"name":"cmd.exe"}},"user":{"name":"jsmith"},"host":{"name":"SRV-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T10:50:00Z","process":{"name":"sqlcmd.exe","command_line":"sqlcmd -S localhost -Q \"SELECT * FROM customers\"","parent":{"name":"powershell.exe"}},"user":{"name":"jsmith"},"host":{"name":"DB-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T02:35:00Z","process":{"name":"ntdsutil.exe","command_line":"ntdsutil \"ac i ntds\" \"ifm\" \"create full c:\\temp\\ntds\"","parent":{"name":"cmd.exe"}},"user":{"name":"admin"},"host":{"name":"DC-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T12:15:00Z","process":{"name":"schtasks.exe","command_line":"schtasks.exe /create /tn UpdateCheck /tr c:\\windows\\temp\\update.exe /sc daily","parent":{"name":"cmd.exe"}},"user":{"name":"jsmith"},"host":{"name":"WS-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T12:30:00Z","process":{"name":"schtasks.exe","command_line":"schtasks.exe /create /tn SystemManager /tr powershell.exe -enc ZQBjAGgAbwAgACIASABlAGwAbABvACIA /sc minute /mo 5","parent":{"name":"powershell.exe"}},"user":{"name":"jsmith"},"host":{"name":"SRV-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T13:15:00Z","process":{"name":"sc.exe","command_line":"sc.exe create RemoteService binPath= c:\\windows\\temp\\remote.exe","parent":{"name":"cmd.exe"}},"user":{"name":"jsmith"},"host":{"name":"DB-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T13:20:00Z","process":{"name":"sc.exe","command_line":"sc.exe create BackdoorService binPath= c:\\programdata\\svc.exe","parent":{"name":"powershell.exe"}},"user":{"name":"jsmith"},"host":{"name":"SRV-001"}}
{"index":{"_index":"process-logs"}}
{"@timestamp":"2025-05-20T13:25:00Z","process":{"name":"sc.exe","command_line":"sc.exe create PersistenceService binPath= c:\\windows\\system32\\malicious.exe","parent":{"name":"cmd.exe"}},"user":{"name":"admin"},"host":{"name":"DC-001"}}

Next, create an index for network traffic logs.

 PUT /network-logs {
  "mappings": {
    "properties": {
      "@timestamp": {"type": "date"},
      "source": {
        "properties": {
          "ip": {"type": "ip"},
          "port": {"type": "integer"}
        }
      },
      "destination": {
        "properties": {
          "ip": {"type": "ip"},
          "port": {"type": "integer"}
        }
      },
      "network": {
        "properties": {
          "bytes": {"type": "long"},
          "protocol": {"type": "keyword"}
        }
      },
      "host": {
        "properties": {
          "name": {"type": "keyword"}
        }
      }
    }
  }
}

Add some sample data to the network-logs index.

 POST /_bulk?refresh=wait_for {"index":{"_index":"network-logs"}}
{"@timestamp":"2025-05-20T08:25:00Z","source":{"ip":"10.1.1.50","port":52341},"destination":{"ip":"185.220.101.45","port":443},"network":{"bytes":2048,"protocol":"tcp"},"host":{"name":"WS-001"}}
{"index":{"_index":"network-logs"}}
{"@timestamp":"2025-05-20T11:15:00Z","source":{"ip":"10.1.3.5","port":54892},"destination":{"ip":"185.220.101.45","port":443},"network":{"bytes":50000000,"protocol":"tcp"},"host":{"name":"DB-001"}}
{"index":{"_index":"network-logs"}}
{"@timestamp":"2025-05-20T02:40:00Z","source":{"ip":"10.1.4.10","port":61234},"destination":{"ip":"185.220.101.45","port":443},"network":{"bytes":500000000,"protocol":"tcp"},"host":{"name":"DC-001"}}

The lookup mode enables these indices to be used with LOOKUP JOIN operations for enriching security events with asset context.

Create the indices we need with the lookup index mode.

 PUT /asset-inventory {
  "mappings": {
    "properties": {
      "host.name": {"type": "keyword"},
      "asset.criticality": {"type": "keyword"},
      "asset.owner": {"type": "keyword"},
      "asset.department": {"type": "keyword"}
    }
  },
  "settings": {
    "index.mode": "lookup"
  }
}
 PUT /user-context {
  "mappings": {
    "properties": {
      "user.name": {"type": "keyword"},
      "user.role": {"type": "keyword"},
      "user.department": {"type": "keyword"},
      "user.privileged": {"type": "boolean"}
    }
  },
  "settings": {
    "index.mode": "lookup"
  }
}
 PUT /threat-intel {
  "mappings": {
    "properties": {
      "indicator.value": {"type": "keyword"},
      "indicator.type": {"type": "keyword"},
      "threat.name": {"type": "keyword"},
      "threat.severity": {"type": "keyword"}
    }
  },
  "settings": {
    "index.mode": "lookup"
  }
}

Now we can populate the lookup indices with contextual data. This single bulk operation indexes data into the user-context, threat-intel and asset-inventory indices with one request.

 POST /_bulk?refresh=wait_for {"index":{"_index":"asset-inventory"}}
{"host.name":"WS-001","asset.criticality":"medium","asset.owner":"IT","asset.department":"finance"}
{"index":{"_index":"asset-inventory"}}
{"host.name":"SRV-001","asset.criticality":"high","asset.owner":"IT","asset.department":"operations"}
{"index":{"_index":"asset-inventory"}}
{"host.name":"DB-001","asset.criticality":"critical","asset.owner":"DBA","asset.department":"finance"}
{"index":{"_index":"asset-inventory"}}
{"host.name":"DC-001","asset.criticality":"critical","asset.owner":"IT","asset.department":"infrastructure"}
{"index":{"_index":"user-context"}}
{"user.name":"jsmith","user.role":"analyst","user.department":"finance","user.privileged":false}
{"index":{"_index":"user-context"}}
{"user.name":"admin","user.role":"administrator","user.department":"IT","user.privileged":true}
{"index":{"_index":"threat-intel"}}
{"indicator.value":"185.220.101.45","indicator.type":"ip","threat.name":"APT-29","threat.severity":"high"}
{"index":{"_index":"threat-intel"}}
{"indicator.value":"powershell.exe","indicator.type":"process","threat.name":"Living off the Land","threat.severity":"medium"}

The first phase of our hunt focuses on identifying the initial compromise. We want to search for suspicious PowerShell execution from Office applications, which is a common initial attack vector.

FROM process-logs
| WHERE process.name == "powershell.exe" AND process.parent.name LIKE "*word*"
| LOOKUP JOIN asset-inventory ON host.name
| LOOKUP JOIN user-context ON user.name
| EVAL encoded_command = CASE(process.command_line LIKE "*-enc*", true, false)
| WHERE encoded_command == true
| STATS count = COUNT(*) BY host.name, user.name, asset.criticality
| LIMIT 1000
  1. Uses WHERE with == and LIKE operators to detect PowerShell processes
  2. Enriches using LOOKUP JOIN with asset inventory
  3. Enriches with user context using LOOKUP JOIN
  4. Uses EVAL and CASE to detect encoded commands
  5. Additional filtering with WHERE
  6. Aggregates results with STATS and COUNT grouped by multiple fields

Response

The response contains a summary of the suspicious PowerShell executions, including the host name, user name, and asset criticality.

count host.name user.name asset.criticality
1 WS-001 jsmith medium

In this step, we track user authentication across multiple systems. This is important for identifying lateral movement and potential privilege escalation.

This query demonstrates how DATE_TRUNC creates time windows for velocity analysis, combining COUNT_DISTINCT aggregations with DATE_DIFF calculations to measure both the scope and speed of user movement across network assets.

FROM windows-security-logs
| WHERE event.code == "4624" AND logon.type == "3"
| LOOKUP JOIN asset-inventory ON host.name
| EVAL time_bucket = DATE_TRUNC(30 minute, @timestamp)
| STATS unique_hosts = COUNT_DISTINCT(host.name),
        criticality_levels = COUNT_DISTINCT(asset.criticality),
        active_periods = COUNT_DISTINCT(time_bucket),
        first_login = MIN(@timestamp),
        last_login = MAX(@timestamp)
BY user.name
| WHERE unique_hosts > 2
| EVAL time_span_hours = DATE_DIFF("hour", first_login, last_login)
| EVAL movement_velocity = ROUND(unique_hosts / (time_span_hours + 1), 2)
| EVAL lateral_movement_score = unique_hosts * criticality_levels
| SORT lateral_movement_score DESC
| LIMIT 1000
  1. Uses WHERE for basic authentication filtering
  2. Creates time buckets with DATE_TRUNC for temporal analysis
  3. Uses STATS with COUNT_DISTINCT for comprehensive access metrics
  4. Uses DATE_DIFF for duration calculations
  5. Uses EVAL with CASEfor risk scoring

Response

The response shows users who logged into multiple hosts, their criticality levels, and the velocity of their lateral movement.

unique_hosts criticality_levels active_periods first_login last_login user.name time_span_hours movement_velocity lateral_movement_score
3 3 3 2025-05-20T08:17:00.000Z 2025-05-20T10:45:00.000Z jsmith 2 1 9

Advanced attackers often target sensitive data. We want to hunt for database access and large data transfers to external systems.

FROM network-logs
| WHERE NOT CIDR_MATCH(destination.ip, "10.0.0.0/8", "192.168.0.0/16")
| EVAL indicator.value = TO_STRING(destination.ip)
| LOOKUP JOIN threat-intel ON indicator.value
| LOOKUP JOIN asset-inventory ON host.name
| WHERE threat.name IS NOT NULL
| STATS total_bytes = SUM(network.bytes),
        connection_count = COUNT(*),
        time_span = DATE_DIFF("hour", MIN(@timestamp), MAX(@timestamp))
BY host.name, destination.ip, threat.name, asset.criticality
| EVAL mb_transferred = ROUND(total_bytes / 1048576, 2)
| EVAL risk_score = CASE(
    asset.criticality == "critical" AND mb_transferred > 100, 10,
    asset.criticality == "high" AND mb_transferred > 100, 7,
    mb_transferred > 50, 5,
    3
  )
| WHERE total_bytes > 1000000
| SORT risk_score DESC, total_bytes DESC
| LIMIT 1000
  1. Uses CIDR_MATCH to filter internal IP ranges for external data transfer detection
  2. Uses TO_STRING to standardize IP format for threat intel lookups
  3. Uses DATE_DIFF with SUM and COUNT to measure data transfer volume over time
  4. Uses ROUND for human-readable values
  5. Uses CASE for risk scoring based on asset criticality and size of data transferred

Response

The response shows external data transfers, their risk scores, and the amount of data transferred.

total_bytes connection_count time_span host.name destination.ip threat.name asset.criticality mb_transferred risk_score
500000000 1 0 DC-001 185.220.101.45 APT-29 critical 476 10
50000000 1 0 DB-001 185.220.101.45 APT-29 critical 47 3

To understand the attack progression, we need to build a timeline of events across multiple indices. This helps us correlate actions and identify the attacker's dwell time.

FROM windows-security-logs, process-logs, network-logs
| LOOKUP JOIN asset-inventory ON host.name
| LOOKUP JOIN user-context ON user.name
| WHERE user.name == "jsmith" OR user.name == "admin"
| EVAL event_type = CASE(
    event.code IS NOT NULL, "Authentication",
    process.name IS NOT NULL, "Process Execution",
    destination.ip IS NOT NULL, "Network Activity",
    "Unknown")
| EVAL dest_ip = TO_STRING(destination.ip)
| EVAL attack_stage = CASE(
    process.parent.name LIKE "*word*", "Initial Compromise",
    process.name IN ("net.exe", "nltest.exe"), "Reconnaissance",
    event.code == "4624" AND logon.type == "3", "Lateral Movement",
    process.name IN ("sqlcmd.exe", "ntdsutil.exe"), "Data Access",
    dest_ip NOT LIKE "10.*", "Exfiltration",
    "Other")
| SORT @timestamp ASC
| KEEP @timestamp, event_type, attack_stage, host.name, asset.criticality, user.name, process.name, destination.ip
| LIMIT 1000
  1. Uses FROM with multiple indices for comprehensive correlation
  2. Uses IS NOT NULL with CASE to classify event types from different data sources
  3. Uses complex CASE logic to map events to MITRE ATT&CK stages
  4. Uses SORT to build chronological attack timeline Response

The response provides a chronological timeline of events, showing the attacker's actions and the impact on the organization.

This query demonstrates how ES|QL's COUNT_DISTINCT function and conditional CASE statements can be used to baseline interpreter usage patterns across users and departments, using aggregation functions to identify anomalous script execution that might indicate compromised accounts or insider threats.

FROM process-logs
| WHERE process.name IN ("powershell.exe", "cmd.exe", "net.exe", "sqlcmd.exe", "schtasks.exe", "sc.exe")
| LOOKUP JOIN asset-inventory ON host.name
| LOOKUP JOIN user-context ON user.name
| STATS executions = COUNT(*),
        unique_hosts = COUNT_DISTINCT(host.name),
        unique_commands = COUNT_DISTINCT(process.name)
BY user.name, user.department
| WHERE executions > 1
| EVAL usage_pattern = CASE(
    executions > 5, "High Usage",
    executions > 3, "Moderate Usage",
    "Low Usage"
  )
| SORT executions DESC
| LIMIT 1000
  1. Uses WHERE...IN to monitor high-risk system tools
  2. Uses LOOKUP JOIN with asset-inventory and user-context indices to enrich events with context
  3. Uses COUNT_DISTINCT to measure breadth of suspicious tool usage
  4. Uses CASEto classify usage patterns for anomaly detection

Response

The response shows the number of executions, unique hosts, and usage patterns for each user and department.

executions unique_hosts unique_commands user.name user.department usage_pattern
7 3 5 jsmith finance High Usage

This query showcases how DATE_TRUNC enables temporal analysis of persistence mechanisms, using time bucketing and COUNT_DISTINCT to identify suspicious patterns like rapid-fire task creation or persistence establishment across multiple time windows.

FROM process-logs
| WHERE process.name == "schtasks.exe" AND process.command_line:"/create"
| LOOKUP JOIN asset-inventory ON host.name
| LOOKUP JOIN user-context ON user.name
| EVAL time_bucket = DATE_TRUNC(1 hour, @timestamp)
| STATS task_creations = COUNT(*),
        creation_hours = COUNT_DISTINCT(time_bucket)
BY user.name, host.name, asset.criticality
| WHERE task_creations > 0
| EVAL persistence_pattern = CASE(
    creation_hours > 1, "Multiple Hours",
    task_creations > 1, "Burst Creation",
    "Single Task"
  )
| SORT task_creations DESC
| LIMIT 1000
  1. Uses WHERE with : match operator to detect scheduled task creation (a common persistence mechanism)
  2. Uses DATE_TRUNC to group events into hourly time buckets for temporal analysis
  3. Uses COUNT_DISTINCT with time_bucket to measure task creation velocity
  4. Uses CASE to classify suspicious patterns based on timing and frequency

Response

The response shows the number of task creations, creation hours, and persistence patterns for each user and host.

task_creations creation_hours user.name host.name asset.criticality persistence_pattern
1 1 jsmith WS-001 medium Single Task
1 1 jsmith SRV-001 high Single Task
Tip

To learn where you can use ES|QL in Elastic Security contexts, refer to the overview.