Skip to main content

Workflow message passing - Temporal Go SDK feature guide

This page shows how to do the following:

Signals

A Signal is a message sent to a running Workflow Execution.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

Define a Signal

** How to define a Signal using the Go SDK.**

A Signal has a name and can have arguments.

  • The name, also called a Signal type, is a string.
  • The arguments must be serializable.

Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter. The Receive() method on the Data Converter decodes the data into the Struct within the Workflow. Only public fields are serializable.

type MySignal struct {
Message string // serializable
message string // not serializable
}

Handle a Signal

How to handle a Signal using the Go SDK.

Workflows listen for Signals by the Signal's name.

Use the GetSignalChannel() API from the go.temporal.io/sdk/workflow package to get the Signal Channel.

A common use-case is to block a Workflow while waiting for a Signal, like in the following snippet:

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
signalChan.Receive(ctx, &signal)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}

Alternatively, you might want the Workflow to proceed and still be capable of handling external Signals.

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
workflow.Go(ctx, func(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signal)
})
selector.Select(ctx)
}
})
// submit activity one
// signal can be received while activity one is pending

}

In the example above, the Workflow code uses workflow.GetSignalChannel to open a workflow.Channel for the Signal type (identified by the Signal name).

Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost.

Send a Signal from a Temporal Client

How to send a Signal from a Temporal Client using the Go SDK.

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.

Use the SignalWorkflow() method on an instance of the Go SDK Temporal Client to send a Signal to a Workflow Execution.

Pass in both the Workflow Id and Run Id to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.

// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...

Possible errors:

  • serviceerror.NotFound
  • serviceerror.Internal
  • serviceerror.Unavailable

Send a Signal from a Workflow

How to send a Signal from a Workflow using the Go SDK.

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow API from the go.temporal.io/sdk/workflow package.

// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}

Signal-With-Start

How to use Signal-With-Start using the Go SDK.

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

Use the SignalWithStartWorkflow() API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.

Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter

// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}

Queries

A Query is a synchronous operation that is used to get the state of a Workflow Execution.

How to define a Query

How to define a Query using the Go SDK.

A Query has a name and can have arguments.

  • The name, also called a Query type, is a string.
  • The arguments must be serializable.

In Go, a Query type, also called a Query name, is a string value.

queryType := "your_query_name"

Handle a Query

How to handle a Query using the Go SDK.

Queries are handled by your Workflow.

Don't include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

Use the SetQueryHandler API from the go.temporal.io/sdk/workflow package to set a Query Handler that listens for a Query by name.

The handler must be a function that returns two values:

  1. A serializable result
  2. An error

The handler function can receive any number of input parameters, but all input parameters must be serializable. The following sample code sets up a Query Handler that handles the current_state Query type:

func YourWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = workflow.NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
})
err = workflow.ExecuteActivity(ctx, YourActivity, "your_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}

For example, suppose your query handler function takes two parameters:

err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})

Send a Query

How to send a Query using the Go SDK.

Queries are sent from a Temporal Client.

Use the QueryWorkflow() API or the QueryWorkflowWithOptions API on the Temporal Client to send a Query to a Workflow Execution.

// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...

You can pass an arbitrary number of arguments to the QueryWorkflow() function.

// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...

The value of response returned by the Query needs to be decoded into result. Because this is a future, use Get() on response to get the result, such as a string in this example.

var result string
if err != response.Get(&result); err != nil {
// ...
}
log.Println("Received Query result. Result: " + result)

Updates

An Update is an operation that can mutate the state of a Workflow Execution and return a response.

Define an Update

How to define an Update using the Go SDK.

In Go, you define an Update type, also known as an Update name, as a string value. You must ensure the arguments and result are serializable. When sending and receiving the Update, use the Update name as an identifier. The name does not link to the data type(s) sent with the Update. Ensure that every Workflow listening to the same Update name can handle the same Update arguments.

View the source code

in the context of the rest of the application code.

// YourUpdateName holds a string value used to correlate Updates.
const YourUpdateName = "your_update_name"
// ...
func YourUpdatableWorkflow(ctx workflow.Context, param WFParam) (WFResult, error) {
// ...
err := workflow.SetUpdateHandler(ctx, YourUpdateName, func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
// ...
})
if err != nil {
return WFResult{}, err
}
// ...
}

Handle an Update

How to handle an Update using the Go SDK.

