Skip to content

Commit fee1bef

Browse files
authored
Merge pull request #993 from TeachMeTW/Extend-Get-Store-Range
Enhance `_get_and_store_range` Function to Include Trip Statistics and Last API Call Tracking
2 parents 91b4b57 + ae549f2 commit fee1bef

File tree

4 files changed

+220
-16
lines changed

4 files changed

+220
-16
lines changed

emission/analysis/result/user_stat.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# emission/analysis/result/user_stats.py
2+
3+
import logging
4+
import pymongo
5+
import arrow
6+
from typing import Optional, Dict, Any
7+
import emission.storage.timeseries.abstract_timeseries as esta
8+
import emission.core.wrapper.user as ecwu
9+
10+
def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]:
11+
"""
12+
Retrieves the last API call timestamp.
13+
14+
:param ts: The time series object.
15+
:type ts: esta.TimeSeries
16+
:return: The last call timestamp or None if not found.
17+
:rtype: Optional[int]
18+
"""
19+
last_call_ts = ts.get_first_value_for_field(
20+
key='stats/server_api_time',
21+
field='data.ts',
22+
sort_order=pymongo.DESCENDING
23+
)
24+
logging.debug(f"Last call timestamp: {last_call_ts}")
25+
return None if last_call_ts == -1 else last_call_ts
26+
27+
28+
def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
29+
"""
30+
Updates the user profile with the provided data.
31+
32+
:param user_id: The UUID of the user.
33+
:type user_id: str
34+
:param data: The data to update in the user profile.
35+
:type data: Dict[str, Any]
36+
:return: None
37+
"""
38+
user = ecwu.User.fromUUID(user_id)
39+
user.update(data)
40+
logging.debug(f"User profile updated with data: {data}")
41+
logging.debug(f"New profile: {user.getProfile()}")
42+
43+
44+
def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
45+
"""
46+
Aggregates and stores user statistics into the user profile.
47+
48+
:param user_id: The UUID of the user.
49+
:type user_id: str
50+
:param trip_key: The key representing the trip data in the time series.
51+
:type trip_key: str
52+
:return: None
53+
"""
54+
try:
55+
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")
56+
57+
ts = esta.TimeSeries.get_time_series(user_id)
58+
start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
59+
start_ts = None if start_ts_result == -1 else start_ts_result
60+
61+
end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
62+
end_ts = None if end_ts_result == -1 else end_ts_result
63+
64+
total_trips = ts.find_entries_count(key_list=["analysis/confirmed_trip"])
65+
labeled_trips = ts.find_entries_count(
66+
key_list=["analysis/confirmed_trip"],
67+
extra_query_list=[{'data.user_input': {'$ne': {}}}]
68+
)
69+
70+
logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
71+
logging.info(f"user_id type: {type(user_id)}")
72+
73+
last_call_ts = get_last_call_timestamp(ts)
74+
logging.info(f"Last call timestamp: {last_call_ts}")
75+
76+
update_data = {
77+
"pipeline_range": {
78+
"start_ts": start_ts,
79+
"end_ts": end_ts
80+
},
81+
"total_trips": total_trips,
82+
"labeled_trips": labeled_trips,
83+
"last_call_ts": last_call_ts
84+
}
85+
86+
update_user_profile(user_id, update_data)
87+
88+
logging.debug("User profile updated successfully.")
89+
90+
except Exception as e:
91+
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")

emission/pipeline/intake_stage.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from uuid import UUID
1414
import time
1515
import pymongo
16+
from datetime import datetime
1617

1718
import emission.core.get_database as edb
1819
import emission.core.timer as ect
@@ -39,6 +40,7 @@
3940
import emission.storage.decorations.stats_queries as esds
4041

4142
import emission.core.wrapper.user as ecwu
43+
import emission.analysis.result.user_stat as eaurs
4244

