Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions lib/galaxy/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,7 @@ def check_fs_sshfs(self):
def check_fs_googledrivefs(self):
return "googledrive" in self.file_sources

def check_fs_gcsfs(self):
return "googlecloudstorage" in self.file_sources

def check_google_cloud_storage(self):
def check_gcsfs(self):
return "googlecloudstorage" in self.file_sources

def check_onedatafilerestclient(self):
Expand Down
4 changes: 1 addition & 3 deletions lib/galaxy/dependencies/conditional-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ fs.dropboxfs>=1.0.3 # type: dropbox
fs.sshfs # type: ssh
fs.anvilfs # type: anvil
fs.googledrivefs # type: googledrive
fs-gcsfs # type: googlecloudstorage
# fs-gcsfs doesn't pin google-cloud-storage, and old versions log noisy exceptions and break test discovery
google-cloud-storage>=2.8.0 # type: googlecloudstorage
gcsfs # type: googlecloudstorage
fs.onedatarestfs==21.2.5.2 # type: onedata, depends on onedatafilerestclient
fs-basespace # type: basespace
fs-azureblob # type: azure
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/files/sources/_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,13 @@ def _list_directory(self, fs: AbstractFileSystem, path: str) -> list[AnyRemoteEn
"""Handle standard directory listing without query filtering."""
entries_list = []
entries: list[dict] = fs.ls(path, detail=True)
# Normalize path for comparison (remove trailing slash)
normalized_path = path.rstrip("/")
for entry in entries:
entry_path = entry.get("name", entry.get("path", ""))
if entry_path: # Only process entries with valid paths
# Skip entries that match the directory being listed (some fsspec implementations
# include the directory itself in the listing results)
if entry_path and entry_path.rstrip("/") != normalized_path:
entries_list.append(self._info_to_entry(entry))
return entries_list

Expand Down
259 changes: 108 additions & 151 deletions lib/galaxy/files/sources/googlecloudstorage.py
Original file line number Diff line number Diff line change
@@ -1,91 +1,132 @@
try:
from fs_gcsfs import GCSFS
from google.cloud.storage import Client
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
except ImportError:
GCSFS = None

import os
import logging
from typing import (
Optional,
Union,
)

from galaxy.files.models import (
AnyRemoteEntry,
BaseFileSourceConfiguration,
BaseFileSourceTemplateConfiguration,
FilesSourceRuntimeContext,
RemoteDirectory,
RemoteFile,
)
from galaxy.files.sources._fsspec import (
CacheOptionsDictType,
FsspecBaseFileSourceConfiguration,
FsspecBaseFileSourceTemplateConfiguration,
FsspecFilesSource,
)
from galaxy.util.config_templates import TemplateExpansion
from ._pyfilesystem2 import PyFilesystem2FilesSource

try:
from gcsfs import GCSFileSystem
except ImportError:
GCSFileSystem = None

class GoogleCloudStorageFileSourceTemplateConfiguration(BaseFileSourceTemplateConfiguration):

REQUIRED_PACKAGE = "gcsfs"

log = logging.getLogger(__name__)


class GoogleCloudStorageFileSourceTemplateConfiguration(FsspecBaseFileSourceTemplateConfiguration):
bucket_name: Union[str, TemplateExpansion]
root_path: Union[str, TemplateExpansion, None] = None
project: Union[str, TemplateExpansion, None] = None
anonymous: Union[bool, TemplateExpansion, None] = True
service_account_json: Union[str, TemplateExpansion, None] = None
token: Union[str, TemplateExpansion, None] = None
token_uri: Union[str, TemplateExpansion, None] = None
# OAuth credentials
client_id: Union[str, TemplateExpansion, None] = None
client_secret: Union[str, TemplateExpansion, None] = None
token: Union[str, TemplateExpansion, None] = None
refresh_token: Union[str, TemplateExpansion, None] = None
token_uri: Union[str, TemplateExpansion, None] = "https://oauth2.googleapis.com/token"


class GoogleCloudStorageFileSourceConfiguration(BaseFileSourceConfiguration):
class GoogleCloudStorageFileSourceConfiguration(FsspecBaseFileSourceConfiguration):
bucket_name: str
root_path: Optional[str] = None
project: Optional[str] = None
anonymous: Optional[bool] = True
service_account_json: Optional[str] = None
token: Optional[str] = None
token_uri: Optional[str] = None
# OAuth credentials
client_id: Optional[str] = None
client_secret: Optional[str] = None
token: Optional[str] = None
refresh_token: Optional[str] = None
token_uri: Optional[str] = "https://oauth2.googleapis.com/token"


class GoogleCloudStorageFilesSource(
PyFilesystem2FilesSource[
GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration
]
FsspecFilesSource[GoogleCloudStorageFileSourceTemplateConfiguration, GoogleCloudStorageFileSourceConfiguration]
):
plugin_type = "googlecloudstorage"
required_module = GCSFS
required_package = "fs-gcsfs"
required_module = GCSFileSystem
required_package = REQUIRED_PACKAGE

template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration
resolved_config_class = GoogleCloudStorageFileSourceConfiguration

def _open_fs(self, context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration]):
if GCSFS is None:
def _open_fs(
self,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
cache_options: CacheOptionsDictType,
):
if GCSFileSystem is None:
raise self.required_package_exception