Register an Update handler for a given name using the SetUpdateHandler API from the go.temporal.io/sdk/workflow package. The handler function can accept multiple serializable input parameters, but we recommend using only a single parameter. This practice enables you to add fields in future versions while maintaining backward compatibility. You must include a workflow.Context parameter in the first position of the function. The function can return either a serializable value with an error or just an error. The Workflow's WorkflowPanicPolicy configuration determines how panics are handled inside the Handler function. WorkflowPanicPolicy is set in the Worker Options.

Update handlers, unlike Query handlers, can block and change Workflow state.

View the source code

in the context of the rest of the application code.

// ...
func YourUpdatableWorkflow(ctx workflow.Context, param WFParam) (WFResult, error) {
counter := param.StartCount
err := workflow.SetUpdateHandler(ctx, YourUpdateName, func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
counter += arg.Add
result := YourUpdateResult{
Total: counter,
}
return result, nil
})
if err != nil {
return WFResult{}, err
}
// ...
}

Set an Update validator function

How to set an Update validator function using the Go SDK.

Validate certain aspects of the data sent to the Workflow using an Update validator function. For instance, a counter Workflow might never want to accept a non-positive number. Invoke the SetUpdateHandlerWithOptions API and define a validator function as one of the options.

When you use a Validator function, the Worker receives the Update first, before any Events are written to the Event History. If the Update is rejected, it's not recorded in the Event History. If it's accepted, the WorkflowExecutionUpdateAccepted Event occurs. Afterwards, the Worker executes the accepted Update and, upon completion, a WorkflowExecutionUpdateCompleted Event gets written into the Event History. The Validator function, unlike the Update Handler, can not change the state of the Workflow.

The platform treats a panic in the Validator function as a rejection of the Update.

View the source code

in the context of the rest of the application code.

// UpdatableWorkflowWithValidator is a Workflow Definition.
// This Workflow Definition has an Update handler that uses the isPositive() validator function.
// After setting the Update handler it sleeps for 1 minute.
// Updates can be sent to the Workflow during this time.
func UpdatableWorkflowWithValidator(ctx workflow.Context, param WFParam) (WFResult, error) {
counter := param.StartCount
err := workflow.SetUpdateHandlerWithOptions(
ctx, YourValidatedUpdateName,
func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
counter += arg.Add
result := YourUpdateResult{
Total: counter,
}
return result, nil
},
// Set the isPositive validator.
workflow.UpdateHandlerOptions{Validator: isPositive},
)
if err != nil {
return WFResult{}, err
}
if err := workflow.Sleep(ctx, time.Minute); err != nil {
return WFResult{}, err
}
return WFResult{Total: counter}, nil
}

// isPositive is a validator function.
// It returns an error if the int value is below 1.
// This function can not change the state of the Workflow.
// workflow.Context can be used to log
func isPositive(ctx workflow.Context, u YourUpdateArg) error {
log := workflow.GetLogger(ctx)
if u.Add < 1 {
log.Debug("Rejecting non-positive number, positive integers only", "UpdateValue", u.Add)
return fmt.Errorf("addend must be a positive integer (%v)", u.Add)
}
log.Debug("Accepting Update", "UpdateValue", u.Add)
return nil
}

Send an Update from a Temporal Client

How to send an Update from a Temporal Client using the Go SDK.

Invoke the UpdateWorkflow() method on an instance of the Go SDK Temporal Client to dispatch an Update to a Workflow Execution.

You must provide the Workflow Id, but specifying a Run Id is optional. If you supply only the Workflow Id (and provide an empty string as the Run Id param), the currently running Workflow Execution receives the Update.

You must provide a WaitForStage when calling UpdateWorkflow(). This parameter controls what stage the update must reach before a handle is returned to the caller. If WaitForStage is set to WorkflowUpdateStageCompleted the handle is returned after the update completes, if WaitForStage is set to WorkflowUpdateStageAccepted the handle is returned after the update is accepted (i.e. the validator has run).

View the source code

in the context of the rest of the application code.

func main() {
// ...
// Set the Update argument values.
updateArg := YourUpdateArg{
Add: 5,
}
// Call the UpdateWorkflow API.
// A blank RunID means that the Update is routed to the most recent Workflow Run of the specified Workflow ID.
updateHandle, err := temporalClient.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
WorkflowID: "your-workflow-id",
UpdateName: YourUpdateName,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []interface{}{updateArg},
})
if err != nil {
log.Fatalln("Error issuing Update request", err)
}
// Get the result of the Update.
var updateResult YourUpdateResult
err = updateHandle.Get(context.Background(), &updateResult)
if err != nil {
log.Fatalln("Update encountered an error", err)
}
log.Println("Update succeeded, new total: ", updateResult.Total)
}