Skip to content

Create a new branch for the GIS based mode detection #712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
8d8a7fb
Change the expansion paramter to support arguments with spaces
shankari Mar 19, 2018
67c1464
Fix transition based trip end when switching platforms
shankari Mar 27, 2018
f622a7d
Real fix for transition based trip end when switching platforms
shankari Mar 28, 2018
ef8e025
Fix broken trips due to spurious points generated by iOS
shankari Mar 28, 2018
5690175
Move the resampling code out into its own module so that it can be re…
shankari Mar 29, 2018
4a0a415
Determine segmentation when large chunks of location points are missing
shankari Mar 29, 2018
c79722a
Mark location entries as invalid instead of deleting them
shankari Mar 29, 2018
d0293f4
Mark the newly inserted, inferred points as "inserted"
shankari Mar 29, 2018
a371e62
Fix slosh by starting the section at the beginning of the transition
shankari Mar 30, 2018
7678818
Handle most normal flip-flopping
shankari Mar 31, 2018
c557fd7
Remove the fake walk transition on the trip back
shankari Mar 31, 2018
abc21b0
Move the location reconstruction to a new file
shankari Mar 31, 2018
1aa77c4
Almost working
shankari Apr 1, 2018
ce4df07
Create the segments based on the transition
shankari Apr 1, 2018
defc6fd
Squish long "stops" by extending to the section that has good data
shankari Apr 2, 2018
c8edf28
Fix some more flip flopping by adding more sanity checks
shankari Apr 2, 2018
bd5e97d
Android fixes
shankari Apr 2, 2018
8466cf4
Add checks for fast biking and short drives
shankari Apr 3, 2018
337dfcb
Move out the reset code to a common module
shankari Apr 5, 2018
15feaa6
Change the assumptions on domain to take the mode type
shankari Apr 5, 2018
733fc9a
Integration point to read transit stop information from OSM
shankari Apr 5, 2018
424d862
Add a mode inference algorithm based on GIS
shankari Apr 5, 2018
2d52104
Switch the pipeline to use the new GIS based mode inference
shankari Apr 5, 2018
a4d5382
Revert change to reconstruct locations
shankari Apr 6, 2018
dd2fb11
Fix ZeroDivisionError found in automated testing
shankari Apr 6, 2018
e61ae24
Fix the merging for mixed walk/bike trips
shankari Apr 7, 2018
044eafb
Minor fix for zero division in one more place
shankari Apr 7, 2018
0da819a
Fix handling of small iOS walking trips
shankari Apr 7, 2018
f176109
Change the threshold from 80% to 90%
shankari Apr 7, 2018
114ea2b
Ignore RUNNING during segmentation to avoid short segments
shankari Apr 7, 2018
2471fec
Couple of minor fixes to the handling of small iOS walking trips
shankari Apr 8, 2018
f332e96
Support more types of rail routes for mapping into trains
shankari Apr 8, 2018
f6fb7df
Unify the invalid modes on android and iOS
shankari Apr 8, 2018
c227c24
Handle idx = 2 for bike trips
shankari Apr 8, 2018
0942774
Use sensed modes instead of speeds to decide which way to squish
shankari Apr 8, 2018
4c3604e
Relax other constraints to all support idx_diff = 2
shankari Apr 8, 2018
4e91287
Squish stops at the section segmentation level
shankari Apr 9, 2018
76e3529
Handle legacy bus stops
shankari Nov 1, 2018
1c359b7
Correctly check for UNKNOWN and AIR_OR_HSR
shankari Nov 2, 2018
ce2b757
Use the UNKNOWN section speeds if they exist
shankari Nov 2, 2018
1e9d46e
Fix the `-e` option for reset_pipeline
shankari Nov 2, 2018
ae781db
Fix code to remove inferred modes and reset the pipeline
shankari Nov 2, 2018
40e075e
Create "new" sections instead of merging if there is a long flip-flop…
shankari Nov 5, 2018
f1475af
Merge branch 'master' of https://github.com/e-mission/e-mission-serve…
shankari Nov 20, 2018
be19654
Merge branch 'master' of https://github.com/e-mission/e-mission-serve…
shankari Feb 27, 2019
c755280
Make ONE_FIFTY_KMPH actually represent 150 kmph
shankari Mar 1, 2019
784f197
Tighten the definition of simple bus stops to avoid false positives
shankari Mar 1, 2019
952a873
Merge pull request #1 from e-mission/master
shankari Mar 6, 2019
a374683
Finish pending merge so that we can fix `delete_user`
shankari Mar 24, 2019
db1978c
Merge branch 'ground_truth_matching' of https://github.com/shankari/e…
shankari Mar 24, 2019
dc60ccd
Merge some fixes from the tripware project
shankari May 13, 2019
351da86
Merge branch 'master' of https://github.com/e-mission/e-mission-serve…
shankari Jul 4, 2019
7afe46c
Merge pull request #578 from shankari/ground_truth_matching
shankari Jul 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions bin/analysis/remove_inferred_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,39 @@
import uuid
import arrow

