Skip to content

👔 Reset the cached pipeline state in the profile as well #1067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 18, 2025

Conversation

shankari
Copy link
Contributor

@shankari shankari commented May 6, 2025

Before this change, the pipeline reset script would change the get_pipeline_state_db() but not the cached pipeline start and end in the profile.

Since we now use the profile to decide whether to run the pipeline, this led to issues where the pipeline for a user would not re-run even after the pipeline was reset.
e-mission/e-mission-docs#1105 (comment)

I wonder if we should use only the pipeline state to decide whether or not to run the pipeline, instead of the profile. It seems like restructuring to use a single source of truth would be good.

Testing done:
Ran it to reset our test user back to 2024-07-30; verified that the value in the profile was reset
Verified that the pipeline was not skipped on the run after the reset

shankari added 2 commits May 5, 2025 15:58
Before this change, the pipeline reset script would change the
`get_pipeline_state_db()` but not the cached pipeline start and end in the
profile.

Since we now use the profile to decide whether to run the pipeline, this led to
issues where the pipeline for a user would not re-run even after the pipeline was reset.
e-mission/e-mission-docs#1105 (comment)

I wonder if we should use only the pipeline state to decide whether or not to
run the pipeline, instead of the profile. It seems like restructuring to use a
single source of truth would be good.

Testing done:
Ran it to reset our test user back to `2024-07-30`; verified that the value in
the profile was reset
When we reset entries, we attempt to print the result of the database operations.
However, in recent versions of pymongo, the results are objects, so we end up
with results like this.

```
DEBUG:root:this is not a dry run, result of updating trip_segmentation stage in reset_pipeline_state = <pymongo.results.UpdateResult object at 0x7f1bb9b43850>
DEBUG:root:this is not a dry run, result of updating all other stages in reset_pipeline_state = <pymongo.results.UpdateResult object at 0x7f1bb9b43af0>
DEBUG:root:this is not a dry run, result of updating the profile in reset_pipeline_state = <pymongo.results.UpdateResult object at 0x7f1bb9b43460>
```

We change all the logs to print the `raw_result` instead,
https://pymongo.readthedocs.io/en/stable/api/pymongo/results.html#pymongo.results.UpdateResult
and add a missing log from 31239b7 (👔 Reset the cached pipeline state in the profile as well)
@shankari
Copy link
Contributor Author

shankari commented May 6, 2025

@JGreenlee for visibility
@TeachMeTW can you please add the tests for this functionality?

Copilot suggests something like

class TestResetPipeline(unittest.TestCase):
    @patch("emission.pipeline.reset.edb.get_analysis_timeseries_db")
    @patch("emission.pipeline.reset.edb.get_pipeline_state_db")
    @patch("emission.pipeline.reset.edb.get_profile_db")
    def test_reset_user_to_start(self, mock_get_profile_db, mock_get_pipeline_state_db, mock_get_analysis_timeseries_db):
        # Mock the database methods
        mock_analysis_db = MagicMock()
        mock_pipeline_db = MagicMock()
        mock_profile_db = MagicMock()

        mock_get_analysis_timeseries_db.return_value = mock_analysis_db
        mock_get_pipeline_state_db.return_value = mock_pipeline_db
        mock_get_profile_db.return_value = mock_profile_db

        # Mock the count_documents and delete_many methods
        mock_analysis_db.count_documents.return_value = 10
        mock_pipeline_db.count_documents.return_value = 5
        mock_analysis_db.delete_many.return_value.raw_result = {"n": 10}
        mock_pipeline_db.delete_many.return_value.raw_result = {"n": 5}
        mock_profile_db.update_one.return_value.raw_result = {"nModified": 1}

        # Call the function
        reset.reset_user_to_start("test_user_id", is_dry_run=False)

        # Assertions
        mock_analysis_db.count_documents.assert_called_once_with({"user_id": "test_user_id"})
        mock_pipeline_db.count_documents.assert_called_once_with({"user_id": "test_user_id"})
        mock_analysis_db.delete_many.assert_called_once_with({"user_id": "test_user_id"})
        mock_pipeline_db.delete_many.assert_called_once_with({"user_id": "test_user_id"})
        mock_profile_db.update_one.assert_called_once_with(
            {"user_id": "test_user_id"},
            {"$set": {"pipeline_range.start_ts": None, "pipeline_range.end_ts": None}}
        )

Please make sure to use your best judgement in integrating it with the rest of the codebase; find the current test suite for pipeline resets; make sure that the checks are actually checking the new functionality here, etc.

@TeachMeTW
Copy link
Contributor

Added test in #1068
I'll fix the commits once 1067 is merged
I know copilot advised mocking but I believe using intake pipeline and real examples was more intuitive and easier to grasp

shankari added 2 commits May 18, 2025 11:24
So that we can verify the results if we need to
Since it can happen if the trip has been labeled before it is processed (aka in
draft mode)
@shankari
Copy link
Contributor Author

Also made some additional logging changes to clean up this branch before I am done

shankari and others added 2 commits May 18, 2025 11:33
When we first started e-mission-common, @JGreenlee hosted it in his own account
as an initial location. We then moved it to the "proper" location in the
e-mission org, and `JGreenlee/e-mission-common` redirected to
`e-mission/e-mission-common` for backwards compatibility.

But then @JGreenlee created a new fork to submit
e-mission/e-mission-common#15 so the redirect stopped.
And the new fork did not have any releases, so we couldn't pull the library any
more.

Fortunately, this was caught by our automated tests :)

All of this played out as expected, but it does provide a strong
hint/motivation to switch the URL over, which I am doing here.

Testing done:
Before this, the additional logging commits in the PR (pushed after the
failure) failed tests. After this, they passed.
introduces two new test cases in `TestPipelineReset.py` to ensure that the `pipeline_range` in user profiles is correctly reset during pipeline operations. The first test checks that the `end_ts` is updated after resetting the pipeline to a specific timestamp, while the second test verifies that both `start_ts` and `end_ts` are set to `None` when resetting to the start. These tests address previous issues where the profile was not updated correctly after pipeline resets
@shankari shankari merged commit ba58169 into e-mission:master May 18, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants