From 009d6b339a6d419fa3253255def8c21912cf81cb Mon Sep 17 00:00:00 2001 From: Srinivas Yeleti Date: Mon, 20 Jan 2025 09:47:29 +0000 Subject: [PATCH 1/3] Serve getAttr call for destination file after the Copy finishes from the cache --- component/attr_cache/attr_cache.go | 19 ++++++++++++++++--- component/azstorage/azstorage.go | 2 +- component/azstorage/block_blob.go | 16 +++++++++++++--- component/azstorage/connection.go | 2 +- component/azstorage/datalake.go | 7 ++++--- component/azstorage/utils.go | 8 ++++++++ component/libfuse/libfuse_handler.go | 12 ++++++++++-- internal/component_options.go | 6 ++++-- 8 files changed, 57 insertions(+), 15 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index ea8f347d0..57019eb08 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -360,14 +360,27 @@ func (ac *AttrCache) DeleteFile(options internal.DeleteFileOptions) error { // RenameFile : Mark the source file deleted. Invalidate the destination file. func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error { log.Trace("AttrCache::RenameFile : %s -> %s", options.Src, options.Dst) - + srcAttr := options.SrcAttr + dstAttr := options.DstAttr err := ac.NextComponent().RenameFile(options) if err == nil { + // Copy source attribute to destination. + // LMT of destination will be modified by next component if the copy is success. ac.cacheLock.RLock() defer ac.cacheLock.RUnlock() - + dstCacheEntry, found := ac.cacheMap[options.Dst] + if found && dstCacheEntry.valid() { + dstAttr.Size = srcAttr.Size + dstAttr.Mode = srcAttr.Mode + dstAttr.Flags = srcAttr.Flags + dstAttr.MD5 = srcAttr.MD5 + dstAttr.Metadata = srcAttr.Metadata + // Dst blob may not exist before + dstCacheEntry.attrFlag.Set(AttrFlagExists) + } else { + ac.invalidatePath(options.Dst) + } ac.deletePath(options.Src, time.Now()) - ac.invalidatePath(options.Dst) } return err diff --git a/component/azstorage/azstorage.go b/component/azstorage/azstorage.go index 2c267c5f5..c741d575d 100644 --- a/component/azstorage/azstorage.go +++ b/component/azstorage/azstorage.go @@ -423,7 +423,7 @@ func (az *AzStorage) DeleteFile(options internal.DeleteFileOptions) error { func (az *AzStorage) RenameFile(options internal.RenameFileOptions) error { log.Trace("AzStorage::RenameFile : %s to %s", options.Src, options.Dst) - err := az.storage.RenameFile(options.Src, options.Dst) + err := az.storage.RenameFile(options.Src, options.Dst, options.DstAttr) if err == nil { azStatsCollector.PushEvents(renameFile, options.Src, map[string]interface{}{src: options.Src, dest: options.Dst}) diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index 5e5563e98..130a3704d 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -316,7 +316,10 @@ func (bb *BlockBlob) DeleteDirectory(name string) (err error) { // RenameFile : Rename the file // Source file must exist in storage account before calling this method. -func (bb *BlockBlob) RenameFile(source string, target string) error { +// When the rename is success, Data, metadata, of the blob will be copied to the destination. +// Creation time and LMT is not preserved for copyBlob API. +// https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id +func (bb *BlockBlob) RenameFile(source string, target string, dstAttr *internal.ObjAttr) error { log.Trace("BlockBlob::RenameFile : %s -> %s", source, target) blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source)) @@ -340,6 +343,8 @@ func (bb *BlockBlob) RenameFile(source string, target string) error { return err } + var dstLMT *time.Time = startCopy.LastModified + copyStatus := startCopy.CopyStatus for copyStatus != nil && *copyStatus == blob.CopyStatusTypePending { time.Sleep(time.Second * 1) @@ -349,9 +354,14 @@ func (bb *BlockBlob) RenameFile(source string, target string) error { if err != nil { log.Err("BlockBlob::RenameFile : CopyStats : Failed to get blob properties for %s [%s]", source, err.Error()) } + dstLMT = prop.LastModified copyStatus = prop.CopyStatus } + if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess { + modifyLMT(dstAttr, dstLMT) + } + log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target) // Copy of the file is done so now delete the older file @@ -393,7 +403,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error { for _, blobInfo := range listBlobResp.Segment.BlobItems { srcDirPresent = true srcPath := split(bb.Config.prefixPath, *blobInfo.Name) - err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1)) + err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1), nil) if err != nil { log.Err("BlockBlob::RenameDirectory : Failed to rename file %s [%s]", srcPath, err.Error) } @@ -419,7 +429,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error { } } - return bb.RenameFile(source, target) + return bb.RenameFile(source, target, nil) } func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err error) { diff --git a/component/azstorage/connection.go b/component/azstorage/connection.go index cdc177896..792c30234 100644 --- a/component/azstorage/connection.go +++ b/component/azstorage/connection.go @@ -110,7 +110,7 @@ type AzConnection interface { DeleteFile(name string) error DeleteDirectory(name string) error - RenameFile(string, string) error + RenameFile(string, string, *internal.ObjAttr) error RenameDirectory(string, string) error GetAttr(name string) (attr *internal.ObjAttr, err error) diff --git a/component/azstorage/datalake.go b/component/azstorage/datalake.go index 8a37c4ee5..efc8580ae 100644 --- a/component/azstorage/datalake.go +++ b/component/azstorage/datalake.go @@ -318,12 +318,13 @@ func (dl *Datalake) DeleteDirectory(name string) (err error) { } // RenameFile : Rename the file -func (dl *Datalake) RenameFile(source string, target string) error { +// While renaming the file, Creation time is preserved but LMT is changed for the destination blob. +func (dl *Datalake) RenameFile(source string, target string, dstAttr *internal.ObjAttr) error { log.Trace("Datalake::RenameFile : %s -> %s", source, target) fileClient := dl.Filesystem.NewFileClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source))) - _, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{ + renameResponse, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{ CPKInfo: dl.datalakeCPKOpt, }) if err != nil { @@ -336,7 +337,7 @@ func (dl *Datalake) RenameFile(source string, target string) error { return err } } - + modifyLMT(dstAttr, renameResponse.LastModified) return nil } diff --git a/component/azstorage/utils.go b/component/azstorage/utils.go index cef06c06f..dc2dd4d0c 100644 --- a/component/azstorage/utils.go +++ b/component/azstorage/utils.go @@ -597,6 +597,14 @@ func removeLeadingSlashes(s string) string { return s } +func modifyLMT(attr *internal.ObjAttr, lmt *time.Time) { + if attr != nil { + attr.Atime = *lmt + attr.Mtime = *lmt + attr.Ctime = *lmt + } +} + // func parseBlobTags(tags *container.BlobTags) map[string]string { // if tags == nil { diff --git a/component/libfuse/libfuse_handler.go b/component/libfuse/libfuse_handler.go index 23f8d70ac..be1d38532 100644 --- a/component/libfuse/libfuse_handler.go +++ b/component/libfuse/libfuse_handler.go @@ -979,7 +979,10 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int { } } - err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{Src: srcPath, Dst: dstPath}) + err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{ + Src: srcPath, + Dst: dstPath, + }) if err != nil { log.Err("Libfuse::libfuse_rename : error renaming directory %s -> %s [%s]", srcPath, dstPath, err.Error()) return -C.EIO @@ -989,7 +992,12 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int { libfuseStatsCollector.UpdateStats(stats_manager.Increment, renameDir, (int64)(1)) } else { - err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{Src: srcPath, Dst: dstPath}) + err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{ + Src: srcPath, + Dst: dstPath, + SrcAttr: srcAttr, + DstAttr: dstAttr, + }) if err != nil { log.Err("Libfuse::libfuse_rename : error renaming file %s -> %s [%s]", srcPath, dstPath, err.Error()) return -C.EIO diff --git a/internal/component_options.go b/internal/component_options.go index dc53ff2ba..67e510cc2 100644 --- a/internal/component_options.go +++ b/internal/component_options.go @@ -96,8 +96,10 @@ type CloseFileOptions struct { } type RenameFileOptions struct { - Src string - Dst string + Src string + Dst string + SrcAttr *ObjAttr + DstAttr *ObjAttr } type ReadFileOptions struct { From 793a12378104666d215fc0a3e239bef4f0f59133 Mon Sep 17 00:00:00 2001 From: Srinivas Yeleti Date: Mon, 20 Jan 2025 16:44:25 +0000 Subject: [PATCH 2/3] Add Tests --- component/attr_cache/attr_cache.go | 12 ++++++------ component/attr_cache/attr_cache_test.go | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 57019eb08..42adff950 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -361,7 +361,6 @@ func (ac *AttrCache) DeleteFile(options internal.DeleteFileOptions) error { func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error { log.Trace("AttrCache::RenameFile : %s -> %s", options.Src, options.Dst) srcAttr := options.SrcAttr - dstAttr := options.DstAttr err := ac.NextComponent().RenameFile(options) if err == nil { // Copy source attribute to destination. @@ -370,11 +369,12 @@ func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error { defer ac.cacheLock.RUnlock() dstCacheEntry, found := ac.cacheMap[options.Dst] if found && dstCacheEntry.valid() { - dstAttr.Size = srcAttr.Size - dstAttr.Mode = srcAttr.Mode - dstAttr.Flags = srcAttr.Flags - dstAttr.MD5 = srcAttr.MD5 - dstAttr.Metadata = srcAttr.Metadata + dstCacheEntry.attr.Size = srcAttr.Size + dstCacheEntry.attr.Path = options.Dst + dstCacheEntry.attr.Mode = srcAttr.Mode + dstCacheEntry.attr.Flags = srcAttr.Flags + dstCacheEntry.attr.MD5 = srcAttr.MD5 + dstCacheEntry.attr.Metadata = srcAttr.Metadata // Dst blob may not exist before dstCacheEntry.attrFlag.Set(AttrFlagExists) } else { diff --git a/component/attr_cache/attr_cache_test.go b/component/attr_cache/attr_cache_test.go index 6dbf0cbec..4f4ff48db 100644 --- a/component/attr_cache/attr_cache_test.go +++ b/component/attr_cache/attr_cache_test.go @@ -676,15 +676,28 @@ func (suite *attrCacheTestSuite) TestRenameFile() { suite.assert.NotContains(suite.attrCache.cacheMap, src) suite.assert.NotContains(suite.attrCache.cacheMap, dst) - // Entry Already Exists + // Src, Dst Entry Already Exists addPathToCache(suite.assert, suite.attrCache, src, false) addPathToCache(suite.assert, suite.attrCache, dst, false) + options.SrcAttr = suite.attrCache.cacheMap[src].attr + options.DstAttr = suite.attrCache.cacheMap[dst].attr suite.mock.EXPECT().RenameFile(options).Return(nil) + err = suite.attrCache.RenameFile(options) + suite.assert.Nil(err) + assertDeleted(suite, src) + assertUntouched(suite, dst) + // Src Entry Exist and Dst Entry Don't Exist + addPathToCache(suite.assert, suite.attrCache, src, false) + // Add negative entry to cache for Dst + suite.attrCache.cacheMap[dst] = newAttrCacheItem(&internal.ObjAttr{}, false, time.Now()) + options.SrcAttr = suite.attrCache.cacheMap[src].attr + options.DstAttr = suite.attrCache.cacheMap[dst].attr + suite.mock.EXPECT().RenameFile(options).Return(nil) err = suite.attrCache.RenameFile(options) suite.assert.Nil(err) assertDeleted(suite, src) - assertInvalid(suite, dst) + assertUntouched(suite, dst) } // Tests Write File From 91bade8ead7b7203e8cd3681d832247b1bcac05a Mon Sep 17 00:00:00 2001 From: Srinivas Yeleti Date: Tue, 21 Jan 2025 08:36:12 +0000 Subject: [PATCH 3/3] Refactor the code and refresh the cache after copying the attributes --- component/attr_cache/attr_cache.go | 15 ++++++--------- component/azstorage/azstorage.go | 2 +- component/azstorage/block_blob.go | 5 +++-- component/azstorage/datalake.go | 4 ++-- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 42adff950..380aba157 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -364,21 +364,18 @@ func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error { err := ac.NextComponent().RenameFile(options) if err == nil { // Copy source attribute to destination. - // LMT of destination will be modified by next component if the copy is success. + // LMT of Source will be modified by next component if the copy is success. ac.cacheLock.RLock() defer ac.cacheLock.RUnlock() dstCacheEntry, found := ac.cacheMap[options.Dst] - if found && dstCacheEntry.valid() { - dstCacheEntry.attr.Size = srcAttr.Size + if found { + // Copy the Src Attr to Dst + dstCacheEntry.attr = srcAttr dstCacheEntry.attr.Path = options.Dst - dstCacheEntry.attr.Mode = srcAttr.Mode - dstCacheEntry.attr.Flags = srcAttr.Flags - dstCacheEntry.attr.MD5 = srcAttr.MD5 - dstCacheEntry.attr.Metadata = srcAttr.Metadata // Dst blob may not exist before dstCacheEntry.attrFlag.Set(AttrFlagExists) - } else { - ac.invalidatePath(options.Dst) + // Refresh the cache + dstCacheEntry.cachedAt = time.Now() } ac.deletePath(options.Src, time.Now()) } diff --git a/component/azstorage/azstorage.go b/component/azstorage/azstorage.go index c741d575d..c859819a6 100644 --- a/component/azstorage/azstorage.go +++ b/component/azstorage/azstorage.go @@ -423,7 +423,7 @@ func (az *AzStorage) DeleteFile(options internal.DeleteFileOptions) error { func (az *AzStorage) RenameFile(options internal.RenameFileOptions) error { log.Trace("AzStorage::RenameFile : %s to %s", options.Src, options.Dst) - err := az.storage.RenameFile(options.Src, options.Dst, options.DstAttr) + err := az.storage.RenameFile(options.Src, options.Dst, options.SrcAttr) if err == nil { azStatsCollector.PushEvents(renameFile, options.Src, map[string]interface{}{src: options.Src, dest: options.Dst}) diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index 130a3704d..14ad09abe 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -318,8 +318,9 @@ func (bb *BlockBlob) DeleteDirectory(name string) (err error) { // Source file must exist in storage account before calling this method. // When the rename is success, Data, metadata, of the blob will be copied to the destination. // Creation time and LMT is not preserved for copyBlob API. +// Copy the LMT to the src attr if the copy is success. // https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id -func (bb *BlockBlob) RenameFile(source string, target string, dstAttr *internal.ObjAttr) error { +func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error { log.Trace("BlockBlob::RenameFile : %s -> %s", source, target) blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source)) @@ -359,7 +360,7 @@ func (bb *BlockBlob) RenameFile(source string, target string, dstAttr *internal. } if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess { - modifyLMT(dstAttr, dstLMT) + modifyLMT(srcAttr, dstLMT) } log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target) diff --git a/component/azstorage/datalake.go b/component/azstorage/datalake.go index efc8580ae..c7044a8cb 100644 --- a/component/azstorage/datalake.go +++ b/component/azstorage/datalake.go @@ -319,7 +319,7 @@ func (dl *Datalake) DeleteDirectory(name string) (err error) { // RenameFile : Rename the file // While renaming the file, Creation time is preserved but LMT is changed for the destination blob. -func (dl *Datalake) RenameFile(source string, target string, dstAttr *internal.ObjAttr) error { +func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error { log.Trace("Datalake::RenameFile : %s -> %s", source, target) fileClient := dl.Filesystem.NewFileClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source))) @@ -337,7 +337,7 @@ func (dl *Datalake) RenameFile(source string, target string, dstAttr *internal.O return err } } - modifyLMT(dstAttr, renameResponse.LastModified) + modifyLMT(srcAttr, renameResponse.LastModified) return nil }