|
| 1 | +import emission.analysis.modelling.trip_model.trip_model as eamuu |
| 2 | +from sklearn.cluster import DBSCAN |
| 3 | +import logging |
| 4 | +import numpy as np |
| 5 | +import pandas as pd |
| 6 | +import emission.analysis.modelling.trip_model.util as eamtu |
| 7 | +from sklearn.preprocessing import StandardScaler |
| 8 | +from sklearn.pipeline import make_pipeline |
| 9 | +from sklearn import svm |
| 10 | +from sklearn.metrics.pairwise import haversine_distances |
| 11 | + |
| 12 | +EARTH_RADIUS = 6371000 |
| 13 | + |
| 14 | +class DBSCANSVMCluster(eamuu.TripModel): |
| 15 | + """ DBSCAN-based clustering algorithm that optionally implements SVM |
| 16 | + sub-clustering. |
| 17 | +
|
| 18 | + Args: |
| 19 | + loc_type (str): 'start' or 'end', the type of point to cluster |
| 20 | + radius (int): max distance between two points in each other's |
| 21 | + neighborhood, i.e. DBSCAN's eps value. does not strictly |
| 22 | + dictate final cluster size |
| 23 | + size_thresh (int): the min number of trips a cluster must have |
| 24 | + to be considered for SVM sub-division |
| 25 | + purity_thresh (float): the min purity a cluster must have |
| 26 | + to be sub-divided using SVM |
| 27 | + gamma (float): coefficient for the rbf kernel in SVM |
| 28 | + C (float): regularization hyperparameter for SVM |
| 29 | +
|
| 30 | + Attributes: |
| 31 | + loc_type (str) |
| 32 | + radius (int) |
| 33 | + size_thresh (int) |
| 34 | + purity_thresh (float) |
| 35 | + gamma (float) |
| 36 | + C (float) |
| 37 | + train_df (DataFrame) |
| 38 | + test_df (DataFrame) |
| 39 | + base_model (sklearn Estimator) |
| 40 | + """ |
| 41 | + |
| 42 | + def __init__(self, |
| 43 | + loc_type='end', |
| 44 | + radius=100, |
| 45 | + svm=True, |
| 46 | + size_thresh=1, |
| 47 | + purity_thresh=1.0, |
| 48 | + gamma=0.05, |
| 49 | + C=1): |
| 50 | + logging.info("PERF: Initializing DBSCANSVMCluster") |
| 51 | + self.loc_type = loc_type |
| 52 | + self.radius = radius |
| 53 | + self.svm = svm |
| 54 | + self.size_thresh = size_thresh |
| 55 | + self.purity_thresh = purity_thresh |
| 56 | + self.gamma = gamma |
| 57 | + self.C = C |
| 58 | + |
| 59 | + def set_params(self, params): |
| 60 | + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] |
| 61 | + if 'radius' in params.keys(): self.radius = params['radius'] |
| 62 | + if 'svm' in params.keys(): self.svm = params['svm'] |
| 63 | + if 'size_thresh' in params.keys(): |
| 64 | + self.size_thresh = params['size_thresh'] |
| 65 | + if 'purity_thresh' in params.keys(): |
| 66 | + self.purity_thresh = params['purity_thresh'] |
| 67 | + if 'gamma' in params.keys(): self.gamma = params['gamma'] |
| 68 | + |
| 69 | + return self |
| 70 | + |
| 71 | + def fit(self, train_df,ct_entry=None): |
| 72 | + """ Creates clusters of trip points. |
| 73 | + self.train_df will be updated with columns containing base and |
| 74 | + final clusters. |
| 75 | +
|
| 76 | + TODO: perhaps move the loc_type argument to fit() so we can use a |
| 77 | + single class instance to cluster both start and end points. This |
| 78 | + will also help us reduce duplicate data. |
| 79 | +
|
| 80 | + Args: |
| 81 | + train_df (dataframe): dataframe of labeled trips |
| 82 | + ct_entry (List) : A list of Entry type of labeled and unlabeled trips |
| 83 | + """ |
| 84 | + ################## |
| 85 | + ### clean data ### |
| 86 | + ################## |
| 87 | + logging.info("PERF: Fitting DBSCANSVMCluster") |
| 88 | + self.train_df = self._clean_data(train_df) |
| 89 | + |
| 90 | + # we can use all trips as long as they have purpose labels. it's ok if |
| 91 | + # they're missing mode/replaced-mode labels, because they aren't as |
| 92 | + # strongly correlated with location compared to purpose |
| 93 | + # TODO: actually, we may want to rethink this. for example, it will |
| 94 | + # probably be helpful to include trips that are missing purpose labels |
| 95 | + # but still have mode labels. |
| 96 | + if self.train_df.purpose_true.isna().any(): |
| 97 | + num_nan = self.train_df.purpose_true.value_counts( |
| 98 | + dropna=False).loc[np.nan] |
| 99 | + logging.info( |
| 100 | + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' |
| 101 | + ) |
| 102 | + self.train_df = self.train_df.dropna( |
| 103 | + subset=['purpose_true']).reset_index(drop=True) |
| 104 | + if len(self.train_df) == 0: |
| 105 | + # i.e. no valid trips after removing all nans |
| 106 | + raise Exception('no valid trips; nothing to fit') |
| 107 | + |
| 108 | + ######################### |
| 109 | + ### get base clusters ### |
| 110 | + ######################### |
| 111 | + dist_matrix_meters = eamtu.get_distance_matrix(self.train_df, self.loc_type) |
| 112 | + self.base_model = DBSCAN(self.radius, |
| 113 | + metric="precomputed", |
| 114 | + min_samples=1).fit(dist_matrix_meters) |
| 115 | + base_clusters = self.base_model.labels_ |
| 116 | + |
| 117 | + self.train_df.loc[:, |
| 118 | + f'{self.loc_type}_base_cluster_idx'] = base_clusters |
| 119 | + |
| 120 | + ######################## |
| 121 | + ### get sub-clusters ### |
| 122 | + ######################## |
| 123 | + # copy base cluster column into final cluster column |
| 124 | + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ |
| 125 | + f'{self.loc_type}_base_cluster_idx'] |
| 126 | + |
| 127 | + if self.svm: |
| 128 | + c = 0 # count of how many clusters we have iterated over |
| 129 | + |
| 130 | + # iterate over all clusters and subdivide them with SVM. the while |
| 131 | + # loop is so we can do multiple iterations of subdividing if needed |
| 132 | + while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): |
| 133 | + points_in_cluster = self.train_df[ |
| 134 | + self.train_df[f'{self.loc_type}_cluster_idx'] == c] |
| 135 | + |
| 136 | + # only do SVM if we have the minimum num of trips in the cluster |
| 137 | + if len(points_in_cluster) < self.size_thresh: |
| 138 | + c += 1 |
| 139 | + continue |
| 140 | + |
| 141 | + # only do SVM if purity is below threshold |
| 142 | + purity = eamtu.single_cluster_purity(points_in_cluster, |
| 143 | + label_col='purpose_true') |
| 144 | + if purity < self.purity_thresh: |
| 145 | + X = points_in_cluster[[ |
| 146 | + f"{self.loc_type}_lon", f"{self.loc_type}_lat" |
| 147 | + ]] |
| 148 | + y = points_in_cluster.purpose_true.to_list() |
| 149 | + |
| 150 | + svm_model = make_pipeline( |
| 151 | + StandardScaler(), |
| 152 | + svm.SVC( |
| 153 | + kernel='rbf', |
| 154 | + gamma=self.gamma, |
| 155 | + C=self.C, |
| 156 | + )).fit(X, y) |
| 157 | + labels = svm_model.predict(X) |
| 158 | + unique_labels = np.unique(labels) |
| 159 | + |
| 160 | + # if the SVM predicts that all points in the cluster have |
| 161 | + # the same label, just ignore it and don't reindex. |
| 162 | + # this also helps us to handle the possibility that a |
| 163 | + # cluster may be impure but inherently inseparable, e.g. an |
| 164 | + # end cluster at a user's home, containing 50% trips from |
| 165 | + # work to home and 50% round trips that start and end at |
| 166 | + # home. we don't want to reindex otherwise the low purity |
| 167 | + # will trigger SVM again, and we will attempt & fail to |
| 168 | + # split the cluster ad infinitum |
| 169 | + if len(unique_labels) > 1: |
| 170 | + # map purpose labels to new cluster indices |
| 171 | + # we offset indices by the max existing index so that we |
| 172 | + # don't run into any duplicate indices |
| 173 | + max_existing_idx = self.train_df[ |
| 174 | + f'{self.loc_type}_cluster_idx'].max() |
| 175 | + label_to_cluster = { |
| 176 | + unique_labels[i]: i + max_existing_idx + 1 |
| 177 | + for i in range(len(unique_labels)) |
| 178 | + } |
| 179 | + # update trips with their new cluster indices |
| 180 | + indices = np.array( |
| 181 | + [label_to_cluster[l] for l in labels]) |
| 182 | + self.train_df.loc[ |
| 183 | + self.train_df[f'{self.loc_type}_cluster_idx'] == c, |
| 184 | + f'{self.loc_type}_cluster_idx'] = indices |
| 185 | + |
| 186 | + c += 1 |
| 187 | + # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline |
| 188 | + |
| 189 | + return self |
| 190 | + |
| 191 | + def fit_predict(self, train_df): |
| 192 | + """ Override to avoid unnecessarily computation of distance matrices. |
| 193 | + """ |
| 194 | + self.fit(train_df) |
| 195 | + return self.train_df[[f'{self.loc_type}_cluster_idx']] |
| 196 | + |
| 197 | + def predict(self, test_df): |
| 198 | + logging.info("PERF: Predicting DBSCANSVMCluster") |
| 199 | + # TODO: store clusters as polygons so the prediction is faster |
| 200 | + # TODO: we probably don't want to store test_df in self to be more memory-efficient |
| 201 | + self.test_df = self._clean_data(test_df) |
| 202 | + pred_clusters = self._NN_predict(self.test_df) |
| 203 | + |
| 204 | + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters |
| 205 | + |
| 206 | + return self.test_df[[f'{self.loc_type}_cluster_idx']] |
| 207 | + |
| 208 | + def _NN_predict(self, test_df): |
| 209 | + """ Generate base-cluster predictions for the test data using a |
| 210 | + nearest-neighbor approach. |
| 211 | + |
| 212 | + sklearn doesn't implement predict() for DBSCAN, which is why we |
| 213 | + need a custom method. |
| 214 | + """ |
| 215 | + logging.info("PERF: NN_predicting DBSCANSVMCluster") |
| 216 | + n_samples = test_df.shape[0] |
| 217 | + labels = np.ones(shape=n_samples, dtype=int) * -1 |
| 218 | + |
| 219 | + # get coordinates of core points (we can't use model.components_ |
| 220 | + # because our input feature was a distance matrix and doesn't contain |
| 221 | + # info about the raw coordinates) |
| 222 | + # NOTE: technically, every single point in a cluster is a core point |
| 223 | + # because it has at least minPts (2) points, including itself, in its |
| 224 | + # radius |
| 225 | + train_coordinates = self.train_df[[ |
| 226 | + f'{self.loc_type}_lat', f'{self.loc_type}_lon' |
| 227 | + ]] |
| 228 | + train_radians = np.radians(train_coordinates) |
| 229 | + |
| 230 | + for idx, row in test_df.reset_index(drop=True).iterrows(): |
| 231 | + # calculate the distances between the ith test data and all points, |
| 232 | + # then find the index of the closest point. if the ith test data is |
| 233 | + # within epsilon of the point, then assign its cluster to the ith |
| 234 | + # test data (otherwise, leave it as -1, indicating noise). |
| 235 | + # unfortunately, pairwise_distances_argmin() does not support |
| 236 | + # haversine distance, so we have to reimplement it ourselves |
| 237 | + new_loc_radians = np.radians( |
| 238 | + row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) |
| 239 | + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) |
| 240 | + dist_matrix_meters = haversine_distances( |
| 241 | + new_loc_radians, train_radians) * EARTH_RADIUS |
| 242 | + |
| 243 | + shortest_dist_idx = np.argmin(dist_matrix_meters) |
| 244 | + if dist_matrix_meters[0, shortest_dist_idx] < self.radius: |
| 245 | + labels[idx] = self.train_df.reset_index( |
| 246 | + drop=True).loc[shortest_dist_idx, |
| 247 | + f'{self.loc_type}_cluster_idx'] |
| 248 | + |
| 249 | + return labels |
| 250 | + |
0 commit comments