|
| 1 | +# Standard imports |
| 2 | +from pymongo import MongoClient |
| 3 | +import logging |
| 4 | +import sys |
| 5 | +import os |
| 6 | +import numpy as np |
| 7 | +import scipy as sp |
| 8 | +import time |
| 9 | +import json |
| 10 | +import copy |
| 11 | + |
| 12 | +# Get the configuration for the classifier |
| 13 | +import emission.analysis.config as eac |
| 14 | + |
| 15 | +# Our imports |
| 16 | +import emission.storage.timeseries.abstract_timeseries as esta |
| 17 | +import emission.storage.decorations.analysis_timeseries_queries as esda |
| 18 | +import emission.storage.decorations.trip_queries as esdt |
| 19 | +import emission.storage.pipeline_queries as epq |
| 20 | +import emission.analysis.section_features as easf |
| 21 | + |
| 22 | +import emission.core.get_database as edb |
| 23 | +import emission.core.wrapper.entry as ecwe |
| 24 | +import emission.core.wrapper.modeprediction as ecwm |
| 25 | +import emission.core.wrapper.motionactivity as ecwma |
| 26 | + |
| 27 | +from uuid import UUID |
| 28 | + |
| 29 | +# We are not going to use the feature matrix for analysis unless we have at |
| 30 | +# least 50 points in the training set. 50 is arbitrary. We could also consider |
| 31 | +# combining the old and new training data, but this is really a bootstrapping |
| 32 | +# problem, so we don't need to solve it right now. |
| 33 | +minTrainingSetSize = 1000 |
| 34 | + |
| 35 | +def predictMode(user_id): |
| 36 | + time_query = epq.get_time_range_for_segmentation(user_id) |
| 37 | + try: |
| 38 | + mip = ModeInferencePipeline() |
| 39 | + mip.user_id = user_id |
| 40 | + mip.runPredictionPipeline(user_id, time_query) |
| 41 | + if mip.getLastTimestamp() == 0: |
| 42 | + logging.debug("after, run, last_timestamp == 0, must be early return") |
| 43 | + epq.mark_mode_inference_done(user_id, None) |
| 44 | + return |
| 45 | + else: |
| 46 | + epq.mark_mode_inference_done(user_id, mip.getLastTimestamp()) |
| 47 | + except: |
| 48 | + epq.mark_mode_inference_failed(user_id) |
| 49 | + |
| 50 | +class ModeInferencePipeline: |
| 51 | + def __init__(self): |
| 52 | + self.featureLabels = ["distance", "duration", "first filter mode", "sectionId", "avg speed", |
| 53 | + "speed EV", "speed variance", "max speed", "max accel", "isCommute", |
| 54 | + "heading change rate", "stop rate", "velocity change rate", |
| 55 | + "start lat", "start lng", "stop lat", "stop lng", |
| 56 | + "start hour", "end hour", "close to bus stop", "close to train stop", |
| 57 | + "close to airport"] |
| 58 | + self.last_timestamp = 0 |
| 59 | + with open("emission/analysis/classification/inference/mode/mode_id_old2new.txt") as fp: |
| 60 | + self.seed_modes_mapping = json.load(fp) |
| 61 | + logging.debug("Loaded modes %s" % self.seed_modes_mapping) |
| 62 | + |
| 63 | + def getLastTimestamp(self): |
| 64 | + return self.last_timestamp |
| 65 | + |
| 66 | + # At this point, none of the clients except for CCI are supporting ground |
| 67 | + # truth, and even cci is only supporting trip-level ground truth. So this |
| 68 | + # version of the pipeline will just load a previously created model, that was |
| 69 | + # created from the small store of data that we do have ground truth for, and |
| 70 | + # we documented to have ~ 70% accuracy in the 2014 e-mission paper. |
| 71 | + |
| 72 | + def runPredictionPipeline(self, user_id, timerange): |
| 73 | + self.ts = esta.TimeSeries.get_time_series(user_id) |
| 74 | + self.toPredictSections = esda.get_entries(esda.CLEANED_SECTION_KEY, user_id, |
| 75 | + time_query=None) |
| 76 | + if (len(self.toPredictSections) == 0): |
| 77 | + logging.debug("len(toPredictSections) == 0, early return") |
| 78 | + assert(self.last_timestamp == 0) |
| 79 | + return None |
| 80 | + |
| 81 | + self.loadModelStage() |
| 82 | + logging.info("loadModelStage DONE") |
| 83 | + self.selFeatureIndices = self.selectFeatureIndicesStep() |
| 84 | + logging.info("selectFeatureIndicesStep DONE") |
| 85 | + (self.toPredictFeatureMatrix, self.tripIds, self.sectionIds) = \ |
| 86 | + self.generateFeatureMatrixAndIDsStep(self.toPredictSections) |
| 87 | + logging.info("generateFeatureMatrixAndIDsStep DONE") |
| 88 | + self.predictedProb = self.predictModesStep() |
| 89 | + #This is a matrix of the entries and their corresponding probabilities for each classification |
| 90 | + logging.info("predictModesStep DONE") |
| 91 | + self.savePredictionsStep() |
| 92 | + logging.info("savePredictionsStep DONE") |
| 93 | + |
| 94 | + def loadModelStage(self): |
| 95 | + # TODO: Consider removing this import by moving the model save/load code to |
| 96 | + # its own module so that we can eventually remove the old pipeline code |
| 97 | + import emission.analysis.classification.inference.mode.seed.pipeline as seedp |
| 98 | + self.model = seedp.ModeInferencePipelineMovesFormat.loadModel() |
| 99 | + |
| 100 | +# Features are: |
| 101 | +# 0. distance |
| 102 | +# 1. duration |
| 103 | +# 2. first filter mode |
| 104 | +# 3. sectionId |
| 105 | +# 4. avg speed |
| 106 | +# 5. speed EV |
| 107 | +# 6. speed variance |
| 108 | +# 7. max speed |
| 109 | +# 8. max accel |
| 110 | +# 9. isCommute |
| 111 | +# 10. heading change rate (currently unfilled) |
| 112 | +# 11. stop rate (currently unfilled) |
| 113 | +# 12. velocity change rate (currently unfilled) |
| 114 | +# 13. start lat |
| 115 | +# 14. start lng |
| 116 | +# 15. stop lat |
| 117 | +# 16. stop lng |
| 118 | +# 17. start hour |
| 119 | +# 18. end hour |
| 120 | +# 19. both start and end close to bus stop |
| 121 | +# 20. both start and end close to train station |
| 122 | +# 21. both start and end close to airport |
| 123 | + def updateFeatureMatrixRowWithSection(self, featureMatrix, i, section_entry): |
| 124 | + section = section_entry.data |
| 125 | + featureMatrix[i, 0] = section.distance |
| 126 | + featureMatrix[i, 1] = section.duration |
| 127 | + |
| 128 | + featureMatrix[i, 2] = section.sensed_mode.value |
| 129 | + # TODO: Figure out if I can get the section id from the new style sections |
| 130 | + # featureMatrix[i, 3] = section['_id'] |
| 131 | + featureMatrix[i, 4] = easf.calOverallSectionSpeed(section) |
| 132 | + |
| 133 | + speeds = section['speeds'] |
| 134 | + |
| 135 | + if speeds != None and len(speeds) > 0: |
| 136 | + featureMatrix[i, 5] = np.mean(speeds) |
| 137 | + featureMatrix[i, 6] = np.std(speeds) |
| 138 | + featureMatrix[i, 7] = np.max(speeds) |
| 139 | + else: |
| 140 | + # They will remain zero |
| 141 | + pass |
| 142 | + |
| 143 | + accels = easf.calAccels(section) |
| 144 | + if accels != None and len(accels) > 0: |
| 145 | + featureMatrix[i, 8] = np.max(accels) |
| 146 | + else: |
| 147 | + # They will remain zero |
| 148 | + pass |
| 149 | + |
| 150 | + featureMatrix[i, 9] = False |
| 151 | + featureMatrix[i, 10] = easf.calHCR(section_entry) |
| 152 | + featureMatrix[i, 11] = easf.calSR(section_entry) |
| 153 | + featureMatrix[i, 12] = easf.calVCR(section_entry) |
| 154 | + if 'start_loc' in section and section['end_loc'] != None: |
| 155 | + startCoords = section['start_loc']['coordinates'] |
| 156 | + featureMatrix[i, 13] = startCoords[0] |
| 157 | + featureMatrix[i, 14] = startCoords[1] |
| 158 | + |
| 159 | + if 'end_loc' in section and section['end_loc'] != None: |
| 160 | + endCoords = section['end_loc']['coordinates'] |
| 161 | + featureMatrix[i, 15] = endCoords[0] |
| 162 | + featureMatrix[i, 16] = endCoords[1] |
| 163 | + |
| 164 | + featureMatrix[i, 17] = section['start_local_dt']['hour'] |
| 165 | + featureMatrix[i, 18] = section['end_local_dt']['hour'] |
| 166 | + |
| 167 | + if (hasattr(self, "bus_cluster")): |
| 168 | + featureMatrix[i, 19] = easf.mode_start_end_coverage(section, self.bus_cluster,105) |
| 169 | + if (hasattr(self, "train_cluster")): |
| 170 | + featureMatrix[i, 20] = easf.mode_start_end_coverage(section, self.train_cluster,600) |
| 171 | + if (hasattr(self, "air_cluster")): |
| 172 | + featureMatrix[i, 21] = easf.mode_start_end_coverage(section, self.air_cluster,600) |
| 173 | + |
| 174 | + if self.last_timestamp < section.end_ts: |
| 175 | + self.last_timestamp = section.end_ts |
| 176 | + |
| 177 | + # Replace NaN and inf by zeros so that it doesn't crash later |
| 178 | + featureMatrix[i] = np.nan_to_num(featureMatrix[i]) |
| 179 | + |
| 180 | +# Feature Indices |
| 181 | + def selectFeatureIndicesStep(self): |
| 182 | + genericFeatureIndices = list(range(0,2)) + list(range(4,9)) |
| 183 | + AdvancedFeatureIndices = list(range(10,13)) |
| 184 | + LocationFeatureIndices = list(range(13,17)) |
| 185 | + TimeFeatureIndices = list(range(17,19)) |
| 186 | + BusTrainFeatureIndices = list(range(19,22)) |
| 187 | + logging.debug("generic features = %s" % genericFeatureIndices) |
| 188 | + logging.debug("advanced features = %s" % AdvancedFeatureIndices) |
| 189 | + logging.debug("location features = %s" % LocationFeatureIndices) |
| 190 | + logging.debug("time features = %s" % TimeFeatureIndices) |
| 191 | + logging.debug("bus train features = %s" % BusTrainFeatureIndices) |
| 192 | + if eac.get_config()["classification.inference.mode.useAdvancedFeatureIndices"]: |
| 193 | + return genericFeatureIndices + AdvancedFeatureIndices + BusTrainFeatureIndices |
| 194 | + else: |
| 195 | + return genericFeatureIndices + BusTrainFeatureIndices |
| 196 | + |
| 197 | + def generateFeatureMatrixAndIDsStep(self, toPredictSections): |
| 198 | + toPredictSections = toPredictSections |
| 199 | + numsections = len(toPredictSections) |
| 200 | + |
| 201 | + logging.debug("Predicting values for %d sections" % numsections) |
| 202 | + featureMatrix = np.zeros([numsections, len(self.featureLabels)]) |
| 203 | + sectionIds = [] |
| 204 | + tripIds = [] |
| 205 | + for (i, section) in enumerate(toPredictSections): |
| 206 | + if i % 50 == 0: |
| 207 | + logging.debug("Processing test record %s " % i) |
| 208 | + self.updateFeatureMatrixRowWithSection(featureMatrix, i, section) |
| 209 | + sectionIds.append(section['_id']) |
| 210 | + tripIds.append(section.data.trip_id) |
| 211 | + |
| 212 | + return (featureMatrix[:,self.selFeatureIndices], tripIds, sectionIds) |
| 213 | + |
| 214 | + def predictModesStep(self): |
| 215 | + return self.model.predict_proba(self.toPredictFeatureMatrix) |
| 216 | + |
| 217 | + # The current probability will only have results for values from the set of |
| 218 | + # unique values in the resultVector. This means that the location of the |
| 219 | + # highest probability is not a 1:1 mapping to the mode, which will probably |
| 220 | + # have issues down the road. We are going to fix this here by storing the |
| 221 | + # non-zero probabilities in a map instead of in a list. We used to have an |
| 222 | + # list here, but we move to a map instead because we plan to support lots of |
| 223 | + # different modes, and having an giant array consisting primarily of zeros |
| 224 | + # doesn't sound like a great option. |
| 225 | + # In other words, uniqueModes = [1, 5] |
| 226 | + # predictedProb = [[1,0], [0,1]] |
| 227 | + # allModes has length 8 |
| 228 | + # returns [{'walking': 1}, {'bus': 1}] |
| 229 | + def convertPredictedProbToMap(self, uniqueModes, predictedProbArr): |
| 230 | + currProbMap = {} |
| 231 | + uniqueModesInt = [int(um) for um in uniqueModes] |
| 232 | + logging.debug("predictedProbArr has %s non-zero elements" % np.count_nonzero(predictedProbArr)) |
| 233 | + logging.debug("uniqueModes are %s " % uniqueModesInt) |
| 234 | + logging.debug("predictedProbArr = %s" % predictedProbArr) |
| 235 | + for (j, oldMode) in enumerate(uniqueModesInt): |
| 236 | + if predictedProbArr[j] != 0: |
| 237 | + uniqueMode = self.seed_modes_mapping[str(oldMode)] |
| 238 | + modeName = ecwm.PredictedModeTypes(uniqueMode).name |
| 239 | + logging.debug("Setting probability of mode %s (%s) to %s" % |
| 240 | + (uniqueMode, modeName, predictedProbArr[j])) |
| 241 | + currProbMap[modeName] = predictedProbArr[j] |
| 242 | + # logging.debug("after setting value = %s" % currProbMap[modeName]) |
| 243 | + # logging.debug("after setting map = %s" % currProbMap) |
| 244 | + # logging.debug("Returning map %s" % currProbMap) |
| 245 | + return currProbMap |
| 246 | + |
| 247 | + def savePredictionsStep(self): |
| 248 | + from emission.core.wrapper.user import User |
| 249 | + from emission.core.wrapper.client import Client |
| 250 | + |
| 251 | + uniqueModes = self.model.classes_ |
| 252 | + |
| 253 | + for i in range(self.predictedProb.shape[0]): |
| 254 | + currSectionEntry = self.toPredictSections[i] |
| 255 | + currSection = currSectionEntry.data |
| 256 | + currProb = self.convertPredictedProbToMap(uniqueModes, self.predictedProb[i]) |
| 257 | + |
| 258 | + # Special handling for the AIR mode |
| 259 | + # AIR is not a mode that is sensed from the phone, but it is inferred |
| 260 | + # through some heuristics in cleanAndResample instead of through the |
| 261 | + # decision tree. Ideally those heurstics should be replaced by the |
| 262 | + # inference through the decision tree, or through a separate heuristic |
| 263 | + # step. But we are out of time for a bigger refactor here. |
| 264 | + # so we say that if the sensed mode == AIR, we are going to use it |
| 265 | + # directly and ignore the inferred mode |
| 266 | + if currSection.sensed_mode == ecwma.MotionTypes.AIR_OR_HSR: |
| 267 | + currProb = {'AIR_OR_HSR': 1.0} |
| 268 | + |
| 269 | + # Insert the prediction |
| 270 | + mp = ecwm.Modeprediction() |
| 271 | + mp.trip_id = currSection.trip_id |
| 272 | + mp.section_id = currSectionEntry.get_id() |
| 273 | + mp.algorithm_id = ecwm.AlgorithmTypes.SEED_RANDOM_FOREST |
| 274 | + mp.predicted_mode_map = currProb |
| 275 | + mp.start_ts = currSection.start_ts |
| 276 | + mp.end_ts = currSection.end_ts |
| 277 | + self.ts.insert_data(self.user_id, "inference/prediction", mp) |
| 278 | + |
| 279 | + # Since there is currently only one prediction, create the inferred |
| 280 | + # section object right here |
| 281 | + is_dict = copy.copy(currSectionEntry) |
| 282 | + del is_dict["_id"] |
| 283 | + is_dict["metadata"]["key"] = "analysis/inferred_section" |
| 284 | + is_dict["data"]["sensed_mode"] = ecwm.PredictedModeTypes[easf.select_inferred_mode([mp])].value |
| 285 | + is_dict["data"]["cleaned_section"] = currSectionEntry.get_id() |
| 286 | + ise = ecwe.Entry(is_dict) |
| 287 | + logging.debug("Updating sensed mode for section = %s to %s" % |
| 288 | + (currSectionEntry.get_id(), ise.data.sensed_mode)) |
| 289 | + self.ts.insert(ise) |
| 290 | + |
| 291 | +if __name__ == "__main__": |
| 292 | + import json |
| 293 | + |
| 294 | + config_data = json.load(open('config.json')) |
| 295 | + log_base_dir = config_data['paths']['log_base_dir'] |
| 296 | + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', |
| 297 | + filename="%s/pipeline.log" % log_base_dir, level=logging.DEBUG) |
| 298 | + modeInferPipeline = ModeInferencePipeline() |
| 299 | + modeInferPipeline.runPipeline() |
0 commit comments