import emission.analysis.classification.inference.mode.pipeline as eacimp
import emission.analysis.classification.inference.mode.reset as eacimr
import emission.core.get_database as edb
import emission.storage.decorations.user_queries as esdu
import emission.core.wrapper.user as ecwu

def _get_user_list(args):
if args.all:
return _find_all_users()
elif args.platform:
return _find_platform_users(args.platform)
elif args.email_list:
return _email_2_user_list(args.email_list)
else:
assert args.user_list is not None
return [uuid.UUID(u) for u in args.user_list]

def _find_platform_users(platform):
# Since all new clients register a profile with the server, we don't have
# to run a 'distinct' query over the entire contents of the timeseries.
# Instead, we can simply query from the profile users, which is
# significantly faster
# Use the commented out line instead for better performance.
# Soon, we can move to the more performant option, because there will be
# no users that don't have a profile
# return edb.get_timeseries_db().find({'metadata.platform': platform}).distinct(
# 'user_id')
return edb.get_profile_db().find({"curr_platform": platform}).distinct("user_id")

def _find_all_users():
return esdu.get_all_uuids()

def _email_2_user_list(email_list):
return [ecwu.User.fromEmail(e).uuid for e in email_list]

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -47,14 +77,14 @@
# Handle the first row in the table
if args.date is None:
if args.all:
eacimp.del_all_objects(args.dry_run)
eacimr.del_all_objects(args.dry_run)
else:
user_list = _get_user_list(args)
logging.info("received list with %s users" % user_list)
logging.info("first few entries are %s" % user_list[0:5])
for user_id in user_list:
logging.info("resetting user %s to start" % user_id)
eacimp.del_objects_after(user_id, 0, args.dry_run)
eacimr.del_objects_after(user_id, 0, args.dry_run)
else:
# Handle the second row in the table
day_dt = arrow.get(args.date, "YYYY-MM-DD")
Expand All @@ -66,5 +96,5 @@
logging.info("first few entries are %s" % user_list[0:5])
for user_id in user_list:
logging.info("resetting user %s to ts %s" % (user_id, day_ts))
eacimp.del_objects_after(user_id, day_ts, args.dry_run)
eacimr.del_objects_after(user_id, day_ts, args.dry_run)

3 changes: 2 additions & 1 deletion bin/reset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import emission.pipeline.reset as epr
import emission.core.get_database as edb
import emission.storage.decorations.user_queries as esdu
import emission.core.wrapper.user as ecwu

def _get_user_list(args):
if args.all:
Expand Down Expand Up @@ -49,7 +50,7 @@ def _find_all_users():
return esdu.get_all_uuids()

def _email_2_user_list(email_list):
return [ecwu.User.fromEmail(e) for e in email_list]
return [ecwu.User.fromEmail(e).uuid for e in email_list]

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand Down
2 changes: 2 additions & 0 deletions conf/analysis/debug.conf.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
"intake.cleaning.filter_accuracy.enable": false,
"classification.inference.mode.useAdvancedFeatureIndices": true,
"classification.inference.mode.useBusTrainFeatureIndices": true,
"section.startStopRadius": 150,
"section.endStopRadius": 150,
"analysis.result.section.key": "analysis/inferred_section"
}
3 changes: 3 additions & 0 deletions conf/net/ext_service/overpass_server.json.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"url": "server running the overpass API to query OSM (e.g. https://wiki.openstreetmap.org/wiki/Overpass_API)"
}
10 changes: 10 additions & 0 deletions conf/net/ext_service/overpass_transit_stops_query_template.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[out:json][timeout:25];
(
node["highway"="bus_stop"]({bbox});
node["railway"="station"]({bbox});
node["public_transport"]({bbox});
way["railway"="station"]({bbox});
relation["route"]({bbox});
);
out body;
>;
2 changes: 1 addition & 1 deletion e-mission-py.bash
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

# Make sure that the python here is the anaconda python if that is not the one in the path

PYTHONPATH=. python $*
PYTHONPATH=. python "$@"
58 changes: 3 additions & 55 deletions emission/analysis/classification/inference/mode/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,58 +49,6 @@ def predict_mode(user_id):
logging.exception("Error while inferring modes, timestamp is unchanged")
epq.mark_mode_inference_failed(user_id)

