From 9ad96ec24d33d730fbc8faea264f2e4b4082e328 Mon Sep 17 00:00:00 2001 From: nyne Date: Tue, 13 May 2025 09:41:19 +0800 Subject: [PATCH] Uploading file to storage asynchronously. --- server/dao/file.go | 41 ++++++++++++++++++++++++++++++++--------- server/service/file.go | 39 +++++++++++++++++++++++++++------------ 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/server/dao/file.go b/server/dao/file.go index c88a1b9..7dd1ebe 100644 --- a/server/dao/file.go +++ b/server/dao/file.go @@ -3,6 +3,7 @@ package dao import ( "errors" "gorm.io/gorm" + "gorm.io/gorm/clause" "nysoure/server/model" "time" ) @@ -35,15 +36,22 @@ func GetUploadingFile(id uint) (*model.UploadingFile, error) { } func UpdateUploadingBlock(id uint, blockIndex int) error { - uf := &model.UploadingFile{} - if err := db.Where("id = ?", id).First(uf).Error; err != nil { - return err - } - if blockIndex < 0 || blockIndex >= uf.BlocksCount() { - return nil - } - uf.Blocks[blockIndex] = true - return db.Save(uf).Error + return db.Transaction(func(tx *gorm.DB) error { + // 使用 FOR UPDATE 锁获取记录 + uf := &model.UploadingFile{} + if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(uf).Error; err != nil { + return err + } + + if blockIndex < 0 || blockIndex >= uf.BlocksCount() { + return nil + } + + uf.Blocks[blockIndex] = true + + // 在事务中立即保存更改 + return tx.Save(uf).Error + }) } func DeleteUploadingFile(id uint) error { @@ -135,3 +143,18 @@ func UpdateFile(id uint, filename string, description string) (*model.File, erro } return f, nil } + +func SetFileStorageKey(id uint, storageKey string) error { + f := &model.File{} + if err := db.Where("id = ?", id).First(f).Error; err != nil { + return err + } + f.StorageKey = storageKey + if err := db.Save(f).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return model.NewNotFoundError("file not found") + } + return err + } + return nil +} diff --git a/server/service/file.go b/server/service/file.go index 0307eb5..9aad720 100644 --- a/server/service/file.go +++ b/server/service/file.go @@ -179,21 +179,19 @@ func FinishUploadingFile(uid uint, fid uint) (*model.FileView, error) { return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } - defer func() { - _ = os.Remove(resultFilePath) - }() - for i := 0; i < uploadingFile.BlocksCount(); i++ { blockPath := filepath.Join(uploadingFile.TempPath, strconv.Itoa(i)) data, err := os.ReadFile(blockPath) if err != nil { log.Error("failed to read block file: ", err) _ = file.Close() + _ = os.Remove(resultFilePath) return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } if _, err := file.Write(data); err != nil { log.Error("failed to write result file: ", err) _ = file.Close() + _ = os.Remove(resultFilePath) return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } } @@ -205,28 +203,41 @@ func FinishUploadingFile(uid uint, fid uint) (*model.FileView, error) { s, err := dao.GetStorage(uploadingFile.TargetStorageID) if err != nil { log.Error("failed to get storage: ", err) + _ = os.Remove(resultFilePath) return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } iStorage := storage.NewStorage(s) if iStorage == nil { log.Error("failed to find storage: ", err) + _ = os.Remove(resultFilePath) return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } - storageKey, err := iStorage.Upload(resultFilePath) - if err != nil { - log.Error("failed to upload file: ", err) - return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") - } - - dbFile, err := dao.CreateFile(uploadingFile.Filename, uploadingFile.Description, uploadingFile.TargetResourceID, &uploadingFile.TargetStorageID, storageKey, "") + dbFile, err := dao.CreateFile(uploadingFile.Filename, uploadingFile.Description, uploadingFile.TargetResourceID, &uploadingFile.TargetStorageID, "", "") if err != nil { log.Error("failed to create file in db: ", err) - _ = iStorage.Delete(storageKey) + _ = os.Remove(resultFilePath) return nil, model.NewInternalServerError("failed to finish uploading file. please re-upload") } + go func() { + defer func() { + _ = os.Remove(resultFilePath) + }() + storageKey, err := iStorage.Upload(resultFilePath) + if err != nil { + log.Error("failed to upload file to storage: ", err) + } else { + err = dao.SetFileStorageKey(dbFile.ID, storageKey) + if err != nil { + _ = iStorage.Delete(storageKey) + _ = dao.DeleteFile(dbFile.ID) + log.Error("failed to set file storage key: ", err) + } + } + }() + return dbFile.ToView(), nil } @@ -367,6 +378,10 @@ func DownloadFile(fid uint) (string, string, error) { return "", "", model.NewInternalServerError("failed to find storage") } + if file.StorageKey == "" { + return "", "", model.NewRequestError("file is not available, please try again later") + } + path, err := iStorage.Download(file.StorageKey) return path, file.Filename, err