19
19
import emission .core .wrapper .location as ecwl
20
20
21
21
import emission .analysis .intake .segmentation .restart_checking as eaisr
22
+ import emission .analysis .intake .segmentation .trip_segmentation_methods .trip_end_detection_corner_cases as eaistc
22
23
23
24
class DwellSegmentationDistFilter (eaist .TripSegmentationMethod ):
24
25
def __init__ (self , time_threshold , point_threshold , distance_threshold ):
@@ -45,10 +46,11 @@ def segment_into_trips(self, timeseries, time_query):
45
46
data that they want from the sensor streams in order to determine the
46
47
segmentation points.
47
48
"""
48
- filtered_points_df = timeseries .get_data_df ("background/filtered_location" , time_query )
49
- transition_df = timeseries .get_data_df ("statemachine/transition" , time_query )
50
- if len (transition_df ) > 0 :
51
- logging .debug ("transition_df = %s" % transition_df [["fmt_time" , "transition" ]])
49
+ self .filtered_points_df = timeseries .get_data_df ("background/filtered_location" , time_query )
50
+ self .filtered_points_df .loc [:,"valid" ] = True
51
+ self .transition_df = timeseries .get_data_df ("statemachine/transition" , time_query )
52
+ if len (self .transition_df ) > 0 :
53
+ logging .debug ("self.transition_df = %s" % self .transition_df [["fmt_time" , "transition" ]])
52
54
else :
53
55
logging .debug ("no transitions found. This can happen for continuous sensing" )
54
56
@@ -60,7 +62,7 @@ def segment_into_trips(self, timeseries, time_query):
60
62
last_trip_end_point = None
61
63
curr_trip_start_point = None
62
64
just_ended = True
63
- for idx , row in filtered_points_df .iterrows ():
65
+ for idx , row in self . filtered_points_df .iterrows ():
64
66
currPoint = ad .AttrDict (row )
65
67
currPoint .update ({"idx" : idx })
66
68
logging .debug ("-" * 30 + str (currPoint .fmt_time ) + "-" * 30 )
@@ -69,7 +71,7 @@ def segment_into_trips(self, timeseries, time_query):
69
71
# segmentation_points.append(currPoint)
70
72
71
73
if just_ended :
72
- if self .continue_just_ended (idx , currPoint , filtered_points_df ):
74
+ if self .continue_just_ended (idx , currPoint , self . filtered_points_df ):
73
75
# We have "processed" the currPoint by deciding to glom it
74
76
self .last_ts_processed = currPoint .metadata_write_ts
75
77
continue
@@ -84,8 +86,8 @@ def segment_into_trips(self, timeseries, time_query):
84
86
# Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive.
85
87
# Using .iloc just ends up including points after this one.
86
88
# So we reset_index upstream and use it here.
87
- last10Points_df = filtered_points_df .iloc [max (idx - self .point_threshold , curr_trip_start_point .idx ):idx + 1 ]
88
- lastPoint = ad . AttrDict ( filtered_points_df . iloc [ idx - 1 ] )
89
+ last10Points_df = self . filtered_points_df .iloc [max (idx - self .point_threshold , curr_trip_start_point .idx ):idx + 1 ]
90
+ lastPoint = self . find_last_valid_point ( idx )
89
91
if self .has_trip_ended (lastPoint , currPoint , timeseries ):
90
92
last_trip_end_point = lastPoint
91
93
logging .debug ("Appending last_trip_end_point %s with index %s " %
@@ -99,7 +101,7 @@ def segment_into_trips(self, timeseries, time_query):
99
101
# end or not. But we still need to process this point by seeing
100
102
# whether it should represent a new trip start, or a glom to the
101
103
# previous trip
102
- if not self .continue_just_ended (idx , currPoint , filtered_points_df ):
104
+ if not self .continue_just_ended (idx , currPoint , self . filtered_points_df ):
103
105
sel_point = currPoint
104
106
logging .debug ("Setting new trip start point %s with idx %s" % (sel_point , sel_point .idx ))
105
107
curr_trip_start_point = sel_point
@@ -131,8 +133,8 @@ def segment_into_trips(self, timeseries, time_query):
131
133
# data for efficiency reasons? Therefore, we also check to see if there
132
134
# is a trip_end_detected in this timeframe after the last point. If so,
133
135
# then we end the trip at the last point that we have.
134
- if not just_ended and len (transition_df ) > 0 :
135
- stopped_moving_after_last = transition_df [(transition_df .ts > currPoint .ts ) & (transition_df .transition == 2 )]
136
+ if not just_ended and len (self . transition_df ) > 0 :
137
+ stopped_moving_after_last = self . transition_df [(self . transition_df .ts > currPoint .ts ) & (self . transition_df .transition == 2 )]
136
138
logging .debug ("stopped_moving_after_last = %s" % stopped_moving_after_last [["fmt_time" , "transition" ]])
137
139
if len (stopped_moving_after_last ) > 0 :
138
140
logging .debug ("Found %d transitions after last point, ending trip..." % len (stopped_moving_after_last ))
@@ -178,7 +180,8 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
178
180
# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
179
181
# between two location points, and there is a large gap between the last location and the first
180
182
# motion activity as well, let us just assume that there was a restart
181
- ongoing_motion_check = len (eaisr .get_ongoing_motion_in_range (lastPoint .ts , currPoint .ts , timeseries )) > 0
183
+ ongoing_motion_in_range = eaisr .get_ongoing_motion_in_range (lastPoint .ts , currPoint .ts , timeseries )
184
+ ongoing_motion_check = len (ongoing_motion_in_range ) > 0
182
185
if timeDelta > self .time_threshold and not ongoing_motion_check :
183
186
logging .debug ("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
184
187
(lastPoint .ts , currPoint .ts ,self .time_threshold , currPoint .ts - lastPoint .ts , ongoing_motion_check ))
@@ -196,9 +199,30 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
196
199
197
200
if (timeDelta > self .time_threshold and # We have been here for a while
198
201
speedDelta < speedThreshold ): # we haven't moved very much
199
- logging .debug ("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ending trip" %
200
- (lastPoint .ts , currPoint .ts ,self .time_threshold , currPoint .ts - lastPoint .ts ))
201
- return True
202
+ # This can happen even during ongoing trips due to spurious points
203
+ # generated on some iOS phones
204
+ # https://github.com/e-mission/e-mission-server/issues/577#issuecomment-376379460
205
+ if eaistc .is_huge_invalid_ts_offset (self , lastPoint , currPoint ,
206
+ timeseries , ongoing_motion_in_range ):
207
+ # delete from memory and the database. Should be generally
208
+ # discouraged, so we are kindof putting it in here
209
+ # secretively
210
+ logging .debug ("About to set valid column for index = %s" %
211
+ (currPoint .idx ))
212
+ self .filtered_points_df .valid .iloc [currPoint .idx ] = False
213
+ logging .debug ("After dropping %d, filtered points = %s" %
214
+ (currPoint .idx , self .filtered_points_df .iloc [currPoint .idx - 5 :currPoint .idx + 5 ][["valid" , "fmt_time" ]]))
215
+ import emission .core .get_database as edb
216
+ logging .debug ("remove huge invalid ts offset point = %s" % currPoint )
217
+ edb .get_timeseries_db ().remove ({"_id" : currPoint ["_id" ]})
218
+ # We currently re-retrieve the last point every time, so
219
+ # the reindexing above is good enough but if we use
220
+ # lastPoint = currPoint, we should update currPoint here
221
+ return False
222
+ else :
223
+ logging .debug ("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ending trip" %
224
+ (lastPoint .ts , currPoint .ts ,self .time_threshold , currPoint .ts - lastPoint .ts ))
225
+ return True
202
226
else :
203
227
logging .debug ("lastPoint.ts = %s, currPoint.ts = %s, time gap = %s (vs %s), distance_gap = %s (vs %s), speed_gap = %s (vs %s) continuing trip" %
204
228
(lastPoint .ts , currPoint .ts ,
@@ -207,6 +231,18 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
207
231
speedDelta , speedThreshold ))
208
232
return False
209
233
234
+ def find_last_valid_point (self , idx ):
235
+ lastPoint = ad .AttrDict (self .filtered_points_df .iloc [idx - 1 ])
236
+ if lastPoint .valid :
237
+ # common case, fast
238
+ return lastPoint
239
+
240
+ # uncommon case, walk backwards until you find something valid.
241
+ i = 2
242
+ while not lastPoint .valid and (idx - i ) >= 0 :
243
+ lastPoint = ad .AttrDict (self .filtered_points_df .iloc [idx - i ])
244
+ i = i - 1
245
+ return lastPoint
210
246
211
247
def continue_just_ended (self , idx , currPoint , filtered_points_df ):
212
248
"""
0 commit comments