config = context.config
token: Union[str, dict[str, Optional[str]], None]

if config.anonymous:
client = Client.create_anonymous_client()
# Use token='anon' for anonymous access to public buckets
token = "anon"
elif config.service_account_json:
credentials = service_account.Credentials.from_service_account_file(config.service_account_json)
client = Client(project=config.project, credentials=credentials)
# Path to service account JSON file
token = config.service_account_json
elif config.token:
client = Client(
project=config.project,
credentials=Credentials(
token=config.token,
token_uri=config.token_uri,
client_id=config.client_id,
client_secret=config.client_secret,
refresh_token=config.refresh_token,
),
)

handle = GCSFS(bucket_name=config.bucket_name, root_path=config.root_path or "", retry=0, client=client)
return handle
# OAuth credentials passed as a dictionary
token = {
"access_token": config.token,
"refresh_token": config.refresh_token,
"client_id": config.client_id,
"client_secret": config.client_secret,
"token_uri": config.token_uri,
}
else:
# Default: use application default credentials
token = None

fs = GCSFileSystem(
project=config.project,
token=token,
**cache_options,
)
return fs

def _to_bucket_path(self, path: str, config: GoogleCloudStorageFileSourceConfiguration) -> str:
"""Adapt the path to the GCS bucket format, including root_path if configured."""
bucket = config.bucket_name
root = (config.root_path or "").strip("/")
if path.startswith("/"):
path = path[1:]
# Build path: bucket / root_path / path
if root and path:
return f"{bucket}/{root}/{path}"
elif root:
return f"{bucket}/{root}"
elif path:
return f"{bucket}/{path}"
return bucket

def _adapt_entry_path(self, filesystem_path: str) -> str:
"""Remove the GCS bucket name and root_path from the filesystem path."""
if self.template_config.bucket_name:
bucket = self.template_config.bucket_name
root = (self.template_config.root_path or "").strip("/")
full_prefix = f"{bucket}/{root}" if root else bucket
if filesystem_path == full_prefix:
return "/"
return "/" + filesystem_path.removeprefix(f"{full_prefix}/")
return "/" + filesystem_path

