Documentation Index
Fetch the complete documentation index at: https://mintlify.com/cadence-workflow/cadence/llms.txt
Use this file to discover all available pages before exploring further.
Overview
While workflows execute autonomously, you often need to interact with them while they’re running. Cadence provides two mechanisms for this:
- Signals: Send data to a running workflow (write operation)
- Queries: Read workflow state without modifying it (read operation)
Signals and queries enable real-time interaction with workflows without breaking determinism. They’re essential for building interactive, event-driven applications.
Signals
What is a Signal?
A signal is an asynchronous message sent to a running workflow execution. Signals are used for:
- External events: User actions, system events, webhooks
- State changes: Approval/rejection, cancellation requests
- Data updates: New information becoming available
- Coordination: Synchronizing between workflows
How Signals Work
Signal Type Definition
type SignalExternalWorkflowExecutionInitiatedEventAttributes struct {
DecisionTaskCompletedEventID int64
Domain string
WorkflowExecution *WorkflowExecution
SignalName string // Name of the signal
Input []byte // Signal payload
Control []byte
ChildWorkflowOnly bool
}
Code Examples
package main
import (
"go.uber.org/cadence/workflow"
)
// Workflow with signal handler
func OrderWorkflow(ctx workflow.Context, orderID string) error {
logger := workflow.GetLogger(ctx)
var state string = "pending"
// Create signal channel
approvalChannel := workflow.GetSignalChannel(ctx, "approve-order")
rejectionChannel := workflow.GetSignalChannel(ctx, "reject-order")
// Wait for approval or rejection
selector := workflow.NewSelector(ctx)
selector.AddReceive(approvalChannel, func(c workflow.ReceiveChannel, more bool) {
var approvalData ApprovalData
c.Receive(ctx, &approvalData)
state = "approved"
logger.Info("Order approved", "by", approvalData.ApprovedBy)
})
selector.AddReceive(rejectionChannel, func(c workflow.ReceiveChannel, more bool) {
var reason string
c.Receive(ctx, &reason)
state = "rejected"
logger.Info("Order rejected", "reason", reason)
})
selector.Select(ctx)
if state == "approved" {
// Process approved order
return workflow.ExecuteActivity(ctx, ProcessOrder, orderID).Get(ctx, nil)
} else {
// Handle rejection
return workflow.ExecuteActivity(ctx, NotifyRejection, orderID).Get(ctx, nil)
}
}
type ApprovalData struct {
ApprovedBy string
Timestamp int64
Comments string
}
func sendApproval() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
// Send signal to workflow
err := c.SignalWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID (empty = current run)
"approve-order", // Signal name
ApprovalData{
ApprovedBy: "manager@example.com",
Timestamp: time.Now().Unix(),
Comments: "Looks good",
},
)
if err != nil {
panic(err)
}
}
// Start workflow if not running, or signal if already running
func signalWithStart() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
workflowOptions := client.StartWorkflowOptions{
ID: "order-12345",
TaskList: "order-processing",
ExecutionStartToCloseTimeout: time.Hour * 24,
}
signalName := "new-item"
signalArg := OrderItem{SKU: "ABC123", Quantity: 5}
we, err := c.SignalWithStartWorkflow(
context.Background(),
"order-12345",
signalName,
signalArg,
workflowOptions,
OrderWorkflow,
"order-12345", // Workflow args
)
if err != nil {
panic(err)
}
}
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.SignalMethod;
import com.uber.cadence.workflow.WorkflowMethod;
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(String orderId);
@SignalMethod
void approveOrder(ApprovalData approval);
@SignalMethod
void rejectOrder(String reason);
}
public class OrderWorkflowImpl implements OrderWorkflow {
private String state = "pending";
private ApprovalData approval;
private String rejectionReason;
@Override
public void processOrder(String orderId) {
// Wait for signal
Workflow.await(() -> !state.equals("pending"));
if (state.equals("approved")) {
// Process order
activities.processOrder(orderId);
} else {
// Handle rejection
activities.notifyRejection(orderId, rejectionReason);
}
}
@Override
public void approveOrder(ApprovalData approval) {
this.approval = approval;
this.state = "approved";
}
@Override
public void rejectOrder(String reason) {
this.rejectionReason = reason;
this.state = "rejected";
}
}
// Sending signal
WorkflowClient client = WorkflowClient.newInstance(...);
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
workflow.approveOrder(new ApprovalData("manager", "Approved"));
Queries
What is a Query?
A query is a synchronous read operation on a workflow’s current state. Queries are used for:
- Status checks: Get current workflow state
- Progress monitoring: Check completion percentage
- Data retrieval: Read accumulated results
- Debugging: Inspect internal workflow state
How Queries Work
Queries are Read-Only: Query handlers cannot modify workflow state or schedule activities. They must be side-effect free.
Query Type Definition
type WorkflowQuery struct {
QueryType string // Name of the query
QueryArgs []byte // Query parameters
}
type WorkflowQueryResult struct {
ResultType *QueryResultType // ANSWERED or FAILED
Answer []byte // Query result data
ErrorMessage string // Error if failed
}
type QueryResultType int32
const (
QueryResultTypeAnswered QueryResultType = iota // Query succeeded
QueryResultTypeFailed // Query failed
)
Query Consistency Levels
type QueryConsistencyLevel int32
const (
QueryConsistencyLevelEventual QueryConsistencyLevel = iota // Eventually consistent
QueryConsistencyLevelStrong // Strongly consistent
)
- Eventual: May return stale data, lower latency
- Strong: Returns latest data, higher latency
Code Examples
func OrderWorkflow(ctx workflow.Context, orderID string) error {
var totalAmount float64 = 0
var items []OrderItem
var status string = "processing"
// Register query handlers
err := workflow.SetQueryHandler(ctx, "get-status", func() (string, error) {
return status, nil
})
if err != nil {
return err
}
err = workflow.SetQueryHandler(ctx, "get-total", func() (float64, error) {
return totalAmount, nil
})
if err != nil {
return err
}
err = workflow.SetQueryHandler(ctx, "get-items", func() ([]OrderItem, error) {
return items, nil
})
if err != nil {
return err
}
// Parameterized query
err = workflow.SetQueryHandler(ctx, "get-item", func(sku string) (OrderItem, error) {
for _, item := range items {
if item.SKU == sku {
return item, nil
}
}
return OrderItem{}, fmt.Errorf("item not found: %s", sku)
})
if err != nil {
return err
}
// Workflow logic updates state
status = "completed"
totalAmount = 99.99
return nil
}
func queryWorkflow() {
c, _ := client.NewClient(client.Options{
Domain: "my-domain",
})
// Query workflow status
var status string
value, err := c.QueryWorkflow(
context.Background(),
"order-12345", // Workflow ID
"", // Run ID
"get-status", // Query type
)
if err != nil {
panic(err)
}
err = value.Get(&status)
if err != nil {
panic(err)
}
fmt.Printf("Status: %s\n", status)
// Query with strong consistency
queryOptions := client.QueryWorkflowWithOptionsRequest{
WorkflowID: "order-12345",
QueryType: "get-total",
QueryConsistencyLevel: client.QueryConsistencyLevelStrong,
}
var total float64
value, err = c.QueryWorkflowWithOptions(context.Background(), &queryOptions)
if err != nil {
panic(err)
}
value.Get(&total)
fmt.Printf("Total: $%.2f\n", total)
// Parameterized query
var item OrderItem
value, err = c.QueryWorkflow(
context.Background(),
"order-12345",
"",
"get-item",
"ABC123", // Query parameter
)
if err != nil {
panic(err)
}
value.Get(&item)
fmt.Printf("Item: %+v\n", item)
}
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(String orderId);
@QueryMethod
String getStatus();
@QueryMethod
double getTotal();
@QueryMethod
List<OrderItem> getItems();
}
public class OrderWorkflowImpl implements OrderWorkflow {
private String status = "processing";
private double totalAmount = 0;
private List<OrderItem> items = new ArrayList<>();
@Override
public void processOrder(String orderId) {
// Workflow logic
status = "completed";
totalAmount = 99.99;
}
@Override
public String getStatus() {
return status;
}
@Override
public double getTotal() {
return totalAmount;
}
@Override
public List<OrderItem> getItems() {
return new ArrayList<>(items);
}
}
// Querying workflow
WorkflowClient client = WorkflowClient.newInstance(...);
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
String status = workflow.getStatus();
double total = workflow.getTotal();
Signal and Query Event Attributes
WorkflowExecutionSignaledEventAttributes
type WorkflowExecutionSignaledEventAttributes struct {
SignalName string // Name of the signal
Input []byte // Signal payload
Identity string // Who sent the signal
RequestID string // Idempotency key
}
QueryWorkflowRequest
type QueryWorkflowRequest struct {
Domain string
Execution *WorkflowExecution
Query *WorkflowQuery
QueryRejectCondition *QueryRejectCondition
QueryConsistencyLevel *QueryConsistencyLevel
}
QueryRejectCondition
type QueryRejectCondition int32
const (
QueryRejectConditionNotOpen QueryRejectCondition = iota // Reject if workflow closed
QueryRejectConditionNotCompletedCleanly // Reject if not completed successfully
)
Advanced Patterns
1. Multi-Signal Coordination
func ApprovalWorkflow(ctx workflow.Context, docID string) error {
approvals := make(map[string]bool)
requiredApprovers := []string{"manager", "finance", "legal"}
approvalCh := workflow.GetSignalChannel(ctx, "approve")
// Wait for all approvals
for len(approvals) < len(requiredApprovers) {
var approver string
approvalCh.Receive(ctx, &approver)
approvals[approver] = true
}
// All approvals received
return workflow.ExecuteActivity(ctx, ProcessApprovedDocument, docID).Get(ctx, nil)
}
2. Cancellation via Signal
func LongRunningWorkflow(ctx workflow.Context) error {
cancelCh := workflow.GetSignalChannel(ctx, "cancel")
// Start long operation
activityCtx, cancelActivity := workflow.WithCancel(ctx)
future := workflow.ExecuteActivity(activityCtx, LongActivity)
// Wait for completion or cancellation
selector := workflow.NewSelector(ctx)
selector.AddFuture(future, func(f workflow.Future) {
// Activity completed
})
selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) {
var reason string
c.Receive(ctx, &reason)
cancelActivity() // Cancel the activity
})
selector.Select(ctx)
return nil
}
3. Progress Tracking with Queries
func BatchProcessingWorkflow(ctx workflow.Context, items []string) error {
var processed int
var failed int
var currentItem string
// Register progress query
workflow.SetQueryHandler(ctx, "progress", func() (map[string]interface{}, error) {
total := len(items)
return map[string]interface{}{
"total": total,
"processed": processed,
"failed": failed,
"remaining": total - processed - failed,
"currentItem": currentItem,
"percentage": float64(processed) / float64(total) * 100,
}, nil
})
// Process items
for _, item := range items {
currentItem = item
err := workflow.ExecuteActivity(ctx, ProcessItem, item).Get(ctx, nil)
if err != nil {
failed++
} else {
processed++
}
}
return nil
}
4. Dynamic Signal Handlers
func DynamicWorkflow(ctx workflow.Context) error {
signals := make(map[string]interface{})
// Register catch-all signal handler
workflow.SetSignalHandler(ctx, func(signalName string, input []byte) {
signals[signalName] = string(input)
})
// Query to see received signals
workflow.SetQueryHandler(ctx, "signals", func() (map[string]interface{}, error) {
return signals, nil
})
// Workflow logic
workflow.Sleep(ctx, time.Hour)
return nil
}
5. Signal Buffering
func BufferedSignalWorkflow(ctx workflow.Context) error {
buffer := make([]SignalData, 0)
signalCh := workflow.GetSignalChannel(ctx, "data")
// Buffer signals
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalCh, func(c workflow.ReceiveChannel, more bool) {
var data SignalData
c.Receive(ctx, &data)
buffer = append(buffer, data)
})
// Process buffer when it reaches threshold
if len(buffer) >= 100 {
err := workflow.ExecuteActivity(ctx, ProcessBatch, buffer).Get(ctx, nil)
if err != nil {
return err
}
buffer = buffer[:0] // Clear buffer
}
selector.Select(ctx)
}
}
Best Practices
Signals
// Use typed signal data
type ApprovalSignal struct {
ApprovedBy string
Timestamp time.Time
Comments string
}
// Descriptive signal names
workflow.GetSignalChannel(ctx, "approve-order")
workflow.GetSignalChannel(ctx, "reject-order")
workflow.GetSignalChannel(ctx, "cancel-order")
Queries
// Read-only query handlers
workflow.SetQueryHandler(ctx, "status", func() (string, error) {
return currentStatus, nil // ✓ Just returns state
})
// Return copies of mutable data
workflow.SetQueryHandler(ctx, "items", func() ([]Item, error) {
return append([]Item{}, items...), nil // ✓ Return copy
})
General
- Signal Naming: Use descriptive, action-oriented names
- Query Naming: Use noun-based or question-style names
- Idempotency: Design signal handlers to be idempotent
- Validation: Validate signal/query inputs
- Documentation: Document expected signal/query types
Common Pitfalls
Avoid These Mistakes:
- Modifying state in queries - Queries must be read-only
- Blocking on signals indefinitely - Use timeouts with selectors
- Large signal payloads - Use activities for large data transfers
- Not handling signal ordering - Signals may arrive in any order
- Querying closed workflows - Check workflow status first
CLI Examples
# Send signal
cadence workflow signal \
--workflow_id order-12345 \
--name approve-order \
--input '{"approvedBy": "manager@example.com"}'
# Query workflow
cadence workflow query \
--workflow_id order-12345 \
--query_type get-status
# Query with consistency
cadence workflow query \
--workflow_id order-12345 \
--query_type get-total \
--query_consistency_level strong
# Signal with start
cadence workflow start \
--workflow_id order-12345 \
--tasklist order-processing \
--workflow_type OrderWorkflow \
--signal_name new-item \
--signal_input '{"sku": "ABC123"}'
- Workflows - Host signal and query handlers
- Activities - Cannot be signaled or queried directly
- Workers - Execute signal and query handlers
- Domains - Scope for workflow execution
Further Reading