Skip to content

Overhaul workflow monad to use native threading #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

Conversation

iand675
Copy link
Contributor

@iand675 iand675 commented Apr 29, 2025

This is a relatively experimental change, but it's intended to enhance our ability to add new code in the future. The Coroutine monad, and the Workflow monad implementation are currently fairly hard to add new instances for, hard to write interceptors for, and lead to a lot of questions from engineers about the semantics of threads and exceptions in the Temporal model (as it currently stands). This PR is a step in the direction of using Haskell-native threads + thin wrappers around the IO monad to run Workflow code. This should provide us with the ability to write instances that we more-or-less can't write at the moment. The crucial one is that we can't reliably handle spans from OTel due to not having a good bracket-like function in the face of continuations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -26,4 +26,3 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -26,4 +26,3 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -26,4 +26,3 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -26,4 +26,3 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -63,4 +63,3 @@ library:
when:
- condition: os(darwin)
frameworks: [Security, CoreFoundation, SystemConfiguration]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-commit check fix.

@@ -211,7 +211,6 @@ pub fn connect_client(
runtime,
}),
Err(e) => {
eprintln!("Error: {:?}", e);
Copy link
Contributor Author

@iand675 iand675 Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People occasionally see these and don't understand where they come from, and they should be reported via exceptions in Haskell land

in
shells // { default = shells.ghc910; }
);
outputs = inputs @ {self, ...}: let
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result of fixed pre-commit checks

