12
12
import attrdict as ad
13
13
import numpy as np
14
14
import datetime as pydt
15
+ import time
15
16
16
17
# Our imports
17
18
import emission .analysis .point_features as pf
20
21
21
22
import emission .analysis .intake .segmentation .restart_checking as eaisr
22
23
import emission .analysis .intake .segmentation .trip_segmentation_methods .trip_end_detection_corner_cases as eaistc
24
+ import emission .storage .decorations .stats_queries as esds
25
+ import emission .core .timer as ect
26
+ import emission .core .wrapper .pipelinestate as ecwp
23
27
24
28
class DwellSegmentationDistFilter (eaist .TripSegmentationMethod ):
25
29
def __init__ (self , time_threshold , point_threshold , distance_threshold ):
@@ -46,9 +50,34 @@ def segment_into_trips(self, timeseries, time_query):
46
50
data that they want from the sensor streams in order to determine the
47
51
segmentation points.
48
52
"""
49
- self .filtered_points_df = timeseries .get_data_df ("background/filtered_location" , time_query )
50
- self .filtered_points_df .loc [:,"valid" ] = True
51
- self .transition_df = timeseries .get_data_df ("statemachine/transition" , time_query )
53
+ with ect .Timer () as t_get_filtered_points :
54
+ self .filtered_points_df = timeseries .get_data_df ("background/filtered_location" , time_query )
55
+ user_id = self .filtered_points_df ["user_id" ].iloc [0 ]
56
+ esds .store_pipeline_time (
57
+ user_id ,
58
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/get_filtered_points_df" ,
59
+ time .time (),
60
+ t_get_filtered_points .elapsed
61
+ )
62
+
63
+ with ect .Timer () as t_mark_valid :
64
+ self .filtered_points_df .loc [:, "valid" ] = True
65
+ esds .store_pipeline_time (
66
+ user_id ,
67
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/mark_valid" ,
68
+ time .time (),
69
+ t_mark_valid .elapsed
70
+ )
71
+
72
+ with ect .Timer () as t_get_transition_df :
73
+ self .transition_df = timeseries .get_data_df ("statemachine/transition" , time_query )
74
+ esds .store_pipeline_time (
75
+ user_id ,
76
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/get_transition_df" ,
77
+ time .time (),
78
+ t_get_transition_df .elapsed
79
+ )
80
+
52
81
if len (self .transition_df ) > 0 :
53
82
logging .debug ("self.transition_df = %s" % self .transition_df [["fmt_time" , "transition" ]])
54
83
else :
@@ -62,86 +91,153 @@ def segment_into_trips(self, timeseries, time_query):
62
91
last_trip_end_point = None
63
92
curr_trip_start_point = None
64
93
just_ended = True
65
- for idx , row in self .filtered_points_df .iterrows ():
66
- currPoint = ad .AttrDict (row )
67
- currPoint .update ({"idx" : idx })
68
- logging .debug ("-" * 30 + str (currPoint .fmt_time ) + "-" * 30 )
69
- if curr_trip_start_point is None :
70
- logging .debug ("Appending currPoint because the current start point is None" )
71
- # segmentation_points.append(currPoint)
72
-
73
- if just_ended :
74
- if self .continue_just_ended (idx , currPoint , self .filtered_points_df ):
75
- # We have "processed" the currPoint by deciding to glom it
76
- self .last_ts_processed = currPoint .metadata_write_ts
77
- continue
78
- # else:
79
- # Here's where we deal with the start trip. At this point, the
80
- # distance is greater than the filter.
81
- sel_point = currPoint
82
- logging .debug ("Setting new trip start point %s with idx %s" % (sel_point , sel_point .idx ))
83
- curr_trip_start_point = sel_point
84
- just_ended = False
85
- else :
86
- # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
87
- # Using .iloc just ends up including points after this one.
88
- # So we reset_index upstream and use it here.
89
- last10Points_df = self .filtered_points_df .iloc [max (idx - self .point_threshold , curr_trip_start_point .idx ):idx + 1 ]
90
- lastPoint = self .find_last_valid_point (idx )
91
- if self .has_trip_ended (lastPoint , currPoint , timeseries ):
92
- last_trip_end_point = lastPoint
93
- logging .debug ("Appending last_trip_end_point %s with index %s " %
94
- (last_trip_end_point , idx - 1 ))
95
- segmentation_points .append ((curr_trip_start_point , last_trip_end_point ))
96
- logging .info ("Found trip end at %s" % last_trip_end_point .fmt_time )
97
- # We have processed everything up to the trip end by marking it as a completed trip
98
- self .last_ts_processed = currPoint .metadata_write_ts
99
- just_ended = True
100
- # Now, we have finished processing the previous point as a trip
101
- # end or not. But we still need to process this point by seeing
102
- # whether it should represent a new trip start, or a glom to the
103
- # previous trip
104
- if not self .continue_just_ended (idx , currPoint , self .filtered_points_df ):
105
- sel_point = currPoint
106
- logging .debug ("Setting new trip start point %s with idx %s" % (sel_point , sel_point .idx ))
94
+
95
+ with ect .Timer () as t_loop :
96
+ for idx , row in self .filtered_points_df .iterrows ():
97
+ currPoint = ad .AttrDict (row )
98
+ currPoint .update ({"idx" : idx })
99
+ logging .debug ("-" * 30 + str (currPoint .fmt_time ) + "-" * 30 )
100
+
101
+ if curr_trip_start_point is None :
102
+ logging .debug ("Appending currPoint because the current start point is None" )
103
+ # segmentation_points.append(currPoint)
104
+
105
+ if just_ended :
106
+ with ect .Timer () as t_continue_just_ended :
107
+ continue_flag = self .continue_just_ended (idx , currPoint , self .filtered_points_df )
108
+ esds .store_pipeline_time (
109
+ user_id ,
110
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/continue_just_ended" ,
111
+ time .time (),
112
+ t_continue_just_ended .elapsed
113
+ )
114
+
115
+ if continue_flag :
116
+ # We have "processed" the currPoint by deciding to glom it
117
+ self .last_ts_processed = currPoint .metadata_write_ts
118
+ continue
119
+ # else:
120
+ sel_point = currPoint
121
+ logging .debug ("Setting new trip start point %s with idx %s" % (sel_point , sel_point .idx ))
122
+ with ect .Timer () as t_set_start_point :
107
123
curr_trip_start_point = sel_point
108
- just_ended = False
109
-
110
- # Since we only end a trip when we start a new trip, this means that
111
- # the last trip that was pushed is ignored. Consider the example of
112
- # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
113
- # so during that remote push, a trip end was not detected. And we got
114
- # back home shortly after 5pm, so the trip end was only detected on the
115
- # phone at 6pm. At that time, the following points were pushed:
116
- # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
117
- # ..., 2016-02-22T16:57:04
118
- # Then, on the server, while iterating through the points, we detected
119
- # a trip end at 16:04, and a new trip start at 16:49. But we did not
120
- # detect the trip end at 16:57, because we didn't start a new point.
121
- # This has two issues:
122
- # - we won't see this trip until the next trip start, which may be on the next day
123
- # - we won't see this trip at all, because when we run the pipeline the
124
- # next time, we will only look at points from that time onwards. These
125
- # points have been marked as "processed", so they won't even be considered.
126
-
127
- # There are multiple potential fixes:
128
- # - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
129
- # - we can mark a trip end based on the fact that we only push data
130
- # when a trip ends, so if we have data, it means that the trip has been
131
- # detected as ended on the phone.
132
- # This seems a bit fragile - what if we start pushing incomplete trip
133
- # data for efficiency reasons? Therefore, we also check to see if there
134
- # is a trip_end_detected in this timeframe after the last point. If so,
135
- # then we end the trip at the last point that we have.
136
- if not just_ended and len (self .transition_df ) > 0 :
137
- stopped_moving_after_last = self .transition_df [(self .transition_df .ts > currPoint .ts ) & (self .transition_df .transition == 2 )]
138
- logging .debug ("stopped_moving_after_last = %s" % stopped_moving_after_last [["fmt_time" , "transition" ]])
139
- if len (stopped_moving_after_last ) > 0 :
140
- logging .debug ("Found %d transitions after last point, ending trip..." % len (stopped_moving_after_last ))
141
- segmentation_points .append ((curr_trip_start_point , currPoint ))
142
- self .last_ts_processed = currPoint .metadata_write_ts
143
- else :
144
- logging .debug ("Found %d transitions after last point, not ending trip..." % len (stopped_moving_after_last ))
124
+ esds .store_pipeline_time (
125
+ user_id ,
126
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/set_new_trip_start_point" ,
127
+ time .time (),
128
+ t_set_start_point .elapsed
129
+ )
130
+ just_ended = False
131
+ else :
132
+ with ect .Timer () as t_process_trip :
133
+ # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
134
+ # Using .iloc just ends up including points after this one.
135
+ # So we reset_index upstream and use it here.
136
+ last10Points_df = self .filtered_points_df .iloc [
137
+ max (idx - self .point_threshold , curr_trip_start_point .idx ):idx + 1
138
+ ]
139
+ lastPoint = self .find_last_valid_point (idx )
140
+ with ect .Timer () as t_has_trip_ended :
141
+ trip_ended = self .has_trip_ended (lastPoint , currPoint , timeseries )
142
+ esds .store_pipeline_time (
143
+ user_id ,
144
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/has_trip_ended" ,
145
+ time .time (),
146
+ t_has_trip_ended .elapsed
147
+ )
148
+
149
+ if trip_ended :
150
+ with ect .Timer () as t_get_last_trip_end_point :
151
+ last_trip_end_point = lastPoint
152
+ logging .debug ("Appending last_trip_end_point %s with index %s " %
153
+ (last_trip_end_point , idx - 1 ))
154
+ segmentation_points .append ((curr_trip_start_point , last_trip_end_point ))
155
+ logging .info ("Found trip end at %s" % last_trip_end_point .fmt_time )
156
+ # We have processed everything up to the trip end by marking it as a completed trip
157
+ self .last_ts_processed = currPoint .metadata_write_ts
158
+ esds .store_pipeline_time (
159
+ user_id ,
160
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/get_last_trip_end_point" ,
161
+ time .time (),
162
+ t_get_last_trip_end_point .elapsed
163
+ )
164
+
165
+ with ect .Timer () as t_handle_trip_end :
166
+ just_ended = True
167
+ # Now, we have finished processing the previous point as a trip
168
+ # end or not. But we still need to process this point by seeing
169
+ # whether it should represent a new trip start, or a glom to the
170
+ # previous trip
171
+ if not self .continue_just_ended (idx , currPoint , self .filtered_points_df ):
172
+ sel_point = currPoint
173
+ logging .debug ("Setting new trip start point %s with idx %s" % (sel_point , sel_point .idx ))
174
+ curr_trip_start_point = sel_point
175
+ just_ended = False
176
+ esds .store_pipeline_time (
177
+ user_id ,
178
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/handle_trip_end" ,
179
+ time .time (),
180
+ t_handle_trip_end .elapsed
181
+ )
182
+ esds .store_pipeline_time (
183
+ user_id ,
184
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/loop" ,
185
+ time .time (),
186
+ t_loop .elapsed
187
+ )
188
+
189
+ with ect .Timer () as t_post_loop :
190
+ # Since we only end a trip when we start a new trip, this means that
191
+ # the last trip that was pushed is ignored. Consider the example of
192
+ # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
193
+ # so during that remote push, a trip end was not detected. And we got
194
+ # back home shortly after 5pm, so the trip end was only detected on the
195
+ # phone at 6pm. At that time, the following points were pushed:
196
+ # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
197
+ # ..., 2016-02-22T16:57:04
198
+ # Then, on the server, while iterating through the points, we detected
199
+ # a trip end at 16:04, and a new trip start at 16:49. But we did not
200
+ # detect the trip end at 16:57, because we didn't start a new point.
201
+ # This has two issues:
202
+ # - we won't see this trip until the next trip start, which may be on the next day
203
+ # - we won't see this trip at all, because when we run the pipeline the
204
+ # next time, we will only look at points from that time onwards. These
205
+ # points have been marked as "processed", so they won't even be considered.
206
+
207
+ # There are multiple potential fixes:
208
+ # - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
209
+ # - we can mark a trip end based on the fact that we only push data
210
+ # when a trip ends, so if we have data, it means that the trip has been
211
+ # detected as ended on the phone.
212
+ # This seems a bit fragile - what if we start pushing incomplete trip
213
+ # data for efficiency reasons? Therefore, we also check to see if there
214
+ # is a trip_end_detected in this timeframe after the last point. If so,
215
+ # then we end the trip at the last point that we have.
216
+ if not just_ended and len (self .transition_df ) > 0 :
217
+ with ect .Timer () as t_check_transitions :
218
+ stopped_moving_after_last = self .transition_df [
219
+ (self .transition_df .ts > currPoint .ts ) & (self .transition_df .transition == 2 )
220
+ ]
221
+ logging .debug ("stopped_moving_after_last = %s" % stopped_moving_after_last [["fmt_time" , "transition" ]])
222
+ if len (stopped_moving_after_last ) > 0 :
223
+ logging .debug ("Found %d transitions after last point, ending trip..." % len (stopped_moving_after_last ))
224
+ segmentation_points .append ((curr_trip_start_point , currPoint ))
225
+ self .last_ts_processed = currPoint .metadata_write_ts
226
+ else :
227
+ logging .debug ("Found %d transitions after last point, not ending trip..." % len (stopped_moving_after_last ))
228
+ esds .store_pipeline_time (
229
+ user_id ,
230
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/check_transitions_post_loop" ,
231
+ time .time (),
232
+ t_check_transitions .elapsed
233
+ )
234
+ esds .store_pipeline_time (
235
+ user_id ,
236
+ ecwp .PipelineStages .TRIP_SEGMENTATION .name + "/segment_into_trips_dist/post_loop" ,
237
+ time .time (),
238
+ t_post_loop .elapsed
239
+ )
240
+
145
241
return segmentation_points
146
242
147
243
def has_trip_ended (self , lastPoint , currPoint , timeseries ):
0 commit comments