From d4de755395a6d16050ca78e63d1ab94dc366a03b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 15 Jan 2026 13:21:49 +0100 Subject: [PATCH 1/5] Switch Google Cloud Storage file source from fs-gcsfs to gcsfs (fsspec) and add gcsfs to the test dependency group in pyproject.toml, so that the unit tests run also with the regular unit tests. Replace the pyfilesystem2-based fs-gcsfs with the fsspec-based gcsfs library. The gcsfs library has native support for anonymous access via token='anon', which properly handles credential refresh without errors. This fixes the "Anonymous credentials cannot be refreshed" error that was occurring with fs-gcsfs when listing files from public GCS buckets. Fixes #21565 --- lib/galaxy/dependencies/__init__.py | 5 +- .../dependencies/conditional-requirements.txt | 4 +- .../files/sources/googlecloudstorage.py | 260 +++++++----------- packages/files/setup.cfg | 6 +- pyproject.toml | 1 + test/unit/files/gcsfs_file_sources_conf.yml | 6 +- test/unit/files/test_gcsfs.py | 21 +- 7 files changed, 126 insertions(+), 177 deletions(-) diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index 8af7472ecd38..fb103233c248 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -263,10 +263,7 @@ def check_fs_sshfs(self): def check_fs_googledrivefs(self): return "googledrive" in self.file_sources - def check_fs_gcsfs(self): - return "googlecloudstorage" in self.file_sources - - def check_google_cloud_storage(self): + def check_gcsfs(self): return "googlecloudstorage" in self.file_sources def check_onedatafilerestclient(self): diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 457374bbcfb6..91288a742ea4 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -26,9 +26,7 @@ fs.dropboxfs>=1.0.3 # type: dropbox fs.sshfs # type: ssh fs.anvilfs # type: anvil fs.googledrivefs # type: googledrive -fs-gcsfs # type: googlecloudstorage -# fs-gcsfs doesn't pin google-cloud-storage, and old versions log noisy exceptions and break test discovery -google-cloud-storage>=2.8.0 # type: googlecloudstorage +gcsfs # type: googlecloudstorage fs.onedatarestfs==21.2.5.2 # type: onedata, depends on onedatafilerestclient fs-basespace # type: basespace fs-azureblob # type: azure diff --git a/lib/galaxy/files/sources/googlecloudstorage.py b/lib/galaxy/files/sources/googlecloudstorage.py index 0631775eeccb..0e32dbf8bc58 100644 --- a/lib/galaxy/files/sources/googlecloudstorage.py +++ b/lib/galaxy/files/sources/googlecloudstorage.py @@ -1,12 +1,4 @@ -try: - from fs_gcsfs import GCSFS - from google.cloud.storage import Client - from google.oauth2 import service_account - from google.oauth2.credentials import Credentials -except ImportError: - GCSFS = None - -import os +import logging from typing import ( Optional, Union, @@ -14,78 +6,116 @@ from galaxy.files.models import ( AnyRemoteEntry, - BaseFileSourceConfiguration, - BaseFileSourceTemplateConfiguration, FilesSourceRuntimeContext, - RemoteDirectory, - RemoteFile, +) +from galaxy.files.sources._fsspec import ( + CacheOptionsDictType, + FsspecBaseFileSourceConfiguration, + FsspecBaseFileSourceTemplateConfiguration, + FsspecFilesSource, ) from galaxy.util.config_templates import TemplateExpansion -from ._pyfilesystem2 import PyFilesystem2FilesSource +try: + from gcsfs import GCSFileSystem +except ImportError: + GCSFileSystem = None + + +REQUIRED_PACKAGE = "gcsfs" + +log = logging.getLogger(__name__) -class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration): + +class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration): bucket_name: Union[str, TemplateExpansion] - root_path: Union[str, TemplateExpansion, None] = None project: Union[str, TemplateExpansion, None] = None anonymous: Union[bool, TemplateExpansion, None] = True - service_account_json: Union[str, TemplateExpansion, None] = None - token: Union[str, TemplateExpansion, None] = None - token_uri: Union[str, TemplateExpansion, None] = None + service_account_credentials: Union[str, TemplateExpansion, None] = None + # OAuth credentials client_id: Union[str, TemplateExpansion, None] = None client_secret: Union[str, TemplateExpansion, None] = None + access_token: Union[str, TemplateExpansion, None] = None refresh_token: Union[str, TemplateExpansion, None] = None + token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token" -class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration): +class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration): bucket_name: str - root_path: Optional[str] = None project: Optional[str] = None anonymous: Optional[bool] = True - service_account_json: Optional[str] = None - token: Optional[str] = None - token_uri: Optional[str] = None + service_account_credentials: Optional[str] = None + # OAuth credentials client_id: Optional[str] = None client_secret: Optional[str] = None + access_token: Optional[str] = None refresh_token: Optional[str] = None + token_uri: Optional[str] = "https://oauth2.googleapis.com/token" class GoogleCloudStorageFilesSource( - PyFilesystem2FilesSource[ - GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration - ] + FsspecFilesSource[GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration] ): plugin_type = "googlecloudstorage" - required_module = GCSFS - required_package = "fs-gcsfs" + required_module = GCSFileSystem + required_package = REQUIRED_PACKAGE template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration resolved_config_class = GoogleCloudStorageFileSourceConfiguration - def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]): - if GCSFS is None: + def _open_fs( + self, + context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], + cache_options: CacheOptionsDictType, + ): + if GCSFileSystem is None: raise self.required_package_exception config = context.config + token: Union[str, dict[str, Optional[str]], None] + if config.anonymous: - client = Client.create_anonymous_client() - elif config.service_account_json: - credentials = service_account.Credentials.from_service_account_file(config.service_account_json) - client = Client(project=config.project, credentials=credentials) - elif config.token: - client = Client( - project=config.project, - credentials=Credentials( - token=config.token, - token_uri=config.token_uri, - client_id=config.client_id, - client_secret=config.client_secret, - refresh_token=config.refresh_token, - ), - ) - - handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client) - return handle + # Use token='anon' for anonymous access to public buckets + token = "anon" + elif config.service_account_credentials: + # Path to service account JSON file + token = config.service_account_credentials + elif config.access_token: + # OAuth credentials passed as a dictionary + token = { + "access_token": config.access_token, + "refresh_token": config.refresh_token, + "client_id": config.client_id, + "client_secret": config.client_secret, + "token_uri": config.token_uri, + } + else: + # Default: use application default credentials + token = None + + fs = GCSFileSystem( + project=config.project, + token=token, + **cache_options, + ) + return fs + + def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str: + """Adapt the path to the GCS bucket format.""" + bucket = config.bucket_name + if path.startswith("/"): + path = path[1:] + return f"{bucket}/{path}" if path else bucket + + def _adapt_entry_path(self, filesystem_path: str) -> str: + """Remove the GCS bucket name from the filesystem path.""" + if self.template_config.bucket_name: + bucket_prefix = f"{self.template_config.bucket_name}/" + if filesystem_path.startswith(bucket_prefix): + return "/" + filesystem_path[len(bucket_prefix) :] + elif filesystem_path == self.template_config.bucket_name: + return "/" + return "/" + filesystem_path def _list( self, @@ -98,84 +128,16 @@ def _list( query: Optional[str] = None, sort_by: Optional[str] = None, ) -> tuple[list[AnyRemoteEntry], int]: - """ - Override base class _list to work around fs_gcsfs limitation with virtual directories. - - GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them. - This implementation uses the GCS API directly to list blobs, bypassing the problematic - getinfo() validation that fails for virtual directories. - """ - if recursive: - # For recursive listing, fall back to the base implementation - return super()._list(context, path, recursive, write_intent, limit, offset, query, sort_by) - - # Open filesystem to get access to the bucket - with self._open_fs(context) as fs_handle: - # Access the bucket from the GCSFS object - bucket = fs_handle.bucket - - # Convert path to GCS prefix format - # Remove leading/trailing slashes and add trailing slash for directory prefix - normalized_path = path.strip("/") - if normalized_path: - prefix = normalized_path + "/" - else: - prefix = "" - - # List blobs with delimiter to get immediate children only (non-recursive) - delimiter = "/" - - # Collect directories (prefixes) and files (blobs) - entries: list[AnyRemoteEntry] = [] - - # First iterator: Get directories from prefixes - page_iterator_dirs = bucket.list_blobs(prefix=prefix, delimiter=delimiter) - for page in page_iterator_dirs.pages: - for dir_prefix in page.prefixes: - # Remove the parent prefix and trailing slash to get just the dir name - dir_name = dir_prefix[len(prefix) :].rstrip("/") - if dir_name: - full_path = os.path.join("/", normalized_path, dir_name) if normalized_path else f"/{dir_name}" - uri = self.uri_from_path(full_path) - entries.append(RemoteDirectory(name=dir_name, uri=uri, path=full_path)) - - # Second iterator: Get files from blobs - page_iterator_files = bucket.list_blobs(prefix=prefix, delimiter=delimiter) - for blob in page_iterator_files: - # Skip directory marker objects (empty blobs ending with /) - if blob.name.endswith("/"): - continue - - # Get just the filename (remove prefix) - file_name = blob.name[len(prefix) :] - if file_name: - full_path = os.path.join("/", normalized_path, file_name) if normalized_path else f"/{file_name}" - uri = self.uri_from_path(full_path) - - # Convert blob metadata to RemoteFile - ctime = None - if blob.time_created: - ctime = blob.time_created.isoformat() - - entries.append( - RemoteFile(name=file_name, size=blob.size or 0, ctime=ctime, uri=uri, path=full_path) - ) - - # Apply query filter if provided - if query: - query_lower = query.lower() - entries = [e for e in entries if query_lower in e.name.lower()] - - # Get total count before pagination - total_count = len(entries) - - # Apply pagination - if offset is not None or limit is not None: - start = offset or 0 - end = start + limit if limit is not None else None - entries = entries[start:end] - - return entries, total_count + bucket_path = self._to_bucket_path(path, context.config) + return super()._list( + context=context, + path=bucket_path, + recursive=recursive, + limit=limit, + offset=offset, + query=query, + sort_by=sort_by, + ) def _realize_to( self, @@ -183,23 +145,8 @@ def _realize_to( native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): - """ - Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks. - """ - with self._open_fs(context) as fs_handle: - bucket = fs_handle.bucket - - # Convert path to GCS blob key - normalized_path = source_path.strip("/") - - # Get the blob - blob = bucket.get_blob(normalized_path) - if not blob: - raise Exception(f"File not found: {source_path}") - - # Download directly to file - with open(native_path, "wb") as write_file: - blob.download_to_file(write_file) + bucket_path = self._to_bucket_path(source_path, context.config) + super()._realize_to(source_path=bucket_path, native_path=native_path, context=context) def _write_from( self, @@ -207,19 +154,18 @@ def _write_from( native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): - """ - Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks. - """ - with self._open_fs(context) as fs_handle: - bucket = fs_handle.bucket - - # Convert path to GCS blob key - normalized_path = target_path.strip("/") - - # Create blob and upload - blob = bucket.blob(normalized_path) - with open(native_path, "rb") as read_file: - blob.upload_from_file(read_file) + bucket_path = self._to_bucket_path(target_path, context.config) + super()._write_from(target_path=bucket_path, native_path=native_path, context=context) + + def score_url_match(self, url: str): + bucket_name = self.template_config.bucket_name + # For security, we need to ensure that a partial match doesn't work + if bucket_name and (url.startswith(f"gs://{bucket_name}/") or url == f"gs://{bucket_name}"): + return len(f"gs://{bucket_name}") + elif bucket_name and (url.startswith(f"gcs://{bucket_name}/") or url == f"gcs://{bucket_name}"): + return len(f"gcs://{bucket_name}") + else: + return super().score_url_match(url) __all__ = ("GoogleCloudStorageFilesSource",) diff --git a/packages/files/setup.cfg b/packages/files/setup.cfg index f923f5c1d743..bd87b8f5f8fc 100644 --- a/packages/files/setup.cfg +++ b/packages/files/setup.cfg @@ -41,10 +41,10 @@ packages = find: python_requires = >=3.9 [options.extras_require] -test = +test = pytest - fs-gcsfs - s3fs>=2023.1.0,<2024 + gcsfs + s3fs>=2023.1.0 [options.entry_points] console_scripts = diff --git a/pyproject.toml b/pyproject.toml index 38ee9bbd894b..a02d32f5f023 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,7 @@ test = [ "boto3", "cwltest>=2.5.20240906231108", # Python 3.13 support "fluent-logger", + "gcsfs", "lxml!=4.2.2", "onedatafilerestclient==21.2.5.2", "pkce", diff --git a/test/unit/files/gcsfs_file_sources_conf.yml b/test/unit/files/gcsfs_file_sources_conf.yml index d823636c6505..d65523b2d926 100644 --- a/test/unit/files/gcsfs_file_sources_conf.yml +++ b/test/unit/files/gcsfs_file_sources_conf.yml @@ -2,10 +2,10 @@ id: test1 doc: Test access to Google Cloud Storage. project: ${user.preferences['googlecloudstorage|project']} - bucket_name: 'genomics-public-data' - token_uri: "https://www.googleapis.com/oauth2/v4/token" + bucket_name: 'anaconda-public-data' + token_uri: "https://oauth2.googleapis.com/token" client_id: ${user.preferences['googlecloudstorage|client_id']} client_secret: ${user.preferences['googlecloudstorage|client_secret']} - token: ${user.preferences['googlecloudstorage|access_token']} + access_token: ${user.preferences['googlecloudstorage|access_token']} refresh_token: ${user.preferences['googlecloudstorage|refresh_token']} anonymous: true diff --git a/test/unit/files/test_gcsfs.py b/test/unit/files/test_gcsfs.py index 94a113717835..71f3ef66bb3f 100644 --- a/test/unit/files/test_gcsfs.py +++ b/test/unit/files/test_gcsfs.py @@ -2,24 +2,31 @@ import pytest -from ._util import assert_simple_file_realize +from ._util import ( + configured_file_sources, + list_root, + user_context_fixture, +) try: - from fs_gcsfs import GCSFS + from gcsfs import GCSFileSystem except ImportError: - GCSFS = None + GCSFileSystem = None SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) FILE_SOURCES_CONF = os.path.join(SCRIPT_DIRECTORY, "gcsfs_file_sources_conf.yml") skip_if_no_gcsfs_libs = pytest.mark.skipif( - not GCSFS, reason="Required lib to run gcs file source test: fs_gcsfs is not available" + not GCSFileSystem, reason="Required lib to run gcs file source test: gcsfs is not available" ) @skip_if_no_gcsfs_libs def test_file_source(): - assert_simple_file_realize( - FILE_SOURCES_CONF, recursive=False, filename="README", contents="1000genomes", contains=True - ) + """Test that we can list files from a public GCS bucket with anonymous access.""" + user_context = user_context_fixture() + file_sources = configured_file_sources(FILE_SOURCES_CONF) + res = list_root(file_sources, "gxfiles://test1", recursive=False, user_context=user_context) + # Verify we got some results from the public bucket + assert len(res) > 0, "Expected to find files/directories in the public GCS bucket" From 0902f434f080d3211382ad086cef02d33d1c352b Mon Sep 17 00:00:00 2001 From: Dannon Baker Date: Thu, 15 Jan 2026 22:14:24 -0500 Subject: [PATCH 2/5] Filter out self-referential entries in fsspec directory listings Some fsspec implementations (notably gcsfs) include the directory being listed as one of the entries in ls() results. This caused duplicate entries to appear when navigating into folders. --- lib/galaxy/files/sources/_fsspec.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/files/sources/_fsspec.py b/lib/galaxy/files/sources/_fsspec.py index ad234ad8430a..1d27f2e3a271 100644 --- a/lib/galaxy/files/sources/_fsspec.py +++ b/lib/galaxy/files/sources/_fsspec.py @@ -297,9 +297,13 @@ def _list_directory(self, fs: AbstractFileSystem, path: str) -> list[AnyRemoteEn """Handle standard directory listing without query filtering.""" entries_list = [] entries: list[dict] = fs.ls(path, detail=True) + # Normalize path for comparison (remove trailing slash) + normalized_path = path.rstrip("/") for entry in entries: entry_path = entry.get("name", entry.get("path", "")) - if entry_path: # Only process entries with valid paths + # Skip entries that match the directory being listed (some fsspec implementations + # include the directory itself in the listing results) + if entry_path and entry_path.rstrip("/") != normalized_path: entries_list.append(self._info_to_entry(entry)) return entries_list From a7ebcd799ee8ebcade9ac055f2d3e615536bf541 Mon Sep 17 00:00:00 2001 From: Dannon Baker Date: Thu, 15 Jan 2026 22:31:57 -0500 Subject: [PATCH 3/5] Use removeprefix() for cleaner bucket prefix removal in GCS file source --- lib/galaxy/files/sources/googlecloudstorage.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/files/sources/googlecloudstorage.py b/lib/galaxy/files/sources/googlecloudstorage.py index 0e32dbf8bc58..081e64b45e80 100644 --- a/lib/galaxy/files/sources/googlecloudstorage.py +++ b/lib/galaxy/files/sources/googlecloudstorage.py @@ -110,11 +110,10 @@ def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfigu def _adapt_entry_path(self, filesystem_path: str) -> str: """Remove the GCS bucket name from the filesystem path.""" if self.template_config.bucket_name: - bucket_prefix = f"{self.template_config.bucket_name}/" - if filesystem_path.startswith(bucket_prefix): - return "/" + filesystem_path[len(bucket_prefix) :] - elif filesystem_path == self.template_config.bucket_name: + if filesystem_path == self.template_config.bucket_name: return "/" + bucket_prefix = f"{self.template_config.bucket_name}/" + return "/" + filesystem_path.removeprefix(bucket_prefix) return "/" + filesystem_path def _list( From f659af70529238d9dcfe6dc5c3a307b942f031ff Mon Sep 17 00:00:00 2001 From: Dannon Baker Date: Thu, 15 Jan 2026 22:37:48 -0500 Subject: [PATCH 4/5] Preserve original config field names for backwards compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverts field renames that would break existing configurations: - service_account_credentials → service_account_json - access_token → token --- lib/galaxy/files/sources/googlecloudstorage.py | 16 ++++++++-------- test/unit/files/gcsfs_file_sources_conf.yml | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/galaxy/files/sources/googlecloudstorage.py b/lib/galaxy/files/sources/googlecloudstorage.py index 081e64b45e80..eed5e6b10eb1 100644 --- a/lib/galaxy/files/sources/googlecloudstorage.py +++ b/lib/galaxy/files/sources/googlecloudstorage.py @@ -31,11 +31,11 @@ class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemp bucket_name: Union[str, TemplateExpansion] project: Union[str, TemplateExpansion, None] = None anonymous: Union[bool, TemplateExpansion, None] = True - service_account_credentials: Union[str, TemplateExpansion, None] = None + service_account_json: Union[str, TemplateExpansion, None] = None # OAuth credentials client_id: Union[str, TemplateExpansion, None] = None client_secret: Union[str, TemplateExpansion, None] = None - access_token: Union[str, TemplateExpansion, None] = None + token: Union[str, TemplateExpansion, None] = None refresh_token: Union[str, TemplateExpansion, None] = None token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token" @@ -44,11 +44,11 @@ class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguratio bucket_name: str project: Optional[str] = None anonymous: Optional[bool] = True - service_account_credentials: Optional[str] = None + service_account_json: Optional[str] = None # OAuth credentials client_id: Optional[str] = None client_secret: Optional[str] = None - access_token: Optional[str] = None + token: Optional[str] = None refresh_token: Optional[str] = None token_uri: Optional[str] = "https://oauth2.googleapis.com/token" @@ -77,13 +77,13 @@ def _open_fs( if config.anonymous: # Use token='anon' for anonymous access to public buckets token = "anon" - elif config.service_account_credentials: + elif config.service_account_json: # Path to service account JSON file - token = config.service_account_credentials - elif config.access_token: + token = config.service_account_json + elif config.token: # OAuth credentials passed as a dictionary token = { - "access_token": config.access_token, + "access_token": config.token, "refresh_token": config.refresh_token, "client_id": config.client_id, "client_secret": config.client_secret, diff --git a/test/unit/files/gcsfs_file_sources_conf.yml b/test/unit/files/gcsfs_file_sources_conf.yml index d65523b2d926..fa9a57a8e3e8 100644 --- a/test/unit/files/gcsfs_file_sources_conf.yml +++ b/test/unit/files/gcsfs_file_sources_conf.yml @@ -6,6 +6,6 @@ token_uri: "https://oauth2.googleapis.com/token" client_id: ${user.preferences['googlecloudstorage|client_id']} client_secret: ${user.preferences['googlecloudstorage|client_secret']} - access_token: ${user.preferences['googlecloudstorage|access_token']} + token: ${user.preferences['googlecloudstorage|access_token']} refresh_token: ${user.preferences['googlecloudstorage|refresh_token']} anonymous: true From d15dc7d2b508d73220dbfac7d6790b276199a98a Mon Sep 17 00:00:00 2001 From: Dannon Baker Date: Thu, 15 Jan 2026 22:39:55 -0500 Subject: [PATCH 5/5] Restore root_path support for GCS file source Allows scoping the file source to a subdirectory within the bucket, matching the behavior of the previous fs-gcsfs implementation. --- .../files/sources/googlecloudstorage.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/galaxy/files/sources/googlecloudstorage.py b/lib/galaxy/files/sources/googlecloudstorage.py index eed5e6b10eb1..23d3f59b6c59 100644 --- a/lib/galaxy/files/sources/googlecloudstorage.py +++ b/lib/galaxy/files/sources/googlecloudstorage.py @@ -29,6 +29,7 @@ class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration): bucket_name: Union[str, TemplateExpansion] + root_path: Union[str, TemplateExpansion, None] = None project: Union[str, TemplateExpansion, None] = None anonymous: Union[bool, TemplateExpansion, None] = True service_account_json: Union[str, TemplateExpansion, None] = None @@ -42,6 +43,7 @@ class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemp class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration): bucket_name: str + root_path: Optional[str] = None project: Optional[str] = None anonymous: Optional[bool] = True service_account_json: Optional[str] = None @@ -101,19 +103,29 @@ def _open_fs( return fs def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str: - """Adapt the path to the GCS bucket format.""" + """Adapt the path to the GCS bucket format, including root_path if configured.""" bucket = config.bucket_name + root = (config.root_path or "").strip("/") if path.startswith("/"): path = path[1:] - return f"{bucket}/{path}" if path else bucket + # Build path: bucket / root_path / path + if root and path: + return f"{bucket}/{root}/{path}" + elif root: + return f"{bucket}/{root}" + elif path: + return f"{bucket}/{path}" + return bucket def _adapt_entry_path(self, filesystem_path: str) -> str: - """Remove the GCS bucket name from the filesystem path.""" + """Remove the GCS bucket name and root_path from the filesystem path.""" if self.template_config.bucket_name: - if filesystem_path == self.template_config.bucket_name: + bucket = self.template_config.bucket_name + root = (self.template_config.root_path or "").strip("/") + full_prefix = f"{bucket}/{root}" if root else bucket + if filesystem_path == full_prefix: return "/" - bucket_prefix = f"{self.template_config.bucket_name}/" - return "/" + filesystem_path.removeprefix(bucket_prefix) + return "/" + filesystem_path.removeprefix(f"{full_prefix}/") return "/" + filesystem_path def _list(