Getting Started with Chainlink Data Streams (Remix)

This guide shows you how to read data from a Data Streams stream, verify the answer onchain, and store it.

This example uses the Streams Trade implementation, with a Chainlink Automation Log Trigger to check for events that require data. For this example, the log trigger comes from a simple emitter contract. Chainlink Automation then uses StreamsLookup to retrieve a signed report from the Data Streams Aggregation Network, return the data in a callback, and run the performUpkeep function on your registered upkeep contract. The performUpkeep function calls the verify function on the verifier contract.

Note: To learn how to use the Streams Direct implementation of Data Streams, see the Fetch and decode reports via a REST API guide or the Stream and decode reports via WebSocket guide.

Before you begin

Tutorial

Deploy an upkeep contract that is enabled to retrieve data from Data Streams. For this example, you will read from the ETH/USD stream on Arbitrum Sepolia. This stream ID is 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782. See the Data Streams Crypto streams page for a complete list of available crypto assets.

  1. Open the StreamsUpkeep.sol contract in Remix.

  2. Select the StreamsUpkeep.sol contract in the Solidity Compiler tab.

    Chainlink Data Streams Solidity Compiler
  3. Compile the contract. You can ignore the warning messages for this example.

  4. Open MetaMask and set the network to Arbitrum Sepolia. If you need to add Arbitrum Sepolia to your wallet, you can find the chain ID and the LINK token contract address on the LINK Token Contracts page.

  5. On the Deploy & Run Transactions tab in Remix, select Injected Provider - MetaMask in the Environment list. Remix will use the MetaMask wallet to communicate with Arbitrum Sepolia.

    Chainlink Data Streams Injected Provider MetaMask
  6. In the Contract section, select the StreamsUpkeep contract and fill in the Arbitrum Sepolia verifier proxy address: 0x2ff010DEbC1297f19579B4246cad07bd24F2488A. You can find the verifier proxy addresses on the Stream Addresses page.

    Chainlink Data Streams Remix Deploy Upkeep Contract
  7. Click the Deploy button to deploy the contract. MetaMask prompts you to confirm the transaction. Check the transaction details to ensure you deploy the contract to Arbitrum Sepolia.

  8. After you confirm the transaction, the contract address appears under the Deployed Contracts list in Remix. Save this contract address for later.

    Chainlink Data Streams Remix Deployed Upkeep Contract

Deploy the emitter contract

This contract emits logs that trigger the upkeep. This code can be part of your dApp. For example, you might emit log triggers when your users initiate a trade or other action requiring data retrieval. For this Getting Started guide, use a very simple emitter so you can test the upkeep and data retrieval.

  1. Open the LogEmitter.sol contract in Remix.

  2. Under the Solidity Compiler tab, select the 0.8.19 Solidity compiler and click the Compile LogEmitter.sol button to compile the contract.

    Chainlink Data Streams Remix Compile Log Emitter Contract
  3. Open MetaMask and make sure the network is still set to Arbitrum Sepolia.

  4. On the Deploy & Run Transactions tab in Remix, ensure the Environment is still set to Injected Provider - MetaMask.

    Chainlink Data Streams Injected Provider MetaMask
  5. Click the Deploy button to deploy the contract. MetaMask prompts you to confirm the transaction. Check the transaction details to ensure you deploy the contract to Arbitrum Sepolia.

    Chainlink Data Streams Deploy Emitter Contract
  6. After you confirm the transaction, the contract address appears in the Deployed Contracts list. Save this contract address for later.

    Chainlink Data Streams Deployed Emitter Contract

Register the upkeep

Register a new Log trigger upkeep. See Automation Log Triggers to learn more about how to register Log Trigger upkeeps.

  1. Go to the Chainlink Automation UI for Arbitrum Sepolia and connect your browser wallet.

  2. Click Register new Upkeep.

  3. Select the Log trigger upkeep type and click Next.

  4. Specify the upkeep contract address you saved earlier as the Contract to automate. In this example, you can ignore the warning about the Automation compatible contract verification. Click Next.

  5. Specify the emitter contract address that you saved earlier. This tells Chainlink Automation what contracts to watch for log triggers. Then click Next.

  6. Provide the ABI if the contract is not validated. To find the ABI of your contract in Remix, navigate to the Solidity Compiler tab. Then, copy the ABI to your clipboard using the button at the bottom of the panel.

    Chainlink Data Streams Remix Log Emitter ABI
  7. Select the Log event as the triggering event in the Emitted log dropdown. Log index topic filters are optional filters to narrow the logs you want to trigger your upkeep. For this example, leave the field blank. Click Next.

  8. Specify a name for the upkeep.

  9. Specify a Starting balance of 1 testnet LINK for this example. You can retrieve unused LINK later.

  10. Leave the Check data value and other fields blank for now, and click Register Upkeep. MetaMask prompts you to confirm the transaction. Wait for the transaction to complete.

