7
7
from builtins import *
8
8
from builtins import object
9
9
import logging
10
+ import time
10
11
11
12
import emission .storage .timeseries .abstract_timeseries as esta
12
13
import emission .storage .decorations .place_queries as esdp
23
24
import emission .analysis .intake .segmentation .restart_checking as eaisr
24
25
25
26
import emission .core .common as ecc
27
+ import emission .storage .decorations .stats_queries as esds
28
+ import emission .core .timer as ect
29
+ import emission .core .wrapper .pipelinestate as ecwp
26
30
27
31
class TripSegmentationMethod (object ):
28
32
def segment_into_trips (self , timeseries , time_query ):
@@ -47,54 +51,76 @@ def segment_into_trips(self, timeseries, time_query):
47
51
pass
48
52
49
53
def segment_current_trips (user_id ):
50
- ts = esta .TimeSeries .get_time_series (user_id )
51
- time_query = epq .get_time_range_for_segmentation (user_id )
54
+ with ect .Timer () as t_get_time_series :
55
+ ts = esta .TimeSeries .get_time_series (user_id )
56
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/get_time_series" , time .time (), t_get_time_series .elapsed )
57
+
58
+ with ect .Timer () as t_get_time_range :
59
+ time_query = epq .get_time_range_for_segmentation (user_id )
60
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/get_time_range_for_segmentation" , time .time (), t_get_time_range .elapsed )
52
61
53
62
import emission .analysis .intake .segmentation .trip_segmentation_methods .dwell_segmentation_time_filter as dstf
54
63
import emission .analysis .intake .segmentation .trip_segmentation_methods .dwell_segmentation_dist_filter as dsdf
55
- dstfsm = dstf .DwellSegmentationTimeFilter (time_threshold = 5 * 60 , # 5 mins
56
- point_threshold = 9 ,
57
- distance_threshold = 100 ) # 100 m
58
64
59
- dsdfsm = dsdf .DwellSegmentationDistFilter (time_threshold = 10 * 60 , # 10 mins
60
- point_threshold = 9 ,
61
- distance_threshold = 50 ) # 50 m
65
+ with ect .Timer () as t_create_time_filter :
66
+ dstfsm = dstf .DwellSegmentationTimeFilter (time_threshold = 5 * 60 , # 5 mins
67
+ point_threshold = 9 ,
68
+ distance_threshold = 100 ) # 100 m
69
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/create_time_filter" , time .time (), t_create_time_filter .elapsed )
70
+
71
+ with ect .Timer () as t_create_dist_filter :
72
+ dsdfsm = dsdf .DwellSegmentationDistFilter (time_threshold = 10 * 60 , # 10 mins
73
+ point_threshold = 9 ,
74
+ distance_threshold = 50 ) # 50 m
75
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/create_dist_filter" , time .time (), t_create_dist_filter .elapsed )
62
76
63
77
filter_methods = {"time" : dstfsm , "distance" : dsdfsm }
64
78
filter_method_names = {"time" : "DwellSegmentationTimeFilter" , "distance" : "DwellSegmentationDistFilter" }
79
+
65
80
# We need to use the appropriate filter based on the incoming data
66
81
# So let's read in the location points for the specified query
67
- loc_df = ts .get_data_df ("background/filtered_location" , time_query )
82
+ with ect .Timer () as t_get_data_df :
83
+ loc_df = ts .get_data_df ("background/filtered_location" , time_query )
84
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/get_data_df" , time .time (), t_get_data_df .elapsed )
85
+
68
86
if len (loc_df ) == 0 :
69
87
# no new segments, no need to keep looking at these again
70
88
logging .debug ("len(loc_df) == 0, early return" )
71
89
epq .mark_segmentation_done (user_id , None )
72
90
return
73
91
74
- out_of_order_points = loc_df [loc_df .ts .diff () < 0 ]
75
- if len (out_of_order_points ) > 0 :
76
- logging .info ("Found out of order points!" )
77
- logging .info ("%s" % out_of_order_points )
78
- # drop from the table
79
- loc_df = loc_df .drop (out_of_order_points .index .tolist ())
80
- loc_df .reset_index (inplace = True )
81
- # invalidate in the database.
82
- out_of_order_id_list = out_of_order_points ["_id" ].tolist ()
83
- logging .debug ("out_of_order_id_list = %s" % out_of_order_id_list )
84
- for ooid in out_of_order_id_list :
85
- ts .invalidate_raw_entry (ooid )
86
-
87
- filters_in_df = loc_df ["filter" ].dropna ().unique ()
92
+ with ect .Timer () as t_handle_out_of_order :
93
+ out_of_order_points = loc_df [loc_df .ts .diff () < 0 ]
94
+ if len (out_of_order_points ) > 0 :
95
+ logging .info ("Found out of order points!" )
96
+ logging .info ("%s" % out_of_order_points )
97
+ # drop from the table
98
+ loc_df = loc_df .drop (out_of_order_points .index .tolist ())
99
+ loc_df .reset_index (inplace = True )
100
+ # invalidate in the database.
101
+ out_of_order_id_list = out_of_order_points ["_id" ].tolist ()
102
+ logging .debug ("out_of_order_id_list = %s" % out_of_order_id_list )
103
+ for ooid in out_of_order_id_list :
104
+ ts .invalidate_raw_entry (ooid )
105
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/handle_out_of_order_points" , time .time (), t_handle_out_of_order .elapsed )
106
+
107
+ with ect .Timer () as t_get_filters :
108
+ filters_in_df = loc_df ["filter" ].dropna ().unique ()
109
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/get_filters_in_df" , time .time (), t_get_filters .elapsed )
110
+
88
111
logging .debug ("Filters in the dataframe = %s" % filters_in_df )
89
112
if len (filters_in_df ) == 1 :
90
113
# Common case - let's make it easy
91
-
92
- segmentation_points = filter_methods [filters_in_df [0 ]].segment_into_trips (ts ,
93
- time_query )
114
+ with ect . Timer () as t_segment_trips :
115
+ segmentation_points = filter_methods [filters_in_df [0 ]].segment_into_trips (ts , time_query )
116
+ esds . store_pipeline_time ( user_id , ecwp . PipelineStages . TRIP_SEGMENTATION . name + "/segment_into_trips" , time . time (), t_segment_trips . elapsed )
94
117
else :
95
- segmentation_points = get_combined_segmentation_points (ts , loc_df , time_query ,
96
- filters_in_df ,
97
- filter_methods )
118
+ with ect .Timer () as t_get_combined_segmentation :
119
+ segmentation_points = get_combined_segmentation_points (ts , loc_df , time_query ,
120
+ filters_in_df ,
121
+ filter_methods )
122
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/get_combined_segmentation_points" , time .time (), t_get_combined_segmentation .elapsed )
123
+
98
124
# Create and store trips and places based on the segmentation points
99
125
if segmentation_points is None :
100
126
epq .mark_segmentation_failed (user_id )
@@ -103,13 +129,15 @@ def segment_current_trips(user_id):
103
129
logging .debug ("len(segmentation_points) == 0, early return" )
104
130
epq .mark_segmentation_done (user_id , None )
105
131
else :
106
- try :
107
- create_places_and_trips (user_id , segmentation_points , filter_method_names [filters_in_df [0 ]])
108
- epq .mark_segmentation_done (user_id , get_last_ts_processed (filter_methods ))
109
- except :
110
- logging .exception ("Trip generation failed for user %s" % user_id )
111
- epq .mark_segmentation_failed (user_id )
112
-
132
+ with ect .Timer () as t_create_places_trips :
133
+ try :
134
+ create_places_and_trips (user_id , segmentation_points , filter_method_names [filters_in_df [0 ]])
135
+ epq .mark_segmentation_done (user_id , get_last_ts_processed (filter_methods ))
136
+ except :
137
+ logging .exception ("Trip generation failed for user %s" % user_id )
138
+ epq .mark_segmentation_failed (user_id )
139
+ esds .store_pipeline_time (user_id , ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/create_places_and_trips" , time .time (), t_create_places_trips .elapsed )
140
+
113
141
def get_combined_segmentation_points (ts , loc_df , time_query , filters_in_df , filter_methods ):
114
142
"""
115
143
We can have mixed filters in a particular time range for multiple reasons.
0 commit comments