Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Pulsar Functions should provide a way to set Pulsar client memory limit #23723

Open
1 of 2 tasks
lhotari opened this issue Dec 12, 2024 · 4 comments
Open
1 of 2 tasks
Labels
area/function type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Comments

@lhotari
Copy link
Member

lhotari commented Dec 12, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

I haven't found a way to tune the memory limit for the Pulsar client used in Pulsar Functions.

The memory limit is disabled in Pulsar Functions:

There's code for configuring it:

private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
String pulsarServiceUrl, AuthenticationConfig authConfig, String stateStorageImplClass,
String storageServiceUrl,
SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager,
Optional<FunctionsManager> functionsManager, FunctionCacheManager fnCache)
throws PulsarClientException {
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
}
this.rootClassLoader = rootClassLoader;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.defaultSecretsProvider = secretsProvider;
this.fnCache = fnCache;
if (fnCache == null) {
this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
}
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarAdmin =
exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig)
: null;
this.clientBuilder = InstanceUtils
.createPulsarClientBuilder(pulsarServiceUrl, authConfig, calculateClientMemoryLimit(memoryLimit));
this.pulsarClient = this.clientBuilder.build();
this.stateStorageImplClass = stateStorageImplClass;
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
this.connectorsManager = connectorsManager;
this.functionsManager = functionsManager;
}
private Optional<Long> calculateClientMemoryLimit(Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit) {
if (memoryLimit.isPresent()) {
Long absolute = memoryLimit.get().getAbsoluteValue();
Double percentOfDirectMem = memoryLimit.get().getPercentOfMaxDirectMemory();
if (absolute != null) {
checkArgument(absolute > 0, "Absolute memory limit for Pulsar client has to be positive");
}
if (percentOfDirectMem != null) {
checkArgument(percentOfDirectMem > 0 && percentOfDirectMem <= 100,
"Percent of max direct memory limit for Pulsar client must be between 0 and 100");
}
if (absolute != null && percentOfDirectMem != null) {
return Optional.of(Math.min(absolute, getBytesPercentDirectMem(percentOfDirectMem)));
}
if (absolute != null) {
return Optional.of(absolute);
}
if (percentOfDirectMem != null) {
return Optional.of(getBytesPercentDirectMem(percentOfDirectMem));
}
}
return Optional.empty();
}

but it doesn't get called in the JavaInstanceStarter case. The JavaInstanceStarter is used when running Java functions in process / k8s / function mesh runtimes (other than thread runtime):

containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
stateStorageImplClass,
stateStorageServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
.clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader,
exposePulsarAdminClientEnabled, webServiceUrl, fnCache);

Optional.empty() is passed to memory limit.
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl,
String stateStorageImplClass,
String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl, FunctionCacheManager fnCache) throws Exception {
initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry,
narExtractionDirectory,
rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),
Optional.empty(), fnCache);
}

It looks like the memory limit solution has been only implemented for the ThreadRuntimeFactory in #9320. When using ThreadRuntimeFactory, it's possible to set the limit in conf/functions_worker.yml. This isn't helpful for other runtime types (process, k8s).

Solution

Add the Pulsar client memory limit configuration to org.apache.pulsar.common.functions.FunctionConfig, org.apache.pulsar.functions.instance.InstanceConfig and org.apache.pulsar.functions.worker.WorkerConfig and make it configurable in the same way as other instance config parameters such as maxPendingAsyncRequests.
The default would be configured in WorkerConfig for all functions and the function configuration would be in InstanceConfig and configurable with FunctionConfig in yaml and command line.
It's not fully consistent how configs should be handled. There's also the protobuf org.apache.pulsar.functions.proto.Function.FunctionDetails where configs are passed.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/function labels Dec 12, 2024
@dao-jun dao-jun assigned dao-jun and unassigned dao-jun Dec 13, 2024
@walkinggo
Copy link

I would like to try to resolve this issue. Could you assign it to me? @lhotari

@lhotari
Copy link
Member Author

lhotari commented Jan 14, 2025

I would like to try to resolve this issue. Could you assign it to me? @lhotari

@walkinggo For issues which don't have someone assigned, there's no need to explicitly assign issues in Apache Pulsar. You can comment on the issue that you will be working on it. Unless someone replies and says that it's already been taken care of, you can expect that nobody else is already working on it.

@walkinggo
Copy link

Sorry, I wasn't aware of the relevant rules. Thank you for letting me know.

I would like to try to resolve this issue. Could you assign it to me? @lhotari

@walkinggo For issues which don't have someone assigned, there's no need to explicitly assign issues in Apache Pulsar. You can comment on the issue that you will be working on it. Unless someone replies and says that it's already been taken care of, you can expect that nobody else is already working on it.@walkinggo 对于没有分配人员的 Issue,无需在 Apache Pulsar 中显式分配 Issue。您可以对您将要处理的问题发表评论。除非有人回复并说它已经被处理好了,否则你可以预期没有其他人已经在处理它。

@walkinggo
Copy link

I found that the MemoryLimit class mentioned here is actually a static inner class of ThreadRuntimeFactoryConfig. If it is directly added to FunctionConfig, it will cause a Circular Dependency issue. Perhaps we should treat it as a separate class.

Additionally, the constructor of ThreadRuntimeFactory used in JavaInstanceStarter does not pass the required MemoryLimit parameter, which leads to passing Optional.empty() as MemoryLimit in the initialize method of ThreadRuntimeFactory. I think we should add a constructor to ThreadRuntimeFactory that takes this parameter.

Before calling the required constructor, instanceConfig has already been initialized in JavaInstanceStarter. MemoryLimit can be stored and passed as a member variable of instanceConfig. I noticed that other member variables of instanceConfig are coming from JavaInstanceStarter. Should we also add the MemoryLimit variable in JavaInstanceStarter to support its configuration via the command line?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/function type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

No branches or pull requests

3 participants