Skip to content

Commit

Permalink
SALTO-4604: Improve remote map performance when iterating over entries (
Browse files Browse the repository at this point in the history
#4722)

Prefer using the cache over deserializing as it tends to be faster
  • Loading branch information
ori-moisis authored Aug 13, 2023
1 parent 5e33dfa commit 153a3dd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 40 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/local-workspace/remote_map/counters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const createLocationCounters = (location: string): LocationCounters => {
...COUNTER_TYPES.map(counterType => [counterType, createCounter()]),
['dump', () => {
log.debug('Remote Map Stats for location \'%s\': %o',
location, Object.fromEntries(COUNTER_TYPES.map(counterType => [counterType, counters[counterType]])))
location, Object.fromEntries(COUNTER_TYPES.map(counterType => [counterType, counters[counterType].value()])))
}],
])
return counters
Expand Down
76 changes: 37 additions & 39 deletions packages/core/src/local-workspace/remote_map/remote_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,18 +514,18 @@ remoteMap.RemoteMapCreator => {
return i > 0
}

const getDataIterable = (opts: CreateIteratorOpts, tempOnly = false):
const getDataIterable = (opts: CreateIteratorOpts):
AsyncIterable<remoteMap.RemoteMapEntry<string>> => (
awu(aggregatedIterable(tempOnly || wasClearCalled
awu(aggregatedIterable(wasClearCalled
? [createTempIterator(opts)]
: [createTempIterator(opts), createPersistentIterator(opts)]))
.filter(entry => !delKeys.has(entry.key))
)

const getDataIterableWithPages = (opts: CreateIteratorOpts, tempOnly = false):
const getDataIterableWithPages = (opts: CreateIteratorOpts):
AsyncIterable<remoteMap.RemoteMapEntry<string>[]> => (
awu(aggregatedIterablesWithPages(
tempOnly || wasClearCalled
wasClearCalled
? [createTempIterator(opts)]
: [createTempIterator(opts), createPersistentIterator(opts)],
opts.pageSize,
Expand All @@ -544,39 +544,39 @@ remoteMap.RemoteMapCreator => {
})
await batchUpdate(batchInsertIterator, temp)
}
const valuesImpl = (tempOnly = false, iterationOpts?: remoteMap.IterationOpts):
AsyncIterable<T> => {
const deserializeEntryValue = async (
{ key, value }: remoteMap.RemoteMapEntry<string>
): Promise<remoteMap.RemoteMapEntry<T, K>> => {
const cacheKey = keyToTempDBKey(key)
const cacheValue = locationCache.get(cacheKey) as T | undefined
if (cacheValue !== undefined) {
statCounters.LocationCacheHit.inc()
return { key: key as K, value: cacheValue }
}
statCounters.LocationCacheMiss.inc()
const deserializedValue = await deserialize(value)
// We currently do not set the value back on the cache because in a large enough
// map this will be a waste of memory (subsequent iterations won't benefit)
return { key: key as K, value: deserializedValue }
}
const entriesImpl = (
iterationOpts?: remoteMap.IterationOpts,
): AsyncIterable<remoteMap.RemoteMapEntry<T, K>> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: true }
return awu(getDataIterable(opts, tempOnly))
.map(async entry => deserialize(entry.value))
return awu(getDataIterable(opts)).map(deserializeEntryValue)
}
const valuesPagesImpl = (tempOnly = false, iterationOpts?: remoteMap.IterationOpts):
AsyncIterable<T[]> => {
const entriesPagesImpl = (
iterationOpts?: remoteMap.IterationOpts,
): AsyncIterable<remoteMap.RemoteMapEntry<T, K>[]> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: true }
return awu(getDataIterableWithPages(opts, tempOnly))
.map(async entries => Promise.all(
entries.map(entry => deserialize(entry.value))
))
}
const entriesImpl = (iterationOpts?: remoteMap.IterationOpts):
AsyncIterable<remoteMap.RemoteMapEntry<T, K>> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: true }
return awu(getDataIterable(opts, false))
.map(
async entry => ({ key: entry.key as K, value: await deserialize(entry.value) })
)
}
const entriesPagesImpl = (iterationOpts?: remoteMap.IterationOpts):
AsyncIterable<remoteMap.RemoteMapEntry<T, K>[]> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: true }
return awu(getDataIterableWithPages(opts, false))
.map(entries => Promise.all(
entries
.map(
async entry => ({ key: entry.key as K, value: await deserialize(entry.value) })
)
))
return awu(getDataIterableWithPages(opts)).map(entries => Promise.all(entries.map(deserializeEntryValue)))
}
const valuesImpl = (iterationOpts?: remoteMap.IterationOpts): AsyncIterable<T> => (
awu(entriesImpl(iterationOpts)).map(entry => entry.value)
)
const valuesPagesImpl = (iterationOpts?: remoteMap.IterationOpts): AsyncIterable<T[]> => (
awu(entriesPagesImpl(iterationOpts)).map(entries => entries.map(entry => entry.value))
)
const clearImpl = (
connection: rocksdb,
prefix: string,
Expand All @@ -592,14 +592,12 @@ remoteMap.RemoteMapCreator => {
})
const keysImpl = (iterationOpts?: remoteMap.IterationOpts): AsyncIterable<K> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: false }
return awu(getDataIterable(opts, false))
.map(async (entry: remoteMap.RemoteMapEntry<string>) => entry.key as K)
return awu(getDataIterable(opts)).map(entry => entry.key as K)
}
const keysPagesImpl = (iterationOpts?: remoteMap.IterationOpts):
AsyncIterable<K[]> => {
const opts = { ...(iterationOpts ?? {}), keys: true, values: false }
return awu(getDataIterableWithPages(opts, false)).map(async entries =>
entries.map(entry => entry.key as K))
return awu(getDataIterableWithPages(opts)).map(entries => entries.map(entry => entry.key as K))
}
const getImpl = (key: string): Promise<T | undefined> => new Promise(resolve => {
if (delKeys.has(key)) {
Expand Down Expand Up @@ -709,9 +707,9 @@ remoteMap.RemoteMapCreator => {
withLimitedConcurrency(keys.map(k => () => getImpl(k)), GET_CONCURRENCY),
values: <Opts extends remoteMap.IterationOpts>(iterationOpts?: Opts) => {
if (iterationOpts && remoteMap.isPagedIterationOpts(iterationOpts)) {
return valuesPagesImpl(false, iterationOpts) as remoteMap.RemoteMapIterator<T, Opts>
return valuesPagesImpl(iterationOpts) as remoteMap.RemoteMapIterator<T, Opts>
}
return valuesImpl(false, iterationOpts)
return valuesImpl(iterationOpts)
},
entries: <Opts extends remoteMap.IterationOpts>(
iterationOpts?: Opts
Expand Down

0 comments on commit 153a3dd

Please sign in to comment.