modules = [
./nix/devenv/temporal-bridge.nix
./nix/devenv/temporal-dev-server.nix
./nix/devenv/repo-wide-checks.nix
Copy link
Contributor Author

@iand675 iand675 Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the key change to this file, repo-wide-checks accidentally were left out during some refactor in the past

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, ty

case result of
Left err -> throwIO err
Left err -> throwM err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere, I'm moving to throwM to just generalize towards any future monad changes we have to worry about in the future.

@@ -1,3 +1,5 @@
#!/usr/bin/env bash
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precommit check fix

@@ -144,3 +152,107 @@ instance Monoid ScheduleClientInterceptors where
ScheduleClientInterceptors
{ scheduleWorkflowAction = \opts _ -> return opts
}


data ExecuteWorkflowInput = ExecuteWorkflowInput
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this stuff here from elsewhere to make the Temporal.Interceptor module the main interface for what goes in and out of Temporal actions

- Temporal.Workflow.Unsafe.Handle
- Temporal.Workflow.Internal.Instance
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving away from the internal namespace usage since we can just not expose the modules

@@ -200,9 +198,6 @@ module Temporal.Workflow (
ProvidedUpdate (..),
KnownUpdate (..),

-- * Other utilities
unsafeAsyncEffectSink,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used, not really needed any more.

@@ -1644,12 +1472,12 @@ returning the original data structure with the arguments replaced with the resul
This is actually a bit of a misnomer, since it's really 'traverseConcurrently', but this is copied to mimic the 'async' package's naming
to slightly ease adoption.
-}
mapConcurrently :: Traversable t => (a -> Workflow b) -> t a -> Workflow (t b)
mapConcurrently :: (Traversable t) => (a -> Workflow b) -> t a -> Workflow (t b)
mapConcurrently = traverseConcurrently
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that these folding / traversing versions of this would benefit from a specialized version that does a lot less blocking, but probably just need to come back to that later if it's an actual problem

, workflowRuntimeCancelRequested :: {-# UNPACK #-} !(IVar ())
, workflowRuntimeSequenceMaps :: {-# UNPACK #-} !SequenceMaps
, workflowRuntimeUnappliedJobs :: {-# UNPACK #-} !(TVar Int)
, signals :: {-# UNPACK #-} !SignalSupport
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an approach I'm taking on trying to decompose portions of workflow functionality into smaller units that are a bit more independently maintainable. Hopefully this unlocks some ability to test subsystems better in future PRs.

@@ -81,22 +81,6 @@ defaultActivityOptions :: StartActivityOptions
defaultActivityOptions = defaultStartActivityOptions $ StartToClose $ minutes 1


opts :: StartChildWorkflowOptions
opts =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to not be a global value to fix shadowing warnings

C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
)
`shouldThrow` \case
RpcError {} -> True
_ -> False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary since RPCError only has one constructor

_ <- attachContext activationCtxt
handleActivation activation
link activator
handleActivation activation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this extra bit of async calling, because creating the instances is cheap, and the RunningWorkflow struct is in charge of its own async-ness now.

@iand675 iand675 requested a review from jkachmar May 12, 2025 16:31
@iand675 iand675 changed the title Haxl be gone Overhaul workflow monad to use native threading May 12, 2025
Comment on lines -278 to -286
-- LMAO this is such a comical coercion
--
-- We just need to ignore payloads if the result type is unit to allow workflows to evolve
-- such that they can return something else in the future.
if typeRep h == typeOf ()
then pure $ unsafeCoerce ()
else case payloads of
(a : _) -> liftIO $ readResult a
_ -> error "Missing result payload"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Comment on lines +566 to +574
runReaderT
( do
let r = RunId $ swer ^. WF.runId
getHandle k wfId' $
GetHandleOptions
(Just r)
(Just r)
)
c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the world if Haskell had a runReaderT variant whose arguments were flipped by default

@@ -521,30 +512,28 @@ startFromPayloads
-> StartWorkflowOptions
-> V.Vector UnencodedPayload
-> m (WorkflowHandle result)
startFromPayloads k@(KnownWorkflow codec _) wfId opts payloads = do
startFromPayloads k wfId opts payloads = do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not something to change in code, but would it be useful to have a glossary for stuff like "k is commonly used to refer to the continuation function" type stuff (for newcomers)?

Comment on lines +733 to +747
-- startLocalActivity
-- :: forall act
-- . (RequireCallStack, ActivityRef act)
-- => act
-- -> StartLocalActivityOptions
-- -> (ActivityArgs act :->: Workflow (Task (ActivityResult act)))
-- startLocalActivity (activityRef -> KnownActivity codec n) opts = withWorkflowArgs @(ActivityArgs act) @(Task (ActivityResult act)) codec $ \typedPayloads -> Workflow do
-- updateCallStack
-- originalTime <- unWorkflow time
-- inst <- ask
-- runInIO <- askRunInIO
-- let ps = fmap convertToProtoPayload typedPayloads
-- s@(Sequence actSeq) <- nextActivitySequence
-- resultSlot <- newIVar
-- atomically $ modifyTVar'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the module export up top

Comment on lines +533 to +535
-- TODO
-- & Command.headers .~ _
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just a carryover, but what's the TODO here for? just setting the Command.headers to something if/when we want to support that?

import Data.Vector (Vector)
import Data.Word
import GHC.Conc (unsafeIOToSTM)
import GHC.Exts (Any)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this import actually used? I can't see Any referenced in the module at all


instance MonadReadStateVar Condition where
readStateVar ref = Condition do
tid <- unsafeIOToSTM myThreadId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth documenting this the same way that Rust usage of unsafe has a convention of documenting the invariants (e.g. IO and STM are effectively identical with respect to RTS thread ID)

Comment on lines +477 to +479
When we get an activation, we apply the jobs, then

-}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there more here, or was this meant to be covered in other docs?

eval = mask $ \restore -> do
tid <- myThreadId
let tidText = Text.pack (show tid)
threadWrapper _restore = id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the intent here the same as your previous threadWrapper? we could use this as a point to support propagating trace context?

Comment on lines +519 to +520
handleableSignals <- stateTVar runtime.signals.bufferedSignals $ \signals ->
span (\signal -> (Just (signal ^. Activation.signalName) `HashMap.member` signalHandlers) || (Nothing `HashMap.member` signalHandlers)) signals
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL stateTVar + span as a pattern for handling state transition

Comment on lines +539 to +540
-- TODO, do we need to count unapplied jobs here. I think so.
modifyTVar' runtime.workflowRuntimeUnappliedJobs (+ length goodQueries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not 100% sure what this comment is referring to: do you want to count unapplied jobs in one of the query counter variables here?

Comment on lines +594 to +613
( do
liftIO $ join $ atomically $ applyActivations mainThread
atomically do
waitAllJobsHandled runtime
waitAllThreadsBlocked runtime
untilM_ do
(handleCount, m) <- atomically $ applyQueuedSignals mainThread
liftIO m
atomically do
waitAllJobsHandled runtime
waitAllThreadsBlocked runtime
pure (handleCount == 0)
-- By the time we've applied all the signals that we can,
-- all of the queries we can possibly handle must have been
-- defined.
liftIO . snd =<< atomically (applyQueuedQueries mainThread)
restore $ atomically do
waitAllJobsHandled runtime
waitAllThreadsBlocked runtime
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've mentioned this elsewhere, but it may be worth reiterating to the reader (either here or elsewhere in the documentation) that atomically is being called explicitly to ensure a deliberate transactional sequence.

I think a lot of people can end up with an impression that atomically is just the STM a -> IO a function — and most of the time that's no big deal — but in this case if someone were to merge some of these blocks then you could end up with re-ordering that breaks the semantics we want to enforce.

Comment on lines +645 to +649
case res of
Left err -> do
throwIO err
Right () -> do
pure ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either throwM pure res? or is it worth deliberately case-ing & using throwIO

Comment on lines +775 to +776
-- TODO, if there's a huge amount of signals, this is algorithmically inefficient.
modifyTVar' runtime.signals.bufferedSignals (++ [sig])
Copy link
Contributor

@jkachmar jkachmar May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would the alternative be here, Vector or something not-list-like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative would be to use a difference list, such as this, although they're not hard to implement by hand: https://hackage.haskell.org/package/dlist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so you mean the specific case of like... a lot of signals getting queued up before any of them get handled yea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it would be an N^2 evaluation of the list. Hard to think that gRPC size limits would let this get too out of control though

& Command.completed .~ convertToProtoPayload payload
)

-- (validator, updateAction) <- do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?

}


-- -- | Creates a new thread in the test runtime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- -- | Creates a new thread in the test runtime
-- | Creates a new thread in the test runtime

`finally` atomically (modifyTVar' runtime.testThreadManager $ HashMap.delete tid)


-- -- | Check if a thread is blocked in the runtime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- -- | Check if a thread is blocked in the runtime
-- | Check if a thread is blocked in the runtime

Copy link
Contributor

@jkachmar jkachmar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huge quality of life improvement; I wish we had some benchmarks to do a before/after comparison because I'd be pretty shocked if this didn't measurably improve overhead.

@iand675
Copy link
Contributor Author

iand675 commented May 13, 2025

@jkachmar @andrewsmith Informally benchmarking this, they're effectively identical performance wise.

Both test suites run in 26.5 seconds +/- .2 seconds

Comment on lines +629 to +634
let anyHandle :: forall h. ChildWorkflowHandle h -> ChildWorkflowHandle Any
anyHandle = unsafeCoerce
atomically $
modifyTVar'
runtime.workflowRuntimeSequenceMaps.childWorkflows
(HashMap.insert s $ anyHandle wfHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay so I understand what unsafeCoerce is doing here, but I don't really follow what opposite side of this looks like.

you stash the ChildWorkflowHandle Any in the map, which gets used in handleResolveChildWorkflowExecution but how is the result type information handled here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants