Workflows automate intelligence on your onchain data. Define multi-step tasks that trigger on blockchain events, run on a schedule, or fire on demand — secondlayer handles execution, retries, and observability. Each step runs independently and retries on failure without re-running earlier steps.

Install with bun add @secondlayer/workflows.

Set up workflows with your agent.
Workflows automate intelligence on your onchain data. Define multi-step tasks that trigger on blockchain events, run on a schedule, or fire on demand. Each step runs independently with automatic retries. Available step methods: step.ai() for LLM analysis, step.query() to read subgraph data, step.mcp() to call MCP server tools, step.deliver() to send to webhooks/Slack/Discord/Telegram/email, step.invoke() to chain workflows, and step.sleep() for delays.

Ensure setup (skip any step already done, use the project's package manager):
- Skill: run the `skills` npm package to install — e.g. `npx|bunx|pnpm dlx skills add ryanwaits/secondlayer --skill secondlayer -y`
- CLI: `@secondlayer/cli` installed globally
- Auth: `secondlayer auth login`

/secondlayer Help me create a workflow. Ask me:
1. What should trigger this workflow? (blockchain event, schedule, or manual)
2. What data do I need to read or analyze?
3. What action should it take? (webhook, Slack, email)

Create the workflow and deploy it.

Getting started

A workflow has two parts: a trigger (when to run) and a handler (what to do). Handlers are built from steps — isolated units of work that retry independently and memoize results across failures.

import { defineWorkflow } from "@secondlayer/workflows"

export default defineWorkflow({
  name: "whale-alert",
  trigger: {
    type: "event",
    filter: { type: "stx_transfer", minAmount: 100_000_000_000 },
  },
  handler: async ({ event, step }) => {
    const context = await step.run("enrich", async () => {
      const sender = await step.query("accounts", "balances", {
        where: { address: { eq: event.sender } },
      })
      return { ...event, senderBalance: sender[0]?.balance }
    })

    const analysis = await step.ai("assess-risk", {
      prompt: `Whale transfer of ${event.amount} microSTX from ${event.sender}. Sender balance: ${context.senderBalance}. Is this unusual?`,
    })

    if (analysis.riskScore > 0.7) {
      await step.deliver("alert", {
        type: "webhook",
        url: "https://api.example.com/alerts",
        body: { transfer: context, analysis },
      })
    }
  },
})

Triggers

Four trigger types. Event and stream triggers use the same filter types as streams and subgraphs. Stream triggers fire the workflow directly when a block matches — no external webhook needed. Schedule triggers use cron expressions. Manual triggers accept typed input via the API or dashboard.

// Trigger on blockchain events — same filters as streams
trigger: {
  type: "event",
  filter: { type: "stx_transfer", minAmount: 50_000_000_000 },
}

// Stream trigger — fires directly when a block matches, no webhook
trigger: {
  type: "stream",
  filter: { type: "contract_call", contractId: "SP1234...::dex", functionName: "swap" },
}

// Trigger on a schedule
trigger: {
  type: "schedule",
  cron: "0 8 * * *",        // 8 AM UTC daily
  timezone: "America/Chicago",
}

// Trigger manually via API or dashboard
trigger: {
  type: "manual",
  input: {
    contractId: { type: "string", required: true },
    depth: { type: "string", default: "shallow" },
  },
}

Steps

Steps are the building blocks of a workflow. Each step.run() call is isolated — it retries on failure without re-running completed steps. Use Promise.all() for parallel execution.

handler: async ({ event, step }) => {
  // Sequential steps
  const data = await step.run("fetch-data", async () => {
    return await fetchFromAPI(event.contractId)
  })

  const enriched = await step.run("enrich", async () => {
    return await enrichWithMetadata(data)
  })

  // Parallel steps
  const [analysis, history] = await Promise.all([
    step.run("analyze", async () => analyzePatterns(enriched)),
    step.run("get-history", async () => getHistoricalData(enriched)),
  ])

  // Sleep between steps
  await step.sleep("wait-for-settlement", 60_000) // 60 seconds

  // Invoke another workflow
  const result = await step.invoke("deep-analysis", {
    workflow: "contract-analyzer",
    input: { contractId: event.contractId },
  })
}

A complete pipeline — detect large swaps, enrich with subgraph data, run AI analysis, and alert via Slack:

export default defineWorkflow({
  name: "large-swap-monitor",
  trigger: {
    type: "stream",
    filter: { type: "contract_call", contractId: "SP1234...::amm-pool", functionName: "swap-exact-*" },
  },
  handler: async ({ event, step }) => {
    // 1. Enrich — query subgraph for context
    const context = await step.run("enrich", async () => {
      const [recentSwaps, pool] = await Promise.all([
        step.query("dex-swaps", "swaps", {
          where: { sender: { eq: event.sender }, _blockHeight: { gte: event.block.height - 500 } },
          orderBy: { _blockHeight: "desc" },
          limit: 20,
        }),
        step.query("dex-pools", "pools", {
          where: { contractId: { eq: event.contractId } },
          limit: 1,
        }),
      ])
      return { recentSwaps, pool: pool[0], swapAmount: event.args.amount }
    })

    // 2. Analyze — AI evaluates the pattern
    const analysis = await step.ai("assess-pattern", {
      prompt: `Large swap of ${context.swapAmount} on pool ${context.pool?.name}. Sender has ${context.recentSwaps.length} swaps in last 500 blocks. Is this unusual activity?`,
      model: "haiku",
      schema: {
        riskScore: { type: "number", description: "0-1 risk score" },
        pattern: { type: "string", description: "detected pattern name" },
        summary: { type: "string", description: "one-line summary" },
      },
    })

    // 3. Alert — deliver if risk is elevated
    if (analysis.riskScore > 0.5) {
      await step.deliver("notify-team", {
        type: "slack",
        channel: "#dex-alerts",
        text: `[${analysis.pattern}] ${analysis.summary} (risk: ${analysis.riskScore})`,
      })
    }
  },
})

AI analysis

step.ai() runs an LLM analysis as a discrete step. It retries independently, tracks token usage, and returns structured output. AI evaluations count toward your tier's daily throughput — when the cap is reached, AI steps are skipped and the workflow continues with condition-only logic.

// Basic analysis — returns unstructured text
const insight = await step.ai("summarize-activity", {
  prompt: `Summarize the trading activity for ${contractId} over the last 24 hours: ${JSON.stringify(trades)}`,
})
// insight.text → "Trading volume increased 340% driven by..."

// Structured output — returns typed object
const assessment = await step.ai("risk-assessment", {
  prompt: `Analyze this transfer pattern for anomalies: ${JSON.stringify(transfers)}`,
  schema: {
    riskScore: { type: "number", description: "0-1 risk score" },
    flags: { type: "array", items: "string" },
    recommendation: { type: "string" },
  },
})
// assessment.riskScore → 0.82
// assessment.flags → ["unusual_volume", "new_recipient"]

// Model selection — defaults to haiku, use sonnet for complex analysis
const deepAnalysis = await step.ai("deep-analysis", {
  prompt: "...",
  model: "sonnet",
  schema: { ... },
})

MCP tools

step.mcp() calls tools on external MCP servers — GitHub, Slack, Notion, or any server in the MCP ecosystem. Configure servers via environment variables and call any tool from your workflow.

// Configure MCP servers via environment variables:
// MCP_SERVER_GITHUB=npx @modelcontextprotocol/server-github
// MCP_SERVER_FILESYSTEM=npx @modelcontextprotocol/server-filesystem /data

// Call any tool on a configured MCP server
const files = await step.mcp("list-files", {
  server: "filesystem",
  tool: "list_directory",
  args: { path: "/data/reports" },
})

// Create a GitHub issue from workflow analysis
await step.mcp("file-issue", {
  server: "github",
  tool: "create_issue",
  args: {
    repo: "myorg/myrepo",
    title: "Anomaly detected in swap volume",
    body: analysis.summary,
  },
})

// MCP results include content array and error flag
// result.content → [{ type: "text", text: "..." }]
// result.isError → false

Querying subgraphs

step.query() reads from your deployed subgraph tables directly. No API overhead — workflows run co-located with your data and query Postgres directly.

// Query a subgraph table
const largeSwaps = await step.query("dex-swaps", "swaps", {
  where: {
    amount: { gte: "1000000000" },
    _blockHeight: { gte: event.block.height - 100 },
  },
  orderBy: { amount: "desc" },
  limit: 50,
})

// Aggregate queries
const volume = await step.count("dex-swaps", "swaps", {
  timestamp: { gte: oneDayAgo },
})

// Cross-subgraph correlation
const positions = await step.query("lending-positions", "borrows", {
  where: { borrower: { eq: event.sender } },
})
const prices = await step.query("price-feeds", "prices", {
  where: { token: { eq: positions[0]?.token } },
  orderBy: { _blockHeight: "desc" },
  limit: 1,
})

Delivering results

step.deliver() sends results to external systems. Supports webhook, Slack, Discord, Telegram, and email. Deliveries are retried on failure and tracked in the run log.

// Webhook delivery
await step.deliver("notify-backend", {
  type: "webhook",
  url: "https://api.example.com/events",
  body: { event: "whale_alert", data: analysis },
  headers: { "X-API-Key": process.env.API_KEY },
})

// Slack notification
await step.deliver("alert-team", {
  type: "slack",
  channel: "#alerts",
  text: `Whale transfer detected: ${event.amount} microSTX from ${event.sender}`,
})

// Email summary
await step.deliver("daily-report", {
  type: "email",
  to: "team@example.com",
  subject: "Daily DEX Volume Report",
  body: reportHtml,
})

// Discord notification
await step.deliver("notify-discord", {
  type: "discord",
  webhookUrl: "https://discord.com/api/webhooks/YOUR/WEBHOOK",
  content: "Whale transfer detected!",
  username: "Secondlayer Bot",
})

// Telegram message
await step.deliver("alert-telegram", {
  type: "telegram",
  botToken: process.env.TELEGRAM_BOT_TOKEN,
  chatId: "-1001234567890",
  text: "⚠️ Large swap detected on DEX",
  parseMode: "HTML",
})

Deploy

Deploy workflows via the CLI. The CLI bundles your handler code and registers triggers with the platform. Workflows start running immediately after deploy.

# Deploy a workflow
sl workflows deploy workflows/whale-alert.ts

# Dev mode — watches for changes, auto-redeploys
sl workflows dev workflows/whale-alert.ts

# Deploy all workflows in a directory
sl workflows deploy workflows/

Management

Manage workflows via the SDK or CLI. Each run is tracked with full step-level logs, timing, and AI token usage.

import { SecondLayer } from "@secondlayer/sdk"

const client = new SecondLayer({ apiKey: "sk-sl_..." })

// Deploy a workflow
const result = await client.workflows.deploy({
  name: "whale-alert",
  trigger: { type: "stream", filter: { type: "stx_transfer", minAmount: 100_000_000_000 } },
  handlerCode: bundledCode,
})

// List workflows
const { workflows } = await client.workflows.list()

// Get workflow detail
const workflow = await client.workflows.get("whale-alert")

// List runs
const { runs } = await client.workflows.listRuns("whale-alert", {
  status: "completed",
  limit: 25,
})

// Get run detail — includes step-level logs and timing
const run = await client.workflows.getRun(runId)

// Trigger a manual workflow
const { runId } = await client.workflows.trigger("contract-analyzer", {
  contractId: "SP1234...::my-contract",
  depth: "deep",
})

// Pause / resume
await client.workflows.pause("whale-alert")
await client.workflows.resume("whale-alert")

// Delete
await client.workflows.delete("whale-alert")
# CLI equivalents
sl workflows ls
sl workflows get whale-alert
sl workflows runs whale-alert --status completed
sl workflows trigger contract-analyzer --input '{"contractId": "SP1234..."}'
sl workflows pause whale-alert
sl workflows delete whale-alert

Props

defineWorkflow
namestringrequired
triggerEventTrigger | StreamTrigger | ScheduleTrigger | ManualTriggerrequired
handler(ctx: WorkflowContext) => Promise<any>required
retriesRetryConfig3 attempts, 1s backoff
timeoutnumber300000 (5 min)
Trigger types
eventfilter: SubgraphFilter (same 13 types as streams)
streamfilter: SubgraphFilter (fires workflow directly, no webhook)
schedulecron: string, timezone?: string
manualinput?: Record<string, InputField>
Step primitives
step.run(id, fn)Isolated unit of work. Retries independently.
step.ai(id, opts)LLM analysis. Supports schema for structured output.
step.query(subgraph, table, opts)Direct Postgres query against subgraph tables.
step.count(subgraph, table, where)Row count against subgraph tables.
step.deliver(id, opts)Send results via webhook, Slack, Discord, Telegram, or email.
step.sleep(id, ms)Pause execution for a duration.
step.invoke(id, opts)Trigger another workflow and await its result.
step.mcp(id, opts)Call a tool on an external MCP server.
step.ai options
promptstringrequired
model'haiku' | 'sonnet'haiku
schemaRecord<string, SchemaField>
step.mcp options
serverstringrequired
toolstringrequired
argsRecord<string, unknown>
step.deliver targets
webhookurl, body, headers?
slackchannel, text
emailto, subject, body
discordwebhookUrl, content, username?, avatarUrl?
telegrambotToken, chatId, text, parseMode?
WorkflowRun
idstring
workflowNamestring
status'running' | 'completed' | 'failed' | 'cancelled'
stepsStepResult[]
durationnumber (ms)
aiTokensUsednumber
triggeredAtstring
completedAtstring | null