Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/galaxy/managers/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,8 @@ def copy(
flush=False,
element_destination=element_destination,
dataset_instance_attributes=dataset_instance_attributes,
target_user=trans.get_user(),
)
new_hdca.copy_tags_from(target_user=trans.get_user(), source=source_hdca)
if not copy_elements:
parent.add_dataset_collection(new_hdca)
trans.sa_session.commit()
Expand Down
15 changes: 11 additions & 4 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3747,13 +3747,18 @@ def copy(self, name=None, target_user=None, activatable=False, all_datasets=Fals
else:
hdcas = self.active_dataset_collections
for hdca in hdcas:
new_hdca = hdca.copy(flush=False, element_destination=new_history, set_hid=False, minimize_copies=True)
new_hdca = hdca.copy(
flush=False,
element_destination=new_history,
set_hid=False,
minimize_copies=True,
target_user=target_user,
)
new_history.add_dataset_collection(new_hdca, set_hid=False)
db_session.add(new_hdca)

if target_user:
new_hdca.copy_item_annotation(db_session, self.user, hdca, target_user, new_hdca)
new_hdca.copy_tags_from(target_user, hdca)

new_history.hid_counter = self.hid_counter
db_session.commit()
Expand Down Expand Up @@ -7850,6 +7855,7 @@ def copy(
flush: bool = True,
set_hid: bool = True,
minimize_copies: bool = False,
target_user: Optional[User] = None,
):
"""
Create a copy of this history dataset collection association. Copy
Expand Down Expand Up @@ -7878,8 +7884,9 @@ def copy(
hdca.collection = collection_copy
session = required_object_session(self)
session.add(hdca)
if self.history and self.history.user:
hdca.copy_tags_from(self.history.user, self)
copy_user = target_user or (self.history.user if self.history else None)
if copy_user:
hdca.copy_tags_from(copy_user, self)
if element_destination and set_hid:
element_destination.stage_addition(hdca)
element_destination.add_pending_items()
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/webapps/galaxy/services/history_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ def __update_dataset(
):
# anon user: ensure that history ids match up and the history is the current,
# check for uploading, and use only the subset of attribute keys manipulatable by anon users
if hda := self.__datasets_for_update(trans, history, [id], payload)[0]:
hdas = self.__datasets_for_update(trans, history, [id], payload)
if hdas and (hda := hdas[0]):
self.__deserialize_dataset(trans, hda, payload)
serialization_params.default_view = "detailed"
return self.hda_serializer.serialize_to_view(
Expand Down
42 changes: 42 additions & 0 deletions lib/galaxy_test/api/test_histories.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,48 @@ def test_copy_history(self):
new_update_time = history["update_time"]
assert original_update_time == new_update_time

def test_copy_history_does_not_duplicate_tags(self):
history_id = self.dataset_populator.new_history()
# Create a standalone dataset and tag it
new_hda = self.dataset_populator.new_dataset(history_id, content="tagged dataset")
hda_id = new_hda["id"]
self.dataset_populator.tag_dataset(history_id, hda_id, tags=["hda_tag"])
# Create a collection and tag it
fetch_response = self.dataset_collection_populator.create_list_in_history(
history_id, contents=["Hello", "World"], direct_upload=True
)
collection = self.dataset_collection_populator.wait_for_fetched_collection(fetch_response.json())
hdca_id = collection["id"]
self._put(
f"histories/{history_id}/contents/dataset_collections/{hdca_id}",
data={"tags": ["hdca_tag"]},
json=True,
).raise_for_status()
# Also tag a dataset within the collection
element_hda_id = collection["elements"][0]["object"]["id"]
self.dataset_populator.tag_dataset(history_id, element_hda_id, tags=["element_tag"])

# Copy the history
copied_history_response = self.dataset_populator.copy_history(history_id)
copied_history_response.raise_for_status()
copied_history = copied_history_response.json()
copied_history_id = copied_history["id"]

# Verify standalone HDA tags are not duplicated
copied_contents = self._get(f"histories/{copied_history_id}/contents").json()
copied_hdas = [c for c in copied_contents if c["history_content_type"] == "dataset" and c["visible"]]
assert len(copied_hdas) == 1
copied_hda_details = self.dataset_populator.get_history_dataset_details(
history_id=copied_history_id, dataset_id=copied_hdas[0]["id"]
)
assert copied_hda_details["tags"] == ["hda_tag"], f"Expected ['hda_tag'] but got {copied_hda_details['tags']}"

# Verify HDCA tags are not duplicated
copied_collection = self.dataset_populator.get_history_collection_details(
history_id=copied_history_id, history_content_type="dataset_collection"
)
assert copied_collection["tags"] == ["hdca_tag"], f"Expected ['hdca_tag'] but got {copied_collection['tags']}"

# TODO: (CE) test_create_from_copy
def test_import_from_model_store_dict(self):
response = self.dataset_populator.create_from_store(store_dict=history_model_store_dict())
Expand Down
Loading