Fund the upkeep contract

In this example, the upkeep contract pays for onchain verification of reports from Data Streams. The Automation subscription does not cover the cost.

Open MetaMask and send 1 testnet LINK on Arbitrum Sepolia to the upkeep contract address you saved earlier.

Chainlink Data Streams Fund Deployed Upkeep

Emit a log

You can use your emitter contract to emit a log and initiate the upkeep, which retrieves data for the specified stream ID.

  1. In Remix, on the Deploy & Run Transactions tab, expand your emitter contract under the Deployed Contracts section.

    Chainlink Data Streams Emit Log
  2. Click the emitLog button to call the function and emit a log. MetaMask prompts you to accept the transaction.

After the transaction is complete, the log is emitted, and the upkeep is triggered. You can find the upkeep transaction hash in the Chainlink Automation UI. Check to make sure the transaction is successful.

View the retrieved price

The retrieved price is stored in the lastDecodedPrice storage variable.

  1. On the Deploy & Run Transactions tab in Remix, expand the details of your upkeep contract in the Deployed Contracts section.

  2. Click the lastDecodedPrice getter function to view the retrieved price. The answer on the ETH/USD stream uses 18 decimal places, so an answer of 248412100000000000 indicates an ETH/USD price of 2,484.121. Some streams may use a different number of decimal places for answers. See the Data Streams Crypto streams page for more information.

    Chainlink Data Streams Deployed Upkeep

Examine the code

The example code you deployed has all the interfaces and functions required to work with Chainlink Automation as an upkeep contract. It follows a similar flow to the trading flow in the Architecture documentation but uses a basic log emitter to simulate the client contract that would initiate a StreamsLookup. After the contract receives and verifies the report, performUpkeep stores the price from the report in the lastDecodedPrice variable. You could modify this to use the data in a way that works for your specific use case and application.

The code example uses revert with StreamsLookup to convey call information about what streams to retrieve. See the EIP-3668 rationale for more information about how to use revert in this way.

// SPDX-License-Identifier: MIT
pragma solidity 0.8.19;

import {Common} from "@chainlink/contracts/src/v0.8/llo-feeds/libraries/Common.sol";
import {StreamsLookupCompatibleInterface} from "@chainlink/contracts/src/v0.8/automation/interfaces/StreamsLookupCompatibleInterface.sol";
import {ILogAutomation, Log} from "@chainlink/contracts/src/v0.8/automation/interfaces/ILogAutomation.sol";
import {IRewardManager} from "@chainlink/contracts/src/v0.8/llo-feeds/interfaces/IRewardManager.sol";
import {IVerifierFeeManager} from "@chainlink/contracts/src/v0.8/llo-feeds/interfaces/IVerifierFeeManager.sol";
import {IERC20} from "@chainlink/contracts/src/v0.8/vendor/openzeppelin-solidity/v4.8.3/contracts/interfaces/IERC20.sol";

/**
 * THIS IS AN EXAMPLE CONTRACT THAT USES UN-AUDITED CODE FOR DEMONSTRATION PURPOSES.
 * DO NOT USE THIS CODE IN PRODUCTION.
 */

// Custom interfaces for IVerifierProxy and IFeeManager
interface IVerifierProxy {
    /**
     * @notice Verifies that the data encoded has been signed.
     * correctly by routing to the correct verifier, and bills the user if applicable.
     * @param payload The encoded data to be verified, including the signed
     * report.
     * @param parameterPayload Fee metadata for billing. For the current implementation this is just the abi-encoded fee token ERC-20 address.
     * @return verifierResponse The encoded report from the verifier.
     */
    function verify(
        bytes calldata payload,
        bytes calldata parameterPayload
    ) external payable returns (bytes memory verifierResponse);

    function s_feeManager() external view returns (IVerifierFeeManager);
}

interface IFeeManager {
    /**
     * @notice Calculates the fee and reward associated with verifying a report, including discounts for subscribers.
     * This function assesses the fee and reward for report verification, applying a discount for recognized subscriber addresses.
     * @param subscriber The address attempting to verify the report. A discount is applied if this address
     * is recognized as a subscriber.
     * @param unverifiedReport The report data awaiting verification. The content of this report is used to
     * determine the base fee and reward, before considering subscriber discounts.
     * @param quoteAddress The payment token address used for quoting fees and rewards.
     * @return fee The fee assessed for verifying the report, with subscriber discounts applied where applicable.
     * @return reward The reward allocated to the caller for successfully verifying the report.
     * @return totalDiscount The total discount amount deducted from the fee for subscribers
     */
    function getFeeAndReward(
        address subscriber,
        bytes memory unverifiedReport,
        address quoteAddress
    ) external returns (Common.Asset memory, Common.Asset memory, uint256);

