Skip to content

Commit dd9ec1c

Browse files
committed
export_data commit #2
Testing: The pipeline has been tested for multiple data sources stored within the tests/data file. By testing multiple files, we have demonstracted that it will update and store the data within a given file. This file however is still being overwritten as it is based on the uuid (within the filename). Changing of the filename will be next. Keys included within the export data: jruzekow-32318s:e-mission-server jruzekow$ zgrep "key" export_5d5fc80b-c031-4e43-8d64-52fb29aefc94.gz | sort | uniq "key": "analysis/cleaned_place", "key": "analysis/cleaned_section", "key": "analysis/cleaned_stop", "key": "analysis/cleaned_trip", "key": "analysis/confirmed_trip", "key": "analysis/inferred_section", "key": "analysis/recreated_location", "key": "background/filtered_location", "key": "background/location", "key": "background/motion_activity", "key": "inference/prediction", "key": "segmentation/raw_place", "key": "segmentation/raw_section", "key": "segmentation/raw_stop", "key": "segmentation/raw_trip", "key": "statemachine/transition", "key": "stats/pipeline_error", "key": "stats/pipeline_time", These keys now account for the raw data and all data put together within the pipeline states.
1 parent 52d5ab5 commit dd9ec1c

File tree

3 files changed

+52
-73
lines changed

3 files changed

+52
-73
lines changed

bin/debug/extract_timeline_for_day_range_and_user.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@
2020
import argparse
2121

2222
import emission.core.wrapper.user as ecwu
23-
import emission.storage.timeseries.abstract_timeseries as esta
2423
import emission.storage.timeseries.timequery as estt
24+
import emission.storage.timeseries.abstract_timeseries as esta
2525
import emission.storage.decorations.user_queries as esdu
26-
import emission.storage.timeseries.cache_series as estcs
2726
# only needed to read the motion_activity
2827
# https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934
2928
import emission.net.usercache.abstract_usercache as enua
30-
29+
import emission.storage.timeseries.cache_series as estcs
3130
import emission.export.export as eee
3231

3332
def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name):
@@ -39,14 +38,27 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name):
3938
logging.debug("start_day_ts = %s (%s), end_day_ts = %s (%s)" %
4039
(start_day_ts, arrow.get(start_day_ts).to(timezone),
4140
end_day_ts, arrow.get(end_day_ts).to(timezone)))
42-
41+
ts = esta.TimeSeries.get_time_series(user_id)
4342
loc_time_query = estt.TimeQuery("data.ts", start_day_ts, end_day_ts)
43+
loc_entry_list = list(estcs.find_entries(user_id, key_list=None, time_query=loc_time_query))
4444
ma_time_query = estt.TimeQuery("metadata.write_ts", start_day_ts, end_day_ts)
4545
uc = enua.UserCache.getUserCache(user_id)
4646
ma_entry_list = uc.getMessage(["background/motion_activity"], ma_time_query)
4747
trip_time_query = estt.TimeQuery("data.start_ts", start_day_ts, end_day_ts)
48+
trip_entry_list = list(ts.find_entries(key_list=None, time_query=trip_time_query))
4849
place_time_query = estt.TimeQuery("data.enter_ts", start_day_ts, end_day_ts)
49-
eee.export(loc_time_query, trip_time_query, place_time_query, ma_entry_list, user_id, file_name)
50+
place_entry_list = list(ts.find_entries(key_list=None, time_query=place_time_query))
51+
eee.export(loc_entry_list, trip_entry_list, place_entry_list, ma_entry_list, user_id, file_name, ts)
52+
53+
import emission.core.get_database as edb
54+
pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
55+
logging.info("Found %d pipeline states %s" %
56+
(len(pipeline_state_list),
57+
list([ps["pipeline_stage"] for ps in pipeline_state_list])))
58+
pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id)
59+
with gzip.open(pipeline_filename, "wt") as gpfd:
60+
json.dump(pipeline_state_list,
61+
gpfd, default=bju.default, allow_nan=False, indent=4)
5062

5163
def export_timeline_for_users(user_id_list, args):
5264
for curr_uuid in user_id_list:

emission/export/export.py

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,35 @@
1-
from __future__ import print_function
2-
from __future__ import unicode_literals
3-
from __future__ import division
4-
from __future__ import absolute_import
5-
# Exports all data for the particular user for the particular day
6-
# Used for debugging issues with trip and section generation
7-
from future import standard_library
8-
standard_library.install_aliases()
9-
from builtins import *
10-
import sys
111
import logging
122
logging.basicConfig(level=logging.DEBUG)
133
import gzip
144

155
import uuid
16-
import datetime as pydt
176
import json
187
import bson.json_util as bju
19-
import arrow
20-
import argparse
218

22-
import emission.core.wrapper.user as ecwu
239
import emission.storage.timeseries.abstract_timeseries as esta
24-
import emission.storage.timeseries.timequery as estt
25-
import emission.storage.decorations.user_queries as esdu
26-
import emission.storage.timeseries.cache_series as estcs
27-
# only needed to read the motion_activity
28-
# https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934
29-
import emission.net.usercache.abstract_usercache as enua
10+
#import emission.storage.timeseries.cache_series as estcs
3011

