@@ -18,6 +18,7 @@ import (
1818 "strconv"
1919 "strings"
2020 "sync"
21+ "syscall"
2122 "time"
2223
2324 "github.com/go-playground/validator/v10"
@@ -104,9 +105,36 @@ func (job *KeyStat) useLocalPlayLoadFile() (payload string, err error) {
104105 return payload , nil
105106}
106107
108+ // tryLockFile 尝试获取文件锁. 返回文件锁对象.
109+ // 如果获取失败,则尝试等待10秒后重试,最多重试360*8=2880次,即8小时.
110+ // 重试时,每60次重试打印一次日志.
111+
112+ func (job * KeyStat ) tryLockFile (workDir string , maxConcurrent int , retryTimes int ) (lock * os.File , err error ) {
113+ for i := 0 ; i < retryTimes ; i ++ {
114+ for j := 0 ; j < maxConcurrent ; j ++ {
115+ lockFile := filepath .Join (workDir , fmt .Sprintf ("keystat.lock.%d" , i ))
116+ if _ , err := os .Stat (lockFile ); os .IsNotExist (err ) {
117+ lock , err = os .Create (lockFile )
118+ if err != nil {
119+ return nil , err
120+ }
121+ err = syscall .Flock (int (lock .Fd ()), syscall .LOCK_EX | syscall .LOCK_NB )
122+ if err != nil {
123+ return nil , err
124+ }
125+ return lock , nil
126+ }
127+ }
128+ time .Sleep (1 * time .Second )
129+ if i % 60 == 1 {
130+ job .runtime .Logger .Info ("try lock file failed, try again, retryTimes:(%d of %d)" , i , retryTimes )
131+ }
132+ }
133+ return nil , fmt .Errorf ("try lock file failed, retryTimes:(%d of %d)" , retryTimes , retryTimes )
134+ }
135+
107136// Init 初始化
108137func (job * KeyStat ) Init (m * jobruntime.JobGenericRuntime ) error {
109-
110138 job .runtime = m
111139 var err error
112140 if s , err := job .useLocalPlayLoadFile (); err == nil {
@@ -153,16 +181,34 @@ func (job *KeyStat) Init(m *jobruntime.JobGenericRuntime) error {
153181 return nil
154182}
155183
184+ const maxRetryTimes = 3600 * 8
185+ const maxConcurrent = 4
186+
156187// Run 运行监听请求任务
157188func (job * KeyStat ) Run () (err error ) {
189+ // 1. 尝试获取文件锁
190+ keystatDir := filepath .Join (consts .GetRedisBackupDir (), "dbbak/keystat" )
191+ lock , err := job .tryLockFile (keystatDir , maxConcurrent , maxRetryTimes )
192+ if err != nil {
193+ job .runtime .Logger .Error ("tryLockFile failed, err:%s" , err )
194+ err = job .updateReportStatus (statusFailed , map [string ]any {
195+ "error" : err .Error (),
196+ })
197+ if err != nil {
198+ job .runtime .Logger .Error ("update report status to failed failed, err:%s" , err )
199+ }
200+ return err
201+ }
202+ defer lock .Close ()
203+
158204 err = job .updateReportStatus (statusRunning , nil )
159205 if err != nil {
160206 job .runtime .Logger .Error ("update report status failed, err:%s" , err )
161207 // return err
162208 }
163209 job .runtime .Logger .Info ("update report status success, status:%s" , statusRunning )
164210 // 1. 创建工作目录
165- workDir := filepath .Join (consts . GetRedisBackupDir (), "dbbak/keystat" , job .runtime .UID )
211+ workDir := filepath .Join (keystatDir , job .runtime .UID )
166212 util .MkDirsIfNotExists ([]string {workDir })
167213 util .LocalDirChownMysql (workDir )
168214 job .runtime .Logger .Info ("KeyStat Run, workDir:%s" , workDir )
@@ -173,6 +219,19 @@ func (job *KeyStat) Run() (err error) {
173219 return err
174220 }
175221
222+ // if >= 7.4 暂不支持
223+
224+ if versionInfo .Major >= 7 && versionInfo .Minor >= 4 {
225+ job .runtime .Logger .Error ("redis version >= 7.4, version:%s, will not support" , versionInfo .Str )
226+ err = job .updateReportStatus (statusFailed , map [string ]any {
227+ "error" : "redis version >= 7.4, will not support" ,
228+ })
229+ if err != nil {
230+ job .runtime .Logger .Error ("update report status to failed failed, err:%s" , err )
231+ }
232+ return err
233+ }
234+
176235 if versionInfo .Major >= 6 {
177236 job .runtime .Logger .Info ("redis version >= 6, version:%s, will check atime" , versionInfo .Str )
178237 job .atimeRequired = true
@@ -516,7 +575,11 @@ func (job *KeyStat) uploadReport(workDir string) (err error) {
516575 return errors .New ("rankReportFile is empty" )
517576 }
518577
519- reportRows , rankRows , err := keystat_report .LoadReport (reportFile , rankReportFile )
578+ reportRows , err := keystat_report .LoadReport (reportFile )
579+ if err != nil {
580+ return err
581+ }
582+ rankRows , err := keystat_report .LoadRankReport (rankReportFile )
520583 if err != nil {
521584 return err
522585 }
@@ -577,6 +640,7 @@ class StateType(str, StructuredEnum):
577640*/
578641
579642const statusReady = "READY"
643+ const statusInqueue = "INQUEUE"
580644const statusRunning = "RUNNING"
581645const statusSuccess = "FINISHED"
582646const statusFailed = "FAILED"
@@ -617,27 +681,38 @@ func (job *KeyStat) sendReportToDB(
617681 }
618682
619683 // 2. upload key report.
620- ret , err := cli .Do (http .MethodPost , KeyStatReportItemUrl , map [string ]any {
621- "keystat_report_item" : reportRows ,
622- "record_id" : job .params .RecordId ,
623- "truncate" : true ,
624- })
625- if err != nil {
626- return errors .New ("upload key report failed, err:" + err .Error ())
684+ // 这里改为分批上传,一次上传200条.
685+ for i := 0 ; i < len (reportRows ); i += 200 {
686+ batchReportRows := reportRows [i :int (math .Min (float64 (i + 200 ), float64 (len (reportRows ))))]
687+ _ , err := cli .Do (http .MethodPost , KeyStatReportItemUrl , map [string ]any {
688+ "keystat_report_item" : batchReportRows ,
689+ "record_id" : job .params .RecordId ,
690+ "truncate" : i == 0 , // 第一次上传时,清空记录.
691+ })
692+ if err != nil {
693+ return errors .New ("upload key report failed, err:" + err .Error ())
694+ }
695+ job .runtime .Logger .Info ("upload %d key report items batch %d success" , len (batchReportRows ), i / 200 + 1 )
627696 }
628- job .runtime .Logger .Info ("upload key report success, ret:%+v" , ret )
697+
698+ job .runtime .Logger .Info ("upload %d key report items success" , len (reportRows ))
629699
630700 // 3. upload rank report.
631- ret , err = cli .Do (http .MethodPost , KeyStatRankReportUrl , map [string ]any {
632- "keystat_rank_item" : rankRows ,
633- "record_id" : job .params .RecordId ,
634- "truncate" : true ,
635- })
636- if err != nil {
637- return errors .New ("upload rank report failed, err:" + err .Error ())
701+ // 这里改为分批上传,一次上传200条.
702+ for i := 0 ; i < len (rankRows ); i += 200 {
703+ batchRankRows := rankRows [i :int (math .Min (float64 (i + 200 ), float64 (len (rankRows ))))]
704+ _ , err := cli .Do (http .MethodPost , KeyStatRankReportUrl , map [string ]any {
705+ "keystat_rank_item" : batchRankRows ,
706+ "record_id" : job .params .RecordId ,
707+ "truncate" : i == 0 , // 第一次上传时,清空记录.
708+ })
709+ if err != nil {
710+ return errors .New ("upload rank report failed, err:" + err .Error ())
711+ }
712+ job .runtime .Logger .Info ("upload %d rank report items batch %d success" , len (batchRankRows ), i / 200 + 1 )
638713 }
639- job .runtime .Logger .Info ("upload rank report success, ret:%+v" , ret )
640714
715+ job .runtime .Logger .Info ("upload %d rank report success" , len (rankRows ))
641716 return nil
642717}
643718
0 commit comments