Skip to content

Commit

Permalink
feat: Add ZSTD decompression support to IPC reader (#693)
Browse files Browse the repository at this point in the history
This PR implements ZSTD buffer decompression in the `ArrowIpcDecoder`
and in the `ArrowArrayStreamReader` when built with
`-DNANOARROW_IPC_WITH_ZSTD=ON`. It also allows a user to inject support
for these into the `ArrowIpcDecoder` if for whatever reason they don't
have control over the build flags (or want to use ZSTD that has been
made available to them in a different way).

This doesn't implement multithreaded decompression but does allow a user
to implement it by not using the default `ArrowIpcSerialDecompressor()`.
This could be included in header-only C++ if there is some interest.

A non-trivial example in Python bindings (where were also wired up to
support it):

```python
import urllib.request
import nanoarrow as na
url = "https://github.com/geoarrow/geoarrow-data/releases/download/v0.1.0/ns-water-basin_point-wkb.arrow"

# Work around the 'no arrow file support'
with urllib.request.urlopen(url) as f:
    f.read(8)
    print(na.ArrayStream.from_readable(f).read_all())
#> nanoarrow.Array<non-nullable struct<OBJECTID: int64, FEAT_CODE: string, ...>[46]
#> {'OBJECTID': 1, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EB000', 'RIVER': 'BAR...
#> {'OBJECTID': 2, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EC000', 'RIVER': 'ROS...
#> {'OBJECTID': 3, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EA000', 'RIVER': 'TUS...
#> {'OBJECTID': 4, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01DA000', 'RIVER': 'MET...
#> {'OBJECTID': 5, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01ED000', 'RIVER': 'MER...
#> {'OBJECTID': 6, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EE000', 'RIVER': 'HER...
#> {'OBJECTID': 7, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EG000', 'RIVER': 'GOL...
#> {'OBJECTID': 8, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EF000', 'RIVER': 'LAH...
#> {'OBJECTID': 9, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EJ000', 'RIVER': 'SAC...
#> {'OBJECTID': 10, 'FEAT_CODE': 'WABA30', 'BASIN_NAME': '01EH000', 'RIVER': 'EA...
#> ...and 36 more items
```

This PR doesn't implement the R bindings (because adding a zstd
dependency there is a can of worms better suited to another PR).
  • Loading branch information
paleolimbot authored Dec 27, 2024
1 parent 1eaa8d5 commit dd437a9
Show file tree
Hide file tree
Showing 19 changed files with 789 additions and 49 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build-and-test-ipc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
fail-fast: false
matrix:
config:
- {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON"}
- {label: default-build, cmake_args: "-DNANOARROW_BUILD_APPS=ON -DNANOARROW_IPC_WITH_ZSTD=ON"}
- {label: default-noatomics, cmake_args: "-DCMAKE_C_FLAGS='-DNANOARROW_IPC_USE_STDATOMIC=0'"}
- {label: namespaced-build, cmake_args: "-DNANOARROW_NAMESPACE=SomeUserNamespace"}
- {label: bundled-build, cmake_args: "-DNANOARROW_BUNDLE=ON"}
Expand Down Expand Up @@ -72,13 +72,13 @@ jobs:
with:
path: arrow
# Bump the number at the end of this line to force a new Arrow C++ build
key: arrow-${{ runner.os }}-${{ runner.arch }}-1
key: arrow-${{ runner.os }}-${{ runner.arch }}-3

- name: Build Arrow C++
if: steps.cache-arrow-build.outputs.cache-hit != 'true'
shell: bash
run: |
ci/scripts/build-arrow-cpp-minimal.sh 18.0.0 arrow
ci/scripts/build-arrow-cpp-minimal.sh 18.1.0 arrow
- name: Build
run: |
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ jobs:

verify-meson:
name: meson-build
runs-on: ubuntu-latest
# Workaround until https://github.com/apache/arrow-nanoarrow/issues/663 is solved
# (after which we can use ubuntu-latest)
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
Expand Down
27 changes: 22 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ option(NANOARROW_FLATCC_ROOT_DIR "Root directory for flatcc include and lib dire
OFF)
option(NANOARROW_FLATCC_INCLUDE_DIR "Include directory for flatcc includes" OFF)
option(NANOARROW_FLATCC_LIB_DIR "Library directory that contains libflatccrt.a" OFF)
option(NANOARROW_IPC_WITH_ZSTD "Build nanoarrow with ZSTD compression support built in"
OFF)

option(NANOARROW_DEVICE "Build device extension" OFF)
option(NANOARROW_TESTING "Build testng extension" OFF)
Expand Down Expand Up @@ -205,15 +207,25 @@ if(NANOARROW_IPC)
"${NANOARROW_FLATCC_INCLUDE_DIR}")
endif()

