#
Message Store
#
Purpose and Functionality
The Message Store is a core component of the Nekuti Matching Engine that provides a sequenced record of all business-domain state changing events. It serves as an audit trail of all state-changing events, indexed so that consumers can dedupe and identify gaps before they persist them into their storage of choice. Typical use cases can include:
- Audit Trail: persist the sequence of events in a timeseries database for auditing purposes
- On-chain settlement: Use the output of the engine as input for on-chain settlement. The order of events is guaranteed to remain the same.
- Database storage: Whether it is for auditing or analysis, it could be useful to keep orders, positions, balances, instrument data, etc... in a database of some sort. We will walk through some examples of how to use the message store for these purposes.
#
Architecture Overview
The Message Store operates as a sequential, append-only log where each message corresponds to state changes triggered by a sequenced command. The engine keeps a record of each event until the consumer has acknowledged the message by purging the corresponding index. If the consumer falls behind or fails to purge the messages for a long period, it could lead to memory exhaustion in the engine. To prevent this, the engine will go under automatic suspension until the consumer catches up.
There can be more than one message store in the system. This can be useful if there are multiple consumers with different processing cadences.
- Indexing: Each message is indexed with a unique command index, allowing consumers to track their processing state and identify gaps.
- Ordering Guarantee: Messages maintain strict sequential order based on command indices
- Reliability: At-least-once delivery with explicit acknowledgment. If the consumer is offline for a while and misses events, it can subscribe to the feed from an earlier point.
- Memory Efficiency: Purged messages are removed from memory
- Fault Tolerance: Automatic suspension prevents memory exhaustion
#
Command Index Behavior
Command indices exhibit specific characteristics:
- Monotonic: Always increasing, never decreasing
- Non-Dense: Gaps exist due to read-only commands or silent updates (e.g., clock ticks)
- Multi-Message: A single command index may generate multiple messages (e.g., mark updates triggering executions and liquidations)
#
Message Types
The Message Store captures all business-domain state changes:
- Executions: events affecting orders and positions such as new orders, amends, cancels, fills, liquidations, ...
- Account transactions: events affecting balances such as deposits/withdrawals but also fees, funding payments, and pnl realization
- Liquidations: summary of a liquidation event. liquidations produce executions and transactions which will be contained in the two lists above
- Funding charges: summary of charges for funding payments. This doesn't include the actual payments, as they are part of the transactions list
- Liveness checks: heartbeat messages that can be used to synchronize the engine with external services. The producer of these would send a liveness check to the engine as a milestone, and can know which events have been processed by the engine before and after this milestone
#
Suspension Mechanism
The automatic suspension feature protects system integrity:
Suspension triggers when the oldest unpurged message exceeds the configured age limit
Effects During Suspension:
- New order placement blocked
- Mark updates accepted but not processed
- No liquidations occur
- Orders not triggered
Unsuspension:
- Automatic: When purging catches up and oldest message age falls below the threshold
- Manual: Operators can control timing, especially important during volatile markets
#
How-To Guides
#
How to Subscribe to a Message Store
Subscribe to a specific message store via WebSocket by:
- Connecting to the WebSocket feed endpoint
- Sending a subscription request with the store name
- Including the last processed command index (optional)
wscat ws://gateway:8181/joined?lastSeen=1234&storeId=store_name
Parameters:
storeId: The message store identifier (use%22%22for default store)lastSeen: Message index of the last processed message. The feed will resume from the next message after this index.
#
How to Acknowledge Message Consumption
Confirm message processing by sending a purge message:
- Include the command index up to which messages have been processed
- This frees memory in the engine by removing acknowledged messages
- Regular purging prevents memory exhaustion and automatic suspension
http://gateway:8181/MessageStore/Purge?storeId=store_name&messageIndex=1234
Parameters:
storeId: The message store identifier (use%22%22for default store)messageIndex: Message index of the last processed message. The engine will purge messages up to this index (included).
#
How to Configure Message Store Suspension
Protect against memory exhaustion by configuring the fail-safe suspension setting:
http://gateway:8181/MessageStore/Suspension?storeId=%22%22&maximumUnpurgedItemAgeMillis=3000000
Parameters:
storeId: The message store identifier (use%22%22for default store)maximumUnpurgedItemAgeMillis: Maximum age in milliseconds (default: 6 hours = 21,600,000ms)
#
How to Handle Exchange Suspension
When automatic suspension occurs:
- During Market Volatility: Manually suspend the exchange before recovering failed services
- Recovery Steps:
- Fix the consumer service issues
- Resume message purging
- Monitor until the oldest unpurged message age falls below the threshold
- The exchange will automatically unsuspend, or manually unsuspend when market conditions are favorable
#
Processing Messages and Storing in a Database
We will walk through an example of how to process messages from the Message Store and store them in a standard SQL database acting as a basic order management system. Find the full code for this example in this script
#
1. Set up an SQL database
This sets up a simple SQLite database with two tables: MessageIndex and Orders. The MessageIndex table stores the last known message index. Orders store the last known state of each order, keyed by orderId.
import sqlite3
def initialize_tables(self, database_name):
self.conn = sqlite3.connect(database_name)
cursor = self.conn.cursor()
# This creates a table that stores the last known message index. Since this script purges messages in the engine
# as soon as they have been processed, earlier messages would be deleted. It uses this message index to subscribe
# to the websocket feed from that index, which should be the first index still present in the engine.
cursor.execute("""
CREATE TABLE IF NOT EXISTS MessageIndex (lastIndex BIGINT)
""")
# Populate the first message index as 0 if the table is empty. This means we are resetting the database and have
# to subscribe from message 0
cursor.execute("SELECT COUNT(*) FROM MessageIndex")
if cursor.fetchone()[0] == 0:
cursor.execute("INSERT INTO MessageIndex (lastIndex) VALUES (?)", (0,))
# Create an order table with some relevant fields. More fields are available in the execution messages.
cursor.execute("""
CREATE TABLE IF NOT EXISTS Orders (
orderId VARCHAR(36) PRIMARY KEY,
account BIGINT,
symbol VARCHAR(23),
price DOUBLE PRECISION,
quantity BIGINT,
orderState VARCHAR(15),
filledQuantity BIGINT,
filledCost BIGINT,
averagePrice DOUBLE PRECISION,
lastUpdateTime VARCHAR(24),
rejectionType VARCHAR(64)
)
""")
# Create a trades table with some relevant fields. More fields are available in the execution messages.
cursor.execute("""
CREATE TABLE IF NOT EXISTS Trades (
executionId VARCHAR(36) PRIMARY KEY,
executionType VARCHAR(36),
orderId VARCHAR(36),
aggressingOrderId BIGINT,
symbol VARCHAR(23),
quantity BIGINT,
cost BIGINT,
price DOUBLE PRECISION,
commission DOUBLE PRECISION,
execCommission BIGINT,
transactionTime VARCHAR(24)
)
""")
#
2. Create a message store
Since this consumer would be responsible for purging messages as it's reading them, it's better to have a dedicated message store. This code will first check if there is a message store that has this name. If yes, it will simply return the first message index in this store. If not, it will create a new store and return the first message index, which will be the current index in the engine. It's important to note that until a message store is created, the messages are not saved inside the engine, so messages prior to creating the message store cannot be retrieved.
import json
def init_message_store(self, name):
response = self.get(f"/MessageStore/")
data = json.loads(response.text)
store = next((store for store in data if store["storeId"] == name), None)
if store is None:
print(f"Creating a new store: {name}")
response = self.put(f"/MessageStore?storeId={name}")
store = json.loads(response.text)
print(f"Message store {name} starts at {store['firstItemIndex']}")
return store["firstItemIndex"]
#
3. Find which index to start from
The previous step tells which index we can start from in the engine. However, the index stored in the database might be more recent. To avoid duplication, we can resume from the most recent index we know about. If the database's first known index is lower than the engine's first known index, then we are missing data. This is why it's important that nothing else other than this consumer can purge this message store. The below code snippet shows how to find the most recent index in the database.
def last_message_index(self):
cursor = self.conn.cursor()
cursor.execute("""
SELECT lastIndex FROM MessageIndex
""")
return cursor.fetchone()[0]
#
4. Subscribe to the websocket feed
This code will subscribe to the websocket feed from the last known index. The ws_url is the control gateway's websocket url. The callback function will be called for each message.
import json
import websockets
message_store_name = "my_message_store"
async def subscribe_ws(self, callback, max_retry=0):
retry = 0
while True:
try:
websocket_url = f"{self.ws_url}/joined?storeId={message_store_name}&lastSeen={self.first_index}"
print(websocket_url)
async with (websockets.connect(websocket_url, ping_interval=None) as websocket):
while True:
response = await websocket.recv()
data = json.loads(response)
if isinstance(data, str):
# The engine doesn't know about this message index. Perhaps it has been purged and there is a
# gap in the data.
await websocket.close()
raise Exception(f"There seems to be a gap in message index. Last known message is {self.first_index} but it is already purged in the engine")
# Handle the message
callback(data)
# Once the latest message is written to the database, we purge the engine message to free up its memory
message_index = data["messageIndex"]
self.purge_until(message_store_name, message_index)
except Exception as e:
retry += 1
print(f"Retry {retry}: Connection closed. {e}")
if retry > max_retry:
break
print("Retrying...")
#
5. Process messages
This will extract the relevant executions from the message and store them in the database as updates to the orders table. It also needs to update the message index table every time a message is processed. In our example we only care about orders, so we only look at executions that represent a change in order state.
import sqlite3
import json
def process_message(self, data):
try:
cursor = self.conn.cursor()
execution_groups = data["executions"]
for executions in execution_groups:
# First, identify which of the executions in the execution group represents the taker side.
# If the taker fills at multiple prices within the same trade, there will be more than one execution for the taker.
# This field can then be used to search and group all the executions that belong to the same trade.
aggressor = next((execution for execution in executions if "lastLiquidityInd" in execution and execution["lastLiquidityInd"] == "RemovedLiquidity"), None)
aggressor_order_id = None if aggressor is None else aggressor["orderId"]
for execution in executions:
# Only the following types of executions affect orders. Other execution types might affect positions.
if execution["execType"] in ("New", "Trade", "Canceled"):
# Updates the state of the order or create a new entry
cursor.execute("""
REPLACE INTO Orders VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
execution["orderId"],
execution["account"],
execution["symbol"],
execution["price"],
execution["orderQty"],
execution["ordStatus"],
execution["cumQty"],
execution["cumCost"],
execution["avgPx"],
execution["timestamp"],
execution["ordRejReason"] if "ordRejReason" in execution else None))
if execution["execType"] in ("Trade", "Liquidation", "Deleverage"):
# Updates the state of the order or create a new entry
cursor.execute("""
INSERT INTO Trades VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
execution["execId"],
execution["execType"],
execution["orderId"],
aggressor_order_id,
execution["symbol"],
execution["lastQty"],
execution["execCost"],
execution["lastPx"],
execution["commission"],
execution["execComm"],
execution["transactTime"]))
# Update the message index table to the new index after the message has been processed
message_index = data["messageIndex"]
cursor.execute("""
UPDATE MessageIndex SET lastIndex = (?)
""", (message_index,))
self.conn.commit()
except json.JSONDecodeError as e:
print(f"Failed to parse message: {e}")
return
except sqlite3.DatabaseError as e:
print(f"Failed to store message: {e}")
return
#
6. Purge messages
If the previous step is successful, we can purge the messages from the engine.
def purge_until(self, store, message_index):
self.delete(f"/MessageStore/Purge?storeId={store}&messageIndex={message_index}")
The consumer is fully in charge of purging messages from the engine. So even if it has to stop momentarily, data will be retained until it can resubscribe. The engine will automatically suspend if the consumer fails to purge messages for a long period of time.
#
Reference
#
Configuration Parameters
#
WebSocket Subscription Parameters
#
Error Conditions
#
Best Practices
- Regular Purging: Implement consistent purge acknowledgments to prevent memory buildup
- Index Tracking: Maintain persistent storage of the last processed command index
- Suspension Monitoring: Alert on approaching suspension thresholds
- Market-Aware Recovery: During volatile markets, use manual suspension control for safer recovery
- Consumer Health Checks: Monitor consumer services to prevent purge failures
For implementation examples and detailed API specifications, consult the Swagger documentation at http://your-control-gateway:8181/docs/.