Skip to content

Commit 8faafe1

Browse files
committed
Separated Get and Store into a separate file
Addressed comments, reduced overkill on refactor Forgot to add last_call_ts
1 parent f1d2eb2 commit 8faafe1

File tree

2 files changed

+115
-78
lines changed

2 files changed

+115
-78
lines changed

emission/analysis/result/user_stat.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# emission/analysis/result/user_stats.py
2+
3+
import logging
4+
import pymongo
5+
import arrow
6+
from typing import Optional, Dict, Any
7+
import emission.storage.timeseries.abstract_timeseries as esta
8+
import emission.core.wrapper.user as ecwu
9+
10+
TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss'
11+
12+
def count_trips(ts: esta.TimeSeries, key_list: list, extra_query_list: Optional[list] = None) -> int:
13+
"""
14+
Counts the number of trips based on the provided query.
15+
16+
:param ts: The time series object.
17+
:type ts: esta.TimeSeries
18+
:param key_list: List of keys to filter trips.
19+
:type key_list: list
20+
:param extra_query_list: Additional queries, defaults to None.
21+
:type extra_query_list: Optional[list], optional
22+
:return: The count of trips.
23+
:rtype: int
24+
"""
25+
count = ts.find_entries_count(key_list=key_list, extra_query_list=extra_query_list)
26+
logging.debug(f"Counted {len(key_list)} trips with additional queries {extra_query_list}: {count}")
27+
return count
28+
29+
30+
def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]:
31+
"""
32+
Retrieves the last API call timestamp.
33+
34+
:param ts: The time series object.
35+
:type ts: esta.TimeSeries
36+
:return: The last call timestamp or None if not found.
37+
:rtype: Optional[int]
38+
"""
39+
last_call_ts = ts.get_first_value_for_field(
40+
key='stats/server_api_time',
41+
field='data.ts',
42+
sort_order=pymongo.DESCENDING
43+
)
44+
logging.debug(f"Last call timestamp: {last_call_ts}")
45+
return None if last_call_ts == -1 else last_call_ts
46+
47+
48+
def update_user_profile(user_id: str, data: Dict[str, Any]) -> None:
49+
"""
50+
Updates the user profile with the provided data.
51+
52+
:param user_id: The UUID of the user.
53+
:type user_id: str
54+
:param data: The data to update in the user profile.
55+
:type data: Dict[str, Any]
56+
:return: None
57+
"""
58+
user = ecwu.User.fromUUID(user_id)
59+
user.update(data)
60+
logging.debug(f"User profile updated with data: {data}")
61+
logging.debug(f"New profile: {user.getProfile()}")
62+
63+
64+
def get_and_store_user_stats(user_id: str, trip_key: str) -> None:
65+
"""
66+
Aggregates and stores user statistics into the user profile.
67+
68+
:param user_id: The UUID of the user.
69+
:type user_id: str
70+
:param trip_key: The key representing the trip data in the time series.
71+
:type trip_key: str
72+
:return: None
73+
"""
74+
try:
75+
logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}")
76+
77+
ts = esta.TimeSeries.get_time_series(user_id)
78+
start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
79+
start_ts = None if start_ts_result == -1 else start_ts_result
80+
81+
end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
82+
end_ts = None if end_ts_result == -1 else end_ts_result
83+
84+
total_trips = count_trips(ts, key_list=["analysis/confirmed_trip"])
85+
labeled_trips = count_trips(
86+
ts,
87+
key_list=["analysis/confirmed_trip"],
88+
extra_query_list=[{'data.user_input': {'$ne': {}}}]
89+
)
90+
91+
logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
92+
logging.info(f"user_id type: {type(user_id)}")
93+
94+
last_call_ts = get_last_call_timestamp(ts)
95+
logging.info(f"Last call timestamp: {last_call_ts}")
96+
97+
update_data = {
98+
"pipeline_range": {
99+
"start_ts": start_ts,
100+
"end_ts": end_ts
101+
},
102+
"total_trips": total_trips,
103+
"labeled_trips": labeled_trips,
104+
"last_call_ts": last_call_ts
105+
}
106+
107+
update_user_profile(user_id, update_data)
108+
109+
logging.debug("User profile updated successfully.")
110+
111+
except Exception as e:
112+
logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}")

