Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/cadence-workflow/cadence/llms.txt

Use this file to discover all available pages before exploring further.

Overview

While workflows execute autonomously, you often need to interact with them while they’re running. Cadence provides two mechanisms for this:
  • Signals: Send data to a running workflow (write operation)
  • Queries: Read workflow state without modifying it (read operation)
Signals and queries enable real-time interaction with workflows without breaking determinism. They’re essential for building interactive, event-driven applications.

Signals

What is a Signal?

A signal is an asynchronous message sent to a running workflow execution. Signals are used for:
  • External events: User actions, system events, webhooks
  • State changes: Approval/rejection, cancellation requests
  • Data updates: New information becoming available
  • Coordination: Synchronizing between workflows

How Signals Work

Signal Type Definition

type SignalExternalWorkflowExecutionInitiatedEventAttributes struct {
    DecisionTaskCompletedEventID int64
    Domain                       string
    WorkflowExecution            *WorkflowExecution
    SignalName                   string             // Name of the signal
    Input                        []byte             // Signal payload
    Control                      []byte
    ChildWorkflowOnly            bool
}

Code Examples

package main

import (
    "go.uber.org/cadence/workflow"
)

// Workflow with signal handler
func OrderWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)
    var state string = "pending"

    // Create signal channel
    approvalChannel := workflow.GetSignalChannel(ctx, "approve-order")
    rejectionChannel := workflow.GetSignalChannel(ctx, "reject-order")

    // Wait for approval or rejection
    selector := workflow.NewSelector(ctx)

    selector.AddReceive(approvalChannel, func(c workflow.ReceiveChannel, more bool) {
        var approvalData ApprovalData
        c.Receive(ctx, &approvalData)
        state = "approved"
        logger.Info("Order approved", "by", approvalData.ApprovedBy)
    })

    selector.AddReceive(rejectionChannel, func(c workflow.ReceiveChannel, more bool) {
        var reason string
        c.Receive(ctx, &reason)
        state = "rejected"
        logger.Info("Order rejected", "reason", reason)
    })

    selector.Select(ctx)

    if state == "approved" {
        // Process approved order
        return workflow.ExecuteActivity(ctx, ProcessOrder, orderID).Get(ctx, nil)
    } else {
        // Handle rejection
        return workflow.ExecuteActivity(ctx, NotifyRejection, orderID).Get(ctx, nil)
    }
}

type ApprovalData struct {
    ApprovedBy string
    Timestamp  int64
    Comments   string
}

Queries

What is a Query?

A query is a synchronous read operation on a workflow’s current state. Queries are used for:
  • Status checks: Get current workflow state
  • Progress monitoring: Check completion percentage
  • Data retrieval: Read accumulated results
  • Debugging: Inspect internal workflow state

How Queries Work

Queries are Read-Only: Query handlers cannot modify workflow state or schedule activities. They must be side-effect free.

Query Type Definition

type WorkflowQuery struct {
    QueryType string  // Name of the query
    QueryArgs []byte  // Query parameters
}

type WorkflowQueryResult struct {
    ResultType   *QueryResultType  // ANSWERED or FAILED
    Answer       []byte            // Query result data
    ErrorMessage string            // Error if failed
}

type QueryResultType int32

const (
    QueryResultTypeAnswered QueryResultType = iota  // Query succeeded
    QueryResultTypeFailed                           // Query failed
)

Query Consistency Levels

type QueryConsistencyLevel int32

const (
    QueryConsistencyLevelEventual QueryConsistencyLevel = iota  // Eventually consistent
    QueryConsistencyLevelStrong                                 // Strongly consistent
)
  • Eventual: May return stale data, lower latency
  • Strong: Returns latest data, higher latency

Code Examples

func OrderWorkflow(ctx workflow.Context, orderID string) error {
    var totalAmount float64 = 0
    var items []OrderItem
    var status string = "processing"

    // Register query handlers
    err := workflow.SetQueryHandler(ctx, "get-status", func() (string, error) {
        return status, nil
    })
    if err != nil {
        return err
    }

    err = workflow.SetQueryHandler(ctx, "get-total", func() (float64, error) {
        return totalAmount, nil
    })
    if err != nil {
        return err
    }

    err = workflow.SetQueryHandler(ctx, "get-items", func() ([]OrderItem, error) {
        return items, nil
    })
    if err != nil {
        return err
    }

    // Parameterized query
    err = workflow.SetQueryHandler(ctx, "get-item", func(sku string) (OrderItem, error) {
        for _, item := range items {
            if item.SKU == sku {
                return item, nil
            }
        }
        return OrderItem{}, fmt.Errorf("item not found: %s", sku)
    })
    if err != nil {
        return err
    }

    // Workflow logic updates state
    status = "completed"
    totalAmount = 99.99

    return nil
}

Signal and Query Event Attributes

WorkflowExecutionSignaledEventAttributes

type WorkflowExecutionSignaledEventAttributes struct {
    SignalName string  // Name of the signal
    Input      []byte  // Signal payload
    Identity   string  // Who sent the signal
    RequestID  string  // Idempotency key
}

QueryWorkflowRequest

type QueryWorkflowRequest struct {
    Domain                string
    Execution             *WorkflowExecution
    Query                 *WorkflowQuery
    QueryRejectCondition  *QueryRejectCondition
    QueryConsistencyLevel *QueryConsistencyLevel
}

QueryRejectCondition

type QueryRejectCondition int32

const (
    QueryRejectConditionNotOpen           QueryRejectCondition = iota  // Reject if workflow closed
    QueryRejectConditionNotCompletedCleanly                            // Reject if not completed successfully
)

