Skip to content

Commit 5892bda

Browse files
authored
Fix conditional writes (#24203)
## Description I was seeing these errors on watermark updates - `Generic GCS error: Version required for conditional update` I guess you need to set the version and the etag for GCS conditional writes.
1 parent 30006ca commit 5892bda

File tree

2 files changed

+15
-21
lines changed
  • crates
    • sui-checkpoint-blob-indexer/src/handlers
    • sui-indexer-alt-object-store/src

2 files changed

+15
-21
lines changed

crates/sui-checkpoint-blob-indexer/src/handlers/epochs.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ impl Handler for EpochsPipeline {
6767
let path = ObjectPath::from("epochs.json");
6868
let store = conn.object_store();
6969

70-
let (mut epochs, etag) = match store.get(&path).await {
70+
let (mut epochs, e_tag, version, file_exists) = match store.get(&path).await {
7171
Ok(result) => {
72-
let etag = result.meta.e_tag.clone();
72+
let e_tag = result.meta.e_tag.clone();
73+
let version = result.meta.version.clone();
7374
let bytes = result.bytes().await?;
7475
let epochs: Vec<u64> =
7576
serde_json::from_slice(&bytes).context("Failed to parse epochs.json")?;
76-
(epochs, etag)
77+
(epochs, e_tag, version, true)
7778
}
78-
Err(ObjectStoreError::NotFound { .. }) => (Vec::new(), None),
79+
Err(ObjectStoreError::NotFound { .. }) => (Vec::new(), None, None, false),
7980
Err(e) => return Err(e.into()),
8081
};
8182

@@ -87,16 +88,12 @@ impl Handler for EpochsPipeline {
8788
let json_bytes = serde_json::to_vec(&epochs)?;
8889
let payload: PutPayload = Bytes::from(json_bytes).into();
8990

90-
if let Some(etag) = etag {
91+
if file_exists {
9192
store
9293
.put_opts(
9394
&path,
9495
payload,
95-
PutMode::Update(object_store::UpdateVersion {
96-
e_tag: Some(etag),
97-
version: None,
98-
})
99-
.into(),
96+
PutMode::Update(object_store::UpdateVersion { e_tag, version }).into(),
10097
)
10198
.await?;
10299
} else {

crates/sui-indexer-alt-object-store/src/lib.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,19 +146,20 @@ impl Connection for ObjectStoreConnection {
146146
ObjectPath::from(format!("_metadata/watermarks/{}.json", pipeline))
147147
};
148148

149-
let (current_watermark, etag) = match self.object_store.get(&object_path).await {
149+
let (current_watermark, e_tag, version) = match self.object_store.get(&object_path).await {
150150
Ok(result) => {
151-
let etag = result.meta.e_tag.clone();
151+
let e_tag = result.meta.e_tag.clone();
152+
let version = result.meta.version.clone();
152153
let bytes = result.bytes().await?;
153154
let watermark: ComitterWatermark = serde_json::from_slice(&bytes)
154155
.context("Failed to parse watermark from object store")?;
155-
(Some(watermark), etag)
156+
(Some(watermark), e_tag, version)
156157
}
157-
Err(ObjectStoreError::NotFound { .. }) => (None, None),
158+
Err(ObjectStoreError::NotFound { .. }) => (None, None, None),
158159
Err(e) => return Err(e.into()),
159160
};
160161

161-
if let Some(current) = current_watermark
162+
if let Some(ref current) = current_watermark
162163
&& current.checkpoint_hi_inclusive >= new_watermark.checkpoint_hi_inclusive
163164
{
164165
return Ok(false);
@@ -167,16 +168,12 @@ impl Connection for ObjectStoreConnection {
167168
let json_bytes = serde_json::to_vec(&new_watermark)?;
168169
let payload: PutPayload = Bytes::from(json_bytes).into();
169170

170-
if let Some(etag) = etag {
171+
if current_watermark.is_some() {
171172
self.object_store
172173
.put_opts(
173174
&object_path,
174175
payload,
175-
PutMode::Update(object_store::UpdateVersion {
176-
e_tag: Some(etag),
177-
version: None,
178-
})
179-
.into(),
176+
PutMode::Update(object_store::UpdateVersion { e_tag, version }).into(),
180177
)
181178
.await?;
182179
} else {

0 commit comments

Comments
 (0)