if(NANOARROW_IPC_WITH_ZSTD)
find_package(zstd REQUIRED)
set(NANOARROW_IPC_EXTRA_FLAGS "-DNANOARROW_IPC_WITH_ZSTD")
set(NANOARROW_IPC_EXTRA_LIBS zstd::libzstd_static)
endif()

if(NOT NANOARROW_BUNDLE)
set(NANOARROW_IPC_BUILD_SOURCES
src/nanoarrow/ipc/decoder.c src/nanoarrow/ipc/encoder.c
src/nanoarrow/ipc/reader.c src/nanoarrow/ipc/writer.c)
src/nanoarrow/ipc/codecs.c
src/nanoarrow/ipc/decoder.c
src/nanoarrow/ipc/encoder.c
src/nanoarrow/ipc/reader.c
src/nanoarrow/ipc/writer.c)
endif()

add_library(nanoarrow_ipc ${NANOARROW_IPC_BUILD_SOURCES})
target_compile_definitions(nanoarrow_ipc PRIVATE ${NANOARROW_IPC_EXTRA_FLAGS})
target_link_libraries(nanoarrow_ipc
PRIVATE flatccrt
PRIVATE flatccrt ${NANOARROW_IPC_EXTRA_LIBS}
PUBLIC nanoarrow nanoarrow_coverage_config)
target_include_directories(nanoarrow_ipc
PUBLIC $<BUILD_INTERFACE:${NANOARROW_BUILD_INCLUDE_DIR}>
Expand Down Expand Up @@ -574,6 +586,7 @@ if(NANOARROW_BUILD_TESTS)
include(GoogleTest)

foreach(name
codecs
decoder
encoder
reader
Expand All @@ -587,6 +600,7 @@ if(NANOARROW_BUILD_TESTS)
nanoarrow
${NANOARROW_ARROW_TARGET}
gtest_main
gmock_main
nanoarrow_coverage_config)

if(NOT (name MATCHES "_hpp_"))
Expand All @@ -595,15 +609,18 @@ if(NANOARROW_BUILD_TESTS)

if(Arrow_FOUND)
target_compile_definitions(nanoarrow_ipc_${name}_test
PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW)
PRIVATE -DNANOARROW_BUILD_TESTS_WITH_ARROW
${NANOARROW_IPC_EXTRA_FLAGS})
else()
target_compile_definitions(nanoarrow_ipc_${name}_test
PRIVATE ${NANOARROW_IPC_EXTRA_FLAGS})
endif()

gtest_discover_tests(nanoarrow_ipc_${name}_test)
endforeach()

target_link_libraries(nanoarrow_ipc_files_test nanoarrow_testing ZLIB::ZLIB
nanoarrow_coverage_config)
target_link_libraries(nanoarrow_ipc_decoder_test gmock_main)
endif()

