Skip to content

Commit 0e8fb59

Browse files
authored
Merge pull request #762 from Netflix/restore-blob-verifier
add a function to allow adding blob verifier during producer restore
2 parents 037f4b9 + af98aa8 commit 0e8fb59

File tree

3 files changed

+82
-9
lines changed

3 files changed

+82
-9
lines changed

hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ abstract class AbstractHollowProducer {
8585
HollowProducerMetrics metrics;
8686
HollowMetricsCollector<HollowProducerMetrics> metricsCollector;
8787
final SingleProducerEnforcer singleProducerEnforcer;
88+
final HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier;
8889
long lastSuccessfulCycle = 0;
8990
final HollowObjectHashCodeFinder hashCodeFinder;
9091
final boolean doIntegrityCheck;
@@ -108,7 +109,7 @@ public AbstractHollowProducer(
108109
new VersionMinterWithCounter(), null, 0,
109110
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, false, null,
110111
new DummyBlobStorageCleaner(), new BasicSingleProducerEnforcer(),
111-
null, true);
112+
null, true, HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE);
112113
}
113114

114115
// The only constructor should be that which accepts a builder
@@ -121,7 +122,7 @@ public AbstractHollowProducer(
121122
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards,
122123
b.allowTypeResharding, b.forceCoverageOfTypeResharding,
123124
b.metricsCollector, b.blobStorageCleaner, b.singleProducerEnforcer,
124-
b.hashCodeFinder, b.doIntegrityCheck);
125+
b.hashCodeFinder, b.doIntegrityCheck, b.updatePlanBlobVerifier);
125126
}
126127

