Skip to content

Commit 60ad71d

Browse files
committed
Merge remote-tracking branch 'origin/main' into haxl-be-gone
2 parents b8a1516 + 73108ce commit 60ad71d

File tree

6 files changed

+230
-13
lines changed

6 files changed

+230
-13
lines changed

sdk/package.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ tests:
156156
DurationSpec,
157157
IntegrationSpec,
158158
IntegrationSpec.HangingWorkflow,
159+
IntegrationSpec.NoOpWorkflow,
159160
IntegrationSpec.TimeoutsInWorkflows,
160161
Temporal.Workflow.IVarSpec,
161162
Common,

sdk/src/Temporal/Client.hs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ module Temporal.Client (
3333
startWorkflowOptions,
3434
Temporal.Client.start,
3535
startFromPayloads,
36-
WorkflowHandle,
36+
WorkflowHandle (..),
3737
execute,
3838
waitWorkflowResult,
3939

@@ -58,6 +58,7 @@ module Temporal.Client (
5858

5959
-- * Producing handles for existing workflows
6060
getHandle,
61+
GetHandleOptions (..),
6162

6263
-- * Workflow history utilities
6364
fetchHistory,
@@ -254,7 +255,7 @@ This function will block until the workflow completes, and will return the resul
254255
or throw a 'WorkflowExecutionClosed' exception if the workflow was closed without returning a result.
255256
-}
256257
waitWorkflowResult :: (Typeable a, MonadIO m) => WorkflowHandle a -> m a
257-
waitWorkflowResult h@(WorkflowHandle readResult _ c wf r) = do
258+
waitWorkflowResult h@(WorkflowHandle readResult _ c wf r _) = do
258259
mev <- runReaderT (waitResult wf r c.clientConfig.namespace) c
259260
case mev of
260261
Nothing -> error "Unexpected empty history"
@@ -317,7 +318,7 @@ signal
317318
-> sig
318319
-> SignalOptions
319320
-> (SignalArgs sig :->: m ())
320-
signal (WorkflowHandle _ _t c wf r) (signalRef -> (KnownSignal sName sCodec)) opts = withArgs @(SignalArgs sig) @(m ()) sCodec $ \inputs -> liftIO $ do
321+
signal (WorkflowHandle _ _t c wf r _) (signalRef -> (KnownSignal sName sCodec)) opts = withArgs @(SignalArgs sig) @(m ()) sCodec $ \inputs -> liftIO $ do
321322
inputs' <- processorEncodePayloads c.clientConfig.payloadProcessor =<< liftIO (sequence inputs)
322323
hdrs <- processorEncodePayloads c.clientConfig.payloadProcessor opts.headers
323324
result <-
@@ -440,6 +441,16 @@ query h (queryRef -> KnownQuery qn codec) opts = withArgs @(QueryArgs query) @(m
440441
WorkflowExecutionStatus'Unrecognized _ -> UnknownStatus
441442

442443

444+
data GetHandleOptions = GetHandleOptions
445+
{ runId :: Maybe RunId
446+
, firstExecutionRunId :: Maybe RunId
447+
-- ^ If specified, later calls referencing the handle may error if the (most recent | specified workflow
448+
-- run id) is not part of the same execution chain as this id. Only some operations (namely terminate)
449+
-- respect firstExecutionRunId, while others (e.g. signal and query) ignore it.
450+
}
451+
deriving stock (Eq, Show)
452+
453+
443454
{- | Sometimes you know that a Workflow exists or existed, but you didn't create the workflow from
444455
the current process or code path. In this case, you can use 'getHandle' to get a handle to the
445456
workflow so that you can interact with it.
@@ -451,9 +462,9 @@ getHandle
451462
:: (HasWorkflowClient m, MonadIO m)
452463
=> KnownWorkflow args a
453464
-> WorkflowId
454-
-> Maybe RunId
465+
-> GetHandleOptions
455466
-> m (WorkflowHandle a)
456-
getHandle (KnownWorkflow {knownWorkflowCodec, knownWorkflowName}) wfId runId = do
467+
getHandle (KnownWorkflow {knownWorkflowCodec, knownWorkflowName}) wfId opts = do
457468
c <- askWorkflowClient
458469
pure $
459470
WorkflowHandle
@@ -462,8 +473,9 @@ getHandle (KnownWorkflow {knownWorkflowCodec, knownWorkflowName}) wfId runId = d
462473
either (throwIO . ValueError) pure result
463474
, workflowHandleClient = c
464475
, workflowHandleWorkflowId = wfId
465-
, workflowHandleRunId = runId
476+
, workflowHandleRunId = opts.runId
466477
, workflowHandleType = WorkflowType knownWorkflowName
478+
, workflowHandleFirstExecutionRunId = opts.firstExecutionRunId
467479
}
468480

469481

@@ -544,7 +556,15 @@ startFromPayloads k wfId opts payloads = do
544556
res <- startWorkflowExecution c.clientCore req
545557
case res of
546558
Left err -> throwIO err
547-
Right swer -> runReaderT (getHandle k wfId' (Just (RunId $ swer ^. WF.runId))) c
559+
Right swer ->
560+
runReaderT
561+
( do
562+
getHandle k wfId' $
563+
GetHandleOptions
564+
(Just (RunId $ swer ^. WF.runId))
565+
Nothing
566+
)
567+
c
548568

549569

550570
{- | Begin a new Workflow Execution.
@@ -657,15 +677,22 @@ signalWithStart (workflowRef -> k@(KnownWorkflow codec _)) wfId opts (signalRef
657677
msg
658678
case res of
659679
Left err -> throwIO err
660-
Right swer -> runReaderT (getHandle k opts'.signalWithStartWorkflowId (Just (RunId $ swer ^. WF.runId))) c
680+
Right swer ->
681+
runReaderT
682+
( getHandle
683+
k
684+
opts'.signalWithStartWorkflowId
685+
( GetHandleOptions
686+
(Just (RunId $ swer ^. WF.runId))
687+
Nothing
688+
)
689+
)
690+
c
661691

662692

663693
data TerminationOptions = TerminationOptions
664694
{ terminationReason :: Text
665695
, terminationDetails :: [Payload]
666-
, firstExecutionRunId :: Maybe RunId
667-
-- ^ If set, this call will error if the (most recent | specified workflow run id in the WorkflowHandle) is not part of the same
668-
-- execution chain as this id.
669696
}
670697

671698

@@ -692,7 +719,7 @@ terminate h req =
692719
& RR.details
693720
.~ (defMessage & Common.payloads .~ fmap convertToProtoPayload req.terminationDetails)
694721
& RR.identity .~ Core.identity (Core.clientConfig h.workflowHandleClient.clientCore)
695-
& RR.firstExecutionRunId .~ maybe "" rawRunId req.firstExecutionRunId
722+
& RR.firstExecutionRunId .~ maybe "" rawRunId h.workflowHandleFirstExecutionRunId
696723

697724

698725
data FollowOption = FollowRuns | ThisRunOnly

sdk/src/Temporal/Client/Types.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ data WorkflowHandle a = WorkflowHandle
144144
, workflowHandleClient :: WorkflowClient
145145
, workflowHandleWorkflowId :: WorkflowId
146146
, workflowHandleRunId :: Maybe RunId
147+
, workflowHandleFirstExecutionRunId :: Maybe RunId
147148
}
148149

149150

sdk/temporal-sdk.cabal

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ test-suite temporal-sdk-tests
149149
type: exitcode-stdio-1.0
150150
main-is: Main.hs
151151
other-modules:
152-
Spec, ConcurrentAccessSpec, AsyncCompletionSpec, DurationSpec, IntegrationSpec, IntegrationSpec.HangingWorkflow, IntegrationSpec.TimeoutsInWorkflows, Temporal.Workflow.IVarSpec, Common, THCompiles
152+
Spec, ConcurrentAccessSpec, AsyncCompletionSpec, DurationSpec, IntegrationSpec, IntegrationSpec.HangingWorkflow, IntegrationSpec.NoOpWorkflow, IntegrationSpec.TimeoutsInWorkflows, Temporal.Workflow.IVarSpec, Common, THCompiles
153153
hs-source-dirs:
154154
test
155155
default-extensions:

sdk/test/IntegrationSpec.hs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import Data.Functor
3434
import Data.Int
3535
import Data.Map.Strict (Map)
3636
import qualified Data.Map.Strict as Map
37+
import qualified Data.Maybe as M
3738
import Data.Text (Text)
3839
import qualified Data.Text as Text
3940
import Data.Time (Day (ModifiedJulianDay))
@@ -44,6 +45,7 @@ import DiscoverInstances (discoverInstances)
4445
import GHC.Generics
4546
import GHC.Stack (SrcLoc (..), callStack, fromCallSiteList)
4647
import IntegrationSpec.HangingWorkflow
48+
import IntegrationSpec.NoOpWorkflow
4749
import IntegrationSpec.TimeoutsInWorkflows
4850
import OpenTelemetry.Trace
4951
import RequireCallStack
@@ -315,6 +317,7 @@ spec = do
315317
(Just $ seconds 2)
316318

317319
aroundAll setup needsClient
320+
aroundAll setup terminateTests
318321
where
319322
setup :: (TestEnv -> IO ()) -> IO ()
320323
setup go = do
@@ -1540,6 +1543,166 @@ needsClient = do
15401543
incompatibleReplayResult <- runReplayHistory globalRuntime incompatibleConf history
15411544
incompatibleReplayResult `shouldSatisfy` isLeft
15421545

1546+
1547+
terminateTests :: SpecWith TestEnv
1548+
terminateTests = do
1549+
describe "Terminate" $ do
1550+
describe "when neither runId nor firstExecutionRunId is provided" $ do
1551+
it "returns" $ \TestEnv {..} -> do
1552+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1553+
baseConf
1554+
withWorker conf $ do
1555+
let opts =
1556+
(C.startWorkflowOptions taskQueue)
1557+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1558+
}
1559+
useClient $ do
1560+
C.start NoOpWorkflow "test-terminate-works-without-run-ids" opts
1561+
h <- C.getHandle (workflowRef NoOpWorkflow) "test-terminate-works-without-run-ids" Nothing Nothing
1562+
C.terminate
1563+
h {C.workflowHandleRunId = Nothing, C.workflowHandleFirstExecutionRunId = Nothing}
1564+
C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1565+
describe "when runId is provided without firstExecutionRunId" $ do
1566+
it "returns if runId matches a workflow" $ \TestEnv {..} -> do
1567+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1568+
baseConf
1569+
withWorker conf $ do
1570+
let opts =
1571+
(C.startWorkflowOptions taskQueue)
1572+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1573+
}
1574+
useClient $ do
1575+
h <- C.start NoOpWorkflow "test-terminate-works-with-good-run-id" opts
1576+
h' <- C.getHandle (workflowRef NoOpWorkflow) "test-terminate-works-with-good-run-id" h.workflowHandleRunId Nothing
1577+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1578+
it "throws if runId does not match a workflow" $ \TestEnv {..} -> do
1579+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1580+
baseConf
1581+
withWorker conf $
1582+
do
1583+
let opts =
1584+
(C.startWorkflowOptions taskQueue)
1585+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1586+
}
1587+
( useClient $ do
1588+
h <- C.start NoOpWorkflow "test-terminate-throws-with-bad-run-id" opts
1589+
h' <- C.getHandle (workflowRef NoOpWorkflow) "test-terminate-throws-with-bad-run-id" (Just "bad-run-id") Nothing
1590+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1591+
)
1592+
`shouldThrow` \case
1593+
RpcError {} -> True
1594+
_ -> False
1595+
describe "when firstExecutionRunId is provided without runId" $ do
1596+
it "returns if firstExecutionRunId matches a workflow" $ \TestEnv {..} -> do
1597+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1598+
baseConf
1599+
withWorker conf $ do
1600+
let opts =
1601+
(C.startWorkflowOptions taskQueue)
1602+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1603+
}
1604+
useClient $ do
1605+
h <- C.start NoOpWorkflow "test-terminate-works-with-good-first-execution-run-id" opts
1606+
h' <-
1607+
C.getHandle
1608+
(workflowRef NoOpWorkflow)
1609+
"test-terminate-works-with-good-first-execution-run-id"
1610+
Nothing
1611+
$ Just C.GetHandleOptions {C.firstExecutionRunId = Just $ M.fromJust h.workflowHandleFirstExecutionRunId}
1612+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1613+
it "throws if firstExecutionRunId does not match a workflow" $ \TestEnv {..} -> do
1614+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1615+
baseConf
1616+
withWorker conf $
1617+
do
1618+
let opts =
1619+
(C.startWorkflowOptions taskQueue)
1620+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1621+
}
1622+
( useClient $ do
1623+
h <- C.start NoOpWorkflow "test-terminate-throws-with-bad-first-execution-run-id" opts
1624+
h' <-
1625+
C.getHandle
1626+
(workflowRef NoOpWorkflow)
1627+
"test-terminate-throws-with-bad-first-execution-run-id"
1628+
Nothing
1629+
$ Just C.GetHandleOptions {C.firstExecutionRunId = Just "bad-first-execution-run-id"}
1630+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1631+
)
1632+
`shouldThrow` \case
1633+
RpcError {} -> True
1634+
_ -> False
1635+
describe "when both runId and firstExecutionRunId are provided" $ do
1636+
it "returns if both runId and firstExecutionRunId match a workflow" $ \TestEnv {..} -> do
1637+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1638+
baseConf
1639+
withWorker conf $ do
1640+
let opts =
1641+
(C.startWorkflowOptions taskQueue)
1642+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1643+
}
1644+
useClient $ do
1645+
h <- C.start NoOpWorkflow "test-terminate-works-with-good-run-id-bad-first-execution-run-id" opts
1646+
h' <-
1647+
C.getHandle
1648+
(workflowRef NoOpWorkflow)
1649+
"test-terminate-works-with-good-run-id-bad-first-execution-run-id"
1650+
h.workflowHandleRunId
1651+
$ Just
1652+
C.GetHandleOptions
1653+
{ C.firstExecutionRunId = Just $ M.fromJust h.workflowHandleFirstExecutionRunId
1654+
}
1655+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1656+
it "throws if runId does not match a workflow" $ \TestEnv {..} -> do
1657+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1658+
baseConf
1659+
withWorker conf $ do
1660+
let opts =
1661+
(C.startWorkflowOptions taskQueue)
1662+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1663+
}
1664+
useClient
1665+
( do
1666+
h <- C.start NoOpWorkflow "test-terminate-works-with-bad-run-id-good-first-execution-run-id" opts
1667+
h' <-
1668+
C.getHandle
1669+
(workflowRef NoOpWorkflow)
1670+
"test-terminate-works-with-bad-run-id-good-first-execution-run-id"
1671+
(Just "bad-run-id")
1672+
( Just
1673+
C.GetHandleOptions
1674+
{ C.firstExecutionRunId = Just $ M.fromJust h.workflowHandleFirstExecutionRunId
1675+
}
1676+
)
1677+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1678+
)
1679+
`shouldThrow` \case
1680+
RpcError {} -> True
1681+
_ -> False
1682+
it "throws if firstExecutionRunId does not match a workflow" $ \TestEnv {..} -> do
1683+
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do
1684+
baseConf
1685+
withWorker conf $
1686+
do
1687+
let opts =
1688+
(C.startWorkflowOptions taskQueue)
1689+
{ C.workflowIdReusePolicy = Just W.WorkflowIdReusePolicyAllowDuplicate
1690+
}
1691+
useClient
1692+
( do
1693+
h <- C.start NoOpWorkflow "test-terminate-throws-good-run-id-bad-first-execution-run-id" opts
1694+
h' <-
1695+
C.getHandle
1696+
(workflowRef NoOpWorkflow)
1697+
"test-terminate-throws-with-good-run-id-bad-first-execution-run-id"
1698+
(Just $ M.fromJust h.workflowHandleRunId)
1699+
(Just C.GetHandleOptions {C.firstExecutionRunId = Just "bad-first-execution-run-id"})
1700+
C.terminate h' C.TerminationOptions {terminationReason = "testing", terminationDetails = []}
1701+
)
1702+
`shouldThrow` \case
1703+
RpcError {} -> True
1704+
_ -> False
1705+
15431706
-- describe "WorkflowClient" $ do
15441707
-- specify "WorkflowExecutionAlreadyStartedError" pending
15451708
-- specify "follows only own execution chain" pending
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{-# LANGUAGE DerivingStrategies #-}
2+
{-# LANGUAGE DerivingVia #-}
3+
{-# LANGUAGE DuplicateRecordFields #-}
4+
{-# LANGUAGE QuasiQuotes #-}
5+
{-# LANGUAGE TemplateHaskell #-}
6+
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
7+
8+
{-# HLINT ignore "Unused LANGUAGE pragma" #-}
9+
10+
module IntegrationSpec.NoOpWorkflow where
11+
12+
import Control.Exception
13+
import Control.Monad
14+
import RequireCallStack (provideCallStack)
15+
import Temporal.Duration
16+
import Temporal.Payload
17+
import Temporal.TH
18+
import Temporal.Workflow
19+
20+
21+
noOpWorkflow :: Workflow ()
22+
noOpWorkflow = provideCallStack $ pure ()
23+
24+
25+
registerWorkflow 'noOpWorkflow

0 commit comments

Comments
 (0)