The Distributed Area contains indexing and coordination systems.
The index path stretches from the user REST command through shard routing down to each individual shard's translog and storage
engine. Reindexing is effectively reading from a source index and writing to a destination index (perhaps on different nodes).
The coordination side includes cluster coordination, shard allocation, cluster autoscaling stats, task management, and cross
cluster replication. Less obvious coordination systems include networking, the discovery plugin system, the snapshot/restore
logic, and shard recovery.
A guide to the general Elasticsearch components can be found [here](https://github.com/elastic/elasticsearch/blob/main/docs/internal/GeneralArchitectureGuide.md).
See the [Javadocs for `ActionListener`](https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/action/ActionListener.java)
(A node can coordinate a search across several other nodes, when the node itself does not have the data, and then return a result to the caller. Explain this coordinating role)
The [Metadata] of a [ClusterState] is persisted on disk and comprises information from two categories:
1. Cluster scope information such as `clusterUUID`, `CoordinationMetadata`
2. Project scope information ([ProjectMetadata]) such as indices and templates belong to each project.
Some concepts are applicable to both cluster and project scopes, e.g. [persistent tasks](#persistent-tasks). The state of a persistent task is therefore stored accordingly depending on the task's scope.
It is important to understand first the [Basic write model] of documents:
documents are written to Lucene in-memory buffers, then "refreshed" to searchable segments which may not be persisted on disk, and finally "flushed" to a durable Lucene commit on disk.
If this was the only way we stored the data, we would have to delay the response to every write request until after the data had been flushed to disk, which could take many seconds or longer. If we didn't, it would mean that we would lose newly ingested data if there was an outage between sending the response and flushing the data to disk.
For this reason, newly ingested data is also written to a shard's [`Translog`], whose main purpose is to persist uncommitted operations (e.g., document insertions or deletions), so they can be replayed by just reading them sequentially from the translog during [recovery](#recovery) in the event of ephemeral failures such as a crash or power loss.
The translog can persist operations quicker than a Lucene commit, because it just stores raw operations / documents without the analysis and indexing that Lucene does.
The translog is always persisted and fsync'ed on disk before acknowledging writes back to the user.
This can be seen in [`InternalEngine`] which calls the `add()` method of the translog to append operations, e.g., its `index()` method at some point adds a document insertion operation to the translog.
The translog ultimately truncates operations once they have been flushed to disk by a Lucene commit; indeed, in some sense the point of a "flush" is to clear out the translog.
* During recovery, an index shard can be recovered up to at least the last acknowledged operation by replaying the translog onto the last flushed commit of the shard.
* Facilitate real-time (m)GETs of documents without refreshing.
Translog files are automatically truncated when they are no longer needed, specifically after all their operations have been persisted by Lucene commits on disk.
Lucene commits are initiated by flushes (e.g., with the index [Flush API]).
Flushes may also be automatically initiated by Elasticsearch, e.g., if the translog exceeds a configurable size [`INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING`] or age [`INDEX_TRANSLOG_FLUSH_THRESHOLD_AGE_SETTING`], which ultimately truncates the translog as well.
#### Acknowledging writes
[`index()` or `delete()`]:https://github.com/elastic/elasticsearch/blob/591fa87e43a509d3eadfdbbb296cdf08453ea91a/server/src/main/java/org/elasticsearch/index/engine/Engine.java#L546-L564
A bulk request will repeateadly call ultimately the Engine methods such as [`index()` or `delete()`] which adds operations to the Translog.
Finally, the AfterWrite action of the [`TransportWriteAction`] will call [`indexShard.syncAfterWrite()`] which will put the last written translog [`Location`] of the bulk request into a [`AsyncIOProcessor`] that is responsible for gradually fsync'ing the Translog and notifying any waiters.
Ultimately the bulk request is notified that the translog has fsync'ed past the requested location, and can continue to acknowledge the bulk request.
This process involes multiple writes to the translog before the next fsync(), and this is done so that we amortize the cost of the translog's fsync() operations across all writes.
Each translog is a sequence of files, each identified by a translog generation ID, each containing a sequence of operations, with the last file open for writes.
The last file has a part which has been fsync'ed to disk, and a part which has been written but not necessarily fsync'ed yet to disk.
Each operation is identified by a sequence number (`seqno`), which is monotonically increased by the engine's ingestion functionality.
Typically the entries in the translog are in increasing order of their sequence number, but not necessarily.
A [`Checkpoint`] file is also maintained, which is written on each fsync operation of the translog, and is necessary because it records important metadata and statistics about the translog, such as the current translog generation ID, its last fsync'ed operation and location (i.e., we should read only up to this location during recovery), the minimum translog generation ID, and the minimum and maximum sequence number of operations the sequence of translog generations include, all of which are used to identify the translog operations needed to be replayed upon recovery.
When the translog rolls over, e.g., upon the translog file exceeding a configurable size, a new file in the sequence is created for writes, and the last one becomes read-only.
A new commit flushed to the disk will also induce a translog rollover, since the operations in the translog so far will become eligible for truncation.
A few more words on terminology and classes used around the translog Java package.
A [`Location`] of an operation is defined by the translog generation file it is contained in, the offset of the operation in that file, and the number of bytes that encode that operation.
An [`Operation`] can be a document indexed, a document deletion, or a no-op operation.
A [`Snapshot`] iterator can be created to iterate over a range of requested operation sequence numbers read from the translog files.
The [`sync()`] method is the one that fsync's the current translog generation file to disk, and updates the checkpoint file with the last fsync'ed operation and location.
The [`rollGeneration()`] method is the one that rolls the translog, creating a new translog generation, e.g., called during an index flush.
The [`createEmptyTranslog()`] method creates a new translog, e.g., for a new empty index shard.
Each translog file starts with a [`TranslogHeader`] that is followed by translog operations.
Some internal classes used for reading and writing from the translog are the following.
A [`TranslogReader`] can be used to read operation bytes from a translog file.
A [`TranslogSnapshot`] can be used to iterate operations from a translog reader.
A [`MultiSnapshot`] can be used to iterate operations over multiple [`TranslogSnapshot`]s.
A [`TranslogWriter`] can be used to write operations to the translog.
The [Get API] (and by extension, the multi-get API) supports a real-time mode, which can query documents by ID, even recently ingested documents that have not yet been refreshed and not searchable.
This capability is facilitated by another data structure, the [`LiveVersionMap`], which maps recently ingested documents by their ID to the translog location that encodes their indexing operation.
That way, we can return the document by reading the translog operation.
The tracking in the version map is not enabled by default.
The first real-time GET induces a refresh of the index shard, and a search to get the document, but also enables the tracking in the version map for newly ingested documents.
Thus, next real-time GETs are serviced by going first through the version map, to query the translog, and if not found there, then search (refreshed data) without requiring to refresh the index shard.
On a refresh, the code safely swaps the old map with a new empty map.
That is because after a refresh, any documents in the old map are now searchable in Lucene, and thus we do not need them in the version map anymore.
[through the CapacityResponseCache]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityAction.java#L97
### Where the data comes from
The Deciders each pull data from different sources as needed to inform their decisions. The
[DiskThresholdMonitor][] is one such data source. The Monitor runs on the master node and maintains
lists of nodes that exceed various disk size thresholds. [DiskThresholdSettings][] contains the
threshold settings with which the `DiskThresholdMonitor` runs.
The `ReactiveStorageDeciderService` tracks information that demonstrates storage limitations are causing
problems in the cluster. It uses [an algorithm defined here][]. Some examples are
- information from the `DiskThresholdMonitor` to find out whether nodes are exceeding their storage capacity
- number of unassigned shards that failed allocation because of insufficient storage
- the max shard size and minimum node size, and whether these can be satisfied with the existing infrastructure
[an algorithm defined here]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java#L158-L176
The `ProactiveStorageDeciderService` maintains a forecast window that [defaults to 30 minutes][]. It only
took place within the forecast window to [predict][] resources that will be needed shortly.
[defaults to 30 minutes]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderService.java#L32
The tasks infrastructure is used to track currently executing operations in the Elasticsearch cluster. The [Task management API] provides an interface for querying, cancelling, and monitoring the status of tasks.
Each individual task is local to a node, but can be related to other tasks, on the same node or other nodes, via a parent-child relationship.
> The Task management API is experimental/beta, its status and outstanding issues can be tracked [here](https://github.com/elastic/elasticsearch/issues/51628).
Tasks are tracked in-memory on each node in the node's [TaskManager], new tasks are registered via one of the [TaskManager#register] methods.
Registration of a task creates a [Task] instance with a unique-for-the-node numeric identifier, populates it with some metadata and stores it in the [TaskManager].
The [register][TaskManager#register] methods will return the registered [Task] instance, which can be used to interact with the task. The [Task] class is often sub-classed to include task-specific data and operations. Specific [Task] subclasses are created by overriding the [createTask][TaskAwareRequest#createTask] method on the [TaskAwareRequest] passed to the [TaskManager#register] methods.
When a task is completed, it must be unregistered via [TaskManager#unregister].
#### A note about task IDs
The IDs given to a task are numeric, supplied by a counter that starts at zero and increments over the life of the node process. So while they are unique in the individual node process, they would collide with IDs allocated after the node restarts, or IDs allocated on other nodes.
To better identify a task in the cluster scope, a tuple of persistent node ID and task ID is used. This is represented in code using the [TaskId] class and serialized as the string `{node-ID}:{local-task-ID}` (e.g. `oTUltX4IQMOUUVeiohTt8A:124`). While [TaskId] is safe to use to uniquely identify tasks _currently_ running in a cluster, it should be used with caution as it can collide with tasks that have run in the cluster in the past (i.e. tasks that ran prior to a cluster node restart).
The purpose of tasks is to provide management and visibility of the cluster workload. There is some overhead involved in tracking a task, so they are best suited to tracking non-trivial and/or long-running operations. For smaller, more trivial operations, visibility is probably better implemented using telemetry APIs.
Some examples of operations that are tracked using tasks include:
All [ThreadPool] threads have an associated [ThreadContext]. The [ThreadContext] contains a map of headers which carry information relevant to the operation currently being executed. For example, a thread spawned to handle a REST request will include the HTTP headers received in that request.
When threads submit work to an [ExecutorService] from the [ThreadPool], those spawned threads will inherit the [ThreadContext] of the thread that submitted them. When [TransportRequest]s are dispatched, the headers from the sending [ThreadContext] are included and then loaded into the [ThreadContext] of the thread handling the request. In these ways, [ThreadContext] is preserved across threads involved in an operation, both locally and on remote nodes.
[X-Opaque-Id API DOC]:https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html#_identifying_running_tasks
When a task is registered by a thread, a subset (defined by [Task#HEADERS_TO_COPY] and any [ActionPlugin][ActionPlugin#getTaskHeaders]s loaded on the node) of the headers from the [ThreadContext] are copied into the [Task]'s set of headers.
One such header is `X-Opaque-Id`. This is a string that [can be submitted on REST requests][X-Opaque-Id API DOC], and it will be associated with all tasks created on all nodes in the course of handling that request.
Another way to track the operations of a task is by following the parent/child relationships. When registering a task it can be optionally associated with a parent task. Generally if an executing task initiates sub-tasks, the ID of the executing task will be set as the parent of any spawned tasks (see [ParentTaskAssigningClient], [TransportService#sendChildRequest] and [TaskAwareRequest#setParentTask] for how this is implemented for [TransportAction]s).
Some long-running tasks are implemented to be cancel-able. Cancellation of a task and its descendants can be done via the [Cancel Task REST API] or programmatically using [TaskManager#cancelTaskAndDescendants]. Perhaps the most common use of cancellation you will see is cancellation of [TransportAction]s dispatched from the REST layer when the client disconnects, to facilitate this we use the [RestCancellableNodeClient].
In order to support cancellation, the [Task] instance associated with the task must extend [CancellableTask]. It is the job of any workload tracked by a [CancellableTask] to periodically check whether it has been cancelled and, if so, finish early. We generally wait for the result of a cancelled task, so tasks can decide how they complete upon being cancelled, typically it's exceptionally with [TaskCancelledException].
When a [Task] extends [CancellableTask] the [TaskManager] keeps track of it and any child tasks that it spawns. When the task is cancelled, requests are sent to any nodes that have had child tasks submitted to them to ban the starting of any further children of that task, and any cancellable child tasks already running are themselves cancelled (see [BanParentRequestHandler]).
When a cancellable task dispatches child requests through the [TransportService], it registers a proxy response handler that will instruct the remote node to cancel that child and any lingering descendants in the event that it completes exceptionally (see [UnregisterChildTransportResponseHandler]). A typical use-case for this is when no response is received within the time-out, the sending node will cancel the remote action and complete with a timeout exception.
A list of tasks currently running in a cluster can be requested via the [Task management API], or the [cat task management API]. The former returns each task represented using [TaskResult], the latter returning a more compact [CAT] representation.
Some [ActionRequest]s allow the results of the actions they spawn to be stored upon completion for later retrieval. If [ActionRequest#getShouldStoreResult] returns true, a [TaskResultStoringActionListener] will be inserted into the chain of response listeners. [TaskResultStoringActionListener] serializes the [TaskResult] of the [TransportAction] and persists it in the `.tasks` index using the [TaskResultsService].
The [Task management API] also exposes an endpoint where a task ID can be specified, this form of the API will return currently running tasks, or completed tasks whose results were persisted. Note that although we use [TaskResult] to return task information from all the JSON APIs, the `error` or `response` fields will only ever be populated for stored tasks that are already completed.
Up until now we have discussed only ephemeral tasks. If we want a task to survive node failures, it needs to be registered as a persistent task at the cluster level.
Plugins can register persistent tasks definitions by implementing [PersistentTaskPlugin] and returning one or more [PersistentTasksExecutor] instances. These are collated into a [PersistentTasksExecutorRegistry] which is provided to [PersistentTasksNodeService] active on each node in the cluster, and a [PersistentTasksClusterService] active on the master. A [PersistentTasksExecutor] can declare either [project][PersistentTasksExecutor.Scope.Project] or [cluster][PersistentTasksExecutor.Scope.Cluster] scope, but not both. A project scope task is not able to access data on a different project.
The [PersistentTasksClusterService] runs on the master to manage the set of running persistent tasks. It periodically checks that all persistent tasks are assigned to live nodes and handles the creation, completion, removal and updates-to-the-state of persistent task instances in the cluster state (see [PersistentTasksCustomMetadata] and [ClusterPersistentTasksCustomMetadata]).
The [PersistentTasksNodeService] monitors the cluster state to:
- Start any tasks allocated to it (tracked in the local [TaskManager] by an [AllocatedPersistentTask])
- Cancel any running tasks that have been removed ([AllocatedPersistentTask] extends [CancellableTask])
If a node leaves the cluster while it has a persistent task allocated to it, the master will re-allocate that task to a surviving node. To do this, it creates a new [PersistentTasksCustomMetadata.PersistentTask] entry with a higher `#allocationId`. The allocation ID is included any time the [PersistentTasksNodeService] communicates with the [PersistentTasksClusterService] about the task, it allows the [PersistentTasksClusterService] to ignore persistent task messages originating from stale allocations.
Some examples of the use of persistent tasks include:
- [ShardFollowTasksExecutor]: Defined by [cross-cluster replication](#cross-cluster-replication-ccr) to poll a remote cluster for updates
- [HealthNodeTaskExecutor]: Used to schedule work related to monitoring cluster health. This is currently the only example of a cluster scope persistent task.
Tasks are integrated with the ElasticSearch APM infrastructure. They implement the [Traceable] interface, and [spans][APM Spans] are published to represent the execution of each task.