Skip to content

Commit

Permalink
[FLINK-35282][python] Upgrade Apache Beam >=2.54 (#25541)
Browse files Browse the repository at this point in the history
  • Loading branch information
xaniasd authored Jan 20, 2025
1 parent 6570509 commit 6b6a73e
Show file tree
Hide file tree
Showing 40 changed files with 172 additions and 140 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ related options. Here's an overview of all the Python related options for the ac
<td>
Specify the path of the python interpreter used to execute the python UDF worker
(e.g.: --pyExecutable /usr/local/bin/python3).
The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0),
The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.54.0, <= 2.61.0),
Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
Please ensure that the specified environment meets the above requirements.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Mode "embedded" (default) submits Flink jobs from the local machine.
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.8+, Apache Beam
(version == 2.43.0), Pip
(version >= 2.54.0, <= 2.61.0), Pip
(version >= 20.3) and SetupTools
(version >= 37.0.0). Please
ensure that the specified
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ related options. Here's an overview of all the Python related options for the ac
<td>
Specify the path of the python interpreter used to execute the python UDF worker
(e.g.: --pyExecutable /usr/local/bin/python3).
The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0),
The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.54.0,<= 2.61.0),
Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
Please ensure that the specified environment meets the above requirements.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Mode "embedded" (default) submits Flink jobs from the local machine.
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.8+, Apache Beam
(version == 2.43.0), Pip
(version >= 2.54.0, <= 2.61.0), Pip
(version >= 20.3) and SetupTools
(version >= 37.0.0). Please
ensure that the specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<td><h5>python.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.54.0, <= 2.61.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
<td><h5>python.execution-mode</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public class CliFrontendParser {
true,
"Specify the path of the python interpreter used to execute the python UDF worker "
+ "(e.g.: --pyExecutable /usr/local/bin/python3). "
+ "The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0), "
+ "The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.54.0, <= 2.61.0), "
+ "Pip (version >= 20.3) and SetupTools (version >= 37.0.0). "
+ "Please ensure that the specified environment meets the above requirements.");

