Skip to content

Commit 265dbe2

Browse files
committed
Merge branch 'master' of https://github.com/e-mission/e-mission-server into batch_overpass
2 parents 3e1def0 + 88610f4 commit 265dbe2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+937
-2185
lines changed

.github/workflows/test-with-manual-install.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ jobs:
2626
strategy:
2727
matrix:
2828
os: [ubuntu-latest]
29+
env:
30+
USE_HINTS: True
2931

3032
# Steps represent a sequence of tasks that will be executed as part of the job
3133
steps:
@@ -35,7 +37,7 @@ jobs:
3537
- name: Install and start MongoDB
3638
uses: supercharge/[email protected]
3739
with:
38-
mongodb-version: 4.4.0
40+
mongodb-version: 8.0.4
3941

4042
- name: Check existing version of miniconda
4143
shell: bash -l {0}

Timeseries_Sample.ipynb

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@
135135
"outputs": [],
136136
"source": [
137137
"# Get all cleaned trips for the first user\n",
138-
"entry_it = ts.find_entries([\"analysis/cleaned_trip\"], time_query=None)"
138+
"entries = ts.find_entries([\"analysis/cleaned_trip\"], time_query=None)"
139139
]
140140
},
141141
{
@@ -152,11 +152,11 @@
152152
"metadata": {},
153153
"outputs": [],
154154
"source": [
155-
"for ct in entry_it:\n",
155+
"for ct in entries:\n",
156156
" cte = ecwe.Entry(ct)\n",
157157
" print(\"=== Trip:\", cte.data.start_loc, \"->\", cte.data.end_loc)\n",
158-
" section_it = esdt.get_sections_for_trip(\"analysis/cleaned_section\", test_user_id, cte.get_id())\n",
159-
" for sec in section_it:\n",
158+
" sections = esdt.get_sections_for_trip(\"analysis/cleaned_section\", test_user_id, cte.get_id())\n",
159+
" for sec in sections:\n",
160160
" print(\" --- Section:\", sec.data.start_loc, \"->\", sec.data.end_loc, \" on \", sec.data.sensed_mode)"
161161
]
162162
},
@@ -169,7 +169,7 @@
169169
"outputs": [],
170170
"source": [
171171
"# Get all cleaned trips for the second user\n",
172-
"entry_it = ts_2.find_entries([\"analysis/cleaned_trip\"], time_query=None)"
172+
"entries = ts_2.find_entries([\"analysis/cleaned_trip\"], time_query=None)"
173173
]
174174
},
175175
{
@@ -178,11 +178,11 @@
178178
"metadata": {},
179179
"outputs": [],
180180
"source": [
181-
"for ct in entry_it:\n",
181+
"for ct in entries:\n",
182182
" cte = ecwe.Entry(ct)\n",
183183
" print(\"=== Trip:\", cte.data.start_loc, \"->\", cte.data.end_loc)\n",
184-
" section_it = esdt.get_sections_for_trip(\"analysis/cleaned_section\", test_user_id, cte.get_id())\n",
185-
" for sec in section_it:\n",
184+
" sections = esdt.get_sections_for_trip(\"analysis/cleaned_section\", test_user_id, cte.get_id())\n",
185+
" for sec in sections:\n",
186186
" print(\" --- Section:\", sec.data.start_loc, \"->\", sec.data.end_loc, \" on \", sec.data.sensed_mode)"
187187
]
188188
},
@@ -198,10 +198,10 @@
198198
"aug_1_tq = estt.TimeQuery(\"data.start_ts\",\n",
199199
" arrow.get(\"2017-08-01\").timestamp, # start of range\n",
200200
" arrow.get(\"2017-08-02\").timestamp) # end of range\n",
201-
"entry_it = ts.find_entries([\"analysis/cleaned_trip\"], time_query=aug_1_tq)\n",
202-
"entry_it_2 = ts_2.find_entries([\"analysis/cleaned_trip\"], time_query=aug_1_tq)\n",
201+
"entries = ts.find_entries([\"analysis/cleaned_trip\"], time_query=aug_1_tq)\n",
202+
"entries2 = ts_2.find_entries([\"analysis/cleaned_trip\"], time_query=aug_1_tq)\n",
203203
"print(\"From %s -> %s, user %s had %d trips and user %s had %d trips\" %\n",
204-
" (aug_1_tq.startTs, aug_1_tq.endTs, test_user_id, len(list(entry_it)), test_user_id_2, len(list(entry_it_2))))"
204+
" (aug_1_tq.startTs, aug_1_tq.endTs, test_user_id, len(entries), test_user_id_2, len(entries_2)))"
205205
]
206206
},
207207
{

bin/debug/save_ground_truth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def save_diary(args):
1818
def save_ct_list(args):
1919
print("Saving confirmed trip list for %s to file %s" % (args.sel_uuid, args.file_name))
2020
ts = esta.TimeSeries.get_time_series(args.sel_uuid)
21-
analysis_objects = list(ts.find_entries(args.key_list, None))
21+
analysis_objects = ts.find_entries(args.key_list, None)
2222
print("Retrieved object is of length %s" % len(analysis_objects))
2323
json.dump(analysis_objects, open(args.file_name, "w"), indent=4, default=esj.wrapped_default)
2424

bin/historical/migrations/_common.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,25 @@
2626
]
2727
print(f"PROD_LIST: {PROD_LIST}")
2828