emission/pipeline/intake_stage.py

Lines changed: 3 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import emission.storage.decorations.stats_queries as esds
4141

4242
import emission.core.wrapper.user as ecwu
43+
import emission.analysis.result.user_stat as eaurs
4344

4445
def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
4546
"""
@@ -202,83 +203,7 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
202203
with ect.Timer() as gsr:
203204
logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
204205
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10)
205-
_get_and_store_range(uuid, "analysis/composite_trip")
206+
eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip")
206207

207208
esds.store_pipeline_time(uuid, 'STORE_USER_STATS',
208-
time.time(), gsr.elapsed)
209-
210-
def _get_and_store_range(user_id, trip_key):
211-
"""
212-
Extends the user profile with pipeline_range, total_trips, labeled_trips, and last_call.
213-
214-
Parameters:
215-
- user_id (str): The UUID of the user.
216-
- trip_key (str): The key representing the trip data in the time series.
217-
"""
218-
time_format = 'YYYY-MM-DD HH:mm:ss'
219-
try:
220-
logging.info(f"Starting _get_and_store_range for user_id: {user_id}, trip_key: {trip_key}")
221-
222-
# Fetch the time series for the user
223-
ts = esta.TimeSeries.get_time_series(user_id)
224-
logging.debug("Fetched time series data.")
225-
226-
# Get start timestamp
227-
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
228-
start_ts = None if start_ts == -1 else start_ts
229-
logging.debug(f"Start timestamp: {start_ts}")
230-
231-
# Get end timestamp
232-
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
233-
end_ts = None if end_ts == -1 else end_ts
234-
logging.debug(f"End timestamp: {end_ts}")
235-
236-
# Retrieve trip entries
237-
total_trips = ts.find_entries_count(
238-
key_list=["analysis/confirmed_trip"],
239-
)
240-
241-
labeled_trips = ts.find_entries_count(
242-
key_list=["analysis/confirmed_trip"],
243-
extra_query_list=[{'data.user_input': {'$ne': {}}}]
244-
)
245-
246-
logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}")
247-
logging.info(type(user_id))
248-
logging.debug("Fetched API call statistics.")
249-
250-
last_call_ts = ts.get_first_value_for_field(
251-
key='stats/server_api_time',
252-
field='data.ts',
253-
sort_order=pymongo.DESCENDING
254-
)
255-
256-
logging.info(f"Last call timestamp: {last_call_ts}")
257-
258-
# Update the user profile with pipeline_range, total_trips, labeled_trips, and last_call
259-
user = ecwu.User.fromUUID(user_id)
260-
if last_call_ts != -1:
261-
# Format the timestamp using arrow
262-
formatted_last_call = arrow.get(last_call_ts).format(time_format)
263-
# Assign using attribute access or the update method
264-
# Option 1: Attribute Assignment (if supported)
265-
# user.last_call = formatted_last_call
266-
267-
# Option 2: Using the update method
268-
user.update({
269-
"last_call": formatted_last_call
270-
})
271-
user.update({
272-
"pipeline_range": {
273-
"start_ts": start_ts,
274-
"end_ts": end_ts
275-
},
276-
"total_trips": total_trips,
277-
"labeled_trips": labeled_trips,
278-
"last_call": last_call_ts
279-
})
280-
logging.debug("User profile updated successfully.")
281-
logging.debug("After updating, new profile is %s", user.getProfile())
282-
283-
except Exception as e:
284-
logging.error(f"Error in _get_and_store_range for user_id {user_id}: {e}")
209+
time.time(), gsr.elapsed)

0 commit comments

Comments
 (0)