Skip to content

Commit aeed2b6

Browse files
committed
Retry close and delete backup dest file when it fails
Signed-off-by: Matt Lord <[email protected]>
1 parent 943dd00 commit aeed2b6

File tree

2 files changed

+46
-19
lines changed

2 files changed

+46
-19
lines changed

go/vt/mysqlctl/builtinbackupengine.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,17 @@ import (
5454

5555
mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl"
5656
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
57-
"vitess.io/vitess/go/vt/proto/vtrpc"
57+
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
5858
)
5959

6060
const (
6161
builtinBackupEngineName = "builtin"
6262
AutoIncrementalFromPos = "auto"
6363
dataDictionaryFile = "mysql.ibd"
6464

65-
maxRetriesPerFile = 1
65+
maxRetriesPerFile = 1
66+
fileCloseRetries = 3
67+
fileCloseRetryDelay = 500 * time.Millisecond
6668
)
6769

6870
var (
@@ -189,7 +191,7 @@ func (fe *FileEntry) fullPath(cnf *Mycnf) (string, error) {
189191
case backupBinlogDir:
190192
root = filepath.Dir(cnf.BinLogPath)
191193
default:
192-
return "", vterrors.Errorf(vtrpc.Code_UNKNOWN, "unknown base: %v", fe.Base)
194+
return "", vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "unknown base: %v", fe.Base)
193195
}
194196

