-
Notifications
You must be signed in to change notification settings - Fork 191
internal tagging #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
internal tagging #377
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||
import base64 | ||||||
import io | ||||||
import json | ||||||
import time | ||||||
from collections import defaultdict | ||||||
from datetime import datetime | ||||||
from pathlib import Path | ||||||
|
@@ -679,11 +680,14 @@ def __init__(self, projection: "AtlasProjection", auto_cleanup: Optional[bool] = | |||||
self.auto_cleanup = auto_cleanup | ||||||
|
||||||
@property | ||||||
def df(self, overwrite: bool = False) -> pd.DataFrame: | ||||||
def df(self, overwrite: bool = False, wait_time: int = 120) -> pd.DataFrame: | ||||||
""" | ||||||
Pandas DataFrame mapping each data point to its tags. | ||||||
|
||||||
Args: | ||||||
wait_time: The maximum time to wait while fetching a tag. | ||||||
""" | ||||||
tags = self.get_tags() | ||||||
tags = self.get_tags(wait_time=wait_time) | ||||||
tag_definition_ids = [tag["tag_definition_id"] for tag in tags] | ||||||
if self.auto_cleanup: | ||||||
self._remove_outdated_tag_files(tag_definition_ids) | ||||||
|
@@ -708,11 +712,25 @@ def df(self, overwrite: bool = False) -> pd.DataFrame: | |||||
tbs.append(tb) | ||||||
return pa.concat_tables(tbs).to_pandas() | ||||||
|
||||||
def get_tags(self) -> List[Dict[str, str]]: | ||||||
def is_tag_complete(self, tag_id) -> bool: | ||||||
is_complete = requests.get( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding error handling (e.g. |
||||||
self.dataset.atlas_api_path + "/v1/project/projection/tags/status", | ||||||
headers=self.dataset.header, | ||||||
params={ | ||||||
"project_id": self.dataset.id, | ||||||
"tag_id": tag_id, | ||||||
}, | ||||||
).json()["is_complete"] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding error handling for the GET request in |
||||||
return is_complete | ||||||
|
||||||
def get_tags(self, wait_time: int = 120) -> List[Dict[str, str]]: | ||||||
""" | ||||||
Retrieves back all tags made in the web browser for a specific map. | ||||||
Each tag is a dictionary containing tag_name, tag_id, and metadata. | ||||||
|
||||||
Args: | ||||||
wait_time: The maximum time to wait for a tag to be completed. | ||||||
|
||||||
Returns: | ||||||
A list of tags a user has created for projection. | ||||||
""" | ||||||
|
@@ -723,16 +741,31 @@ def get_tags(self) -> List[Dict[str, str]]: | |||||
).json() | ||||||
keep_tags = [] | ||||||
for tag in tags: | ||||||
is_complete = requests.get( | ||||||
self.dataset.atlas_api_path + "/v1/project/projection/tags/status", | ||||||
headers=self.dataset.header, | ||||||
params={ | ||||||
"project_id": self.dataset.id, | ||||||
"tag_id": tag["tag_id"], | ||||||
}, | ||||||
).json()["is_complete"] | ||||||
tag_id = tag["tag_id"] | ||||||
is_complete = self.is_tag_complete(tag_id) | ||||||
if is_complete: | ||||||
keep_tags.append(tag) | ||||||
else: | ||||||
# Use robotag route instead of v1/n so we guarantee only one request gets launched | ||||||
requests.post( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding error handling for the robotag POST request to ensure it succeeds before proceeding. |
||||||
self.dataset.atlas_api_path + "/v1/project/projection/tags/robotag", | ||||||
headers=self.dataset.header, | ||||||
json={"project_id": self.dataset.id, "tag_id": tag_id}, | ||||||
) | ||||||
wait_start = time.time() | ||||||
# Wait up to 5 minutes for tag to be completed | ||||||
while not is_complete: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is an inconsistency in the |
||||||
# Sleep 5 seconds | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment mentions sleeping 5 seconds, but the code uses
Suggested change
|
||||||
time.sleep(15) | ||||||
if time.time() >= wait_start + wait_time: | ||||||
break | ||||||
is_complete = self.is_tag_complete(tag_id) | ||||||
if is_complete: | ||||||
keep_tags.append(tag) | ||||||
else: | ||||||
logger.warning( | ||||||
f"Tag {tag['tag_name']} currently unavailable for download from SDK. Download from {self.projection.dataset_link} instead or try again." | ||||||
) | ||||||
return keep_tags | ||||||
|
||||||
def get_datums_in_tag(self, tag_name: str, overwrite: bool = False): | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using a
@property
with parameters. Consider convertingdf
to a regular method so that parameters likeoverwrite
andwait_time
can be explicitly passed.