# Message Store

# Purpose and Functionality

The Message Store is a core component of the Nekuti Matching Engine that provides a sequenced record of all business-domain state changing events. It serves as an audit trail of all state-changing events, indexed so that consumers can dedupe and identify gaps before they persist them into their storage of choice. Typical use cases can include:

  • Audit Trail: persist the sequence of events in a timeseries database for auditing purposes
  • On-chain settlement: Use the output of the engine as input for on-chain settlement. The order of events is guaranteed to remain the same.
  • Database storage: Whether it is for auditing or analysis, it could be useful to keep orders, positions, balances, instrument data, etc... in a database of some sort. We will walk through some examples of how to use the message store for these purposes.

# Architecture Overview

The Message Store operates as a sequential, append-only log where each message corresponds to state changes triggered by a sequenced command. The engine keeps a record of each event until the consumer has acknowledged the message by purging the corresponding index. If the consumer falls behind or fails to purge the messages for a long period, it could lead to memory exhaustion in the engine. To prevent this, the engine will go under automatic suspension until the consumer catches up.

There can be more than one message store in the system. This can be useful if there are multiple consumers with different processing cadences.

  • Indexing: Each message is indexed with a unique command index, allowing consumers to track their processing state and identify gaps.
  • Ordering Guarantee: Messages maintain strict sequential order based on command indices
  • Reliability: At-least-once delivery with explicit acknowledgment. If the consumer is offline for a while and misses events, it can subscribe to the feed from an earlier point.
  • Memory Efficiency: Purged messages are removed from memory
  • Fault Tolerance: Automatic suspension prevents memory exhaustion

# Command Index Behavior

Command indices exhibit specific characteristics:

  • Monotonic: Always increasing, never decreasing
  • Non-Dense: Gaps exist due to read-only commands or silent updates (e.g., clock ticks)
  • Multi-Message: A single command index may generate multiple messages (e.g., mark updates triggering executions and liquidations)

# Message Types

The Message Store captures all business-domain state changes:

  • Executions: events affecting orders and positions such as new orders, amends, cancels, fills, liquidations, ...
  • Account transactions: events affecting balances such as deposits/withdrawals but also fees, funding payments, and pnl realization
  • Liquidations: summary of a liquidation event. liquidations produce executions and transactions which will be contained in the two lists above
  • Funding charges: summary of charges for funding payments. This doesn't include the actual payments, as they are part of the transactions list
  • Liveness checks: heartbeat messages that can be used to synchronize the engine with external services. The producer of these would send a liveness check to the engine as a milestone, and can know which events have been processed by the engine before and after this milestone

# Suspension Mechanism

The automatic suspension feature protects system integrity:

Suspension triggers when the oldest unpurged message exceeds the configured age limit

Effects During Suspension:

  • New order placement blocked
  • Mark updates accepted but not processed
  • No liquidations occur
  • Orders not triggered

Unsuspension:

  • Automatic: When purging catches up and oldest message age falls below the threshold
  • Manual: Operators can control timing, especially important during volatile markets

# How-To Guides

# How to Subscribe to a Message Store

Subscribe to a specific message store via WebSocket by:

  1. Connecting to the WebSocket feed endpoint
  2. Sending a subscription request with the store name
  3. Including the last processed command index (optional)
wscat ws://gateway:8181/joined?lastSeen=1234&storeId=store_name

Parameters:

  • storeId: The message store identifier (use %22%22 for default store)
  • lastSeen: Message index of the last processed message. The feed will resume from the next message after this index.

# How to Acknowledge Message Consumption

Confirm message processing by sending a purge message:

  • Include the command index up to which messages have been processed
  • This frees memory in the engine by removing acknowledged messages
  • Regular purging prevents memory exhaustion and automatic suspension
http://gateway:8181/MessageStore/Purge?storeId=store_name&messageIndex=1234

Parameters:

  • storeId: The message store identifier (use %22%22 for default store)
  • messageIndex: Message index of the last processed message. The engine will purge messages up to this index (included).

# How to Configure Message Store Suspension

Protect against memory exhaustion by configuring the fail-safe suspension setting:

http://gateway:8181/MessageStore/Suspension?storeId=%22%22&maximumUnpurgedItemAgeMillis=3000000

Parameters:

  • storeId: The message store identifier (use %22%22 for default store)
  • maximumUnpurgedItemAgeMillis: Maximum age in milliseconds (default: 6 hours = 21,600,000ms)

# How to Handle Exchange Suspension

When automatic suspension occurs:

  1. During Market Volatility: Manually suspend the exchange before recovering failed services
  2. Recovery Steps:
    • Fix the consumer service issues
    • Resume message purging
    • Monitor until the oldest unpurged message age falls below the threshold
    • The exchange will automatically unsuspend, or manually unsuspend when market conditions are favorable

