Skip to content

Commit 4034533

Browse files
authored
Merge pull request #998 from TeachMeTW/Remove-Bottom-80
Removed Bottom 80% functions from latest snapshot.
2 parents 398960e + f497ddf commit 4034533

File tree

3 files changed

+146
-257
lines changed

3 files changed

+146
-257
lines changed

emission/analysis/intake/segmentation/trip_segmentation.py

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,20 @@ def segment_into_trips(self, timeseries, time_query):
5151
pass
5252

5353
def segment_current_trips(user_id):
54-
with ect.Timer() as t_get_time_series:
55-
ts = esta.TimeSeries.get_time_series(user_id)
56-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed)
57-
58-
with ect.Timer() as t_get_time_range:
59-
time_query = epq.get_time_range_for_segmentation(user_id)
60-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed)
54+
ts = esta.TimeSeries.get_time_series(user_id)
55+
time_query = epq.get_time_range_for_segmentation(user_id)
6156

6257
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf
6358
import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf
6459

65-
with ect.Timer() as t_create_time_filter:
66-
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
67-
point_threshold=9,
68-
distance_threshold=100) # 100 m
69-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed)
7060

71-
with ect.Timer() as t_create_dist_filter:
72-
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
73-
point_threshold=9,
74-
distance_threshold=50) # 50 m
75-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed)
61+
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins
62+
point_threshold=9,
63+
distance_threshold=100) # 100 m
64+
65+
dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins
66+
point_threshold=9,
67+
distance_threshold=50) # 50 m
7668

7769
filter_methods = {"time": dstfsm, "distance": dsdfsm}
7870
filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"}
@@ -89,24 +81,20 @@ def segment_current_trips(user_id):
8981
epq.mark_segmentation_done(user_id, None)
9082
return
9183

92-
with ect.Timer() as t_handle_out_of_order:
93-
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
94-
if len(out_of_order_points) > 0:
95-
logging.info("Found out of order points!")
96-
logging.info("%s" % out_of_order_points)
97-
# drop from the table
98-
loc_df = loc_df.drop(out_of_order_points.index.tolist())
99-
loc_df.reset_index(inplace=True)
100-
# invalidate in the database.
101-
out_of_order_id_list = out_of_order_points["_id"].tolist()
102-
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
103-
for ooid in out_of_order_id_list:
104-
ts.invalidate_raw_entry(ooid)
105-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed)
106-
107-
with ect.Timer() as t_get_filters:
108-
filters_in_df = loc_df["filter"].dropna().unique()
109-
esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed)
84+
out_of_order_points = loc_df[loc_df.ts.diff() < 0]
85+
if len(out_of_order_points) > 0:
86+
logging.info("Found out of order points!")
87+
logging.info("%s" % out_of_order_points)
88+
# drop from the table
89+
loc_df = loc_df.drop(out_of_order_points.index.tolist())
90+
loc_df.reset_index(inplace=True)
91+
# invalidate in the database.
92+
out_of_order_id_list = out_of_order_points["_id"].tolist()
93+
logging.debug("out_of_order_id_list = %s" % out_of_order_id_list)
94+
for ooid in out_of_order_id_list:
95+
ts.invalidate_raw_entry(ooid)
96+
97+
filters_in_df = loc_df["filter"].dropna().unique()
11098

11199
logging.debug("Filters in the dataframe = %s" % filters_in_df)
112100
if len(filters_in_df) == 1:

emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py

Lines changed: 61 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,9 @@ def segment_into_trips(self, timeseries, time_query):
6060
t_get_filtered_points.elapsed
6161
)
6262

63-
with ect.Timer() as t_mark_valid:
64-
self.filtered_points_df.loc[:, "valid"] = True
65-
esds.store_pipeline_time(
66-
user_id,
67-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid",
68-
time.time(),
69-
t_mark_valid.elapsed
70-
)
63+
self.filtered_points_df.loc[:, "valid"] = True
7164

