diff --git a/bindings/python/python/pyiceberg_core/datafusion.pyi b/bindings/python/python/pyiceberg_core/datafusion.pyi new file mode 100644 index 000000000..087907b9f --- /dev/null +++ b/bindings/python/python/pyiceberg_core/datafusion.pyi @@ -0,0 +1,47 @@ +from __future__ import annotations + +from typing import Any + +class IcebergDataFusionTable: + def __init__( + self, + identifier: list[str], + metadata_location: str, + storage_options: dict[str, str] | None, + ) -> None: + """Create Iceberg table that can be registered as a datafusion table provider + + Example: + ```python + from pyiceberg_core.datafusion import IcebergDataFusionTable + from datafusion import SessionContext + + ice_tbl = IcebergDataFusionTable() + ctx = SessionContext() + ctx.register_table_provider("test", ice_tbl) + ctx.table("test").show() + ``` + Results in + ``` + DataFrame() + +----+----+----+ + | c3 | c1 | c2 | + +----+----+----+ + | 4 | 6 | a | + | 6 | 5 | b | + | 5 | 4 | c | + +----+----+----+ + ``` + """ + + def __datafusion_table_provider__(self) -> Any: + """Return the DataFusion table provider PyCapsule interface. + + To support DataFusion features such as push down filtering, this function will return a PyCapsule + interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective + you should not need to call this function directly. Instead you can use ``register_table_provider`` in + the DataFusion SessionContext. + + Returns: + A PyCapsule DataFusion TableProvider interface. + """ diff --git a/bindings/python/src/datafusion_table_provider.rs b/bindings/python/src/datafusion_table_provider.rs index b5e1bf952..2f989262f 100644 --- a/bindings/python/src/datafusion_table_provider.rs +++ b/bindings/python/src/datafusion_table_provider.rs @@ -41,7 +41,7 @@ impl PyIcebergDataFusionTable { fn new( identifier: Vec, metadata_location: String, - file_io_properties: Option>, + storage_options: Option>, ) -> PyResult { let runtime = runtime(); @@ -52,7 +52,7 @@ impl PyIcebergDataFusionTable { let mut builder = FileIO::from_path(&metadata_location) .map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?; - if let Some(props) = file_io_properties { + if let Some(props) = storage_options { builder = builder.with_props(props); } diff --git a/bindings/python/tests/test_datafusion_table_provider.py b/bindings/python/tests/test_datafusion_table_provider.py index 915b78769..3126ec963 100644 --- a/bindings/python/tests/test_datafusion_table_provider.py +++ b/bindings/python/tests/test_datafusion_table_provider.py @@ -16,15 +16,16 @@ # under the License. -from datetime import date, datetime import uuid +from datetime import date, datetime +from pathlib import Path + +import datafusion +import pyarrow as pa import pytest -from pyiceberg_core.datafusion import IcebergDataFusionTable from datafusion import SessionContext from pyiceberg.catalog import Catalog, load_catalog -import pyarrow as pa -from pathlib import Path -import datafusion +from pyiceberg_core.datafusion import IcebergDataFusionTable assert ( datafusion.__version__ >= "45" @@ -102,7 +103,7 @@ def test_register_iceberg_table_provider( iceberg_table_provider = IcebergDataFusionTable( identifier=iceberg_table.name(), metadata_location=iceberg_table.metadata_location, - file_io_properties=iceberg_table.io.properties, + storage_options=iceberg_table.io.properties, ) ctx = SessionContext() @@ -146,7 +147,7 @@ def __datafusion_table_provider__(self): return IcebergDataFusionTable( identifier=self.name(), metadata_location=self.metadata_location, - file_io_properties=self.io.properties, + storage_options=self.io.properties, ).__datafusion_table_provider__() iceberg_table.__datafusion_table_provider__ = MethodType(