# Delete the objects created by this pipeline step (across users)
def del_all_objects(is_dry_run):
del_query = {}
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

del_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
logging.info("About to delete pipeline entries for stage %s" %
ecwp.PipelineStages.MODE_INFERENCE)

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().delete_many(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result.raw_result)
result = edb.get_pipeline_state_db().delete_many(del_pipeline_query)
logging.info("this is not a dry-run, result of deleting pipeline state is %s" % result.raw_result)

# Delete the objects created by this pipeline step (for a particular user)
def del_objects_after(user_id, reset_ts, is_dry_run):
del_query = {}
# handle the user
del_query.update({"user_id": user_id})

del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
# all objects inserted here have start_ts and end_ts and are trip-like
del_query.update({"data.start_ts": {"$gt": reset_ts}})
logging.debug("After all updates, del_query = %s" % del_query)

reset_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
# Fuzz the TRIP_SEGMENTATION stage 5 mins because of
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312730217
FUZZ_FACTOR = 5 * 60
reset_pipeline_update = {'$set': {'last_processed_ts': reset_ts + FUZZ_FACTOR}}
logging.info("About to reset stage %s to %s"
% (ecwp.PipelineStages.MODE_INFERENCE, reset_ts))


logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().remove(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result)

class ModeInferencePipeline:
def __init__(self):
self.featureLabels = ["distance", "duration", "first filter mode", "sectionId", "avg speed",
Expand Down Expand Up @@ -226,9 +174,6 @@ def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry):
if (hasattr(self, "air_cluster")):
featureMatrix[i, 21] = easf.mode_start_end_coverage(section, self.air_cluster,600)

if self.last_section_done is None or self.last_section_done.data.end_ts < section_entry.data.end_ts:
self.last_section_done = section_entry

# Replace NaN and inf by zeros so that it doesn't crash later
featureMatrix[i] = np.nan_to_num(featureMatrix[i])

Expand Down Expand Up @@ -344,6 +289,9 @@ def savePredictionsStep(self):
logging.debug("Updating sensed mode for section = %s to %s" %
(currSectionEntry.get_id(), ise.data.sensed_mode))
self.ts.insert(ise)
# Set last_section_done after saving because otherwise if there is an error
# during inference, we will not save results and never re-run
self.last_section_done = self.toPredictSections[-1]

if __name__ == "__main__":
import json
Expand Down
61 changes: 61 additions & 0 deletions emission/analysis/classification/inference/mode/reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp

import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp

# Delete the objects created by this pipeline step (across users)
def del_all_objects(is_dry_run):
del_query = {}
del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

del_pipeline_query = {"pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
logging.info("About to delete pipeline entries for stage %s" %
ecwp.PipelineStages.MODE_INFERENCE)

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().delete_many(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result.raw_result)
result = edb.get_pipeline_state_db().delete_many(del_pipeline_query)
logging.info("this is not a dry-run, result of deleting pipeline state is %s" % result.raw_result)

# Delete the objects created by this pipeline step (for a particular user)
def del_objects_after(user_id, reset_ts, is_dry_run):
del_query = {}
# handle the user
del_query.update({"user_id": user_id})

del_query.update({"metadata.key": {"$in": ["inference/prediction", "analysis/inferred_section"]}})
# all objects inserted here have start_ts and end_ts and are trip-like
del_query.update({"data.start_ts": {"$gt": reset_ts}})
logging.debug("After all updates, del_query = %s" % del_query)

reset_pipeline_query = {"user_id": user_id, "pipeline_stage": ecwp.PipelineStages.MODE_INFERENCE.value}
# Fuzz the TRIP_SEGMENTATION stage 5 mins because of
# https://github.com/e-mission/e-mission-server/issues/333#issuecomment-312730217
FUZZ_FACTOR = 5 * 60
reset_pipeline_update = {'$set': {'last_processed_ts': reset_ts + FUZZ_FACTOR}}
logging.info("About to reset stage %s to %s"
% (ecwp.PipelineStages.MODE_INFERENCE, reset_ts))


logging.info("About to delete %d entries"
% edb.get_analysis_timeseries_db().find(del_query).count())
logging.info("About to delete entries with keys %s"
% edb.get_analysis_timeseries_db().find(del_query).distinct("metadata.key"))

if is_dry_run:
logging.info("this is a dry-run, returning from del_objects_after without modifying anything")
else:
result = edb.get_analysis_timeseries_db().remove(del_query)
logging.info("this is not a dry-run, result of deleting analysis entries is %s" % result)
result = edb.get_pipeline_state_db().update_one(reset_pipeline_query, reset_pipeline_update)
logging.info("this is not a dry-run, result of updating pipeline state is %s" % result.raw_result)

Loading