1
1
import base64
2
2
import io
3
3
import json
4
+ import time
4
5
from collections import defaultdict
5
6
from datetime import datetime
6
7
from pathlib import Path
@@ -679,11 +680,14 @@ def __init__(self, projection: "AtlasProjection", auto_cleanup: Optional[bool] =
679
680
self .auto_cleanup = auto_cleanup
680
681
681
682
@property
682
- def df (self , overwrite : bool = False ) -> pd .DataFrame :
683
+ def df (self , overwrite : bool = False , wait_time : int = 120 ) -> pd .DataFrame :
683
684
"""
684
685
Pandas DataFrame mapping each data point to its tags.
686
+
687
+ Args:
688
+ wait_time: The maximum time to wait while fetching a tag.
685
689
"""
686
- tags = self .get_tags ()
690
+ tags = self .get_tags (wait_time = wait_time )
687
691
tag_definition_ids = [tag ["tag_definition_id" ] for tag in tags ]
688
692
if self .auto_cleanup :
689
693
self ._remove_outdated_tag_files (tag_definition_ids )
@@ -707,12 +711,26 @@ def df(self, overwrite: bool = False) -> pd.DataFrame:
707
711
tb = tb .append_column (tag ["tag_name" ], bitmask )
708
712
tbs .append (tb )
709
713
return pa .concat_tables (tbs ).to_pandas ()
714
+
715
+ def is_tag_complete (self , tag_id ) -> bool :
716
+ is_complete = requests .get (
717
+ self .dataset .atlas_api_path + "/v1/project/projection/tags/status" ,
718
+ headers = self .dataset .header ,
719
+ params = {
720
+ "project_id" : self .dataset .id ,
721
+ "tag_id" : tag_id ,
722
+ },
723
+ ).json ()["is_complete" ]
724
+ return is_complete
710
725
711
- def get_tags (self ) -> List [Dict [str , str ]]:
726
+ def get_tags (self , wait_time : int = 120 ) -> List [Dict [str , str ]]:
712
727
"""
713
728
Retrieves back all tags made in the web browser for a specific map.
714
729
Each tag is a dictionary containing tag_name, tag_id, and metadata.
715
730
731
+ Args:
732
+ wait_time: The maximum time to wait for a tag to be completed.
733
+
716
734
Returns:
717
735
A list of tags a user has created for projection.
718
736
"""
@@ -723,16 +741,26 @@ def get_tags(self) -> List[Dict[str, str]]:
723
741
).json ()
724
742
keep_tags = []
725
743
for tag in tags :
726
- is_complete = requests .get (
727
- self .dataset .atlas_api_path + "/v1/project/projection/tags/status" ,
728
- headers = self .dataset .header ,
729
- params = {
730
- "project_id" : self .dataset .id ,
731
- "tag_id" : tag ["tag_id" ],
732
- },
733
- ).json ()["is_complete" ]
744
+ tag_id = tag ["tag_id" ]
745
+ is_complete = self .is_tag_complete (tag_id )
734
746
if is_complete :
735
747
keep_tags .append (tag )
748
+ else :
749
+ # Use robotag route instead of v1/n so we guarantee only one request gets launched
750
+ requests .post (self .dataset .atlas_api_path + "/v1/project/projection/tags/robotag" , headers = self .dataset .header ,
751
+ json = {"project_id" : self .dataset .id , "tag_id" : tag_id })
752
+ wait_start = time .time ()
753
+ # Wait up to 5 minutes for tag to be completed
754
+ while not is_complete :
755
+ # Sleep 5 seconds
756
+ time .sleep (15 )
757
+ if time .time () >= wait_start + wait_time :
758
+ break
759
+ is_complete = self .is_tag_complete (tag_id )
760
+ if is_complete :
761
+ keep_tags .append (tag )
762
+ else :
763
+ logger .warning (f"Tag { tag ['tag_name' ]} currently unavailable for download from SDK. Download from { self .projection .dataset_link } instead or try again." )
736
764
return keep_tags
737
765
738
766
def get_datums_in_tag (self , tag_name : str , overwrite : bool = False ):
0 commit comments