Stream and decode reports via WebSocket using the SDK (RWA Streams)

In this guide, you'll learn how to use Chainlink Data Streams with the Streams Direct implementation and the Data Streams SDK for Go to subscribe to real-time V4 reports for Real World Assets (RWAs) streams via a WebSocket connection. You'll set up your Go project, listen for real-time reports from the Data Streams Aggregation Network, decode the report data, and log their attributes to your terminal.

Requirements

  • Git: Make sure you have Git installed. You can check your current version by running git --version in your terminal and download the latest version from the official Git website if necessary.
  • Go Version: Make sure you have Go version 1.21 or higher. You can check your current version by running go version in your terminal and download the latest version from the official Go website if necessary.
  • API Credentials: Access to the Streams Direct implementation requires API credentials. If you haven't already, contact us to request mainnet or testnet early access.

Guide

Set up your Go project

  1. Create a new directory for your project and navigate to it:

    mkdir my-data-streams-project
    cd my-data-streams-project
    
  2. Initialize a new Go module:

    go mod init my-data-streams-project
    
  3. Install the Data Streams SDK:

    go get github.com/smartcontractkit/data-streams-sdk/go
    

Establish a WebSocket connection and listen for real-time reports

  1. Create a new Go file, stream.go, in your project directory:

    touch stream.go
    
  2. Insert the following code example and save your stream.go file:

     package main
    
     import (
       "context"
       "fmt"
       "os"
       "time"
    
       streams "github.com/smartcontractkit/data-streams-sdk/go"
       feed "github.com/smartcontractkit/data-streams-sdk/go/feed"
       report "github.com/smartcontractkit/data-streams-sdk/go/report"
       v4 "github.com/smartcontractkit/data-streams-sdk/go/report/v4" // Import the v4 report schema for RWA streams. For Crypto streams, use the v3 schema.
     )
    
     func main() {
       if len(os.Args) < 2 {
         fmt.Println("Usage: go run stream.go [StreamID1] [StreamID2] ...")
         os.Exit(1)
       }
    
       // Set up the SDK client configuration
       cfg := streams.Config{
         ApiKey:    os.Getenv("API_KEY"),
         ApiSecret: os.Getenv("API_SECRET"),
         WsURL: "wss://ws.testnet-dataengine.chain.link",
         Logger: streams.LogPrintf,
       }
    
       // Create a new client
       client, err := streams.New(cfg)
       if err != nil {
         cfg.Logger("Failed to create client: %v\n", err)
         os.Exit(1)
       }
    
       // Parse the feed IDs from the command line arguments
       var ids []feed.ID
       for _, arg := range os.Args[1:] {
         var fid feed.ID
         if err := fid.FromString(arg); err != nil {
           cfg.Logger("Invalid stream ID %s: %v\n", arg, err)
           os.Exit(1)
         }
         ids = append(ids, fid)
       }
    
       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
       defer cancel()
    
       // Subscribe to the feeds
       stream, err := client.Stream(ctx, ids)
       if err != nil {
         cfg.Logger("Failed to subscribe: %v\n", err)
         os.Exit(1)
       }
    
       defer stream.Close()
         for {
             reportResponse, err := stream.Read(context.Background())
             if err != nil {
                 cfg.Logger("Error reading from stream: %v\n", err)
                 continue
             }
    
             // Log the contents of the report before decoding
             cfg.Logger("Raw report data: %+v\n", reportResponse)
    
                 // Decode each report as it comes in
                 decodedReport, decodeErr := report.Decode[v4.Data](reportResponse.FullReport)
                 if decodeErr != nil {
                     cfg.Logger("Failed to decode report: %v\n", decodeErr)
                     continue
                 }
    
             // Log the decoded report
             cfg.Logger("\n--- Report Stream ID: %s ---\n" +
               "------------------------------------------\n" +
               "Observations Timestamp : %d\n" +
               "Benchmark Price        : %s\n" +
               "Valid From Timestamp   : %d\n" +
               "Expires At             : %d\n" +
               "Link Fee               : %s\n" +
               "Native Fee             : %s\n" +
               "Market Status          : %d\n" +
               "------------------------------------------\n",
               reportResponse.FeedID.String(),
               decodedReport.Data.ObservationsTimestamp,
               decodedReport.Data.BenchmarkPrice.String(),
               decodedReport.Data.ValidFromTimestamp,
               decodedReport.Data.ExpiresAt,
               decodedReport.Data.LinkFee.String(),
               decodedReport.Data.NativeFee.String(),
               decodedReport.Data.MarketStatus,
             )
    
             // Also, log the stream stats
             cfg.Logger("\n--- Stream Stats ---\n" +
             stream.Stats().String() + "\n" +
             "--------------------------------------------------------------------------------------------------------------------------------------------\n",
             )
         }
     }
    
  3. Download the required dependencies and update the go.mod and go.sum files:

    go mod tidy
    
  4. Set up the SDK client configuration within stream.go with your API credentials and the WebSocket URL:

    cfg := streams.Config{
        ApiKey:    os.Getenv("API_KEY"),
        ApiSecret: os.Getenv("API_SECRET"),
        WsURL: "wss://ws.testnet-dataengine.chain.link",
        Logger: streams.LogPrintf,
    }
    
    • Set your API credentials as environment variables:

      export API_KEY="<YOUR_API_KEY>"
      export API_SECRET="<YOUR_API_SECRET>"
      

      Replace <YOUR_API_KEY> and <YOUR_API_SECRET> with your API credentials.

    • WsURL is the WebSocket URL for the Data Streams Aggregation Network. Use wss://ws.testnet-dataengine.chain.link for the testnet environment.

    See the SDK Reference page for more configuration options.

  5. For this example, you'll subscribe to the AUD/USD RWA streams. This stream ID is 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea. See the RWA Streams page for a complete list of available Real World Assets.

    Execute your application:

    go run stream.go 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea
    

    Expect output similar to the following in your terminal:

     2024-10-24T12:52:50-05:00 Raw report data: {"fullReport":"0x0006aee203ef23a892e75b579f8c3f26fd933d9ca45de95c2f8ac470f4ddcd7600000000000000000000000000000000000000000000000000000000015de214000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000026001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea00000000000000000000000000000000000000000000000000000000671a897200000000000000000000000000000000000000000000000000000000671a89720000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000671bdaf20000000000000000000000000000000000000000000000000936a5885261c000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000029bb0fa88b2783870c74deae7db07a588344b1ef32a58dd00589c5ebc24b1b978075d1fa41ce8670633be03f555d297ff4b005b19c58577bc70e68803160b4120000000000000000000000000000000000000000000000000000000000000000238ea2b71402354ebabb4d9be738a83d13e4f67512d13bf8ee078b6a6f2851cdf1fc20d5eb054527ec82dadcd1206dd1d36009db1e95c9ac88defc1d54cab0826","feedID":"0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea","validFromTimestamp":1729792370,"observationsTimestamp":1729792370}
    
     2024-10-24T12:52:50-05:00
     --- Report Stream ID: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea ---
     ------------------------------------------
     Observations Timestamp : 1729792370
     Benchmark Price        : 663900000000000000
     Valid From Timestamp   : 1729792370
     Expires At             : 1729878770
     Link Fee               : 0
     Native Fee             : 0
     Market Status          : 2
     ------------------------------------------
    
     2024-10-24T12:52:50-05:00
     --- Stream Stats ---
     accepted: 1, deduplicated: 0, total_received 1, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
     2024-10-24T12:52:51-05:00 Raw report data: {"fullReport":"0x0006aee203ef23a892e75b579f8c3f26fd933d9ca45de95c2f8ac470f4ddcd7600000000000000000000000000000000000000000000000000000000015de218000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000026000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea00000000000000000000000000000000000000000000000000000000671a897300000000000000000000000000000000000000000000000000000000671a89730000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000671bdaf30000000000000000000000000000000000000000000000000936aa14799b100000000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002d8e3d2f04c9de178bc958eecdd747c975248fa2fd8745485ff05105d0f99abdc11caf5d40d80e6d832dce82748fb688ecc967654a09b5ba45bc6b62d7023aa9600000000000000000000000000000000000000000000000000000000000000021837177262094c1183fff139f494c2923f308851f722d80d7793a4d0ea855bec26f35f8112eae664484d16cccfacf15c8710bc44397f0bd1ca0869241a6156f9","feedID":"0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea","validFromTimestamp":1729792371,"observationsTimestamp":1729792371}
    
     2024-10-24T12:52:51-05:00
     --- Report Stream ID: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea ---
     ------------------------------------------
     Observations Timestamp : 1729792371
     Benchmark Price        : 663905000000000000
     Valid From Timestamp   : 1729792371
     Expires At             : 1729878771
     Link Fee               : 0
     Native Fee             : 0
     Market Status          : 2
     ------------------------------------------
    
     2024-10-24T12:52:51-05:00
     --- Stream Stats ---
     accepted: 2, deduplicated: 0, total_received 2, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
     2024-10-24T12:52:52-05:00 Raw report data: {"fullReport":"0x0006aee203ef23a892e75b579f8c3f26fd933d9ca45de95c2f8ac470f4ddcd7600000000000000000000000000000000000000000000000000000000015de303000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000026000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea00000000000000000000000000000000000000000000000000000000671a897400000000000000000000000000000000000000000000000000000000671a89740000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000671bdaf40000000000000000000000000000000000000000000000000936a5885261c0000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000286acd46943638d6999878133ce893f42720070ec0937b28cbd69c07b243f3f4130627d493403bbf6a3c914cd0bdf54d254490a084e23201d3bcd19797e94162e00000000000000000000000000000000000000000000000000000000000000022cac7476948bbda325c2e73ac86a7b66fd35daada10d7aa9d71f9d737376573407171b150dcc244d88ecc3f94e99668debdc046c564b2343f8e8590a9d8a92cc","feedID":"0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea","validFromTimestamp":1729792372,"observationsTimestamp":1729792372}
    
     2024-10-24T12:52:52-05:00
     --- Report Stream ID: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea ---
     ------------------------------------------
     Observations Timestamp : 1729792372
     Benchmark Price        : 663900000000000000
     Valid From Timestamp   : 1729792372
     Expires At             : 1729878772
     Link Fee               : 0
     Native Fee             : 0
     Market Status          : 2
     ------------------------------------------
    
     2024-10-24T12:52:52-05:00
     --- Stream Stats ---
     accepted: 3, deduplicated: 0, total_received 3, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
    [...]
    

