elasticsearch/docs/reference/ingest.asciidoc

746 lines
18 KiB
Plaintext
Raw Normal View History

2016-02-12 06:16:56 +08:00
[[ingest]]
= Ingest pipelines
2016-02-12 06:16:56 +08:00
Ingest pipelines let you perform common transformations on your data before
indexing. For example, you can use pipelines to remove fields, extract values
from text, and enrich your data.
2016-02-12 06:16:56 +08:00
A pipeline consists of a series of configurable tasks called
<<processors,processors>>. Each processor runs sequentially, making specific
changes to incoming documents. After the processors have run, {es} adds the
transformed documents to your data stream or index.
2016-02-12 06:16:56 +08:00
image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
You can create and manage ingest pipelines using {kib}'s **Ingest Node
Pipelines** feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in
the <<cluster-state,cluster state>>.
[discrete]
[[ingest-prerequisites]]
=== Prerequisites
* Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
processing. To use ingest pipelines, your cluster must have at least one node
with the `ingest` role. For heavy ingest loads, we recommend creating
<<node-ingest-node,dedicated ingest nodes>>.
* If the {es} security features are enabled, you must have the `manage_pipeline`
<<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
{kib}'s **Ingest Node Pipelines** feature, you also need the
`cluster:monitor/nodes/info` cluster privileges.
* Pipelines including the `enrich` processor require additional setup. See
<<ingest-enriching-data>>.
[discrete]
[[create-manage-ingest-pipelines]]
2021-03-22 22:43:44 +08:00
=== Create and manage pipelines
In {kib}, open the main menu and click **Stack Management** > **Ingest Node
Pipelines**. From the list view, you can:
* View a list of your pipelines and drill down into details
* Edit or clone existing pipelines
* Delete pipelines
To create a new pipeline, click **Create a pipeline**. For an example tutorial,
see <<common-log-format-example>>.
[role="screenshot"]
image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Node Pipelines list view,align="center"]
You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
The following <<put-pipeline-api,create pipeline API>> request creates
a pipeline containing two <<set-processor,`set`>> processors followed by a
<<lowercase-processor,`lowercase`>> processor. The processors run sequentially
in the order specified.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"description": "My optional pipeline description",
"processors": [
{
"set": {
"description": "My optional processor description",
"field": "my-long-field",
"value": 10
}
},
{
"set": {
"description": "Set 'my-boolean-field' to true",
"field": "my-boolean-field",
"value": true
}
},
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
}
----
// TESTSETUP
[discrete]
[[manage-pipeline-versions]]
=== Manage pipeline versions
When you create or update a pipeline, you can specify an optional `version`
integer. {es} doesn't use this `version` number internally, but you can use it
to track changes to a pipeline.
[source,console]
----
PUT _ingest/pipeline/my-pipeline-id
{
"version" : 1,
"processors": [ ... ]
}
----
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
To unset the `version` number using the API, replace or update the pipeline
without specifying the `version` parameter.
[discrete]
[[test-pipeline]]
=== Test a pipeline
Before using a pipeline in production, we recommend you test it using sample
documents. When creating or editing a pipeline in {kib}, click **Add
documents**. In the **Documents** tab, provide sample documents and click **Run
the pipeline**.
[role="screenshot"]
image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
API>>. You can specify a configured pipeline in the request path. For example,
the following request tests `my-pipeline`.
[source,console]
----
POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
----
Alternatively, you can specify a pipeline and its processors in the request
body.
[source,console]
----
POST _ingest/pipeline/_simulate
{
"pipeline" : {
"processors": [
{
"lowercase": {
"field": "my-keyword-field"
}
}
]
},
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
]
}
----
The API returns transformed documents:
[source,console-result]
----
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"my-keyword-field": "foo"
},
"_ingest": {
"timestamp": "2099-02-30T22:30:03.187Z"
}
}
},
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"my-keyword-field": "bar"
},
"_ingest": {
"timestamp": "2099-02-30T22:30:03.188Z"
}
}
}
]
}
----
// TESTRESPONSE[s/"2099-02-30T22:30:03.187Z"/$body.docs.0.doc._ingest.timestamp/]
// TESTRESPONSE[s/"2099-02-30T22:30:03.188Z"/$body.docs.1.doc._ingest.timestamp/]
2016-02-12 06:16:56 +08:00
[discrete]
[[add-pipeline-to-indexing-request]]
=== Add a pipeline to an indexing request
Use the `pipeline` query parameter to apply a pipeline to documents in
<<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
2016-02-12 06:16:56 +08:00
[source,console]
----
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:04:05.000Z", "my-keyword-field" : "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "my-keyword-field" : "bar" }
----
You can also use the `pipeline` parameter with the <<docs-update-by-query,update
by query>> or <<docs-reindex,reindex>> APIs.
[source,console]
----
POST my-data-stream/_update_by_query?pipeline=my-pipeline
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
----
// TEST[continued]
[discrete]
[[set-default-pipeline]]
=== Set a default pipeline
Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
a default pipeline. {es} applies this pipeline if no `pipeline` parameter
is specified.
[discrete]
[[set-final-pipeline]]
=== Set a final pipeline
Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
final pipeline. {es} applies this pipeline after the request or default
pipeline, even if neither is specified.
[discrete]
[[access-source-fields]]
=== Access source fields in a processor
Processors have read and write access to an incoming document's source fields.
To access a field key in a processor, use its field name. The following `set`
processor accesses `my-long-field`.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "my-long-field",
"value": 10
}
}
]
}
----
You can also prepend the `_source` prefix.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"field": "_source.my-long-field",
"value": 10
}
}
]
}
----
Use dot notation to access object fields.
IMPORTANT: If your document contains flattened objects, use the
<<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
ingest processors cannot access flattened objects.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"dot_expander": {
"description": "Expand 'my-object-field.my-property'",
"field": "my-object-field.my-property"
}
},
{
"set": {
"description": "Set 'my-object-field.my-property' to 10",
"field": "my-object-field.my-property",
"value": 10
}
}
]
}
----
[[template-snippets]]
To access field values, enclose the field name in double curly brackets `{{ }}`
to create a https://mustache.github.io[Mustache] template snippet. You can use
template snippets to dynamically set field names.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Set dynamic '<service>' field to 'code' value",
"field": "{{service}}",
"value": "{{code}}"
}
}
]
}
----
[discrete]
[[access-metadata-fields]]
=== Access metadata fields in a processor
Processors can access the following metadata fields by name:
* `_index`
* `_id`
* `_routing`
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors" : [
{
"set" : {
"description": "Set '_routing' to 'geoip.country_iso_code' value",
"field": "_routing",
"value": "{{geoip.country_iso_code}}"
}
}
]
}
----
Use a Mustache template snippet to access metadata field values. For example,
`{{_routing}}` retrieves a document's routing value.
WARNING: If you <<create-document-ids-automatically,automatically generate>>
document IDs, you cannot use `{{_id}}` in a processor. {es} assigns
auto-generated `_id` values after ingest.
[discrete]
[[access-ingest-metadata]]
=== Access ingest metadata in a processor
Ingest processors can add and access ingest metadata using the `_ingest` key.
Unlike source and metadata fields, {es} does not index ingest metadata fields by
default. {es} also allows source fields that start with an `_ingest` key. If
your data includes such source fields, use `_source._ingest` to access them.
Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
This field contains a timestamp of when {es} received the document's indexing
request. To index `_ingest.timestamp` or other ingest metadata fields, use the
`set` processor.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Index the ingest timestamp as 'event.ingested'",
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
}
]
}
----
[discrete]
[[handling-pipeline-failures]]
=== Handing pipeline failures
A pipeline's processors run sequentially. By default, pipeline processing stops
when one of these processors fails or encounters an error.
To ignore a processor failure and run the pipeline's remaining processors, set
`ignore_failure` to `true`.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"ignore_failure": true
}
}
]
}
----
Use the `on_failure` parameter to specify a list of processors to run
immediately after a processor failure. If `on_failure` is specified, {es}
2021-03-22 22:43:44 +08:00
afterward runs the pipeline's remaining processors, even if the `on_failure`
configuration is empty.
2016-02-12 06:16:56 +08:00
[source,console]
----
PUT _ingest/pipeline/my-pipeline
2016-02-12 06:16:56 +08:00
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
2016-02-12 06:16:56 +08:00
}
----
Nest a list of `on_failure` processors for nested error handling.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false,
"on_failure": [
{
"set": {
"description": "Set 'error.message.multi'",
"field": "error.message.multi",
"value": "Document encountered multiple ingest errors",
"override": true
}
}
]
}
}
]
}
}
]
}
----
You can also specify `on_failure` for a pipeline. If a processor without an
`on_failure` value fails, {es} uses this pipeline-level parameter as a fallback.
{es} will not attempt to run the pipeline's remaining processors.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{ _index }}"
}
}
]
}
----
// TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
[discrete]
[[conditionally-run-processor]]
=== Conditionally run a processor
Each processor supports an optional `if` condition, written as a
{painless}/painless-guide.html[Painless script]. If provided, the processor only
runs when the `if` condition is `true`.
IMPORTANT: `if` condition scripts run in Painless's
{painless}/painless-ingest-processor-context.html[ingest processor context]. In
`if` conditions, `ctx` values are read-only.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents with 'network.name' of 'Guest'",
"if": "ctx?.network?.name == 'Guest'"
}
}
]
}
----
If the static `script.painless.regex.enabled` cluster setting is enabled, you
can use regular expressions in your `if` condition scripts. For supported
syntax, see the {painless}/painless-regexes.html[Painless regexes]
documentation.
TIP: If possible, avoid using regular expressions. Expensive regular expressions
can slow indexing speeds.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
"if": "ctx.url?.scheme =~ /^http[^s]/",
"field": "url.insecure",
"value": true
}
}
]
}
----
You must specify `if` conditions as valid JSON on a single line. However, you
can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
console]'s triple quote syntax to write and debug larger scripts.
TIP: If possible, avoid using complex or expensive `if` condition scripts.
Expensive condition scripts can slow indexing speeds.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that don't contain 'prod' tag",
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
----
2016-02-12 06:16:56 +08:00
You can also specify a <<modules-scripting-stored-scripts,stored script>> as the
`if` condition.
[source,console]
----
PUT _scripts/my-stored-script
{
"script": {
"lang": "painless",
"source": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
"if": { "id": "my-stored-script" }
}
}
]
}
----
Incoming documents often contain object fields. If a processor script attempts
to access a field whose parent object does not exist, {es} returns a
`NullPointerException`. To avoid these exceptions, use
{painless}/painless-operators-reference.html#null-safe-operator[null safe
operators], such as `?.`, and write your scripts to be null safe.
For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
`ctx.network?.name` can return null. Rewrite the script as
`'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
`Guest` is always non-null.
2016-03-04 14:00:07 +08:00
If you can't rewrite a script to be null safe, include an explicit null check.
[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"drop": {
"description": "Drop documents that contain 'network.name' of 'Guest'",
"if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
}
}
]
}
----
[discrete]
[[conditionally-apply-pipelines]]
=== Conditionally apply pipelines
Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
to apply other pipelines to documents based on your criteria. You can use this
pipeline as the <<set-default-pipeline,default pipeline>> in an
<<index-templates,index template>> used to configure multiple data streams or
indices.
[source,console]
----
PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
"processors": [
{
"pipeline": {
"description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
"if": "ctx.service?.name == 'apache_httpd'",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
"if": "ctx.service?.name == 'syslog'",
"name": "syslog_pipeline"
}
},
{
"fail": {
"description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
"if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
----
[discrete]
[[get-pipeline-usage-stats]]
=== Get pipeline usage statistics
Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
ingest statistics. Use these stats to determine which pipelines run most
frequently or spend the most time processing.
[source,console]
----
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
----
2016-02-12 06:16:56 +08:00
include::ingest/common-log-format-example.asciidoc[]
include::ingest/enrich.asciidoc[]
include::ingest/processors.asciidoc[]