    function i_linkAddress() external view returns (address);

    function i_nativeAddress() external view returns (address);

    function i_rewardManager() external view returns (address);
}

contract StreamsUpkeep is ILogAutomation, StreamsLookupCompatibleInterface {
    error InvalidReportVersion(uint16 version); // Thrown when an unsupported report version is provided to verifyReport.

    /**
     * @dev Represents a data report from a Data Streams stream for v3 schema (crypto streams).
     * The `price`, `bid`, and `ask` values are carried to either 8 or 18 decimal places, depending on the stream.
     * For more information, see https://docs.chain.link/data-streams/crypto-streams and https://docs.chain.link/data-streams/reference/report-schema
     */
    struct ReportV3 {
        bytes32 feedId; // The stream ID the report has data for.
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable.
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable.
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (e.g., WETH/ETH).
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK.
        uint32 expiresAt; // Latest timestamp where the report can be verified onchain.
        int192 price; // DON consensus median price (8 or 18 decimals).
        int192 bid; // Simulated price impact of a buy order up to the X% depth of liquidity utilisation (8 or 18 decimals).
        int192 ask; // Simulated price impact of a sell order up to the X% depth of liquidity utilisation (8 or 18 decimals).
    }

    /**
     * @dev Represents a data report from a Data Streams stream for v4 schema (RWA streams).
     * The `price` value is carried to either 8 or 18 decimal places, depending on the stream.
     * The `marketStatus` indicates whether the market is currently open. Possible values: `0` (`Unknown`), `1` (`Closed`), `2` (`Open`).
     * For more information, see https://docs.chain.link/data-streams/rwa-streams and https://docs.chain.link/data-streams/reference/report-schema-v4
     */
    struct ReportV4 {
        bytes32 feedId; // The stream ID the report has data for.
        uint32 validFromTimestamp; // Earliest timestamp for which price is applicable.
        uint32 observationsTimestamp; // Latest timestamp for which price is applicable.
        uint192 nativeFee; // Base cost to validate a transaction using the report, denominated in the chain’s native token (e.g., WETH/ETH).
        uint192 linkFee; // Base cost to validate a transaction using the report, denominated in LINK.
        uint32 expiresAt; // Latest timestamp where the report can be verified onchain.
        int192 price; // DON consensus median benchmark price (8 or 18 decimals).
        uint32 marketStatus; // The DON's consensus on whether the market is currently open.
    }

    struct Quote {
        address quoteAddress;
    }

    IVerifierProxy public verifier;

    address public FEE_ADDRESS;
    string public constant DATASTREAMS_FEEDLABEL = "feedIDs";
    string public constant DATASTREAMS_QUERYLABEL = "timestamp";
    int192 public lastDecodedPrice;

    // This example reads the ID for the ETH/USD report.
    // Find a complete list of IDs at https://docs.chain.link/data-streams/crypto-streams.
    string[] public feedIds = [
        "0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782"
    ];

    constructor(address _verifier) {
        verifier = IVerifierProxy(_verifier);
    }

    // This function uses revert to convey call information.
    // See https://eips.ethereum.org/EIPS/eip-3668#rationale for details.
    function checkLog(
        Log calldata log,
        bytes memory
    ) external returns (bool upkeepNeeded, bytes memory performData) {
        revert StreamsLookup(
            DATASTREAMS_FEEDLABEL,
            feedIds,
            DATASTREAMS_QUERYLABEL,
            log.timestamp,
            ""
        );
    }

    /**
     * @notice this is a new, optional function in streams lookup. It is meant to surface streams lookup errors.
     * @return upkeepNeeded boolean to indicate whether the keeper should call performUpkeep or not.
     * @return performData bytes that the keeper should call performUpkeep with, if
     * upkeep is needed. If you would like to encode data to decode later, try `abi.encode`.
     */
    function checkErrorHandler(
        uint256 /*errCode*/,
        bytes memory /*extraData*/
    ) external pure returns (bool upkeepNeeded, bytes memory performData) {
        return (true, "0");
        // Hardcoded to always perform upkeep.
        // Read the StreamsLookup error handler guide for more information.
        // https://docs.chain.link/chainlink-automation/guides/streams-lookup-error-handler
    }

    // The Data Streams report bytes is passed here.
    // extraData is context data from stream lookup process.
    // Your contract may include logic to further process this data.
    // This method is intended only to be simulated offchain by Automation.
    // The data returned will then be passed by Automation into performUpkeep
    function checkCallback(
        bytes[] calldata values,
        bytes calldata extraData
    ) external pure returns (bool, bytes memory) {
        return (true, abi.encode(values, extraData));
    }

    // function will be performed onchain
    function performUpkeep(bytes calldata performData) external {
        // Decode the performData bytes passed in by CL Automation.
        // This contains the data returned by your implementation in checkCallback().
        (bytes[] memory signedReports, bytes memory extraData) = abi.decode(
            performData,
            (bytes[], bytes)
        );

        bytes memory unverifiedReport = signedReports[0];

        (, /* bytes32[3] reportContextData */ bytes memory reportData) = abi
            .decode(unverifiedReport, (bytes32[3], bytes));

        // Extract report version from reportData
        uint16 reportVersion = (uint16(uint8(reportData[0])) << 8) |
            uint16(uint8(reportData[1]));

        // Validate report version
        if (reportVersion != 3 && reportVersion != 4) {
            revert InvalidReportVersion(uint8(reportVersion));
        }

        // Report verification fees
        IFeeManager feeManager = IFeeManager(address(verifier.s_feeManager()));
        IRewardManager rewardManager = IRewardManager(
            address(feeManager.i_rewardManager())
        );

        address feeTokenAddress = feeManager.i_linkAddress();
        (Common.Asset memory fee, , ) = feeManager.getFeeAndReward(
            address(this),
            reportData,
            feeTokenAddress
        );

        // Approve rewardManager to spend this contract's balance in fees
        IERC20(feeTokenAddress).approve(address(rewardManager), fee.amount);

        // Verify the report
        bytes memory verifiedReportData = verifier.verify(
            unverifiedReport,
            abi.encode(feeTokenAddress)
        );

        // Decode verified report data into the appropriate Report struct based on reportVersion
        if (reportVersion == 3) {
            // v3 report schema
            ReportV3 memory verifiedReport = abi.decode(
                verifiedReportData,
                (ReportV3)
            );

            // Store the price from the report
            lastDecodedPrice = verifiedReport.price;
        } else if (reportVersion == 4) {
            // v4 report schema
            ReportV4 memory verifiedReport = abi.decode(
                verifiedReportData,
                (ReportV4)
            );

            // Store the price from the report
            lastDecodedPrice = verifiedReport.price;
        }
    }
}