195197
return path.Join(fe.ParentPath, root, fe.Name), nil
@@ -312,7 +314,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
312314
}
313315
purgedGTIDSet, ok := gtidPurged.GTIDSet.(replication.Mysql56GTIDSet)
314316
if !ok {
315-
return gtidPurged, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "failed to parse a valid MySQL GTID set from value: %v", gtidPurged)
317+
return gtidPurged, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse a valid MySQL GTID set from value: %v", gtidPurged)
316318
}
317319
return gtidPurged, purgedGTIDSet, nil
318320
}
@@ -375,7 +377,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
375377
return BackupUnusable, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup)
376378
}
377379
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
378-
return BackupUnusable, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
380+
return BackupUnusable, vterrors.Errorf(vtrpcpb.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
379381
}
380382
log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp)
381383
incrDetails := &IncrementalBackupDetails{
@@ -698,7 +700,7 @@ func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []File
698700
select {
699701
case <-ctxCancel.Done():
700702
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
701-
bh.RecordError(name, vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
703+
bh.RecordError(name, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context canceled"))
702704
return nil
703705
default:
704706
}
@@ -854,7 +856,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
854856

855857
defer func() {
856858
closeSourceAt := time.Now()
857-
source.Close()
859+
if err := closeWithRetry(source); err != nil {
860+
params.Logger.Infof("Failed to close %s source file during backup: %v", fe.Name, err)
861+
}
858862
params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
859863
}()
860864

@@ -881,9 +885,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
881885

882886
defer func(name, fileName string) {
883887
closeDestAt := time.Now()
884-
if rerr := dest.Close(); rerr != nil {
885-
rerr = vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name)
888+
if rerr := closeWithRetry(dest); rerr != nil {
889+
rerr = vterrors.Wrapf(rerr, "failed to close destination file (%v) %v", name, fe.Name)
886890
params.Logger.Error(rerr)
891+
// Let's try to delete the file as we will retry.
892+
if cerr := os.Remove(source.Name()); cerr != nil {
893+
finalErr = errors.Join(finalErr, cerr)
894+
}
887895
finalErr = errors.Join(finalErr, rerr)
888896
}
889897
params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
@@ -932,7 +940,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
932940
// Close gzip to flush it, after that all data is sent to writer.
933941
closeCompressorAt := time.Now()
934942
params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr)
935-
if cerr := closer.Close(); err != nil {
943+
if cerr := closeWithRetry(closer); err != nil {
936944
cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name)
937945
params.Logger.Error(cerr)
938946
createAndCopyErr = errors.Join(createAndCopyErr, cerr)
@@ -996,7 +1004,7 @@ func (be *BuiltinBackupEngine) backupManifest(
9961004
return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr)
9971005
}
9981006
defer func() {
999-
if err := wc.Close(); err != nil {
1007+
if err := closeWithRetry(wc); err != nil {
10001008
addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName))
10011009
}
10021010
}()
@@ -1076,7 +1084,7 @@ func (be *BuiltinBackupEngine) executeRestoreIncrementalBackup(ctx context.Conte
10761084
defer os.RemoveAll(createdDir)
10771085
mysqld, ok := params.Mysqld.(*Mysqld)
10781086
if !ok {
1079-
return vterrors.Errorf(vtrpc.Code_UNIMPLEMENTED, "expected: Mysqld")
1087+
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "expected: Mysqld")
10801088
}
10811089
for _, fe := range bm.FileEntries {
10821090
fe.ParentPath = createdDir
@@ -1216,7 +1224,7 @@ func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []Fil
12161224
select {
12171225
case <-ctx.Done():
12181226
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
1219-
bh.RecordError(name, vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
1227+
bh.RecordError(name, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context canceled"))
12201228
return nil
12211229
default:
12221230
}
@@ -1258,7 +1266,9 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
12581266

12591267
defer func() {
12601268
closeSourceAt := time.Now()
1261-
source.Close()
1269+
if err := closeWithRetry(source); err != nil {
1270+
params.Logger.Errorf("Failed to close source file %s during restore: %v", name, err)
1271+
}
12621272
params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
12631273
}()
12641274

@@ -1282,10 +1292,11 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
12821292
params.Stats.Scope(stats.Operation("Destination:Open")).TimedIncrement(time.Since(openDestAt))
12831293

12841294
defer func() {
1285-
closeDestAt := time.Now()
1286-
if cerr := dest.Close(); cerr != nil {
1295+
if cerr := closeWithRetry(dest); cerr != nil {
12871296
finalErr = errors.Join(finalErr, vterrors.Wrap(cerr, "failed to close destination file"))
1297+
params.Logger.Errorf("Failed to close destination file %s during restore: %v", dest.Name(), err)
12881298
}
1299+
closeDestAt := time.Now()
12891300
params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
12901301
}()
12911302

@@ -1331,7 +1342,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
13311342
defer func() {
13321343
closeDecompressorAt := time.Now()
13331344
params.Logger.Infof("closing decompressor")
1334-
if cerr := closer.Close(); err != nil {
1345+
if cerr := closeWithRetry(closer); err != nil {
13351346
cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name)
13361347
params.Logger.Error(cerr)
13371348
finalErr = errors.Join(finalErr, cerr)
@@ -1348,7 +1359,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
13481359
// Check the hash.
13491360
hash := br.HashString()
13501361
if hash != fe.Hash {
1351-
return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash)
1362+
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash)
13521363
}
13531364

13541365
// Flush the buffer.
@@ -1402,3 +1413,17 @@ func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, t
14021413
func init() {
14031414
BackupRestoreEngineMap[builtinBackupEngineName] = &BuiltinBackupEngine{}
14041415
}
1416+
1417+
// closeWithRetry does just what it says. Retrying a close operation is
1418+
// important as an error is most likely transient and leaving around open
1419+
// file descriptors can lead to later problems.
1420+
func closeWithRetry(file io.Closer) error {
1421+
var err error
1422+
for range fileCloseRetries {
1423+
if err = file.Close(); err == nil {
1424+
return nil
1425+
}
1426+
time.Sleep(fileCloseRetryDelay)
1427+
}
1428+
return err
1429+
}

go/vt/mysqlctl/gcsbackupstorage/gcs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ func (bs *GCSBackupStorage) Close() error {
233233
// so we know to create a new client the next time one
234234
// is needed.
235235
client := bs._client
236-
bs._client = nil
236+
defer func() {
237+
bs._client = nil
238+
}()
237239
if err := client.Close(); err != nil {
238240
return err
239241
}

0 commit comments

Comments
 (0)