def _list(
self,
Expand All @@ -98,128 +139,44 @@ def _list(
query: Optional[str] = None,
sort_by: Optional[str] = None,
) -> tuple[list[AnyRemoteEntry], int]:
"""
Override base class _list to work around fs_gcsfs limitation with virtual directories.

GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them.
This implementation uses the GCS API directly to list blobs, bypassing the problematic
getinfo() validation that fails for virtual directories.
"""
if recursive:
# For recursive listing, fall back to the base implementation
return super()._list(context, path, recursive, write_intent, limit, offset, query, sort_by)

# Open filesystem to get access to the bucket
with self._open_fs(context) as fs_handle:
# Access the bucket from the GCSFS object
bucket = fs_handle.bucket

# Convert path to GCS prefix format
# Remove leading/trailing slashes and add trailing slash for directory prefix
normalized_path = path.strip("/")
if normalized_path:
prefix = normalized_path + "/"
else:
prefix = ""

# List blobs with delimiter to get immediate children only (non-recursive)
delimiter = "/"

# Collect directories (prefixes) and files (blobs)
entries: list[AnyRemoteEntry] = []

# First iterator: Get directories from prefixes
page_iterator_dirs = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for page in page_iterator_dirs.pages:
for dir_prefix in page.prefixes:
# Remove the parent prefix and trailing slash to get just the dir name
dir_name = dir_prefix[len(prefix) :].rstrip("/")
if dir_name:
full_path = os.path.join("/", normalized_path, dir_name) if normalized_path else f"/{dir_name}"
uri = self.uri_from_path(full_path)
entries.append(RemoteDirectory(name=dir_name, uri=uri, path=full_path))

# Second iterator: Get files from blobs
page_iterator_files = bucket.list_blobs(prefix=prefix, delimiter=delimiter)
for blob in page_iterator_files:
# Skip directory marker objects (empty blobs ending with /)
if blob.name.endswith("/"):
continue

# Get just the filename (remove prefix)
file_name = blob.name[len(prefix) :]
if file_name:
full_path = os.path.join("/", normalized_path, file_name) if normalized_path else f"/{file_name}"
uri = self.uri_from_path(full_path)

# Convert blob metadata to RemoteFile
ctime = None
if blob.time_created:
ctime = blob.time_created.isoformat()

entries.append(
RemoteFile(name=file_name, size=blob.size or 0, ctime=ctime, uri=uri, path=full_path)
)

# Apply query filter if provided
if query:
query_lower = query.lower()
entries = [e for e in entries if query_lower in e.name.lower()]

# Get total count before pagination
total_count = len(entries)

# Apply pagination
if offset is not None or limit is not None:
start = offset or 0
end = start + limit if limit is not None else None
entries = entries[start:end]

return entries, total_count
bucket_path = self._to_bucket_path(path, context.config)
return super()._list(
context=context,
path=bucket_path,
recursive=recursive,
limit=limit,
offset=offset,
query=query,
sort_by=sort_by,
)

def _realize_to(
self,
source_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket

# Convert path to GCS blob key
normalized_path = source_path.strip("/")

# Get the blob
blob = bucket.get_blob(normalized_path)
if not blob:
raise Exception(f"File not found: {source_path}")

# Download directly to file
with open(native_path, "wb") as write_file:
blob.download_to_file(write_file)
bucket_path = self._to_bucket_path(source_path, context.config)
super()._realize_to(source_path=bucket_path, native_path=native_path, context=context)

def _write_from(
self,
target_path: str,
native_path: str,
context: FilesSourceRuntimeContext[GoogleCloudStorageFileSourceConfiguration],
):
"""
Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks.
"""
with self._open_fs(context) as fs_handle:
bucket = fs_handle.bucket

# Convert path to GCS blob key
normalized_path = target_path.strip("/")

# Create blob and upload
blob = bucket.blob(normalized_path)
with open(native_path, "rb") as read_file:
blob.upload_from_file(read_file)
bucket_path = self._to_bucket_path(target_path, context.config)
super()._write_from(target_path=bucket_path, native_path=native_path, context=context)

def score_url_match(self, url: str):
bucket_name = self.template_config.bucket_name
# For security, we need to ensure that a partial match doesn't work
if bucket_name and (url.startswith(f"gs://{bucket_name}/") or url == f"gs://{bucket_name}"):
return len(f"gs://{bucket_name}")
elif bucket_name and (url.startswith(f"gcs://{bucket_name}/") or url == f"gcs://{bucket_name}"):
return len(f"gcs://{bucket_name}")
else:
return super().score_url_match(url)


__all__ = ("GoogleCloudStorageFilesSource",)
6 changes: 3 additions & 3 deletions packages/files/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ packages = find:
python_requires = >=3.9

[options.extras_require]
test =
test =
pytest
fs-gcsfs
s3fs>=2023.1.0,<2024
gcsfs
Comment thread
mvdbeek marked this conversation as resolved.
s3fs>=2023.1.0

[options.entry_points]
console_scripts =
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ test = [
"boto3",
"cwltest>=2.5.20240906231108", # Python 3.13 support
"fluent-logger",
"gcsfs",
"lxml!=4.2.2",
"onedatafilerestclient==21.2.5.2",
"pkce",
Expand Down
4 changes: 2 additions & 2 deletions test/unit/files/gcsfs_file_sources_conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
id: test1
doc: Test access to Google Cloud Storage.
project: ${user.preferences['googlecloudstorage|project']}
bucket_name: 'genomics-public-data'
token_uri: "https://www.googleapis.com/oauth2/v4/token"
bucket_name: 'anaconda-public-data'
token_uri: "https://oauth2.googleapis.com/token"
client_id: ${user.preferences['googlecloudstorage|client_id']}
client_secret: ${user.preferences['googlecloudstorage|client_secret']}
token: ${user.preferences['googlecloudstorage|access_token']}
Expand Down
Loading
Loading