# Processing Messages and Storing in a Database

We will walk through an example of how to process messages from the Message Store and store them in a standard SQL database acting as a basic order management system. Find the full code for this example in this script

# 1. Set up an SQL database

This sets up a simple SQLite database with two tables: MessageIndex and Orders. The MessageIndex table stores the last known message index. Orders store the last known state of each order, keyed by orderId.

import sqlite3
def initialize_tables(self, database_name):
    self.conn = sqlite3.connect(database_name)
    cursor = self.conn.cursor()

    # This creates a table that stores the last known message index. Since this script purges messages in the engine
    # as soon as they have been processed, earlier messages would be deleted. It uses this message index to subscribe
    # to the websocket feed from that index, which should be the first index still present in the engine.
    cursor.execute("""
                       CREATE TABLE IF NOT EXISTS MessageIndex (lastIndex BIGINT)
                       """)

    # Populate the first message index as 0 if the table is empty. This means we are resetting the database and have
    # to subscribe from message 0
    cursor.execute("SELECT COUNT(*) FROM MessageIndex")
    if cursor.fetchone()[0] == 0:
        cursor.execute("INSERT INTO MessageIndex (lastIndex) VALUES (?)", (0,))

    # Create an order table with some relevant fields. More fields are available in the execution messages.
    cursor.execute("""
                       CREATE TABLE IF NOT EXISTS Orders (
                           orderId         VARCHAR(36) PRIMARY KEY,
                           account         BIGINT,
                           symbol          VARCHAR(23),
                           price           DOUBLE PRECISION,
                           quantity        BIGINT,
                           orderState      VARCHAR(15),
                           filledQuantity  BIGINT,
                           filledCost      BIGINT,
                           averagePrice    DOUBLE PRECISION,
                           lastUpdateTime  VARCHAR(24),
                           rejectionType   VARCHAR(64)
                           )
                       """)


    # Create a trades table with some relevant fields. More fields are available in the execution messages.
    cursor.execute("""
                       CREATE TABLE IF NOT EXISTS Trades (
                           executionId          VARCHAR(36) PRIMARY KEY,
                           executionType        VARCHAR(36),
                           orderId              VARCHAR(36),
                           aggressingOrderId    BIGINT,
                           symbol               VARCHAR(23),
                           quantity             BIGINT,
                           cost                 BIGINT,
                           price                DOUBLE PRECISION,
                           commission           DOUBLE PRECISION,
                           execCommission       BIGINT,
                           transactionTime      VARCHAR(24)
                           )
                       """)

# 2. Create a message store

Since this consumer would be responsible for purging messages as it's reading them, it's better to have a dedicated message store. This code will first check if there is a message store that has this name. If yes, it will simply return the first message index in this store. If not, it will create a new store and return the first message index, which will be the current index in the engine. It's important to note that until a message store is created, the messages are not saved inside the engine, so messages prior to creating the message store cannot be retrieved.

import json
def init_message_store(self, name):
    response = self.get(f"/MessageStore/")
    data = json.loads(response.text)
    store = next((store for store in data if store["storeId"] == name), None)
    if store is None:
        print(f"Creating a new store: {name}")
        response = self.put(f"/MessageStore?storeId={name}")
        store = json.loads(response.text)

    print(f"Message store {name} starts at {store['firstItemIndex']}")
    return store["firstItemIndex"]

# 3. Find which index to start from

The previous step tells which index we can start from in the engine. However, the index stored in the database might be more recent. To avoid duplication, we can resume from the most recent index we know about. If the database's first known index is lower than the engine's first known index, then we are missing data. This is why it's important that nothing else other than this consumer can purge this message store. The below code snippet shows how to find the most recent index in the database.

def last_message_index(self):
    cursor = self.conn.cursor()
    cursor.execute("""
            SELECT lastIndex FROM MessageIndex
        """)
    return cursor.fetchone()[0]

# 4. Subscribe to the websocket feed

This code will subscribe to the websocket feed from the last known index. The ws_url is the control gateway's websocket url. The callback function will be called for each message.

import json
import websockets
message_store_name = "my_message_store"