Decoded report details

The decoded report details include:

AttributeValueDescription
Stream ID0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7eaThe unique identifier for the stream. In this example, the stream is for AUD/USD.
Observations Timestamp1729792372The timestamp indicating when the data was captured.
Benchmark Price663900000000000000The observed price in the report, with 18 decimals. For readability: 0.6639 USD per EUR.
Valid From Timestamp1729792372The start validity timestamp for the report, indicating when the data becomes relevant.
Expires At1729878772The expiration timestamp of the report, indicating the point at which the data becomes outdated.
Link Fee0The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.
Native Fee0The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.
Market Status2The DON's consensus on whether the market is currently open. Possible values: 0 (Unknown), 1 (Closed), 2 (Open).

Payload for onchain verification

In this guide, you log and decode the fullReport payload to extract the report data. In a production environment, you should verify the data onchain to ensure its integrity and authenticity. Refer to the Verify report data onchain guide.

Explanation

Establishing a WebSocket connection and listening for reports

Your application uses the Stream function in the Data Streams SDK's client package to establish a real-time WebSocket connection with the Data Streams Aggregation Network.

Once the WebSocket connection is established, your application subscribes to one or more streams by passing an array of feed.IDs to the Stream function. This subscription lets the client receive real-time updates whenever new report data is available for the specified streams.

Decoding a report

As data reports arrive via the established WebSocket connection, they are processed in real-time:

  • Reading streams: The Read method on the returned Stream object is continuously called within a loop. This method blocks until new data is available, ensuring that all incoming reports are captured as soon as they are broadcasted.

  • Decoding reports: For each received report, the SDK's Decode function parses and transforms the raw data into a structured format (v4.Data for RWA streams). This decoded data includes data such as the observation timestamp, benchmark price, and market status from the report data.

Handling the decoded data

In this example, the application logs the structured report data to the terminal. However, this data can be used for further processing, analysis, or display in your own application.

What's next

Get the latest Chainlink content straight to your inbox.