72-
with ect.Timer() as t_get_transition_df:
73-
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
74-
esds.store_pipeline_time(
75-
user_id,
76-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df",
77-
time.time(),
78-
t_get_transition_df.elapsed
79-
)
65+
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
8066

8167
if len(self.transition_df) > 0:
8268
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
@@ -103,14 +89,7 @@ def segment_into_trips(self, timeseries, time_query):
10389
# segmentation_points.append(currPoint)
10490

10591
if just_ended:
106-
with ect.Timer() as t_continue_just_ended:
107-
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)
108-
esds.store_pipeline_time(
109-
user_id,
110-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended",
111-
time.time(),
112-
t_continue_just_ended.elapsed
113-
)
92+
continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df)
11493

11594
if continue_flag:
11695
# We have "processed" the currPoint by deciding to glom it
@@ -119,14 +98,7 @@ def segment_into_trips(self, timeseries, time_query):
11998
# else:
12099
sel_point = currPoint
121100
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
122-
with ect.Timer() as t_set_start_point:
123-
curr_trip_start_point = sel_point
124-
esds.store_pipeline_time(
125-
user_id,
126-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point",
127-
time.time(),
128-
t_set_start_point.elapsed
129-
)
101+
curr_trip_start_point = sel_point
130102
just_ended = False
131103
else:
132104
with ect.Timer() as t_process_trip:
@@ -137,106 +109,72 @@ def segment_into_trips(self, timeseries, time_query):
137109
max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1
138110
]
139111
lastPoint = self.find_last_valid_point(idx)
140-
with ect.Timer() as t_has_trip_ended:
141-
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)
142-
esds.store_pipeline_time(
143-
user_id,
144-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended",
145-
time.time(),
146-
t_has_trip_ended.elapsed
147-
)
112+
trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries)
148113

149114
if trip_ended:
150-
with ect.Timer() as t_get_last_trip_end_point:
151-
last_trip_end_point = lastPoint
152-
logging.debug("Appending last_trip_end_point %s with index %s " %
153-
(last_trip_end_point, idx - 1))
154-
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
155-
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
156-
# We have processed everything up to the trip end by marking it as a completed trip
157-
self.last_ts_processed = currPoint.metadata_write_ts
158-
esds.store_pipeline_time(
159-
user_id,
160-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point",
161-
time.time(),
162-
t_get_last_trip_end_point.elapsed
163-
)
164-
165-
with ect.Timer() as t_handle_trip_end:
166-
just_ended = True
167-
# Now, we have finished processing the previous point as a trip
168-
# end or not. But we still need to process this point by seeing
169-
# whether it should represent a new trip start, or a glom to the
170-
# previous trip
171-
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
172-
sel_point = currPoint
173-
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
174-
curr_trip_start_point = sel_point
175-
just_ended = False
176-
esds.store_pipeline_time(
177-
user_id,
178-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end",
179-
time.time(),
180-
t_handle_trip_end.elapsed
181-
)
115+
last_trip_end_point = lastPoint
116+
logging.debug("Appending last_trip_end_point %s with index %s " %
117+
(last_trip_end_point, idx - 1))
118+
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
119+
logging.info("Found trip end at %s" % last_trip_end_point.fmt_time)
120+
# We have processed everything up to the trip end by marking it as a completed trip
121+
self.last_ts_processed = currPoint.metadata_write_ts
122+
just_ended = True
123+
# Now, we have finished processing the previous point as a trip
124+
# end or not. But we still need to process this point by seeing
125+
# whether it should represent a new trip start, or a glom to the
126+
# previous trip
127+
if not self.continue_just_ended(idx, currPoint, self.filtered_points_df):
128+
sel_point = currPoint
129+
logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx))
130+
curr_trip_start_point = sel_point
131+
just_ended = False
132+
182133
esds.store_pipeline_time(
183134
user_id,
184135
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop",
185136
time.time(),
186137
t_loop.elapsed
187138
)
188139