async def subscribe_ws(self, callback, max_retry=0):
    retry = 0
    while True:
        try:
            websocket_url = f"{self.ws_url}/joined?storeId={message_store_name}&lastSeen={self.first_index}"
            print(websocket_url)
            async with (websockets.connect(websocket_url, ping_interval=None) as websocket):
                while True:
                    response = await websocket.recv()
                    data = json.loads(response)
                    if isinstance(data, str):
                        # The engine doesn't know about this message index. Perhaps it has been purged and there is a
                        # gap in the data.
                        await websocket.close()
                        raise Exception(f"There seems to be a gap in message index. Last known message is {self.first_index} but it is already purged in the engine")

                    # Handle the message
                    callback(data)

                    # Once the latest message is written to the database, we purge the engine message to free up its memory
                    message_index = data["messageIndex"]
                    self.purge_until(message_store_name, message_index)

        except Exception as e:
            retry += 1
            print(f"Retry {retry}: Connection closed. {e}")
            if retry > max_retry:
                break
            print("Retrying...")

# 5. Process messages

This will extract the relevant executions from the message and store them in the database as updates to the orders table. It also needs to update the message index table every time a message is processed. In our example we only care about orders, so we only look at executions that represent a change in order state.

import sqlite3
import json
def process_message(self, data):
   try:
      cursor = self.conn.cursor()
      execution_groups = data["executions"]
      for executions in execution_groups:
         # First, identify which of the executions in the execution group represents the taker side. 
         # If the taker fills at multiple prices within the same trade, there will be more than one execution for the taker.
         # This field can then be used to search and group all the executions that belong to the same trade.
         aggressor = next((execution for execution in executions if "lastLiquidityInd" in execution and execution["lastLiquidityInd"] == "RemovedLiquidity"), None)
         aggressor_order_id = None if aggressor is None else aggressor["orderId"]
         for execution in executions:
            # Only the following types of executions affect orders. Other execution types might affect positions.
            if execution["execType"] in ("New", "Trade", "Canceled"):
               # Updates the state of the order or create a new entry
               cursor.execute("""
                                REPLACE INTO Orders VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                        """, (
                  execution["orderId"],
                  execution["account"],
                  execution["symbol"],
                  execution["price"],
                  execution["orderQty"],
                  execution["ordStatus"],
                  execution["cumQty"],
                  execution["cumCost"],
                  execution["avgPx"],
                  execution["timestamp"],
                  execution["ordRejReason"] if "ordRejReason" in execution else None))
            if execution["execType"] in ("Trade", "Liquidation", "Deleverage"):
               # Updates the state of the order or create a new entry
               cursor.execute("""
                                       INSERT INTO Trades VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                                       """, (
                  execution["execId"],
                  execution["execType"],
                  execution["orderId"],
                  aggressor_order_id,
                  execution["symbol"],
                  execution["lastQty"],
                  execution["execCost"],
                  execution["lastPx"],
                  execution["commission"],
                  execution["execComm"],
                  execution["transactTime"]))
   
      # Update the message index table to the new index after the message has been processed
      message_index = data["messageIndex"]
      cursor.execute("""
                           UPDATE MessageIndex SET lastIndex = (?)
                           """, (message_index,))

      self.conn.commit()

   except json.JSONDecodeError as e:
     print(f"Failed to parse message: {e}")
     return
   except sqlite3.DatabaseError as e:
     print(f"Failed to store message: {e}")
     return

# 6. Purge messages

If the previous step is successful, we can purge the messages from the engine.

def purge_until(self, store, message_index):
    self.delete(f"/MessageStore/Purge?storeId={store}&messageIndex={message_index}")

The consumer is fully in charge of purging messages from the engine. So even if it has to stop momentarily, data will be retained until it can resubscribe. The engine will automatically suspend if the consumer fails to purge messages for a long period of time.

# Reference

# Configuration Parameters

Parameter Description Default Endpoint
maximumUnpurgedItemAgeMillis Maximum age for unpurged messages before suspension 21,600,000 (6 hours) /MessageStore/Suspension
storeId Message store identifier "" (default store) Various

# WebSocket Subscription Parameters

Field Type Description Required
store_name string Name of the message store ("" for default) Yes
command_index integer Resume point for message consumption No

# Error Conditions

Condition Response Resolution
Command index before purged index Error returned Use a more recent index or omit to receive all messages
Consumer failure to purge Exchange suspension Fix consumer and resume purging
Message age threshold exceeded Automatic suspension Increase purge frequency or suspension threshold

# Best Practices

  1. Regular Purging: Implement consistent purge acknowledgments to prevent memory buildup
  2. Index Tracking: Maintain persistent storage of the last processed command index
  3. Suspension Monitoring: Alert on approaching suspension thresholds
  4. Market-Aware Recovery: During volatile markets, use manual suspension control for safer recovery
  5. Consumer Health Checks: Monitor consumer services to prevent purge failures

For implementation examples and detailed API specifications, consult the Swagger documentation at http://your-control-gateway:8181/docs/.