Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the number of rest calls while doing Rename Operation #1614

Open
wants to merge 5 commits into
base: blobfuse/2.4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions component/attr_cache/attr_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,24 @@ func (ac *AttrCache) DeleteFile(options internal.DeleteFileOptions) error {
// RenameFile : Mark the source file deleted. Invalidate the destination file.
func (ac *AttrCache) RenameFile(options internal.RenameFileOptions) error {
log.Trace("AttrCache::RenameFile : %s -> %s", options.Src, options.Dst)

srcAttr := options.SrcAttr
err := ac.NextComponent().RenameFile(options)
if err == nil {
// Copy source attribute to destination.
// LMT of Source will be modified by next component if the copy is success.
ac.cacheLock.RLock()
defer ac.cacheLock.RUnlock()

dstCacheEntry, found := ac.cacheMap[options.Dst]
if found {
// Copy the Src Attr to Dst
dstCacheEntry.attr = srcAttr
dstCacheEntry.attr.Path = options.Dst
// Dst blob may not exist before
dstCacheEntry.attrFlag.Set(AttrFlagExists)
// Refresh the cache
dstCacheEntry.cachedAt = time.Now()
}
ac.deletePath(options.Src, time.Now())
ac.invalidatePath(options.Dst)
}

return err
Expand Down
17 changes: 15 additions & 2 deletions component/attr_cache/attr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,28 @@ func (suite *attrCacheTestSuite) TestRenameFile() {
suite.assert.NotContains(suite.attrCache.cacheMap, src)
suite.assert.NotContains(suite.attrCache.cacheMap, dst)

// Entry Already Exists
// Src, Dst Entry Already Exists
addPathToCache(suite.assert, suite.attrCache, src, false)
addPathToCache(suite.assert, suite.attrCache, dst, false)
options.SrcAttr = suite.attrCache.cacheMap[src].attr
options.DstAttr = suite.attrCache.cacheMap[dst].attr
suite.mock.EXPECT().RenameFile(options).Return(nil)
err = suite.attrCache.RenameFile(options)
suite.assert.Nil(err)
assertDeleted(suite, src)
assertUntouched(suite, dst)

// Src Entry Exist and Dst Entry Don't Exist
addPathToCache(suite.assert, suite.attrCache, src, false)
// Add negative entry to cache for Dst
suite.attrCache.cacheMap[dst] = newAttrCacheItem(&internal.ObjAttr{}, false, time.Now())
options.SrcAttr = suite.attrCache.cacheMap[src].attr
options.DstAttr = suite.attrCache.cacheMap[dst].attr
suite.mock.EXPECT().RenameFile(options).Return(nil)
err = suite.attrCache.RenameFile(options)
suite.assert.Nil(err)
assertDeleted(suite, src)
assertInvalid(suite, dst)
assertUntouched(suite, dst)
}

// Tests Write File
Expand Down
2 changes: 1 addition & 1 deletion component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (az *AzStorage) DeleteFile(options internal.DeleteFileOptions) error {
func (az *AzStorage) RenameFile(options internal.RenameFileOptions) error {
log.Trace("AzStorage::RenameFile : %s to %s", options.Src, options.Dst)

err := az.storage.RenameFile(options.Src, options.Dst)
err := az.storage.RenameFile(options.Src, options.Dst, options.SrcAttr)

if err == nil {
azStatsCollector.PushEvents(renameFile, options.Src, map[string]interface{}{src: options.Src, dest: options.Dst})
Expand Down
17 changes: 14 additions & 3 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ func (bb *BlockBlob) DeleteDirectory(name string) (err error) {

// RenameFile : Rename the file
// Source file must exist in storage account before calling this method.
func (bb *BlockBlob) RenameFile(source string, target string) error {
// When the rename is success, Data, metadata, of the blob will be copied to the destination.
// Creation time and LMT is not preserved for copyBlob API.
// Copy the LMT to the src attr if the copy is success.
// https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob?tabs=microsoft-entra-id
func (bb *BlockBlob) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
log.Trace("BlockBlob::RenameFile : %s -> %s", source, target)

blobClient := bb.Container.NewBlockBlobClient(filepath.Join(bb.Config.prefixPath, source))
Expand All @@ -340,6 +344,8 @@ func (bb *BlockBlob) RenameFile(source string, target string) error {
return err
}

var dstLMT *time.Time = startCopy.LastModified

copyStatus := startCopy.CopyStatus
for copyStatus != nil && *copyStatus == blob.CopyStatusTypePending {
time.Sleep(time.Second * 1)
Expand All @@ -349,9 +355,14 @@ func (bb *BlockBlob) RenameFile(source string, target string) error {
if err != nil {
log.Err("BlockBlob::RenameFile : CopyStats : Failed to get blob properties for %s [%s]", source, err.Error())
}
dstLMT = prop.LastModified
syeleti-msft marked this conversation as resolved.
Show resolved Hide resolved
copyStatus = prop.CopyStatus
}

if copyStatus != nil && *copyStatus == blob.CopyStatusTypeSuccess {
modifyLMT(srcAttr, dstLMT)
}

log.Trace("BlockBlob::RenameFile : %s -> %s done", source, target)

// Copy of the file is done so now delete the older file
Expand Down Expand Up @@ -393,7 +404,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error {
for _, blobInfo := range listBlobResp.Segment.BlobItems {
srcDirPresent = true
srcPath := split(bb.Config.prefixPath, *blobInfo.Name)
err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1))
err = bb.RenameFile(srcPath, strings.Replace(srcPath, source, target, 1), nil)
if err != nil {
log.Err("BlockBlob::RenameDirectory : Failed to rename file %s [%s]", srcPath, err.Error)
}
Expand All @@ -419,7 +430,7 @@ func (bb *BlockBlob) RenameDirectory(source string, target string) error {
}
}

return bb.RenameFile(source, target)
return bb.RenameFile(source, target, nil)
}

func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err error) {
Expand Down
2 changes: 1 addition & 1 deletion component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type AzConnection interface {
DeleteFile(name string) error
DeleteDirectory(name string) error

RenameFile(string, string) error
RenameFile(string, string, *internal.ObjAttr) error
RenameDirectory(string, string) error

GetAttr(name string) (attr *internal.ObjAttr, err error)
Expand Down
7 changes: 4 additions & 3 deletions component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ func (dl *Datalake) DeleteDirectory(name string) (err error) {
}

// RenameFile : Rename the file
func (dl *Datalake) RenameFile(source string, target string) error {
// While renaming the file, Creation time is preserved but LMT is changed for the destination blob.
func (dl *Datalake) RenameFile(source string, target string, srcAttr *internal.ObjAttr) error {
log.Trace("Datalake::RenameFile : %s -> %s", source, target)

fileClient := dl.Filesystem.NewFileClient(url.PathEscape(filepath.Join(dl.Config.prefixPath, source)))

_, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{
renameResponse, err := fileClient.Rename(context.Background(), filepath.Join(dl.Config.prefixPath, target), &file.RenameOptions{
CPKInfo: dl.datalakeCPKOpt,
})
if err != nil {
Expand All @@ -336,7 +337,7 @@ func (dl *Datalake) RenameFile(source string, target string) error {
return err
}
}

modifyLMT(srcAttr, renameResponse.LastModified)
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ func removeLeadingSlashes(s string) string {
return s
}

func modifyLMT(attr *internal.ObjAttr, lmt *time.Time) {
if attr != nil {
attr.Atime = *lmt
attr.Mtime = *lmt
attr.Ctime = *lmt
}
}

// func parseBlobTags(tags *container.BlobTags) map[string]string {

// if tags == nil {
Expand Down
12 changes: 10 additions & 2 deletions component/libfuse/libfuse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,10 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int {
}
}

err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{Src: srcPath, Dst: dstPath})
err := fuseFS.NextComponent().RenameDir(internal.RenameDirOptions{
Src: srcPath,
Dst: dstPath,
})
if err != nil {
log.Err("Libfuse::libfuse_rename : error renaming directory %s -> %s [%s]", srcPath, dstPath, err.Error())
return -C.EIO
Expand All @@ -989,7 +992,12 @@ func libfuse_rename(src *C.char, dst *C.char, flags C.uint) C.int {
libfuseStatsCollector.UpdateStats(stats_manager.Increment, renameDir, (int64)(1))

} else {
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{Src: srcPath, Dst: dstPath})
err := fuseFS.NextComponent().RenameFile(internal.RenameFileOptions{
Src: srcPath,
Dst: dstPath,
SrcAttr: srcAttr,
DstAttr: dstAttr,
})
if err != nil {
log.Err("Libfuse::libfuse_rename : error renaming file %s -> %s [%s]", srcPath, dstPath, err.Error())
return -C.EIO
Expand Down
6 changes: 4 additions & 2 deletions internal/component_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ type CloseFileOptions struct {
}

type RenameFileOptions struct {
Src string
Dst string
Src string
Dst string
SrcAttr *ObjAttr
DstAttr *ObjAttr
}

type ReadFileOptions struct {
Expand Down
Loading