Expand Down
2 changes: 1 addition & 1 deletion flink-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The auto-generated Python docs can be found at [https://nightlies.apache.org/fli

## Python Requirements

Apache Flink Python API depends on Py4J (currently version 0.10.9.7), CloudPickle (currently version 2.2.0), python-dateutil (currently version >=2.8.0,<3), Apache Beam (currently version >=2.43.0,<2.49.0).
Apache Flink Python API depends on Py4J (currently version 0.10.9.7), CloudPickle (currently version 2.2.0), python-dateutil (currently version >=2.8.0,<3), Apache Beam (currently version >= 2.54.0, <= 2.61.0).

## Development Notices

Expand Down
2 changes: 1 addition & 1 deletion flink-python/dev/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pip>=20.3
setuptools>=18.0
wheel
apache-beam>=2.43.0,<2.49.0
apache-beam>=2.54.0,<=2.61.0
cython>=0.29.24
py4j==0.10.9.7
python-dateutil>=2.8.0,<3
Expand Down
92 changes: 64 additions & 28 deletions flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -454,7 +455,8 @@ under the License.
<delete dir="${project.basedir}/build"/>
<delete dir="${project.basedir}/apache-flink-libraries/build"/>
<delete dir="${project.basedir}/apache_flink.egg-info"/>
<delete dir="${project.basedir}/apache-flink-libraries/apache_flink_libraries.egg-info"/>
<delete
dir="${project.basedir}/apache-flink-libraries/apache_flink_libraries.egg-info"/>
</target>
</configuration>
</execution>
Expand Down Expand Up @@ -504,15 +506,17 @@ under the License.
basedir="${project.build.directory}/test-classes"
includes="**/PythonFunctionFactoryTest.class"/>

<jar destfile="${project.build.directory}/artifacts/testDataStream.jar"
basedir="${project.build.directory}/test-classes"
includes="**/DataStreamTestCollectSink.class,**/MyCustomSourceFunction.class,**/PartitionCustomTestMapFunction.class"/>
<jar
destfile="${project.build.directory}/artifacts/testDataStream.jar"
basedir="${project.build.directory}/test-classes"
includes="**/DataStreamTestCollectSink.class,**/MyCustomSourceFunction.class,**/PartitionCustomTestMapFunction.class"/>

<jar destfile="${project.build.directory}/artifacts/dummy.jar"
basedir="${project.build.directory}/test-classes"
includes="**/TestJob.class">
<manifest>
<attribute name="Main-Class" value="org.apache.flink.client.cli.TestJob" />
<attribute name="Main-Class"
value="org.apache.flink.client.cli.TestJob"/>
</manifest>
</jar>
</target>
Expand Down Expand Up @@ -581,7 +585,8 @@ under the License.
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
<artifactId>flink-sql-connector-aws-kinesis-firehose
</artifactId>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
Expand All @@ -593,7 +598,8 @@ under the License.
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
<artifactId>flink-connector-cassandra_${scala.binary.version}
</artifactId>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
Expand All @@ -618,7 +624,8 @@ under the License.
<artifactId>flink-test-utils-junit</artifactId>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
<outputDirectory>${project.build.directory}/test-dependencies
</outputDirectory>
</configuration>
</execution>
</executions>
Expand All @@ -635,7 +642,8 @@ under the License.
</goals>
<configuration>
<includeGroupIds>junit</includeGroupIds>
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
<outputDirectory>${project.build.directory}/test-dependencies
</outputDirectory>
</configuration>
</execution>
</executions>
Expand Down Expand Up @@ -669,24 +677,33 @@ under the License.
<filter>
<artifact>org.apache.beam:beam-sdks-java-core</artifact>
<excludes>
<exclude>org/apache/beam/repackaged/core/org/antlr/**</exclude>
<exclude>org/apache/beam/repackaged/core/org/apache/commons/compress/**</exclude>
<exclude>org/apache/beam/repackaged/core/org/apache/commons/lang3/**</exclude>
<exclude>org/apache/beam/repackaged/core/org/antlr/**
</exclude>
<exclude>
org/apache/beam/repackaged/core/org/apache/commons/compress/**
</exclude>
<exclude>
org/apache/beam/repackaged/core/org/apache/commons/lang3/**
</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.beam:beam-vendor-grpc-1_43_2</artifact>
<excludes>
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/jboss/**</exclude>
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/jboss/**
</exclude>
<exclude>schema/**</exclude>
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/eclipse/jetty/**</exclude>
<exclude>
org/apache/beam/vendor/grpc/v1p43p2/org/eclipse/jetty/**
</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>LICENSE-junit.txt</exclude>
<exclude>LICENSE.txt</exclude>
<exclude>LICENSE</exclude>
<exclude>META-INF/LICENSE.txt</exclude>
<exclude>*.proto</exclude>
</excludes>
Expand All @@ -695,43 +712,60 @@ under the License.
<relocations combine.children="append">
<relocation>
<pattern>py4j</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.py4j</shadedPattern>
<shadedPattern>org.apache.flink.api.python.shaded.py4j
</shadedPattern>
</relocation>
<relocation>
<pattern>net.razorvine</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.net.razorvine</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.net.razorvine
</shadedPattern>
</relocation>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.com.fasterxml.jackson</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.com.fasterxml.jackson
</shadedPattern>
</relocation>
<relocation>
<pattern>org.joda.time</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.org.joda.time</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.org.joda.time
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.com.google.protobuf</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.com.google.protobuf
</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.org.apache.arrow</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.org.apache.arrow
</shadedPattern>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.io.netty</shadedPattern>
<shadedPattern>org.apache.flink.api.python.shaded.io.netty
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.com.google.flatbuffers</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.com.google.flatbuffers
</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.flink.api.python.shaded.com.google.protobuf</shadedPattern>
<shadedPattern>
org.apache.flink.api.python.shaded.com.google.protobuf
</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro
</shadedPattern>
</relocation>
</relocations>
</configuration>
Expand All @@ -749,7 +783,8 @@ under the License.
<goal>run</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}
</protocArtifact>
<inputDirectories>
<include>pyflink/proto</include>
</inputDirectories>
Expand All @@ -764,7 +799,8 @@ under the License.
<systemPropertyVariables>
<!-- Arrow requires the property io.netty.tryReflectionSetAccessible to
be set to true for JDK >= 9. Please refer to ARROW-5412 for more details. -->
<io.netty.tryReflectionSetAccessible>true</io.netty.tryReflectionSetAccessible>
<io.netty.tryReflectionSetAccessible>true
</io.netty.tryReflectionSetAccessible>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def set_python_executable(self, python_exec: str):
.. note::
The python udf worker depends on Apache Beam (version == 2.43.0).
The python udf worker depends on Apache Beam (version >= 2.54.0, <= 2.61.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cl
if not self._has_side_output:
self._main_output_processor = self._output_processors[DEFAULT_OUTPUT_TAG][0]

def setup(self):
super(FunctionOperation, self).setup()
def setup(self, data_sampler=None):
super().setup(data_sampler)

def start(self):
with self.scoped_start_state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _start_sdk_worker_main(self, start_worker_request: beam_fn_api_pb2.StartWork
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=sdk_worker_main._get_state_cache_size(experiments),
state_cache_size=sdk_worker_main._get_state_cache_size_bytes(sdk_pipeline_options),
data_buffer_time_limit_ms=sdk_worker_main._get_data_buffer_time_limit_ms(
experiments),
profiler_factory=profiler.Profile.factory_from_options(
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/table/table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def set_python_executable(self, python_exec: str):
.. note::
The python udf worker depends on Apache Beam (version == 2.43.0).
The python udf worker depends on Apache Beam (version >= 2.54.0, <= 2.61.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
13 changes: 3 additions & 10 deletions flink-python/pyflink/table/tests/test_pandas_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PandasConversionTestBase(object):

@classmethod
def setUpClass(cls):
super(PandasConversionTestBase, cls).setUpClass()
super().setUpClass()
cls.data = [(1, 1, 1, 1, True, 1.1, 1.2, 'hello', bytearray(b"aaa"),
decimal.Decimal('1000000000000000000.01'), datetime.date(2014, 9, 13),
datetime.time(hour=1, minute=0, second=1),
Expand Down Expand Up @@ -236,12 +236,5 @@ def test_to_pandas_with_event_time(self):
result_pdf = t.to_pandas()
import pandas as pd
os.remove(source_path)
assert_frame_equal(result_pdf, pd.DataFrame(
data={"rowtime": [
datetime.datetime(2018, 3, 11, 3, 10),
datetime.datetime(2018, 3, 11, 3, 10),
datetime.datetime(2018, 3, 11, 3, 10),
datetime.datetime(2018, 3, 11, 3, 40),
datetime.datetime(2018, 3, 11, 4, 20),
datetime.datetime(2018, 3, 11, 3, 30),
]}))
expected_df = pd.DataFrame(data={"rowtime": pd.Series(data, dtype="datetime64[ms]")})
assert_frame_equal(result_pdf, expected_df)
2 changes: 1 addition & 1 deletion flink-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ requires = [
"packaging>=20.5; platform_machine=='arm64'", # macos M1
"setuptools>=18.0",
"wheel",
"apache-beam>=2.43.0,<2.49.0",
"apache-beam>=2.54.0,<=2.61.0",
"cython>=0.29.24",
"fastavro>=1.1.0,!=1.8.0"
]
Expand Down
2 changes: 1 addition & 1 deletion flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def extracted_output_files(base_dir, file_path, output_directory):
'pyflink.bin': ['*']}

install_requires = ['py4j==0.10.9.7', 'python-dateutil>=2.8.0,<3',
'apache-beam>=2.43.0,<2.49.0',
'apache-beam>=2.54.0,<=2.61.0',
'cloudpickle>=2.2.0', 'avro-python3>=1.8.1,!=1.9.2',
'pytz>=2018.3', 'fastavro>=1.1.0,!=1.8.0', 'requests>=2.26.0',
'protobuf>=3.19.0',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Loading

0 comments on commit 6b6a73e

Please sign in to comment.