29-
def run_on_all_deployments(fn_to_run):
29+
def run_on_all_deployments(fn_to_run, *args):
3030
"""
3131
Run the given function on the database for each deployment by setting the
3232
DB_HOST environment variable in between each function call.
3333
The list of deployments (PROD_LIST) is retrieved from the
3434
nrel-openpath-deploy-configs repo upon initialization of this module.
3535
"""
36+
print(f'About to run {fn_to_run.__name__}{args} on {len(PROD_LIST)} deployments. Proceed? [y/n]')
37+
if input() != 'y':
38+
print("Aborting")
39+
return
3640
for prod in PROD_LIST:
41+
# e-bikes-for-essentials has a typo; treat as special case
42+
if prod == 'e-bikes-for-essentials':
43+
prod = 'ebikes-for-essentials'
44+
3745
prod_db_name = prod.replace("-", "_")
3846
print(f"Running {fn_to_run.__name__} for {prod} on DB {prod_db_name}")
3947
os.environ['DB_HOST'] = DB_HOST_TEMPLATE.replace(
4048
"REPLACEME", prod_db_name)
4149
importlib.reload(edb)
42-
fn_to_run()
50+
fn_to_run(*args)

bin/historical/migrations/add_sections_and_summaries_to_trips.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ def add_sections_to_trips(process_number, uuid_list, skip_if_no_new_data):
3333

3434
def add_sections_to_trips_for_user(uuid):
3535
ts = esta.TimeSeries.get_time_series(uuid)
36-
cleaned_trips = list(ts.find_entries([esda.CLEANED_TRIP_KEY]))
37-
confirmed_trips = list(ts.find_entries([esda.CONFIRMED_TRIP_KEY]))
38-
composite_trips = list(ts.find_entries([esda.COMPOSITE_TRIP_KEY]))
36+
cleaned_trips = ts.find_entries([esda.CLEANED_TRIP_KEY])
37+
confirmed_trips = ts.find_entries([esda.CONFIRMED_TRIP_KEY])
38+
composite_trips = ts.find_entries([esda.COMPOSITE_TRIP_KEY])
3939
cleaned_trips_map = dict((t["_id"], t) for t in cleaned_trips)
4040
composite_trips_map = dict((t["data"]["confirmed_trip"], t) for t in composite_trips)
4141
# This script is slow due to DB queries

bin/historical/migrations/inactive.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import arrow
2+
import pymongo
3+
import emission.core.get_database as edb
4+
import emission.storage.timeseries.abstract_timeseries as esta
5+
import bin.debug.common as common
6+
from _common import run_on_all_deployments
7+
8+
NOW_SECONDS = arrow.now().timestamp()
9+
10+
def find_inactive_uuids(uuids_entries, threshold):
11+
inactive_uuids = []
12+
for u in uuids_entries:
13+
print(f'Checking activity for user {u["uuid"]}')
14+
profile_data = edb.get_profile_db().find_one({'user_id': u})
15+
ts = esta.TimeSeries.get_time_series(u['uuid'])
16+
17+
if profile_data:
18+
last_call_ts = profile_data.get('last_call_ts')
19+
else:
20+
last_call_ts = ts.get_first_value_for_field(
21+
key='stats/server_api_time',
22+
field='data.ts',
23+
sort_order=pymongo.DESCENDING
24+
)
25+
26+
print(f'for user {u["uuid"]}, last call was {last_call_ts}')
27+
if last_call_ts > NOW_SECONDS - threshold:
28+
continue
29+
30+
if profile_data:
31+
last_loc_ts = profile_data.get('last_loc_ts')
32+
else:
33+
last_loc_ts = ts.get_first_value_for_field(
34+
key='background/location',
35+
field='data.ts',
36+
sort_order=pymongo.DESCENDING
37+
)
38+
39+
print(f'for user {u["uuid"]}, last location was {last_loc_ts}')
40+
if last_loc_ts > NOW_SECONDS - threshold:
41+
continue
42+
43+
print(f'User {u["uuid"]} is inactive')
44+
inactive_uuids.append(u['uuid'])
45+
46+
return inactive_uuids
47+
48+
def purge_users(uuids):
49+
print(f'About to remove {len(uuids)} users. Proceed? [y/n]')
50+
if input() != 'y':
51+
print("Aborting")
52+
return
53+
for u in uuids:
54+
print(f'Purging user {u}')
55+
common.purge_entries_for_user(u, True)
56+
57+
def start_inactive(threshold_s, purge):
58+
total_users = edb.get_uuid_db().count_documents({})
59+
print(f'Total users: {total_users}')
60+
uuids_entries = edb.get_uuid_db().find()
61+
print('Finding inactive users...')
62+
inactive_uuids = find_inactive_uuids(uuids_entries, threshold_s)
63+
print(f'Of {total_users} users, found {len(inactive_uuids)} inactive users:')
64+
print(inactive_uuids)
65+
66+
if purge:
67+
purge_users(inactive_uuids)
68+
69+
if __name__ == '__main__':
70+
import argparse
71+
parser = argparse.ArgumentParser(
72+
prog='inactive_users',
73+
description='Identify and perform actions on inactive users'
74+
)
75+
parser.add_argument('-t', '--threshold', help='amount of time in days that defines an inactive user')
76+
parser.add_argument('-p', '--purge', action='store_true', help='purge inactive users')
77+
args = parser.parse_args()
78+
79+
threshold_s = 60 * 60 * 24 * int(args.threshold)
80+
81+
run_on_all_deployments(start_inactive, threshold_s, args.purge)
82+
83+

0 commit comments

Comments
 (0)