diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index 8af7472ecd38..fb103233c248 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -263,10 +263,7 @@ def check_fs_sshfs(self): def check_fs_googledrivefs(self): return "googledrive" in self.file_sources - def check_fs_gcsfs(self): - return "googlecloudstorage" in self.file_sources - - def check_google_cloud_storage(self): + def check_gcsfs(self): return "googlecloudstorage" in self.file_sources def check_onedatafilerestclient(self): diff --git a/lib/galaxy/dependencies/conditional-requirements.txt b/lib/galaxy/dependencies/conditional-requirements.txt index 457374bbcfb6..91288a742ea4 100644 --- a/lib/galaxy/dependencies/conditional-requirements.txt +++ b/lib/galaxy/dependencies/conditional-requirements.txt @@ -26,9 +26,7 @@ fs.dropboxfs>=1.0.3 # type: dropbox fs.sshfs # type: ssh fs.anvilfs # type: anvil fs.googledrivefs # type: googledrive -fs-gcsfs # type: googlecloudstorage -# fs-gcsfs doesn't pin google-cloud-storage, and old versions log noisy exceptions and break test discovery -google-cloud-storage>=2.8.0 # type: googlecloudstorage +gcsfs # type: googlecloudstorage fs.onedatarestfs==21.2.5.2 # type: onedata, depends on onedatafilerestclient fs-basespace # type: basespace fs-azureblob # type: azure diff --git a/lib/galaxy/files/sources/_fsspec.py b/lib/galaxy/files/sources/_fsspec.py index ad234ad8430a..1d27f2e3a271 100644 --- a/lib/galaxy/files/sources/_fsspec.py +++ b/lib/galaxy/files/sources/_fsspec.py @@ -297,9 +297,13 @@ def _list_directory(self, fs: AbstractFileSystem, path: str) -> list[AnyRemoteEn """Handle standard directory listing without query filtering.""" entries_list = [] entries: list[dict] = fs.ls(path, detail=True) + # Normalize path for comparison (remove trailing slash) + normalized_path = path.rstrip("/") for entry in entries: entry_path = entry.get("name", entry.get("path", "")) - if entry_path: # Only process entries with valid paths + # Skip entries that match the directory being listed (some fsspec implementations + # include the directory itself in the listing results) + if entry_path and entry_path.rstrip("/") != normalized_path: entries_list.append(self._info_to_entry(entry)) return entries_list diff --git a/lib/galaxy/files/sources/googlecloudstorage.py b/lib/galaxy/files/sources/googlecloudstorage.py index 0631775eeccb..23d3f59b6c59 100644 --- a/lib/galaxy/files/sources/googlecloudstorage.py +++ b/lib/galaxy/files/sources/googlecloudstorage.py @@ -1,12 +1,4 @@ -try: - from fs_gcsfs import GCSFS - from google.cloud.storage import Client - from google.oauth2 import service_account - from google.oauth2.credentials import Credentials -except ImportError: - GCSFS = None - -import os +import logging from typing import ( Optional, Union, @@ -14,78 +6,127 @@ from galaxy.files.models import ( AnyRemoteEntry, - BaseFileSourceConfiguration, - BaseFileSourceTemplateConfiguration, FilesSourceRuntimeContext, - RemoteDirectory, - RemoteFile, +) +from galaxy.files.sources._fsspec import ( + CacheOptionsDictType, + FsspecBaseFileSourceConfiguration, + FsspecBaseFileSourceTemplateConfiguration, + FsspecFilesSource, ) from galaxy.util.config_templates import TemplateExpansion -from ._pyfilesystem2 import PyFilesystem2FilesSource +try: + from gcsfs import GCSFileSystem +except ImportError: + GCSFileSystem = None -class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration): + +REQUIRED_PACKAGE = "gcsfs" + +log = logging.getLogger(__name__) + + +class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration): bucket_name: Union[str, TemplateExpansion] root_path: Union[str, TemplateExpansion, None] = None project: Union[str, TemplateExpansion, None] = None anonymous: Union[bool, TemplateExpansion, None] = True service_account_json: Union[str, TemplateExpansion, None] = None - token: Union[str, TemplateExpansion, None] = None - token_uri: Union[str, TemplateExpansion, None] = None + # OAuth credentials client_id: Union[str, TemplateExpansion, None] = None client_secret: Union[str, TemplateExpansion, None] = None + token: Union[str, TemplateExpansion, None] = None refresh_token: Union[str, TemplateExpansion, None] = None + token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token" -class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration): +class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration): bucket_name: str root_path: Optional[str] = None project: Optional[str] = None anonymous: Optional[bool] = True service_account_json: Optional[str] = None - token: Optional[str] = None - token_uri: Optional[str] = None + # OAuth credentials client_id: Optional[str] = None client_secret: Optional[str] = None + token: Optional[str] = None refresh_token: Optional[str] = None + token_uri: Optional[str] = "https://oauth2.googleapis.com/token" class GoogleCloudStorageFilesSource( - PyFilesystem2FilesSource[ - GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration - ] + FsspecFilesSource[GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration] ): plugin_type = "googlecloudstorage" - required_module = GCSFS - required_package = "fs-gcsfs" + required_module = GCSFileSystem + required_package = REQUIRED_PACKAGE template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration resolved_config_class = GoogleCloudStorageFileSourceConfiguration - def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]): - if GCSFS is None: + def _open_fs( + self, + context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], + cache_options: CacheOptionsDictType, + ): + if GCSFileSystem is None: raise self.required_package_exception config = context.config + token: Union[str, dict[str, Optional[str]], None] + if config.anonymous: - client = Client.create_anonymous_client() + # Use token='anon' for anonymous access to public buckets + token = "anon" elif config.service_account_json: - credentials = service_account.Credentials.from_service_account_file(config.service_account_json) - client = Client(project=config.project, credentials=credentials) + # Path to service account JSON file + token = config.service_account_json elif config.token: - client = Client( - project=config.project, - credentials=Credentials( - token=config.token, - token_uri=config.token_uri, - client_id=config.client_id, - client_secret=config.client_secret, - refresh_token=config.refresh_token, - ), - ) - - handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client) - return handle + # OAuth credentials passed as a dictionary + token = { + "access_token": config.token, + "refresh_token": config.refresh_token, + "client_id": config.client_id, + "client_secret": config.client_secret, + "token_uri": config.token_uri, + } + else: + # Default: use application default credentials + token = None + + fs = GCSFileSystem( + project=config.project, + token=token, + **cache_options, + ) + return fs + + def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str: + """Adapt the path to the GCS bucket format, including root_path if configured.""" + bucket = config.bucket_name + root = (config.root_path or "").strip("/") + if path.startswith("/"): + path = path[1:] + # Build path: bucket / root_path / path + if root and path: + return f"{bucket}/{root}/{path}" + elif root: + return f"{bucket}/{root}" + elif path: + return f"{bucket}/{path}" + return bucket + + def _adapt_entry_path(self, filesystem_path: str) -> str: + """Remove the GCS bucket name and root_path from the filesystem path.""" + if self.template_config.bucket_name: + bucket = self.template_config.bucket_name + root = (self.template_config.root_path or "").strip("/") + full_prefix = f"{bucket}/{root}" if root else bucket + if filesystem_path == full_prefix: + return "/" + return "/" + filesystem_path.removeprefix(f"{full_prefix}/") + return "/" + filesystem_path def _list( self, @@ -98,84 +139,16 @@ def _list( query: Optional[str] = None, sort_by: Optional[str] = None, ) -> tuple[list[AnyRemoteEntry], int]: - """ - Override base class _list to work around fs_gcsfs limitation with virtual directories. - - GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them. - This implementation uses the GCS API directly to list blobs, bypassing the problematic - getinfo() validation that fails for virtual directories. - """ - if recursive: - # For recursive listing, fall back to the base implementation - return super()._list(context, path, recursive, write_intent, limit, offset, query, sort_by) - - # Open filesystem to get access to the bucket - with self._open_fs(context) as fs_handle: - # Access the bucket from the GCSFS object - bucket = fs_handle.bucket - - # Convert path to GCS prefix format - # Remove leading/trailing slashes and add trailing slash for directory prefix - normalized_path = path.strip("/") - if normalized_path: - prefix = normalized_path + "/" - else: - prefix = "" - - # List blobs with delimiter to get immediate children only (non-recursive) - delimiter = "/" - - # Collect directories (prefixes) and files (blobs) - entries: list[AnyRemoteEntry] = [] - - # First iterator: Get directories from prefixes - page_iterator_dirs = bucket.list_blobs(prefix=prefix, delimiter=delimiter) - for page in page_iterator_dirs.pages: - for dir_prefix in page.prefixes: - # Remove the parent prefix and trailing slash to get just the dir name - dir_name = dir_prefix[len(prefix) :].rstrip("/") - if dir_name: - full_path = os.path.join("/", normalized_path, dir_name) if normalized_path else f"/{dir_name}" - uri = self.uri_from_path(full_path) - entries.append(RemoteDirectory(name=dir_name, uri=uri, path=full_path)) - - # Second iterator: Get files from blobs - page_iterator_files = bucket.list_blobs(prefix=prefix, delimiter=delimiter) - for blob in page_iterator_files: - # Skip directory marker objects (empty blobs ending with /) - if blob.name.endswith("/"): - continue - - # Get just the filename (remove prefix) - file_name = blob.name[len(prefix) :] - if file_name: - full_path = os.path.join("/", normalized_path, file_name) if normalized_path else f"/{file_name}" - uri = self.uri_from_path(full_path) - - # Convert blob metadata to RemoteFile - ctime = None - if blob.time_created: - ctime = blob.time_created.isoformat() - - entries.append( - RemoteFile(name=file_name, size=blob.size or 0, ctime=ctime, uri=uri, path=full_path) - ) - - # Apply query filter if provided - if query: - query_lower = query.lower() - entries = [e for e in entries if query_lower in e.name.lower()] - - # Get total count before pagination - total_count = len(entries) - - # Apply pagination - if offset is not None or limit is not None: - start = offset or 0 - end = start + limit if limit is not None else None - entries = entries[start:end] - - return entries, total_count + bucket_path = self._to_bucket_path(path, context.config) + return super()._list( + context=context, + path=bucket_path, + recursive=recursive, + limit=limit, + offset=offset, + query=query, + sort_by=sort_by, + ) def _realize_to( self, @@ -183,23 +156,8 @@ def _realize_to( native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): - """ - Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks. - """ - with self._open_fs(context) as fs_handle: - bucket = fs_handle.bucket - - # Convert path to GCS blob key - normalized_path = source_path.strip("/") - - # Get the blob - blob = bucket.get_blob(normalized_path) - if not blob: - raise Exception(f"File not found: {source_path}") - - # Download directly to file - with open(native_path, "wb") as write_file: - blob.download_to_file(write_file) + bucket_path = self._to_bucket_path(source_path, context.config) + super()._realize_to(source_path=bucket_path, native_path=native_path, context=context) def _write_from( self, @@ -207,19 +165,18 @@ def _write_from( native_path: str, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration], ): - """ - Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks. - """ - with self._open_fs(context) as fs_handle: - bucket = fs_handle.bucket - - # Convert path to GCS blob key - normalized_path = target_path.strip("/") - - # Create blob and upload - blob = bucket.blob(normalized_path) - with open(native_path, "rb") as read_file: - blob.upload_from_file(read_file) + bucket_path = self._to_bucket_path(target_path, context.config) + super()._write_from(target_path=bucket_path, native_path=native_path, context=context) + + def score_url_match(self, url: str): + bucket_name = self.template_config.bucket_name + # For security, we need to ensure that a partial match doesn't work + if bucket_name and (url.startswith(f"gs://{bucket_name}/") or url == f"gs://{bucket_name}"): + return len(f"gs://{bucket_name}") + elif bucket_name and (url.startswith(f"gcs://{bucket_name}/") or url == f"gcs://{bucket_name}"): + return len(f"gcs://{bucket_name}") + else: + return super().score_url_match(url) __all__ = ("GoogleCloudStorageFilesSource",) diff --git a/packages/files/setup.cfg b/packages/files/setup.cfg index f923f5c1d743..bd87b8f5f8fc 100644 --- a/packages/files/setup.cfg +++ b/packages/files/setup.cfg @@ -41,10 +41,10 @@ packages = find: python_requires = >=3.9 [options.extras_require] -test = +test = pytest - fs-gcsfs - s3fs>=2023.1.0,<2024 + gcsfs + s3fs>=2023.1.0 [options.entry_points] console_scripts = diff --git a/pyproject.toml b/pyproject.toml index 38ee9bbd894b..a02d32f5f023 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,7 @@ test = [ "boto3", "cwltest>=2.5.20240906231108", # Python 3.13 support "fluent-logger", + "gcsfs", "lxml!=4.2.2", "onedatafilerestclient==21.2.5.2", "pkce", diff --git a/test/unit/files/gcsfs_file_sources_conf.yml b/test/unit/files/gcsfs_file_sources_conf.yml index d823636c6505..fa9a57a8e3e8 100644 --- a/test/unit/files/gcsfs_file_sources_conf.yml +++ b/test/unit/files/gcsfs_file_sources_conf.yml @@ -2,8 +2,8 @@ id: test1 doc: Test access to Google Cloud Storage. project: ${user.preferences['googlecloudstorage|project']} - bucket_name: 'genomics-public-data' - token_uri: "https://www.googleapis.com/oauth2/v4/token" + bucket_name: 'anaconda-public-data' + token_uri: "https://oauth2.googleapis.com/token" client_id: ${user.preferences['googlecloudstorage|client_id']} client_secret: ${user.preferences['googlecloudstorage|client_secret']} token: ${user.preferences['googlecloudstorage|access_token']} diff --git a/test/unit/files/test_gcsfs.py b/test/unit/files/test_gcsfs.py index 94a113717835..71f3ef66bb3f 100644 --- a/test/unit/files/test_gcsfs.py +++ b/test/unit/files/test_gcsfs.py @@ -2,24 +2,31 @@ import pytest -from ._util import assert_simple_file_realize +from ._util import ( + configured_file_sources, + list_root, + user_context_fixture, +) try: - from fs_gcsfs import GCSFS + from gcsfs import GCSFileSystem except ImportError: - GCSFS = None + GCSFileSystem = None SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) FILE_SOURCES_CONF = os.path.join(SCRIPT_DIRECTORY, "gcsfs_file_sources_conf.yml") skip_if_no_gcsfs_libs = pytest.mark.skipif( - not GCSFS, reason="Required lib to run gcs file source test: fs_gcsfs is not available" + not GCSFileSystem, reason="Required lib to run gcs file source test: gcsfs is not available" ) @skip_if_no_gcsfs_libs def test_file_source(): - assert_simple_file_realize( - FILE_SOURCES_CONF, recursive=False, filename="README", contents="1000genomes", contains=True - ) + """Test that we can list files from a public GCS bucket with anonymous access.""" + user_context = user_context_fixture() + file_sources = configured_file_sources(FILE_SOURCES_CONF) + res = list_root(file_sources, "gxfiles://test1", recursive=False, user_context=user_context) + # Verify we got some results from the public bucket + assert len(res) > 0, "Expected to find files/directories in the public GCS bucket"