189-
with ect.Timer() as t_post_loop:
190-
# Since we only end a trip when we start a new trip, this means that
191-
# the last trip that was pushed is ignored. Consider the example of
192-
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
193-
# so during that remote push, a trip end was not detected. And we got
194-
# back home shortly after 5pm, so the trip end was only detected on the
195-
# phone at 6pm. At that time, the following points were pushed:
196-
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
197-
# ..., 2016-02-22T16:57:04
198-
# Then, on the server, while iterating through the points, we detected
199-
# a trip end at 16:04, and a new trip start at 16:49. But we did not
200-
# detect the trip end at 16:57, because we didn't start a new point.
201-
# This has two issues:
202-
# - we won't see this trip until the next trip start, which may be on the next day
203-
# - we won't see this trip at all, because when we run the pipeline the
204-
# next time, we will only look at points from that time onwards. These
205-
# points have been marked as "processed", so they won't even be considered.
206-
207-
# There are multiple potential fixes:
208-
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
209-
# - we can mark a trip end based on the fact that we only push data
210-
# when a trip ends, so if we have data, it means that the trip has been
211-
# detected as ended on the phone.
212-
# This seems a bit fragile - what if we start pushing incomplete trip
213-
# data for efficiency reasons? Therefore, we also check to see if there
214-
# is a trip_end_detected in this timeframe after the last point. If so,
215-
# then we end the trip at the last point that we have.
216-
if not just_ended and len(self.transition_df) > 0:
217-
with ect.Timer() as t_check_transitions:
218-
stopped_moving_after_last = self.transition_df[
219-
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
220-
]
221-
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
222-
if len(stopped_moving_after_last) > 0:
223-
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
224-
segmentation_points.append((curr_trip_start_point, currPoint))
225-
self.last_ts_processed = currPoint.metadata_write_ts
226-
else:
227-
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
228-
esds.store_pipeline_time(
229-
user_id,
230-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop",
231-
time.time(),
232-
t_check_transitions.elapsed
233-
)
234-
esds.store_pipeline_time(
235-
user_id,
236-
ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop",
237-
time.time(),
238-
t_post_loop.elapsed
239-
)
140+
141+
# Since we only end a trip when we start a new trip, this means that
142+
# the last trip that was pushed is ignored. Consider the example of
143+
# 2016-02-22 when I took kids to karate. We arrived shortly after 4pm,
144+
# so during that remote push, a trip end was not detected. And we got
145+
# back home shortly after 5pm, so the trip end was only detected on the
146+
# phone at 6pm. At that time, the following points were pushed:
147+
# ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50,
148+
# ..., 2016-02-22T16:57:04
149+
# Then, on the server, while iterating through the points, we detected
150+
# a trip end at 16:04, and a new trip start at 16:49. But we did not
151+
# detect the trip end at 16:57, because we didn't start a new point.
152+
# This has two issues:
153+
# - we won't see this trip until the next trip start, which may be on the next day
154+
# - we won't see this trip at all, because when we run the pipeline the
155+
# next time, we will only look at points from that time onwards. These
156+
# points have been marked as "processed", so they won't even be considered.
157+
158+
# There are multiple potential fixes:
159+
# - we can mark only the completed trips as processed. This will solve (2) above, but not (1)
160+
# - we can mark a trip end based on the fact that we only push data
161+
# when a trip ends, so if we have data, it means that the trip has been
162+
# detected as ended on the phone.
163+
# This seems a bit fragile - what if we start pushing incomplete trip
164+
# data for efficiency reasons? Therefore, we also check to see if there
165+
# is a trip_end_detected in this timeframe after the last point. If so,
166+
# then we end the trip at the last point that we have.
167+
if not just_ended and len(self.transition_df) > 0:
168+
stopped_moving_after_last = self.transition_df[
169+
(self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2)
170+
]
171+
logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]])
172+
if len(stopped_moving_after_last) > 0:
173+
logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last))
174+
segmentation_points.append((curr_trip_start_point, currPoint))
175+
self.last_ts_processed = currPoint.metadata_write_ts
176+
else:
177+
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
240178

241179
return segmentation_points
242180

0 commit comments

Comments
 (0)