Initializing the upkeep contract

When you deploy the contract, you define the verifier proxy address. You can find this address on the Stream Addresses page. The IVerifierProxy interface provides the following functions:

  • The s_feeManager function to estimate the verification fees.
  • The verify function to verify the report onchain.

Emitting a log, retrieving, and verifying the report

After registering your upkeep contract with Chainlink Automation with a log trigger, you can emit a log with the emitLog function from your emitter contract.

  1. The emitted log triggers the Chainlink Automation upkeep.
  2. Chainlink Automation then uses StreamsLookup to retrieve a signed report from the Data Streams Aggregation Network, returns the data in a callback (checkCallback), and runs the performUpkeep function on your registered upkeep contract.
  3. The performUpkeep function calls the verify function on the verifier contract to verify the report onchain.
  4. In this example, the performUpkeep function also stores the price from the report in the lastDecodedPrice state variable.

Viewing the retrieved price

The lastDecodedPrice getter function of your upkeep contract retrieves the last price stored by the performUpkeep function in the lastDecodedPrice state variable of the StreamsUpkeep contract.

Feed ID types and conversion

Chainlink Data Streams uses different data types for feed IDs at different stages of the process:

  • The StreamsLookup error requires feed IDs to be provided as an array of string,
  • The decoded reports within the contract use bytes32 types for feed IDs (see the Report Schemas reference).

If your application needs to compare the feed ID(s) sent in the StreamsLookup with those received in the report(s), you must convert between string and bytes32 types.

Optional: Handle Data Streams fetching errors offchain with checkErrorHandler

When Automation detects the triggering event, it runs the checkLog function of your upkeep contract, which includes a StreamsLookup revert custom error. The StreamsLookup revert enables your upkeep to fetch a report from the Data Streams Aggregation Network. If the report is fetched successfully, the checkCallback function is evaluated offchain. Otherwise, the checkErrorHandler function is evaluated offchain to determine what Automation should do next.

In this example, the checkErrorHandler is set to always return true for upkeepNeeded. This implies that the upkeep is always triggered, even if the report fetching fails. You can modify the checkErrorHandler function to handle errors offchain in a way that works for your specific use case. Read more about using the StreamsLookup error handler.

What's next

Get the latest Chainlink content straight to your inbox.