31-
32-
def export(loc_time_query, trip_time_query, place_time_query, ma_entry_list, user_id, file_name):
33-
ts = esta.TimeSeries.get_time_series(user_id)
34-
loc_entry_list = list(estcs.find_entries(user_id, key_list=None, time_query=loc_time_query))
35-
trip_entry_list = list(ts.find_entries(key_list=None, time_query=trip_time_query))
36-
place_entry_list = list(ts.find_entries(key_list=None, time_query=place_time_query))
37-
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},
38-
{'data.exit_ts': {'$exists': True}}]}
39-
first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query]))
40-
logging.info("First place entry list = %s" % first_place_entry_list)
12+
def export(loc_entry_list, trip_entry_list, place_entry_list, ma_entry_list, user_id, file_name, ts):
13+
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},{'data.exit_ts': {'$exists': True}}]}
14+
first_place_entry_list = list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[first_place_extra_query]))
15+
logging.info("First place entry list = %s" % first_place_entry_list)
4116
combined_list = ma_entry_list + loc_entry_list + trip_entry_list + place_entry_list + first_place_entry_list
4217

4318
logging.info("Found %d loc entries, %d motion entries, %d trip-like entries, %d place-like entries = %d total entries" %
4419
(len(loc_entry_list), len(ma_entry_list), len(trip_entry_list), len(place_entry_list), len(combined_list)))
45-
46-
validate_truncation(loc_entry_list, trip_entry_list, place_entry_list)
47-
48-
unique_key_list = set([e["metadata"]["key"] for e in combined_list])
49-
logging.info("timeline has unique keys = %s" % unique_key_list)
50-
if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']):
51-
logging.info("No entries found in range for user %s, skipping save" % user_id)
52-
else:
53-
# Also dump the pipeline state, since that's where we have analysis results upto
54-
# This allows us to copy data to a different *live system*, not just
55-
# duplicate for analysis
56-
combined_filename = "%s_%s.gz" % (file_name, user_id)
57-
with gzip.open(combined_filename, "wt") as gcfd:
58-
json.dump(combined_list,gcfd, default=bju.default, allow_nan=False, indent=4)
59-
60-
import emission.core.get_database as edb
61-
pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
62-
logging.info("Found %d pipeline states %s" %
63-
(len(pipeline_state_list),
64-
list([ps["pipeline_stage"] for ps in pipeline_state_list])))
65-
66-
pipeline_filename = "%s_pipelinestate_%s.gz" % (file_name, user_id)
67-
with gzip.open(pipeline_filename, "wt") as gpfd:
68-
json.dump(pipeline_state_list,
69-
gpfd, default=bju.default, allow_nan=False, indent=4)
20+
validate_truncation(loc_entry_list, trip_entry_list, place_entry_list)
21+
22+
unique_key_list = set([e["metadata"]["key"] for e in combined_list])
23+
logging.info("timeline has unique keys = %s" % unique_key_list)
24+
if len(combined_list) == 0 or unique_key_list == set(['stats/pipeline_time']):
25+
logging.info("No entries found in range for user %s, skipping save" % user_id)
26+
else:
27+
# Also dump the pipeline state, since that's where we have analysis results upto
28+
# This allows us to copy data to a different *live system*, not just
29+
# duplicate for analysis
30+
combined_filename = "%s_%s.gz" % (file_name, user_id)
31+
with gzip.open(combined_filename, "wt") as gcfd:
32+
json.dump(combined_list,gcfd, default=bju.default, allow_nan=False, indent=4)
7033

7134
def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
7235
MAX_LIMIT = 25 * 10000

emission/exportdata/export_data.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
import emission.storage.pipeline_queries as espq
66
import emission.storage.timeseries.abstract_timeseries as esta
77
import emission.storage.decorations.analysis_timeseries_queries as esda
8-
import emission.export.run_export_data_pipeline as eer
8+
import emission.storage.timeseries.cache_series as estcs
99
import emission.export.export as eee
10+
import emission.core.wrapper.pipelinestate as ps
1011
import gzip
1112
import json
1213
import bson.json_util as bju
1314

1415
def set_export_data(user_id):
15-
try:
16+
try:
1617
edp = ExportDataPipeline()
1718
edp.user_id = user_id
1819
edp.run_export_data_pipeline(user_id)
@@ -32,14 +33,17 @@ def last_trip_done(self):
3233
return self._last_trip_done
3334

3435
def run_export_data_pipeline(self, user_id):
35-
loc_time_query = epq.get_time_range_for_stage(user_id)
36-
loc_time_query.timeType = "data.ts"
37-
trip_time_query = epq.get_time_range_for_stage(user_id)
38-
trip_time_query.timeType = "data.start_ts"
39-
place_time_query = epq.get_time_range_for_stage(user_id)
40-
place_time_query.timeType = "data.enter_ts"
41-
ma_entry_list = []
42-
file_name = "export"
43-
eee.export(loc_time_query, trip_time_query, place_time_query, ma_entry_list, user_id, file_name)
44-
45-
36+
ts = esta.TimeSeries.get_time_series(user_id)
37+
time_query = espq.get_time_range_for_stage(user_id, ps.PipelineStages.EXPORT_DATA)
38+
loc_time_query = time_query
39+
loc_time_query.timeType = "data.ts"
40+
loc_entry_list = list(estcs.find_entries(user_id, key_list=None, time_query=loc_time_query))
41+
trip_time_query = time_query
42+
trip_time_query.timeType = "data.end_ts"
43+
trip_entry_list = list(ts.find_entries(key_list=None, time_query=trip_time_query))
44+
place_time_query = time_query
45+
place_time_query.timeType = "data.enter_ts"
46+
place_entry_list = list(ts.find_entries(key_list=None, time_query=place_time_query))
47+
ma_entry_list = []
48+
file_name = "export"
49+
eee.export(loc_entry_list, trip_entry_list, place_entry_list, ma_entry_list, user_id, file_name, ts)

0 commit comments

Comments
 (0)