Skip to content

Commit 2be5a82

Browse files
committed
Update retry
Signed-off-by: Matt Lord <[email protected]>
1 parent aeed2b6 commit 2be5a82

File tree

1 file changed

+31
-21
lines changed

1 file changed

+31
-21
lines changed

go/vt/mysqlctl/builtinbackupengine.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ const (
6262
AutoIncrementalFromPos = "auto"
6363
dataDictionaryFile = "mysql.ibd"
6464

65-
maxRetriesPerFile = 1
66-
fileCloseRetries = 3
67-
fileCloseRetryDelay = 500 * time.Millisecond
65+
maxRetriesPerFile = 1
6866
)
6967

7068
var (
@@ -856,7 +854,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
856854

857855
defer func() {
858856
closeSourceAt := time.Now()
859-
if err := closeWithRetry(source); err != nil {
857+
if err := closeWithRetry(ctx, source); err != nil {
860858
params.Logger.Infof("Failed to close %s source file during backup: %v", fe.Name, err)
861859
}
862860
params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
@@ -885,13 +883,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
885883

886884
defer func(name, fileName string) {
887885
closeDestAt := time.Now()
888-
if rerr := closeWithRetry(dest); rerr != nil {
886+
if rerr := closeWithRetry(ctx, dest); rerr != nil {
889887
rerr = vterrors.Wrapf(rerr, "failed to close destination file (%v) %v", name, fe.Name)
890888
params.Logger.Error(rerr)
891-
// Let's try to delete the file as we will retry.
892-
if cerr := os.Remove(source.Name()); cerr != nil {
893-
finalErr = errors.Join(finalErr, cerr)
894-
}
895889
finalErr = errors.Join(finalErr, rerr)
896890
}
897891
params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
@@ -940,7 +934,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
940934
// Close gzip to flush it, after that all data is sent to writer.
941935
closeCompressorAt := time.Now()
942936
params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr)
943-
if cerr := closeWithRetry(closer); err != nil {
937+
if cerr := closeWithRetry(ctx, closer); err != nil {
944938
cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name)
945939
params.Logger.Error(cerr)
946940
createAndCopyErr = errors.Join(createAndCopyErr, cerr)
@@ -1004,7 +998,7 @@ func (be *BuiltinBackupEngine) backupManifest(
1004998
return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr)
1005999
}
10061000
defer func() {
1007-
if err := closeWithRetry(wc); err != nil {
1001+
if err := closeWithRetry(ctx, wc); err != nil {
10081002
addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName))
10091003
}
10101004
}()
@@ -1266,7 +1260,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
12661260

12671261
defer func() {
12681262
closeSourceAt := time.Now()
1269-
if err := closeWithRetry(source); err != nil {
1263+
if err := closeWithRetry(ctx, source); err != nil {
12701264
params.Logger.Errorf("Failed to close source file %s during restore: %v", name, err)
12711265
}
12721266
params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
@@ -1292,7 +1286,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
12921286
params.Stats.Scope(stats.Operation("Destination:Open")).TimedIncrement(time.Since(openDestAt))
12931287

12941288
defer func() {
1295-
if cerr := closeWithRetry(dest); cerr != nil {
1289+
if cerr := closeWithRetry(ctx, dest); cerr != nil {
12961290
finalErr = errors.Join(finalErr, vterrors.Wrap(cerr, "failed to close destination file"))
12971291
params.Logger.Errorf("Failed to close destination file %s during restore: %v", dest.Name(), err)
12981292
}
@@ -1342,7 +1336,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
13421336
defer func() {
13431337
closeDecompressorAt := time.Now()
13441338
params.Logger.Infof("closing decompressor")
1345-
if cerr := closeWithRetry(closer); err != nil {
1339+
if cerr := closeWithRetry(ctx, closer); err != nil {
13461340
cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name)
13471341
params.Logger.Error(cerr)
13481342
finalErr = errors.Join(finalErr, cerr)
@@ -1414,16 +1408,32 @@ func init() {
14141408
BackupRestoreEngineMap[builtinBackupEngineName] = &BuiltinBackupEngine{}
14151409
}
14161410

1417-
// closeWithRetry does just what it says. Retrying a close operation is
1418-
// important as an error is most likely transient and leaving around open
1419-
// file descriptors can lead to later problems.
1420-
func closeWithRetry(file io.Closer) error {
1411+
// closeWithRetry does just what it says. Retrying a close operation is important as
1412+
// an error is most likely transient and leaving around open file descriptors can lead
1413+
// to later problems as the file may be in a sort-of uploaded state where it exists
1414+
// but has not yet been finalized (this is true for GCS). This can cause unexpected
1415+
// behavior if you retry the file while the original request is still in this state.
1416+
// Most implementations such as GCS will automatically retry operations, but close
1417+
// is one that may be left to the caller (this is true for GCS).
1418+
// We model this retry after the GCS retry implementation described here:
1419+
// https://cloud.google.com/storage/docs/retry-strategy#go
1420+
func closeWithRetry(ctx context.Context, file io.Closer) error {
1421+
backoff := 1 * time.Second
1422+
backoffLimit := backoff * 30
14211423
var err error
1422-
for range fileCloseRetries {
1424+
for {
14231425
if err = file.Close(); err == nil {
14241426
return nil
14251427
}
1426-
time.Sleep(fileCloseRetryDelay)
1428+
select {
1429+
case <-ctx.Done():
1430+
return ctx.Err()
1431+
case <-time.After(backoff):
1432+
// Exponential backoff with 2 as a factor.
1433+
if backoff != backoffLimit {
1434+
updatedBackoff := time.Duration(float64(backoff) * 2)
1435+
backoff = min(updatedBackoff, backoffLimit)
1436+
}
1437+
}
14271438
}
1428-
return err
14291439
}

0 commit comments

Comments
 (0)