Advanced Patterns

1. Multi-Signal Coordination

func ApprovalWorkflow(ctx workflow.Context, docID string) error {
    approvals := make(map[string]bool)
    requiredApprovers := []string{"manager", "finance", "legal"}

    approvalCh := workflow.GetSignalChannel(ctx, "approve")

    // Wait for all approvals
    for len(approvals) < len(requiredApprovers) {
        var approver string
        approvalCh.Receive(ctx, &approver)
        approvals[approver] = true
    }

    // All approvals received
    return workflow.ExecuteActivity(ctx, ProcessApprovedDocument, docID).Get(ctx, nil)
}

2. Cancellation via Signal

func LongRunningWorkflow(ctx workflow.Context) error {
    cancelCh := workflow.GetSignalChannel(ctx, "cancel")

    // Start long operation
    activityCtx, cancelActivity := workflow.WithCancel(ctx)
    future := workflow.ExecuteActivity(activityCtx, LongActivity)

    // Wait for completion or cancellation
    selector := workflow.NewSelector(ctx)

    selector.AddFuture(future, func(f workflow.Future) {
        // Activity completed
    })

    selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) {
        var reason string
        c.Receive(ctx, &reason)
        cancelActivity()  // Cancel the activity
    })

    selector.Select(ctx)
    return nil
}

3. Progress Tracking with Queries

func BatchProcessingWorkflow(ctx workflow.Context, items []string) error {
    var processed int
    var failed int
    var currentItem string

    // Register progress query
    workflow.SetQueryHandler(ctx, "progress", func() (map[string]interface{}, error) {
        total := len(items)
        return map[string]interface{}{
            "total":       total,
            "processed":   processed,
            "failed":      failed,
            "remaining":   total - processed - failed,
            "currentItem": currentItem,
            "percentage":  float64(processed) / float64(total) * 100,
        }, nil
    })

    // Process items
    for _, item := range items {
        currentItem = item
        err := workflow.ExecuteActivity(ctx, ProcessItem, item).Get(ctx, nil)
        if err != nil {
            failed++
        } else {
            processed++
        }
    }

    return nil
}

4. Dynamic Signal Handlers

func DynamicWorkflow(ctx workflow.Context) error {
    signals := make(map[string]interface{})

    // Register catch-all signal handler
    workflow.SetSignalHandler(ctx, func(signalName string, input []byte) {
        signals[signalName] = string(input)
    })

    // Query to see received signals
    workflow.SetQueryHandler(ctx, "signals", func() (map[string]interface{}, error) {
        return signals, nil
    })

    // Workflow logic
    workflow.Sleep(ctx, time.Hour)
    return nil
}

5. Signal Buffering

func BufferedSignalWorkflow(ctx workflow.Context) error {
    buffer := make([]SignalData, 0)
    signalCh := workflow.GetSignalChannel(ctx, "data")

    // Buffer signals
    for {
        selector := workflow.NewSelector(ctx)

        selector.AddReceive(signalCh, func(c workflow.ReceiveChannel, more bool) {
            var data SignalData
            c.Receive(ctx, &data)
            buffer = append(buffer, data)
        })

        // Process buffer when it reaches threshold
        if len(buffer) >= 100 {
            err := workflow.ExecuteActivity(ctx, ProcessBatch, buffer).Get(ctx, nil)
            if err != nil {
                return err
            }
            buffer = buffer[:0]  // Clear buffer
        }

        selector.Select(ctx)
    }
}

Best Practices

Signals

// Use typed signal data
type ApprovalSignal struct {
    ApprovedBy string
    Timestamp  time.Time
    Comments   string
}

// Descriptive signal names
workflow.GetSignalChannel(ctx, "approve-order")
workflow.GetSignalChannel(ctx, "reject-order")
workflow.GetSignalChannel(ctx, "cancel-order")

Queries

// Read-only query handlers
workflow.SetQueryHandler(ctx, "status", func() (string, error) {
    return currentStatus, nil  // ✓ Just returns state
})

// Return copies of mutable data
workflow.SetQueryHandler(ctx, "items", func() ([]Item, error) {
    return append([]Item{}, items...), nil  // ✓ Return copy
})

General

  1. Signal Naming: Use descriptive, action-oriented names
  2. Query Naming: Use noun-based or question-style names
  3. Idempotency: Design signal handlers to be idempotent
  4. Validation: Validate signal/query inputs
  5. Documentation: Document expected signal/query types

Common Pitfalls

Avoid These Mistakes:
  1. Modifying state in queries - Queries must be read-only
  2. Blocking on signals indefinitely - Use timeouts with selectors
  3. Large signal payloads - Use activities for large data transfers
  4. Not handling signal ordering - Signals may arrive in any order
  5. Querying closed workflows - Check workflow status first

CLI Examples

# Send signal
cadence workflow signal \
  --workflow_id order-12345 \
  --name approve-order \
  --input '{"approvedBy": "manager@example.com"}'

# Query workflow
cadence workflow query \
  --workflow_id order-12345 \
  --query_type get-status

# Query with consistency
cadence workflow query \
  --workflow_id order-12345 \
  --query_type get-total \
  --query_consistency_level strong

# Signal with start
cadence workflow start \
  --workflow_id order-12345 \
  --tasklist order-processing \
  --workflow_type OrderWorkflow \
  --signal_name new-item \
  --signal_input '{"sku": "ABC123"}'
  • Workflows - Host signal and query handlers
  • Activities - Cannot be signaled or queried directly
  • Workers - Execute signal and query handlers
  • Domains - Scope for workflow execution

Further Reading