Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a FileIO implementation for WASB #360

Merged
merged 30 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,10 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) {
.getSubscopedCreds(
Mockito.mock(PolarisDiagnostics.class),
new AwsStorageConfigurationInfo(
storageType,
List.of(s3Path(bucket, warehouseKeyPrefix, storageType)),
roleARN,
externalId),
storageType, List.of(s3Path(bucket, warehouseKeyPrefix)), roleARN, externalId),
true,
Set.of(
s3Path(bucket, firstPath, storageType),
s3Path(bucket, secondPath, storageType)),
Set.of(s3Path(bucket, firstPath, storageType)));
Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
Set.of(s3Path(bucket, firstPath)));
assertThat(credentials)
.isNotEmpty()
.containsEntry(PolarisCredentialProperty.AWS_TOKEN, "sess")
Expand Down Expand Up @@ -313,14 +308,12 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() {
Mockito.mock(PolarisDiagnostics.class),
new AwsStorageConfigurationInfo(
PolarisStorageConfigurationInfo.StorageType.S3,
List.of(s3Path(bucket, warehouseKeyPrefix, storageType)),
List.of(s3Path(bucket, warehouseKeyPrefix)),
roleARN,
externalId),
false, /* allowList = false*/
Set.of(
s3Path(bucket, firstPath, storageType),
s3Path(bucket, secondPath, storageType)),
Set.of(s3Path(bucket, firstPath, storageType)));
Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
Set.of(s3Path(bucket, firstPath)));
assertThat(credentials)
.isNotEmpty()
.containsEntry(PolarisCredentialProperty.AWS_TOKEN, "sess")
Expand Down Expand Up @@ -405,14 +398,9 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() {
.getSubscopedCreds(
Mockito.mock(PolarisDiagnostics.class),
new AwsStorageConfigurationInfo(
storageType,
List.of(s3Path(bucket, warehouseKeyPrefix, storageType)),
roleARN,
externalId),
storageType, List.of(s3Path(bucket, warehouseKeyPrefix)), roleARN, externalId),
true, /* allowList = true */
Set.of(
s3Path(bucket, firstPath, storageType),
s3Path(bucket, secondPath, storageType)),
Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
Set.of());
assertThat(credentials)
.isNotEmpty()
Expand Down Expand Up @@ -469,11 +457,7 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() {
Mockito.mock(PolarisDiagnostics.class),
new AwsStorageConfigurationInfo(
PolarisStorageConfigurationInfo.StorageType.S3,
List.of(
s3Path(
bucket,
warehouseKeyPrefix,
PolarisStorageConfigurationInfo.StorageType.S3)),
List.of(s3Path(bucket, warehouseKeyPrefix)),
roleARN,
externalId),
true, /* allowList = true */
Expand All @@ -494,8 +478,7 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() {
return bucketArn + "/" + keyPrefix + "/*";
}

private static @NotNull String s3Path(
String bucket, String keyPrefix, PolarisStorageConfigurationInfo.StorageType storageType) {
private static @NotNull String s3Path(String bucket, String keyPrefix) {
return "s3://" + bucket + "/" + keyPrefix;
}
}
2 changes: 1 addition & 1 deletion polaris-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ metaStoreManager:
# persistence-unit: polaris

io:
factoryType: default
factoryType: wasb

# TODO - avoid duplicating token broker config
oauth2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@
import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi;
import org.apache.polaris.service.admin.api.PolarisPrincipalsApi;
import org.apache.polaris.service.auth.DiscoverableAuthenticator;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApi;
import org.apache.polaris.service.catalog.api.IcebergRestOAuth2Api;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.ConfigurationStoreAware;
import org.apache.polaris.service.config.HasEntityManagerFactory;
import org.apache.polaris.service.config.OAuth2ApiService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.aws.PolarisS3FileIOClientFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;
package org.apache.polaris.service.catalog.io;

import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;
package org.apache.polaris.service.catalog.io;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog.io;

import java.util.Map;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.core.storage.azure.AzureLocation;

/**
* A {@link FileIO} implementation that translates WASB paths into ABFS paths and then delegates to
* another underlying FileIO implementation
*/
public class WasbTranslatingFileIO implements FileIO {
private final FileIO io;

private static final String WASB_SCHEME = "wasb";
private static final String ABFS_SCHEME = "abfs";

public WasbTranslatingFileIO(FileIO io) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it looks like the current structure of FileIOFactory is forcing this to wrap all FileIOs including non-Azure-related ones, we might as well make the class itself more generalized and we could move the Azure-specificity into config.

What if we just called this thing SchemeTranslatingFileIO that takes a map of source schemes to destination schemes? And get the map through config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see you also have the endpoint rewrite from blob to dfs which wouldn't fit into that. I agree we might not want to go down the road of overly general regexes either.

Per apache/iceberg#10127 (comment) the underlying Azure SDK doesn't actually seem to actually care about the endpoint and will internally sort into both a dfs and a blob client, so in theory a scheme-only replacement should still work, but I guess it could be more fragile.

@collado-mike What do you think? Should we just keep this very wasb-specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the current structure of FileIOFactory is forcing this to wrap all FileIOs including non-Azure-related ones

It's doing a little more than just scheme translating, and this is only true if Polaris is actually configured by an admin to use this factory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I were designing this, I would have a TransformingFileIO with a constructor like

public TransformingFileIO(Function<String, String> pathTransformer, FileIOFactory delegate) {
}

and the azure class would extend that class and pass in a function that did the wasb->abfs conversion. But that's the kind of code refactoring that can be done later without any compatibility or behavior impact, so... 🤷🏽‍♂️

Copy link
Contributor Author

@eric-maynard eric-maynard Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea and actually @dennishuo and I had discussed something similar.

My concern for the time being is that it's unclear how we can pass in that pathTransformer via the YAML config file, and that I don't necessarily want to put designing a syntax for arbitrary string transformation onto the critical path for making WASB work.

Agreed that this makes a lot of sense as something to design and follow up with.

this.io = io;
}

private static String translate(String path) {
if (path == null) {
return null;
} else {
StorageLocation storageLocation = StorageLocation.of(path);
if (storageLocation instanceof AzureLocation azureLocation) {
String scheme = azureLocation.getScheme();
if (scheme.startsWith(WASB_SCHEME)) {
scheme = scheme.replaceFirst(WASB_SCHEME, ABFS_SCHEME);
}
return String.format(
"%s://%s@%s.dfs.core.windows.net/%s",
scheme,
azureLocation.getContainer(),
azureLocation.getStorageAccount(),
azureLocation.getFilePath());
} else {
return path;
}
}
}

@Override
public InputFile newInputFile(String path) {
return io.newInputFile(translate(path));
}

@Override
public OutputFile newOutputFile(String path) {
return io.newOutputFile(translate(path));
}

@Override
public void deleteFile(String path) {
io.deleteFile(translate(path));
}

@Override
public Map<String, String> properties() {
return io.properties();
}

@Override
public void initialize(Map<String, String> properties) {
io.initialize(properties);
}

@Override
public void close() {
io.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog.io;

import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.io.FileIO;

/** A {@link FileIOFactory} that translates WASB paths to ABFS ones */
@JsonTypeName("wasb")
public class WasbTranslatingFileIOFactory implements FileIOFactory {
@Override
public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point it's starting to seem that we need a way to do a chain of delegation. Both this class (or the suggested generalized SchemeTranslatingFileIOFactory) as well as MeasuredFileIOFactory fundamentally delegators, so maybe we should've entrenched an interface for delegation instead of just loading from String ioImpl.

If we make the delegation behavior configured as construction-time params or injection then we don't have to change the method signatures or callsites either. Basically like this:

mainFileIOFactory = new SchemeTranslatingFileIOFactory(
    new MeasuredFileIOFactory(new DefaultFileIOFactory()),
    Map.of("wasb", "abfs", "wasbs", "abfss"));

And then we get rid of the hard-coded CatalogUtil.loadFileIO in both SchemeTranslatingFileIOFactory and MeasuredFileIOFactory:

@Override
public FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
  return new SchemeTranslatingFileIO(this.delegateFactory.loadFileIO(ioImpl, properties);
}

The drawback I guess is the config for setting the delegate factory is specific to each outer delegator factory:

io:
  factoryType: scheme-translating
schemeTranslatingDelegateType: measured
measuredFactoryDelegateType: default

But this might be better than forcing delegation through a single config syntax since delegation might not be consistent for all delegator types. For example you might have something like:

multiplexingIoFactoryDelegateTypes: r2,s3n,viewfs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how it could be a bigger undertaking to properly entrench how we want to convey such a delegation chain though, so I don't feel too strongly about whether to tackle this in this PR or in a later followup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can avoid working on this now. A production use case will require things like metering, throttling, as well as azure support. As is, the configuration only supports one IO factory, meaning you couldn't get all three without writing a new class that does all three.

I think the configuration would work fine as

io:
  factoryType: chain
    delegates:
      - measured
      - wasb
      - default

I think dropwizard should support detecting the generic type argument in something like

public void setDelegates(List<FileIOFactory> delegates);

so the above configuration should work out of the box.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't work exactly out of the box, since FileIOFactory doesn't have a way to wrap a FileIO today. So it's not clear how the chain implementation would actually chain them. In this feature branch I toyed with adding a new interface called SupportsFileIOWrapping for this purpose, but I don't know that this is necessary just to get WASB working E2E

WasbTranslatingFileIO wrapped =
new WasbTranslatingFileIO(CatalogUtil.loadFileIO(ioImpl, properties, new Configuration()));
return wrapped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.auth.DiscoverableAuthenticator;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.ratelimiter.RateLimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.task.TaskExecutor;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;

public class TaskFileIOSupplier implements Function<TaskEntity, FileIO> {
private final MetaStoreManagerFactory metaStoreManagerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ org.apache.polaris.service.config.OAuth2ApiService
org.apache.polaris.service.context.RealmContextResolver
org.apache.polaris.service.context.CallContextResolver
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.catalog.FileIOFactory
org.apache.polaris.service.catalog.io.FileIOFactory
org.apache.polaris.service.ratelimiter.RateLimiter
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
# under the License.
#

org.apache.polaris.service.catalog.DefaultFileIOFactory
org.apache.polaris.service.catalog.io.DefaultFileIOFactory
org.apache.polaris.service.catalog.io.WasbTranslatingFileIOFactory
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
import org.apache.polaris.service.catalog.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
import org.apache.polaris.service.task.TaskExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.admin.PolarisAdminService;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.jetbrains.annotations.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.types.NotificationRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;
package org.apache.polaris.service.catalog.io;

import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import org.apache.iceberg.io.OutputFile;

/** A FileIOFactory that measures the number of bytes read, files written, and files deleted. */
@JsonTypeName("measured")
public class MeasuredFileIOFactory implements FileIOFactory {
private final List<MeasuredFileIO> ios;

Expand Down