-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Description
Motivation
Currently, Druid can serve partial result sets unbeknownst to the user. This can occur due to many reasons:
- Data node crash/failure/unavailability
- Broker missing announcements from historicals/realtime nodes
- Other synchronization issues
Issue can be reproduced here: #18737
Proposed changes
The core issue of this problem is because Brokers automatically remove segments from their timeline when they receive drop notifications from the data nodes. This removes the ability to audit and fail queries if a complete timeline is being served (because parts of it are missing).
Segment-level changes
To solve this, introduce concept of a "queryable" segment in Broker timeline. A segment is marked as "queryable" when it is loaded by at least one data node. Once a segment is marked as loaded, it remains as such until it is marked as unused, metadata is reset, the cluster is re-deployed with downtime, or other similar situations. The loaded property is maintained in memory by the coordinator. A diagram of a segment's lifetime is described below:
<TODO>
Broker changes
To fetch the latest used status for segments, the broker will do an initial full sync, followed by periodic delta syncs, with the coordinator to keep its knowledge of what segments are used/queryable up-to-date.
Queries that touch segments marked as queryable will fail if no announcing servers are found.
In order to mark a segment as queryable in the timeline, the broker needs to hear both sync callbacks from data node/coordinator that the segment has loaded. This extra sync with the coordinator is needed because broker needs direct confirmation the segment is loaded as well as that it is currently marked as used in the cluster.
The broker will place the segment in its timeline for any given callback/sync (historical/coordinator) but will ONLY mark the segment as queryable once:
- A data (historical/peon) node has given a loaded callback for the segment
- A sync from the coordinator shows the segment as
usedandloadedonto a node.
Broker query evaluation loop looks like:
Unavailable Query Failure Flow
for segment in query:
if segment in timeline and segment is `queryable`:
if len(getServersForSegment(segment)) == 0:
throw UNAVAILABLE_SEGMENT_ERROR
return OK
Edge Cases
- Coordinator sync slowness/failures/drops
- This can cause Broker not to pick up newly loaded or mark (un)used segments. This can cause delays in when segments are available in the timeline compared to current behavior.
- Historical callback slowness/failures/drops
- This can cause Broker not to pick up newly loaded segments.
- TODO
- Historical callback reordering on brokers
- This is a well-known issue where HTTP-based load/drop callbacks for the same segment from different historicals can be delivered to brokers out-of-order. The existence of unavailable segment detection will cause these races to fail queries. At scale, this issue occurs very frequently.
- Setting replication factor to 0.
Desired Goal/Outcome
Segment Creation/Loading
- Newly created segment
Sloading for first time on Historical/Realtime server A
1.1. Broker waits until it syncs with coordinator and confirms that the segment is used and gets theloadedstatus (interchangeable with 1.2.).
1.2. Broker adds segmentSto its timeline, settingqueryable=false.
1.2. Broker waits until it receives loaded callback from Historical/Realtime (interchangeable with 1.1.)
1.3. Broker setsqueryable=truefor segment S. - Previously created segment S loading on Historical/Realtime A
1.1. Broker waits until it syncs with coordinator and confirms that the segment is used and gets theloadedstatus (interchangeable with 1.2.).
1.2. Broker adds segmentSto its timeline, settingqueryable=false.
1.2. Broker waits until it receives loaded callback from Historical/Realtime (interchangeable with 1.1.)
1.3. Broker setsqueryable=truefor segment S.
Segment Deletion/Drop on Historical/Realtime Node
- Segment S marked unused
1.1 Historical/Realtime callback THEN Coordinator sync
1.1.1. Broker gets dropSegment callback from Historical/Realtime server. Removes server from timeline for that segment, but does not remove the segment itself. If no more servers exist, queries touchingSwill start to fail until coordinator sync occurs.
1.1.2. Broker gets Coordinator sync. Seeing it is unused, removes the segment from the timeline.
1.2 Coordinator sync THEN Historical/Realtime callback
1.2.1. Broker gets Coordinator sync. Seeing it is unused, removes the segment from the timeline.
1.2.2. Broker gets dropSegment callback from Historical/Realtime server. This is a noop. - Segment S is moved from Historical A to Historical B
2.1. TODO - Server removed segment because its replication factor was changed (used) => keep in timeline (assert there are n > 0 servers serving it, otherwise that's a race and should be fixed).
3.1. TODO - Server removed segment because the server died/stopped responding (used) =>
4.1 Historical/Realtime callback THEN Coordinator sync
1.1.1. Broker gets dropSegment callback from Historical/Realtime server. Removes server from timeline for that segment, but does not remove the segment itself. If no more servers exist, queries touchingSwill start to fail until coordinator sync occurs.
1.1.2. Broker gets Coordinator sync. Seeing it is still used, noops.
4.2 Coordinator sync THEN Historical/Realtime callback
4.2.1. Broker gets Coordinator sync. Seeing it is still used, noops.
4.2.2. Broker gets dropSegment callback from Historical/Realtime server. Removes server from timeline for that segment, but does not remove the segment itself.