Skip to content

Commit 0441401

Browse files
authored
Merge pull request #712 from shankari/gis-based-mode-detection
Create a new branch for the GIS based mode detection
2 parents d81e0f2 + 7afe46c commit 0441401

28 files changed

+1812
-232
lines changed

bin/analysis/remove_inferred_modes.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,39 @@
1717
import uuid
1818
import arrow
1919

20-
import emission.analysis.classification.inference.mode.pipeline as eacimp
20+
import emission.analysis.classification.inference.mode.reset as eacimr
2121
import emission.core.get_database as edb
2222
import emission.storage.decorations.user_queries as esdu
23+
import emission.core.wrapper.user as ecwu
24+
25+
def _get_user_list(args):
26+
if args.all:
27+
return _find_all_users()
28+
elif args.platform:
29+
return _find_platform_users(args.platform)
30+
elif args.email_list:
31+
return _email_2_user_list(args.email_list)
32+
else:
33+
assert args.user_list is not None
34+
return [uuid.UUID(u) for u in args.user_list]
35+
36+
def _find_platform_users(platform):
37+
# Since all new clients register a profile with the server, we don't have
38+
# to run a 'distinct' query over the entire contents of the timeseries.
39+
# Instead, we can simply query from the profile users, which is
40+
# significantly faster
41+
# Use the commented out line instead for better performance.
42+
# Soon, we can move to the more performant option, because there will be
43+
# no users that don't have a profile
44+
# return edb.get_timeseries_db().find({'metadata.platform': platform}).distinct(
45+
# 'user_id')
46+
return edb.get_profile_db().find({"curr_platform": platform}).distinct("user_id")
47+
48+
def _find_all_users():
49+
return esdu.get_all_uuids()
50+
51+
def _email_2_user_list(email_list):
52+
return [ecwu.User.fromEmail(e).uuid for e in email_list]
2353

2454
if __name__ == '__main__':
2555
logging.basicConfig(level=logging.DEBUG)
@@ -47,14 +77,14 @@
4777
# Handle the first row in the table
4878
if args.date is None:
4979
if args.all:
50-
eacimp.del_all_objects(args.dry_run)
80+
eacimr.del_all_objects(args.dry_run)
5181
else:
5282
user_list = _get_user_list(args)
5383
logging.info("received list with %s users" % user_list)
5484
logging.info("first few entries are %s" % user_list[0:5])
5585
for user_id in user_list:
5686
logging.info("resetting user %s to start" % user_id)
57-
eacimp.del_objects_after(user_id, 0, args.dry_run)
87+
eacimr.del_objects_after(user_id, 0, args.dry_run)
5888
else:
5989
# Handle the second row in the table
6090
day_dt = arrow.get(args.date, "YYYY-MM-DD")
@@ -66,5 +96,5 @@
6696
logging.info("first few entries are %s" % user_list[0:5])
6797
for user_id in user_list:
6898
logging.info("resetting user %s to ts %s" % (user_id, day_ts))
69-
eacimp.del_objects_after(user_id, day_ts, args.dry_run)
99+
eacimr.del_objects_after(user_id, day_ts, args.dry_run)
70100

bin/reset_pipeline.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import emission.pipeline.reset as epr
2222
import emission.core.get_database as edb
2323
import emission.storage.decorations.user_queries as esdu
24+
import emission.core.wrapper.user as ecwu
2425

2526
def _get_user_list(args):
2627
if args.all:
@@ -49,7 +50,7 @@ def _find_all_users():
4950
return esdu.get_all_uuids()
5051

5152
def _email_2_user_list(email_list):
52-
return [ecwu.User.fromEmail(e) for e in email_list]
53+
return [ecwu.User.fromEmail(e).uuid for e in email_list]
5354

5455
if __name__ == '__main__':
5556
logging.basicConfig(level=logging.DEBUG)

conf/analysis/debug.conf.json.sample

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@
33
"intake.cleaning.filter_accuracy.enable": false,
44
"classification.inference.mode.useAdvancedFeatureIndices": true,
55
"classification.inference.mode.useBusTrainFeatureIndices": true,
6+
"section.startStopRadius": 150,
7+
"section.endStopRadius": 150,
68
"analysis.result.section.key": "analysis/inferred_section"
79
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"url": "server running the overpass API to query OSM (e.g. https://wiki.openstreetmap.org/wiki/Overpass_API)"
3+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[out:json][timeout:25];
2+
(
3+
node["highway"="bus_stop"]({bbox});
4+
node["railway"="station"]({bbox});
5+
node["public_transport"]({bbox});
6+
way["railway"="station"]({bbox});
7+
relation["route"]({bbox});
8+
);
9+
out body;
10+
>;

e-mission-py.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55

66
# Make sure that the python here is the anaconda python if that is not the one in the path
77

8-
PYTHONPATH=. python $*
8+
PYTHONPATH=. python "$@"

emission/analysis/classification/inference/mode/pipeline.py

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -49,58 +49,6 @@ def predict_mode(user_id):
4949
logging.exception("Error while inferring modes, timestamp is unchanged")
5050
epq.mark_mode_inference_failed(user_id)
5151