if(NANOARROW_DEVICE)
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/build-arrow-cpp-minimal.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cmake ../apache-arrow-${ARROW_CPP_VERSION}/cpp \
-DARROW_JEMALLOC=OFF \
-DARROW_SIMD_LEVEL=NONE \
-DARROW_FILESYSTEM=OFF \
-DARROW_WITH_ZSTD=ON \
-DCMAKE_INSTALL_PREFIX="${ARROW_CPP_INSTALL_DIR}"
cmake --build . --parallel $(nproc)
cmake --install . --prefix="${ARROW_CPP_INSTALL_DIR}" --config=Debug
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def bundle_nanoarrow_ipc(
nanoarrow_ipc_c = concatenate_content(
[
src_dir / "ipc" / "flatcc_generated.h",
src_dir / "ipc" / "codecs.c",
src_dir / "ipc" / "decoder.c",
src_dir / "ipc" / "encoder.c",
src_dir / "ipc" / "reader.c",
Expand Down
12 changes: 11 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ nanoarrow_dep = declare_dependency(

if get_option('ipc')
flatcc_dep = dependency('flatcc')
ipc_lib_deps = [nanoarrow_dep, flatcc_dep]
ipc_lib_c_args = []

if get_option('ipc_with_zstd')
zstd_dep = dependency('libzstd')
ipc_lib_deps += zstd_dep
ipc_lib_c_args += '-DNANOARROW_IPC_WITH_ZSTD'
endif

install_headers(
'src/nanoarrow/nanoarrow_ipc.h',
Expand All @@ -118,12 +126,14 @@ if get_option('ipc')

nanoarrow_ipc_lib = library(
'nanoarrow_ipc',
'src/nanoarrow/ipc/codecs.c',
'src/nanoarrow/ipc/decoder.c',
'src/nanoarrow/ipc/encoder.c',
'src/nanoarrow/ipc/reader.c',
'src/nanoarrow/ipc/writer.c',
dependencies: [nanoarrow_dep, flatcc_dep],
dependencies: ipc_lib_deps,
install: true,
c_args: ipc_lib_c_args,
)
nanoarrow_ipc_dep = declare_dependency(
include_directories: [incdir],
Expand Down
1 change: 1 addition & 0 deletions meson.options
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ option('tests_with_arrow', type: 'boolean', description: 'Build tests with Arrow
option('benchmarks', type: 'boolean', description: 'Build benchmarks', value: false)
option('apps', type: 'boolean', description: 'Build utility applications', value: false)
option('ipc', type: 'boolean', description: 'Build IPC libraries', value: false)
option('ipc_with_zstd', type: 'boolean', description: 'Build IPC libraries with ZSTD compression support', value: false)
option('integration_tests', type: 'boolean',
description: 'Build cross-implementation Arrow integration tests',
value: false)
Expand Down
3 changes: 3 additions & 0 deletions python/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ project(
'warning_level=2',
'c_std=c99',
'default_library=static',
'force_fallback_for=zstd',
# We need to set these options at the project default_option level
# due to https://github.com/mesonbuild/meson/issues/6728
'arrow-nanoarrow:ipc=true',
'arrow-nanoarrow:ipc_with_zstd=true',
'arrow-nanoarrow:device=true',
'arrow-nanoarrow:namespace=PythonPkg',
'zstd:bin_programs=false',
],
)

Expand Down
3 changes: 2 additions & 1 deletion python/src/nanoarrow/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

flatcc_dep = dependency('flatcc')
zstd_dep = dependency('libzstd')
nanoarrow_proj = subproject('arrow-nanoarrow')
nanoarrow_dep = nanoarrow_proj.get_variable('nanoarrow_dep')
nanoarrow_ipc_dep = nanoarrow_proj.get_variable('nanoarrow_ipc_dep')
Expand Down Expand Up @@ -77,7 +78,7 @@ foreach cyf : cyfiles
if stem in ['_array', '_device']
cyfile_deps += [nanoarrow_device_dep]
elif stem == '_ipc_lib'
cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep]
cyfile_deps += [nanoarrow_ipc_dep, flatcc_dep, zstd_dep]
endif

py.extension_module(
Expand Down
30 changes: 30 additions & 0 deletions python/subprojects/zstd.wrap
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[wrap-file]
directory = zstd-1.5.6
source_url = https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz
source_filename = zstd-1.5.6.tar.gz
source_hash = 8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1
patch_filename = zstd_1.5.6-2_patch.zip
patch_url = https://wrapdb.mesonbuild.com/v2/zstd_1.5.6-2/get_patch
patch_hash = 3e67f7d2edf3c56e6450d4c0f5f3d5fe94799e3608e3795502da03f7dd51b28c
source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/zstd_1.5.6-2/zstd-1.5.6.tar.gz
wrapdb_version = 1.5.6-2

[provide]
libzstd = libzstd_dep
35 changes: 34 additions & 1 deletion python/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from nanoarrow._utils import NanoarrowException

import nanoarrow as na
from nanoarrow.ipc import InputStream, StreamWriter
from nanoarrow.ipc import _EXAMPLE_IPC_SCHEMA, InputStream, StreamWriter


def test_ipc_stream_example():
Expand Down Expand Up @@ -92,6 +92,18 @@ def test_ipc_stream_from_url():
assert len(batches[0]) == 3


def test_ipc_stream_compressed_example():
buf = io.BytesIO()
buf.write(_EXAMPLE_IPC_SCHEMA)
buf.write(COMPRESSED_BATCH)
buf.seek(0)

with InputStream.from_readable(buf) as inp:
array = na.Array(inp)
assert len(array) == 3
assert array.child(0).to_pylist() == [0, 1, 2]


def test_ipc_stream_python_exception_on_read():
class ExtraordinarilyInconvenientFile:
def readinto(self, *args, **kwargs):
Expand Down Expand Up @@ -231,3 +243,24 @@ def test_writer_error_on_write():
with pytest.raises(NanoarrowException):
with StreamWriter.from_writable(io.BytesIO()) as writer:
writer.write_stream(na.c_array([], na.int32()))


# fmt: off
COMPRESSED_BATCH = bytearray([
0xff, 0xff, 0xff, 0xff, 0xa0, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x0c, 0x00, 0x18, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x20, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x1e, 0x00,
0x10, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x50, 0x00,
0x00, 0x00, 0x24, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x08, 0x00,
0x07, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1d, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x0c,
0x61, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00
])
# fmt: on
Loading

0 comments on commit dd437a9

Please sign in to comment.