Cortical Labs SDK: Simulator-Only Components
Overview
This simulator-only module defines the pluggable data-source layer that feeds the SDK's in-process simulator. A data source produces sample frames and may also produce spikes. It may react to committed stimulation events, so closed-loop tests can model stim-to-spike responses without running the real data provider.
The SDK ships two built-in sources:
- File recording replay: replays an HDF5 recording selected with
CL_SDK_REPLAY_PATH. - Random source: generates deterministic synthetic frames and spikes on demand. This is the default when no replay file or custom source is set.
Custom sources can be backed by hardware, a network stream, a model, a test
fixture, or a precomputed dataset. As long as the source exposes stable
metadata and returns frames at the SDK sample rate, cl.neurons, recording,
analysis, and WebSocket visualisation use it like any other simulator source.
Concepts
- Frames are the atomic neural data unit: an
int16array shaped(frame_count, channel_count)in raw sample units. UseSimulatorDataSourceMetadata.uV_per_sample_unitto convert to microvolts. - Spikes are
DataSourceSpikerecords. A source can return spikes inline with frames or emit them asynchronously from a live source. - Stims are committed
Neurons.stim()events delivered toon_stim(stim)/on_stims(stims)in the producer subprocess. - Batches (
DataSourceBatch) are returned byread(): frames plus any spikes whose timestamps fall in that read window. - Metadata (
SimulatorDataSourceMetadata) describes channel count, sample rate, start timestamp, duration, seekability, real-time constraints, and accelerated-time support. The sample rate must currently be 25 kHz.
Source Types
There are two source styles:
- Pull sources subclass
SimulatorDataSourceand implementread(from_timestamp, frame_count). The simulator asks them for each block. Use these for seekable files, deterministic fixtures, generated data, or models that can produce arbitrary timestamp ranges. - Push/live sources subclass
LiveSimulatorDataSourceand implementstart(sink). They push frames and spikes into aLiveDataSinkfrom their own thread, event loop, or external process. The base class buffers data and serves sequentialread()calls. Live sources are forced non-seekable, real-time only, and not accelerated unlesssupports_accelerated=True.
Registering a Custom Source
Register custom sources by import path so the simulator subprocess can build them independently:
import cl
cl.sim.set_simulator_data_source(
"my_package.my_module:my_source_factory",
config={"param1": 42, "param2": "hello"},
metadata=cl.sim.SimulatorDataSourceMetadata(channel_count=64),
)
The first argument is a "module:attribute" string, or an importable callable
that the SDK can resolve to that form. The optional config dict is forwarded
as keyword arguments to the factory in the simulator subprocess and must be
JSON-serialisable.
Pass metadata= when it is cheap and static. This avoids constructing the
source in the parent process just to inspect metadata. If metadata is omitted,
the SDK constructs the source once in the parent, reads source.metadata, then
passes that validated metadata to the producer process.
The producer always constructs the real source and validates its runtime metadata against the parent metadata before opening. If the two differ, opening fails. This catches sources whose metadata depends on process-local state, environment, random choices, or mutable config.
To revert to the default source, call:
The same configuration can be applied by environment variables:
CL_SDK_DATA_SOURCE: import path of the factory.CL_SDK_DATA_SOURCE_CONFIG: JSON object of factory keyword arguments.CL_SDK_DATA_SOURCE_METADATA: JSON object ofSimulatorDataSourceMetadatafields. Optional, but avoids parent-process construction for metadata discovery.
Implementing a Pull Source
Pull sources are constructed in the parent process for metadata discovery unless metadata is supplied at registration time, and are always constructed in the simulator subprocess for real reads. Keep factories importable and avoid depending on non-importable parent objects.
import numpy as np
from cl.sim import (
DataSourceBatch,
DataSourceStim,
SimulatorDataSource,
SimulatorDataSourceMetadata,
)
class MyDataSource(SimulatorDataSource):
def __init__(self, scale: float = 1.0):
self._scale = scale
@property
def metadata(self) -> SimulatorDataSourceMetadata:
return SimulatorDataSourceMetadata(
channel_count = 64,
duration_frames = None, # unbounded
seekable = True,
)
def open(self) -> None:
# Acquire subprocess-local resources here.
pass
def close(self) -> None:
pass
def on_stim(self, stim: DataSourceStim) -> None:
# Called when a stim has been committed to the simulator timeline.
pass
def read(self, from_timestamp: int, frame_count: int) -> DataSourceBatch:
frames = (np.random.randn(frame_count, 64) * self._scale).astype(np.int16)
return DataSourceBatch(frames=frames)
def my_source_factory(scale: float = 1.0) -> SimulatorDataSource:
return MyDataSource(scale=scale)
Implementing a Push/Live Source
Live sources are useful when frames arrive on their own clock, e.g. hardware,
a websocket, or another process. Implement start(sink), emit frames with
sink.emit_frames() / sink.emit_batch(), and optionally emit reactive spikes
with sink.emit_spikes().
Queued stims wake the simulator's stim publisher immediately. Outside
neurons.loop() and accelerated time, publishing no longer has to wait for the
next 10 ms heartbeat tick; the heartbeat is now the fallback clock while stim
publishing waits adaptively near the next due stim. A reactive live source can
therefore achieve sub-5 ms simulator timestamp latency when it keeps a shallow
buffer and places spikes at the earliest deliverable timestamp.
import threading
import numpy as np
from cl.sim import (
DataSourceSpike,
DataSourceStim,
LiveDataSink,
LiveSimulatorDataSource,
SimulatorDataSourceMetadata,
)
class MyLiveSource(LiveSimulatorDataSource):
def __init__(self):
super().__init__(
metadata=SimulatorDataSourceMetadata(channel_count=64),
max_buffer_frames=64,
read_timeout_seconds=5.0,
)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self._sink: LiveDataSink | None = None
def start(self, sink: LiveDataSink) -> None:
self._sink = sink
self._stop.clear()
self._thread = threading.Thread(target=self._run, args=(sink,), daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=1.0)
def on_stim(self, stim: DataSourceStim) -> None:
if self._sink is None:
return
# Spikes before the live read head have already been passed and are
# dropped. Clamping to read_timestamp keeps the response deliverable
# while avoiding extra latency from the live write-head buffer depth.
spike_ts = max(stim.timestamp + 1, self._sink.read_timestamp)
self._sink.emit_spikes(
[
DataSourceSpike(
timestamp=spike_ts,
channel=stim.channel,
samples=np.zeros(75, dtype=np.float32),
)
]
)
def _run(self, sink: LiveDataSink) -> None:
try:
while not self._stop.is_set():
frames = np.zeros((5, self.metadata.channel_count), dtype=np.int16)
sink.emit_frames(frames)
except RuntimeError:
# The sink raises after the simulator closes the source.
pass
def my_source_factory() -> LiveSimulatorDataSource:
return MyLiveSource()
cl.sim
A frames + spikes payload returned by SimulatorDataSource.read().
A batch describes the data for a contiguous range of frame timestamps
[from_timestamp, from_timestamp + frame_count). Either field may be
omitted: a source may return only frames (the common case), only spikes
(e.g. when push-emitting out-of-band spike events on a live source), or
both.
Attributes:
- frames:
int16array shaped(frame_count, channel_count)in raw sample units.Noneindicates no new frames for this batch. When present,frame_countmust equal the value requested by the caller ofread(). - spikes: Sequence of
DataSourceSpikeevents whose timestamps fall within the batch's time range. Should be ordered by timestamp. Defaults to empty.
A spike detection event produced by a simulator data source.
Sources may emit spikes alongside frames (as part of a DataSourceBatch)
when they have ground-truth knowledge of spike events (e.g. when replaying
a recording with annotated spikes, or generating synthetic data). Sources
that don't know about spikes simply leave the spikes field of
DataSourceBatch empty and let downstream consumers do the detection.
Attributes:
- timestamp: Frame timestamp at which the spike occurred. Must fall within the time range of the batch the spike is emitted in.
- channel: Zero-based channel index on which the spike was detected.
Must be
< metadata.channel_count. - samples: Optional waveform snippet centred on the spike, in
microvolts (
float32). Length is fixed bycl._sim._data_buffer.SPIKE_SAMPLES_TOTAL.Noneif no waveform is available. - channel_mean_sample: Optional baseline (mean) sample value for the
channel at the time of the spike, used by some analyses for
normalisation.
Noneif not computed.
A stimulation event observed by a simulator data source.
These timestamps use the same 25 kHz frame clock as read(). Bursts are
delivered as individual stim events. Phase fields mirror the StimDesign
used for that individual pulse.
Thread-safe handle for pushing data into a LiveSimulatorDataSource.
A LiveDataSink is passed to LiveSimulatorDataSource.start() and is
the only way a live source should emit data. All methods are safe to call
from background threads owned by the source implementation.
Backpressure: emit_batch / emit_frames will block if the source's
internal buffer is full, until the simulator drains it (or the source is
closed). This naturally throttles producers that run faster than the
simulator can consume.
Timestamp that will be assigned to the next emitted frame.
Useful when constructing spikes whose timestamp must fall inside
the batch they are emitted with.
Timestamp of the next frame the simulator will read (the read head).
This is the lowest timestamp an out-of-band spike (emit_spikes) can
still be delivered with; earlier spikes have already been passed and are
dropped. Clamp reactive spikes to max(desired_ts, read_timestamp) to
minimise stim->spike latency without risking a silently dropped spike.
Always <= next_timestamp (the write head runs ahead by the buffered
frame depth).
Emit a frames + spikes batch atomically.
Any spikes in batch.spikes must have timestamps within
[next_timestamp, next_timestamp + batch.frames.shape[0]). Blocks
if the internal buffer is full.
Raises:
- ValueError: if frames have the wrong shape/dtype or if a spike's timestamp falls outside the batch range.
- RuntimeError: if the source has already been closed.
Emit frames with no associated spikes.
Convenience wrapper around emit_batch(DataSourceBatch(frames=frames)).
Abstract base class for push-style (live) simulator data sources.
Use this base when data arrives on its own clock, e.g., from hardware
capture, a network stream, or a child process, rather than being
fetched on demand. Subclasses implement start(sink) (and optionally
stop()) and push frames/spikes into the supplied LiveDataSink from a
background thread or event loop. The base class buffers what's pushed and
serves the simulator's read() calls sequentially.
Live sources are inherently non-seekable and real-time only: the base
class overrides metadata.seekable to False and metadata.realtime_only
to True regardless of what the supplied metadata says.
Lifecycle
__init__(metadata, ...): subclass callssuper().__init__()with optional metadata and buffering parameters.open(): called by the simulator subprocess; resets internal state and invokesstart(sink).- The subclass's
start(sink)implementation begins producing data, typically by spawning a thread that callssink.emit_*in a loop. - The simulator drains the buffer via
read(); emit calls block when the buffer is full. close(): callsstop(), marks the sink closed, and unblocks any waiting reader.
Backpressure & timeouts
max_buffer_framescaps the number of pending frames; emit calls block when full, throttling fast producers.read_timeout_secondsbounds how long the simulator will wait for enough frames to satisfy a read before raisingTimeoutError.
See the module doc (cl.sim) for a full worked example.
Arguments:
- metadata: Static metadata for the source. The
seekableandrealtime_onlyfields are ignored and forced toFalseandTruerespectively. Defaults toSimulatorDataSourceMetadata(). - max_buffer_frames: Maximum number of pending frames to buffer
before
emit_*calls block. Defaults to one second's worth of frames at the SDK's sample rate. - read_timeout_seconds: Maximum time the simulator will wait for a
read()to be satisfied. RaisesTimeoutErroron expiry. - supports_accelerated: Whether the source can keep up with
accelerated time. Defaults to
False(live sources are normally locked to wall-clock).
Return the static metadata describing this source.
Called once by the simulator subprocess immediately after
construction, before open(). Must return the same value on every
call; the simulator does not poll for metadata changes.
Timestamp of the next frame the simulator will read (the read head).
This is the earliest timestamp an out-of-band spike emitted via
LiveDataSink.emit_spikes() can still be delivered with: spikes whose
timestamp falls before the read head have already been passed and are
discarded. Unlike next_write_timestamp (the source's write head, which
runs ahead of the read head by the buffered frame depth), clamping a
reactive spike to max(desired_ts, current_read_timestamp) places it as
close as possible to the stimulus that triggered it — minimising the
stim->spike latency while guaranteeing the spike is never dropped.
Prepare the source for read() calls in the simulator subprocess.
Use this hook (rather than __init__) to acquire process-local
resources such as file handles, sockets, threads, or GPU contexts.
The constructor runs in both the parent and child processes, while
open() runs only in the simulator subprocess.
Default implementation does nothing.
Begin producing live data into sink.
Called once per open(). Implementations typically spawn a daemon
thread (or start an async task / external process) that calls
sink.emit_batch, sink.emit_frames, or sink.emit_spikes as data
becomes available. Must return promptly, do not block here.
Stop producing live data. Called once per close().
Implementations should signal their background workers to exit and wait for them to do so. Exceptions raised here are swallowed by the base class to ensure cleanup proceeds.
Default implementation does nothing.
Produce a batch of frames (and optionally spikes) for the simulator.
Returns a DataSourceBatch covering the half-open timestamp range
[from_timestamp, from_timestamp + frame_count). If batch.frames
is not None, it must have shape
(frame_count, metadata.channel_count) and dtype int16. Any
spikes in batch.spikes must have timestamps within the requested
range.
Arguments:
- from_timestamp: First frame timestamp to produce, relative to
metadata.start_timestamp. - frame_count: Number of frames the simulator expects. Always > 0.
May block for live/streamed sources, but should return promptly for seekable sources to avoid stalling the simulator.
Abstract base class for pull-style simulator data sources.
A data source is the component that produces neural sample frames (and
optionally spikes) on behalf of the SDK's in-process simulator. Subclass
this directly when your source can answer read(from_timestamp,
frame_count) on demand. For example, file-backed replay sources,
deterministic generators, or any source that is randomly seekable. For
push-style sources driven by their own clock (hardware capture, network
streams, etc.) subclass LiveSimulatorDataSource instead.
Lifecycle
Sources are constructed in the parent process via
cl.sim.set_simulator_data_source("module:factory", config={...}), then
re-constructed inside the simulator subprocess from the same factory.
The subprocess then calls, in order:
metadata: read once to size the shared buffer and configure timing.open(): acquire any per-process resources (file handles, sockets, thread pools, etc.). Do not do this work in__init__; the constructor runs in both the parent and child processes.read(from_timestamp, frame_count): called repeatedly to drive the simulator. Must return aDataSourceBatchwhoseframes(if any) have the exact shape(frame_count, metadata.channel_count).close(): release the resources acquired inopen().
Requirements for implementations
- The class and its factory must be importable from the simulator
subprocess (
from module import factory). Lambdas, local classes, and__main__-defined symbols are rejected byset_simulator_data_source. - Any constructor
configpassed toset_simulator_data_sourcemust be JSON-serialisable; it is forwarded as**kwargsto the factory in the subprocess. metadata.frames_per_secondmust match the SDK's expected sample rate (currently 25,000).- If
metadata.seekableisTrue,read()must accept anyfrom_timestampin[start_timestamp, start_timestamp + duration_frames). IfFalse, the source may require sequential reads.
See the module doc (cl.sim) for a full worked example.
Return the static metadata describing this source.
Called once by the simulator subprocess immediately after
construction, before open(). Must return the same value on every
call; the simulator does not poll for metadata changes.
Prepare the source for read() calls in the simulator subprocess.
Use this hook (rather than __init__) to acquire process-local
resources such as file handles, sockets, threads, or GPU contexts.
The constructor runs in both the parent and child processes, while
open() runs only in the simulator subprocess.
Default implementation does nothing.
Handle one committed stim event from the SDK simulator.
Called in the data producer subprocess after Neurons publishes stim
events to the shared stim ring. Override this to let a simulated model
react to stimulation. Default implementation does nothing.
Handle a batch of committed stim events from the SDK simulator.
Override this for batch handling; otherwise each event is forwarded to on_stim().
Produce a batch of frames (and optionally spikes) for the simulator.
Returns a DataSourceBatch covering the half-open timestamp range
[from_timestamp, from_timestamp + frame_count). If batch.frames
is not None, it must have shape
(frame_count, metadata.channel_count) and dtype int16. Any
spikes in batch.spikes must have timestamps within the requested
range.
Arguments:
- from_timestamp: First frame timestamp to produce, relative to
metadata.start_timestamp. - frame_count: Number of frames the simulator expects. Always > 0.
May block for live/streamed sources, but should return promptly for seekable sources to avoid stalling the simulator.
Static description of a simulator data source.
Every SimulatorDataSource must publish a metadata instance describing
its shape and capabilities. The simulator subprocess reads metadata once
immediately after construction (before open()) to configure the shared
buffer, timestamp accounting, and accelerated-time support.
Attributes:
- channel_count: Number of channels per frame. Must be positive. Frames
returned by
read()must have shape(frame_count, channel_count). - frames_per_second: Sample rate in Hz. Must match the SDK's expected
rate (currently 25,000); other values are rejected by
validate_metadata. - uV_per_sample_unit: Scaling factor used to convert raw
int16sample units into microvolts. Used by downstream analysis but not by the simulator itself. - start_timestamp: Frame timestamp at which this source's data begins. Reads use timestamps relative to this value.
- duration_frames: Total number of frames the source can produce, or
Nonefor unbounded sources. Bounded sources loop or stop at this length depending on the producer's policy. - seekable:
Trueifread()can be called with arbitraryfrom_timestampvalues within the source's range.Falsefor sources that must be consumed sequentially (e.g. live streams). - realtime_only:
Trueif the source can only produce data at real wall-clock speed. Set automatically forLiveSimulatorDataSource. - supports_accelerated:
Trueif the source can produce frames faster than real time (used byCL_SDK_ACCELERATED_TIME). File-backed sources are accelerated; live sources usually are not.
Unregister any custom simulator data source previously set.
After this call, subsequent simulator opens fall back to the
CL_SDK_DATA_SOURCE env-var configuration if set, otherwise to the
default behaviour (replay CL_SDK_REPLAY_PATH if set, else use the built-in
random on-the-fly simulator source). Also invalidates any cached Neurons
instance so the change takes effect on next open.
No-op if no custom source was registered.
Register a custom simulator data source for subsequent simulator opens.
The next time the SDK opens the simulator (e.g. via cl.neurons()), the
producer subprocess will import factory and call it with **config to
construct its SimulatorDataSource. Sources registered here take
precedence over the CL_SDK_DATA_SOURCE / CL_SDK_DATA_SOURCE_CONFIG
environment variables.
Arguments:
- factory: Either a
"module:attribute"/"module.attribute"import path string, or a callable defined at module level in an importable package. Lambdas, local functions, and factories defined in__main__/REPL/notebooks are rejected because the simulator subprocess cannot re-import them by name; move such factories into a.pyfile inside an importable package. - config: Keyword arguments forwarded to
factory()in the subprocess. Must be JSON-serialisable. Defaults to no arguments. - metadata: Optional static metadata for the source. Supplying this avoids constructing the source in the parent process before opening.
Raises:
- ValueError: if
factoryis not importable by name. - TypeError: if
configis not JSON-serialisable.
Calling this also invalidates any cached Neurons instance so the next
open picks up the new source. Pair with clear_simulator_data_source()
to revert to the default (random/file replay) source.