52-
# Delete the objects created by this pipeline step (across users)
53-
def del_all_objects(is_dry_run):
54-
del_query = {}
55-
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
56-
logging.info("About to delete %d entries"
57-
% edb.get_analysis_timeseries_db().find(del_query).count())
58-
logging.info("About to delete entries with keys %s"
59-
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))
60-
61-
del_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
62-
logging.info("About to delete pipeline entries for stage %s" %
63-
ecwp.PipelineStages.MODE_INFERENCE)
64-
65-
if is_dry_run:
66-
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
67-
else:
68-
result = edb.get_analysis_timeseries_db().delete_many(del_query)
69-
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result.raw_result)
70-
result = edb.get_pipeline_state_db().delete_many(del_pipeline_query)
71-
logging.info("this is not a dry-run, result of deleting pipeline state is %s" % result.raw_result)
72-
73-
# Delete the objects created by this pipeline step (for a particular user)
74-
def del_objects_after(user_id, reset_ts, is_dry_run):
75-
del_query = {}
76-
# handle the user
77-
del_query.update({"user_id": user_id})
78-
79-
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
80-
# all objects inserted here have start_ts and end_ts and are trip-like
81-
del_query.update({"data.start_ts": {"$gt": reset_ts}})
82-
logging.debug("After all updates, del_query = %s" % del_query)
83-
84-
reset_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
85-
# Fuzz the TRIP_SEGMENTATION stage 5 mins because of
86-
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312730217
87-
FUZZ_FACTOR = 5 * 60
88-
reset_pipeline_update = {'$set': {'last_processed_ts': reset_ts + FUZZ_FACTOR}}
89-
logging.info("About to reset stage %s to %s"
90-
% (ecwp.PipelineStages.MODE_INFERENCE, reset_ts))
91-
92-
93-
logging.info("About to delete %d entries"
94-
% edb.get_analysis_timeseries_db().find(del_query).count())
95-
logging.info("About to delete entries with keys %s"
96-
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))
97-
98-
if is_dry_run:
99-
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
100-
else:
101-
result = edb.get_analysis_timeseries_db().remove(del_query)
102-
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result)
103-
10452
class ModeInferencePipeline:
10553
def __init__(self):
10654
self.featureLabels = ["distance", "duration", "first filter mode", "sectionId", "avg speed",
@@ -226,9 +174,6 @@ def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry):
226174
if (hasattr(self, "air_cluster")):
227175
featureMatrix[i, 21] = easf.mode_start_end_coverage(section, self.air_cluster,600)
228176

229-
if self.last_section_done is None or self.last_section_done.data.end_ts < section_entry.data.end_ts:
230-
self.last_section_done = section_entry
231-
232177
# Replace NaN and inf by zeros so that it doesn't crash later
233178
featureMatrix[i] = np.nan_to_num(featureMatrix[i])
234179

@@ -344,6 +289,9 @@ def savePredictionsStep(self):
344289
logging.debug("Updating sensed mode for section = %s to %s" %
345290
(currSectionEntry.get_id(), ise.data.sensed_mode))
346291
self.ts.insert(ise)
292+
# Set last_section_done after saving because otherwise if there is an error
293+
# during inference, we will not save results and never re-run
294+
self.last_section_done = self.toPredictSections[-1]
347295

348296
if __name__ == "__main__":
349297
import json
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
import emission.core.get_database as edb
3+
import emission.core.wrapper.pipelinestate as ecwp
4+
5+
import emission.core.get_database as edb
6+
import emission.core.wrapper.pipelinestate as ecwp
7+
8+
# Delete the objects created by this pipeline step (across users)
9+
def del_all_objects(is_dry_run):
10+
del_query = {}
11+
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
12+
logging.info("About to delete %d entries"
13+
% edb.get_analysis_timeseries_db().find(del_query).count())
14+
logging.info("About to delete entries with keys %s"
15+
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))
16+
17+
del_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
18+
logging.info("About to delete pipeline entries for stage %s" %
19+
ecwp.PipelineStages.MODE_INFERENCE)
20+
21+
if is_dry_run:
22+
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
23+
else:
24+
result = edb.get_analysis_timeseries_db().delete_many(del_query)
25+
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result.raw_result)
26+
result = edb.get_pipeline_state_db().delete_many(del_pipeline_query)
27+
logging.info("this is not a dry-run, result of deleting pipeline state is %s" % result.raw_result)
28+
29+
# Delete the objects created by this pipeline step (for a particular user)
30+
def del_objects_after(user_id, reset_ts, is_dry_run):
31+
del_query = {}
32+
# handle the user
33+
del_query.update({"user_id": user_id})
34+
35+
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
36+
# all objects inserted here have start_ts and end_ts and are trip-like
37+
del_query.update({"data.start_ts": {"$gt": reset_ts}})
38+
logging.debug("After all updates, del_query = %s" % del_query)
39+
40+
reset_pipeline_query = {"user_id": user_id, "pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
41+
# Fuzz the TRIP_SEGMENTATION stage 5 mins because of
42+
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312730217
43+
FUZZ_FACTOR = 5 * 60
44+
reset_pipeline_update = {'$set': {'last_processed_ts': reset_ts + FUZZ_FACTOR}}
45+
logging.info("About to reset stage %s to %s"
46+
% (ecwp.PipelineStages.MODE_INFERENCE, reset_ts))
47+
48+
49+
logging.info("About to delete %d entries"
50+
% edb.get_analysis_timeseries_db().find(del_query).count())
51+
logging.info("About to delete entries with keys %s"
52+
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))
53+
54+
if is_dry_run:
55+
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
56+
else:
57+
result = edb.get_analysis_timeseries_db().remove(del_query)
58+
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result)
59+
result = edb.get_pipeline_state_db().update_one(reset_pipeline_query, reset_pipeline_update)
60+
logging.info("this is not a dry-run, result of updating pipeline state is %s" % result.raw_result)
61+

0 commit comments

Comments
 (0)