4345
def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
4446
"""
@@ -201,20 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
201203
with ect.Timer() as gsr:
202204
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
203205
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
204-
_get_and_store_range(uuid, "analysis/composite_trip")
206+
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
205207

206208
esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
207-
time.time(), gsr.elapsed)
208-
209-
def _get_and_store_range(user_id, trip_key):
210-
ts = esta.TimeSeries.get_time_series(user_id)
211-
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
212-
if start_ts == -1:
213-
start_ts = None
214-
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
215-
if end_ts == -1:
216-
end_ts = None
217-
218-
user = ecwu.User(user_id)
219-
user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}})
220-
logging.debug("After updating, new profiles is %s" % user.getProfile())
209+
time.time(), gsr.elapsed)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from __future__ import unicode_literals, print_function, division, absolute_import
2+
import unittest
3+
import uuid
4+
import logging
5+
import json
6+
import os
7+
import time
8+
import pandas as pd
9+
10+
from builtins import *
11+
from future import standard_library
12+
standard_library.install_aliases()
13+
14+
# Standard imports
15+
import emission.storage.json_wrappers as esj
16+
17+
# Our imports
18+
import emission.core.get_database as edb
19+
import emission.storage.timeseries.timequery as estt
20+
import emission.storage.timeseries.abstract_timeseries as esta
21+
import emission.storage.decorations.analysis_timeseries_queries as esda
22+
import emission.core.wrapper.user as ecwu
23+
import emission.net.api.stats as enac
24+
import emission.pipeline.intake_stage as epi
25+
26+
# Test imports
27+
import emission.tests.common as etc
28+
29+
30+
class TestUserStats(unittest.TestCase):
31+
def setUp(self):
32+
"""
33+
Set up the test environment by loading real example data for both Android and users.
34+
"""
35+
# Set up the real example data with entries
36+
self.testUUID = uuid.uuid4()
37+
with open("emission/tests/data/real_examples/shankari_2015-aug-21") as fp:
38+
self.entries = json.load(fp, object_hook = esj.wrapped_object_hook)
39+
# Retrieve the user profile
40+
etc.setupRealExampleWithEntries(self)
41+
profile = edb.get_profile_db().find_one({"user_id": self.testUUID})
42+
if profile is None:
43+
# Initialize the profile if it does not exist
44+
edb.get_profile_db().insert_one({"user_id": self.testUUID})
45+
46+
#etc.runIntakePipeline(self.testUUID)
47+
epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False)
48+
logging.debug("UUID = %s" % (self.testUUID))
49+
50+
def tearDown(self):
51+
"""
52+
Clean up the test environment by removing analysis configuration and deleting test data from databases.
53+
"""
54+
55+
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
56+
edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID})
57+
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})
58+
edb.get_profile_db().delete_one({"user_id": self.testUUID})
59+
60+
def testGetAndStoreUserStats(self):
61+
"""
62+
Test get_and_store_user_stats for the user to ensure that user statistics
63+
are correctly aggregated and stored in the user profile.
64+
"""
65+
66+
# Retrieve the updated user profile from the database
67+
profile = edb.get_profile_db().find_one({"user_id": self.testUUID})
68+
69+
# Ensure that the profile exists
70+
self.assertIsNotNone(profile, "User profile should exist after storing stats.")
71+
72+
# Verify that the expected fields are present
73+
self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.")
74+
self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.")
75+
self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.")
76+
self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.")
77+
78+
expected_total_trips = 5
79+
expected_labeled_trips = 0
80+
81+
self.assertEqual(profile["total_trips"], expected_total_trips,
82+
f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}")
83+
self.assertEqual(profile["labeled_trips"], expected_labeled_trips,
84+
f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}")
85+
86+
# Verify pipeline range
87+
pipeline_range = profile.get("pipeline_range", {})
88+
self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.")
89+
self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.")
90+
91+
expected_start_ts = 1440168891.095
92+
expected_end_ts = 1440209488.817
93+
94+
self.assertEqual(pipeline_range["start_ts"], expected_start_ts,
95+
f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}")
96+
self.assertEqual(pipeline_range["end_ts"], expected_end_ts,
97+
f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}")
98+
99+
def testLastCall(self):
100+
# Call the function with all required arguments
101+
test_call_ts = time.time()
102+
enac.store_server_api_time(self.testUUID, "test_call_ts", test_call_ts, 69420)
103+
etc.runIntakePipeline(self.testUUID)
104+
105+
# Retrieve the profile from the database
106+
profile = edb.get_profile_db().find_one({"user_id": self.testUUID})
107+
108+
# Verify that last_call_ts is updated correctly
109+
expected_last_call_ts = test_call_ts
110+
actual_last_call_ts = profile.get("last_call_ts")
111+
112+
self.assertEqual(
113+
actual_last_call_ts,
114+
expected_last_call_ts,
115+
f"Expected last_call_ts to be {expected_last_call_ts}, got {actual_last_call_ts}"
116+
)
117+
118+
if __name__ == '__main__':
119+
# Configure logging for the test
120+
etc.configLogging()
121+
unittest.main()

emission/tests/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def getRealExampleEmail(testObj):
107107
def fillExistingUUID(testObj):
108108
userObj = ecwu.User.fromEmail(getRealExampleEmail(testObj))
109109
print("Setting testUUID to %s" % userObj.uuid)
110-
testObj.testUUID = userObj.uuir
110+
testObj.testUUID = userObj.uuid
111111

112112
def getRegEmailIfPresent(testObj):
113113
if hasattr(testObj, "evaluation") and testObj.evaluation:
@@ -193,6 +193,7 @@ def runIntakePipeline(uuid):
193193
import emission.analysis.userinput.expectations as eaue
194194
import emission.analysis.classification.inference.labels.pipeline as eacilp
195195
import emission.analysis.plotting.composite_trip_creation as eapcc
196+
import emission.analysis.result.user_stat as eaurs
196197

197198
eaum.match_incoming_user_inputs(uuid)
198199
eaicf.filter_accuracy(uuid)
@@ -205,6 +206,8 @@ def runIntakePipeline(uuid):
205206
eaue.populate_expectations(uuid)
206207
eaum.create_confirmed_objects(uuid)
207208
eapcc.create_composite_objects(uuid)
209+
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
210+
208211

209212
def configLogging():
210213
"""

0 commit comments

Comments
 (0)