127128
private AbstractHollowProducer(
@@ -140,7 +141,8 @@ private AbstractHollowProducer(
140141
HollowProducer.BlobStorageCleaner blobStorageCleaner,
141142
SingleProducerEnforcer singleProducerEnforcer,
142143
HollowObjectHashCodeFinder hashCodeFinder,
143-
boolean doIntegrityCheck) {
144+
boolean doIntegrityCheck,
145+
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier) {
144146
this.publisher = publisher;
145147
this.announcer = announcer;
146148
this.versionMinter = versionMinter;
@@ -173,6 +175,7 @@ private AbstractHollowProducer(
173175

174176
this.metrics = new HollowProducerMetrics();
175177
this.metricsCollector = metricsCollector;
178+
this.updatePlanBlobVerifier = updatePlanBlobVerifier;
176179
}
177180

178181
/**
@@ -262,19 +265,25 @@ public void initializeDataModel(HollowSchema... schemas) {
262265
* @see #initializeDataModel(Class[])
263266
*/
264267
public HollowProducer.ReadState restore(long versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
268+
return restore(new HollowConsumer.VersionInfo(versionDesired), blobRetriever,
269+
(restoreFrom, restoreTo) -> restoreTo.restoreFrom(restoreFrom));
270+
}
271+
272+
public HollowProducer.ReadState restore(HollowConsumer.VersionInfo versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
265273
return restore(versionDesired, blobRetriever,
266274
(restoreFrom, restoreTo) -> restoreTo.restoreFrom(restoreFrom));
267275
}
268276

269277
HollowProducer.ReadState hardRestore(long versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
270-
return restore(versionDesired, blobRetriever,
278+
return restore(new HollowConsumer.VersionInfo(versionDesired), blobRetriever,
271279
(restoreFrom, restoreTo) -> HollowWriteStateCreator.
272280
populateUsingReadEngine(restoreTo, restoreFrom, false));
273281
}
274282

275283
private HollowProducer.ReadState restore(
276-
long versionDesired, HollowConsumer.BlobRetriever blobRetriever,
284+
HollowConsumer.VersionInfo versionInfoDesired, HollowConsumer.BlobRetriever blobRetriever,
277285
BiConsumer<HollowReadStateEngine, HollowWriteStateEngine> restoreAction) {
286+
long versionDesired = versionInfoDesired.getVersion();
278287
Objects.requireNonNull(blobRetriever);
279288
Objects.requireNonNull(restoreAction);
280289

@@ -288,8 +297,10 @@ private HollowProducer.ReadState restore(
288297
Status.RestoreStageBuilder status = localListeners.fireProducerRestoreStart(versionDesired);
289298
try {
290299
if (versionDesired != HollowConstants.VERSION_NONE) {
291-
HollowConsumer client = HollowConsumer.withBlobRetriever(blobRetriever).build();
292-
client.triggerRefreshTo(versionDesired);
300+
HollowConsumer client = HollowConsumer.withBlobRetriever(blobRetriever)
301+
.withUpdatePlanVerifier(updatePlanBlobVerifier)
302+
.build();
303+
client.triggerRefreshTo(versionInfoDesired);
293304
readState = ReadStateHelper.newReadState(client.getCurrentVersionId(), client.getStateEngine());
294305
readStates = ReadStateHelper.restored(readState);
295306

hollow/src/main/java/com/netflix/hollow/api/producer/HollowProducer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ public HollowProducer.ReadState restore(long versionDesired, HollowConsumer.Blob
207207
return super.restore(versionDesired, blobRetriever);
208208
}
209209

210+
@Override
211+
public HollowProducer.ReadState restore(HollowConsumer.VersionInfo versionDesired, HollowConsumer.BlobRetriever blobRetriever) {
212+
return super.restore(versionDesired, blobRetriever);
213+
}
214+
210215
@Override
211216
public HollowWriteStateEngine getWriteEngine() {
212217
return super.getWriteEngine();
@@ -739,6 +744,7 @@ public static class Builder<B extends HollowProducer.Builder<B>> {
739744
HollowObjectHashCodeFinder hashCodeFinder = null;
740745
boolean doIntegrityCheck = true;
741746
ProducerOptionalBlobPartConfig optionalPartConfig = null;
747+
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier = HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE;
742748

743749
public B withBlobStager(HollowProducer.BlobStager stager) {
744750
this.stager = stager;
@@ -930,6 +936,11 @@ public B noIntegrityCheck() {
930936
return (B) this;
931937
}
932938

939+
public B withUpdatePlanVerifier(HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier) {
940+
this.updatePlanBlobVerifier = updatePlanBlobVerifier;
941+
return (B) this;
942+
}
943+
933944
protected void checkArguments() {
934945
if (allowTypeResharding == true && doIntegrityCheck == false) { // type resharding feature rollout
935946
throw new IllegalArgumentException("Enabling type re-sharding requires integrity check to also be enabled");

hollow/src/test/java/com/netflix/hollow/api/producer/HollowProducerTest.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.junit.Assert.fail;
2222
import static org.mockito.ArgumentMatchers.any;
2323
import static org.mockito.Mockito.doThrow;
24+
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.spy;
26+
import static org.mockito.Mockito.when;
2527

2628
import com.netflix.hollow.api.consumer.HollowConsumer;
2729
import com.netflix.hollow.api.objects.delegate.HollowObjectGenericDelegate;
@@ -60,10 +62,12 @@
6062
import java.util.ArrayList;
6163
import java.util.Arrays;
6264
import java.util.BitSet;
65+
import java.util.Collections;
6366
import java.util.HashMap;
6467
import java.util.HashSet;
6568
import java.util.List;
6669
import java.util.Map;
70+
import java.util.Optional;
6771
import java.util.Random;
6872
import java.util.Set;
6973
import java.util.concurrent.TimeUnit;
@@ -97,8 +101,22 @@ public void setUp() throws IOException {
97101
}
98102

99103
private HollowProducer createProducer(File tmpFolder, HollowObjectSchema... schemas) {
100-
HollowProducer producer = HollowProducer.withPublisher(new FakeBlobPublisher())
101-
.withAnnouncer(new HollowFilesystemAnnouncer(tmpFolder.toPath())).build();
104+
return createProducer(tmpFolder, null, null, schemas);
105+
}
106+
107+
private HollowProducer createProducer(File tmpFolder,
108+
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier,
109+
HollowProducer.VersionMinter versionMinter,
110+
HollowObjectSchema... schemas) {
111+
HollowProducer.Builder producerBuilder = HollowProducer.withPublisher(new FakeBlobPublisher())
112+
.withAnnouncer(new HollowFilesystemAnnouncer(tmpFolder.toPath()));
113+
if(updatePlanBlobVerifier != null) {
114+
producerBuilder = producerBuilder.withUpdatePlanVerifier(updatePlanBlobVerifier);
115+
}
116+
if (versionMinter != null) {
117+
producerBuilder = producerBuilder.withVersionMinter(versionMinter);
118+
}
119+
HollowProducer producer = producerBuilder.build();
102120
if (schemas != null && schemas.length > 0) {
103121
producer.initializeDataModel(schemas);
104122
}
@@ -980,6 +998,39 @@ public int maxDeltasBeforeDoubleSnapshot() {
980998
assertRecordOrdinal(consumer, 11, "val12", 12);
981999
}
9821000

1001+
@Test
1002+
public void testHollowProducerRestoreWithBlobVerifier() {
1003+
HollowConsumer.UpdatePlanBlobVerifier blobVerifier = new HollowConsumer.UpdatePlanBlobVerifier() {
1004+
@Override
1005+
public boolean announcementVerificationEnabled() {
1006+
return true;
1007+
}
1008+
1009+
@Override
1010+
public int announcementVerificationMaxLookback() {
1011+
return 5;
1012+
}
1013+
1014+
@Override
1015+
public HollowConsumer.AnnouncementWatcher announcementWatcher() {
1016+
HollowConsumer.AnnouncementWatcher watcher = mock(HollowConsumer.AnnouncementWatcher.class);
1017+
when(watcher.getVersionAnnouncementStatus(1001L)).thenReturn(HollowConsumer.AnnouncementStatus.NOT_ANNOUNCED);
1018+
when(watcher.getVersionAnnouncementStatus(990L)).thenReturn(HollowConsumer.AnnouncementStatus.ANNOUNCED);
1019+
return watcher;
1020+
}
1021+
};
1022+
HollowProducer.VersionMinter mockVersionMinter = mock(HollowProducer.VersionMinter.class);
1023+
when(mockVersionMinter.mint()).thenReturn(990L).thenReturn(1001L);
1024+
HollowProducer producer = createProducer(tmpFolder,
1025+
blobVerifier, mockVersionMinter, schema);
1026+
long version1 = testPublishV1(producer, 2, 10);
1027+
long version2 = testPublishV1(producer, 2, 11);
1028+
1029+
HollowProducer.ReadState readState = producer.restore(new HollowConsumer.VersionInfo(1003l, Optional.empty(), Optional.empty(), Optional.of(true)), blobRetriever);
1030+
//the mocked blob retriever does not have delta blob so restore should go to the latest verified snapshot prior to 1003 which is 990
1031+
assertEquals(version1, readState.getVersion());
1032+
}
1033+
9831034
private void add(HollowProducer.WriteState state, String sVal, int iVal) {
9841035
TestRec rec = new TestRec(sVal, iVal);
9851036
state.add(rec);

0 commit comments

Comments
 (0)