Cortical Labs SDK: Simulator-Only Components

Overview

This simulator-only module defines the pluggable data-source layer that feeds the SDK's in-process simulator. A data source produces sample frames and may also produce spikes. It may react to committed stimulation events, so closed-loop tests can model stim-to-spike responses without running the real data provider.

The SDK ships two built-in sources:

  • File recording replay: replays an HDF5 recording selected with CL_SDK_REPLAY_PATH.
  • Random source: generates deterministic synthetic frames and spikes on demand. This is the default when no replay file or custom source is set.

Custom sources can be backed by hardware, a network stream, a model, a test fixture, or a precomputed dataset. As long as the source exposes stable metadata and returns frames at the SDK sample rate, cl.neurons, recording, analysis, and WebSocket visualisation use it like any other simulator source.

Concepts

  • Frames are the atomic neural data unit: an int16 array shaped (frame_count, channel_count) in raw sample units. Use SimulatorDataSourceMetadata.uV_per_sample_unit to convert to microvolts.
  • Spikes are DataSourceSpike records. A source can return spikes inline with frames or emit them asynchronously from a live source.
  • Stims are committed Neurons.stim() events delivered to on_stim(stim) / on_stims(stims) in the producer subprocess.
  • Batches (DataSourceBatch) are returned by read(): frames plus any spikes whose timestamps fall in that read window.
  • Metadata (SimulatorDataSourceMetadata) describes channel count, sample rate, start timestamp, duration, seekability, real-time constraints, and accelerated-time support. The sample rate must currently be 25 kHz.

Source Types

There are two source styles:

  1. Pull sources subclass SimulatorDataSource and implement read(from_timestamp, frame_count). The simulator asks them for each block. Use these for seekable files, deterministic fixtures, generated data, or models that can produce arbitrary timestamp ranges.
  2. Push/live sources subclass LiveSimulatorDataSource and implement start(sink). They push frames and spikes into a LiveDataSink from their own thread, event loop, or external process. The base class buffers data and serves sequential read() calls. Live sources are forced non-seekable, real-time only, and not accelerated unless supports_accelerated=True.

Registering a Custom Source

Register custom sources by import path so the simulator subprocess can build them independently:

import cl

cl.sim.set_simulator_data_source(
    "my_package.my_module:my_source_factory",
    config={"param1": 42, "param2": "hello"},
    metadata=cl.sim.SimulatorDataSourceMetadata(channel_count=64),
)

The first argument is a "module:attribute" string, or an importable callable that the SDK can resolve to that form. The optional config dict is forwarded as keyword arguments to the factory in the simulator subprocess and must be JSON-serialisable.

Pass metadata= when it is cheap and static. This avoids constructing the source in the parent process just to inspect metadata. If metadata is omitted, the SDK constructs the source once in the parent, reads source.metadata, then passes that validated metadata to the producer process.

The producer always constructs the real source and validates its runtime metadata against the parent metadata before opening. If the two differ, opening fails. This catches sources whose metadata depends on process-local state, environment, random choices, or mutable config.

To revert to the default source, call:

The same configuration can be applied by environment variables:

  • CL_SDK_DATA_SOURCE: import path of the factory.
  • CL_SDK_DATA_SOURCE_CONFIG: JSON object of factory keyword arguments.
  • CL_SDK_DATA_SOURCE_METADATA: JSON object of SimulatorDataSourceMetadata fields. Optional, but avoids parent-process construction for metadata discovery.

Implementing a Pull Source

Pull sources are constructed in the parent process for metadata discovery unless metadata is supplied at registration time, and are always constructed in the simulator subprocess for real reads. Keep factories importable and avoid depending on non-importable parent objects.

import numpy as np

from cl.sim import (
    DataSourceBatch,
    DataSourceStim,
    SimulatorDataSource,
    SimulatorDataSourceMetadata,
)

class MyDataSource(SimulatorDataSource):
    def __init__(self, scale: float = 1.0):
        self._scale = scale

    @property
    def metadata(self) -> SimulatorDataSourceMetadata:
        return SimulatorDataSourceMetadata(
            channel_count   = 64,
            duration_frames = None,  # unbounded
            seekable        = True,
        )

    def open(self) -> None:
        # Acquire subprocess-local resources here.
        pass

    def close(self) -> None:
        pass

    def on_stim(self, stim: DataSourceStim) -> None:
        # Called when a stim has been committed to the simulator timeline.
        pass

    def read(self, from_timestamp: int, frame_count: int) -> DataSourceBatch:
        frames = (np.random.randn(frame_count, 64) * self._scale).astype(np.int16)
        return DataSourceBatch(frames=frames)

def my_source_factory(scale: float = 1.0) -> SimulatorDataSource:
    return MyDataSource(scale=scale)

Implementing a Push/Live Source

Live sources are useful when frames arrive on their own clock, e.g. hardware, a websocket, or another process. Implement start(sink), emit frames with sink.emit_frames() / sink.emit_batch(), and optionally emit reactive spikes with sink.emit_spikes().

Queued stims wake the simulator's stim publisher immediately. Outside neurons.loop() and accelerated time, publishing no longer has to wait for the next 10 ms heartbeat tick; the heartbeat is now the fallback clock while stim publishing waits adaptively near the next due stim. A reactive live source can therefore achieve sub-5 ms simulator timestamp latency when it keeps a shallow buffer and places spikes at the earliest deliverable timestamp.

import threading

import numpy as np

from cl.sim import (
    DataSourceSpike,
    DataSourceStim,
    LiveDataSink,
    LiveSimulatorDataSource,
    SimulatorDataSourceMetadata,
)

class MyLiveSource(LiveSimulatorDataSource):
    def __init__(self):
        super().__init__(
            metadata=SimulatorDataSourceMetadata(channel_count=64),
            max_buffer_frames=64,
            read_timeout_seconds=5.0,
        )
        self._stop = threading.Event()
        self._thread: threading.Thread | None = None
        self._sink: LiveDataSink | None = None

    def start(self, sink: LiveDataSink) -> None:
        self._sink = sink
        self._stop.clear()
        self._thread = threading.Thread(target=self._run, args=(sink,), daemon=True)
        self._thread.start()

    def stop(self) -> None:
        self._stop.set()
        if self._thread is not None:
            self._thread.join(timeout=1.0)

    def on_stim(self, stim: DataSourceStim) -> None:
        if self._sink is None:
            return

        # Spikes before the live read head have already been passed and are
        # dropped. Clamping to read_timestamp keeps the response deliverable
        # while avoiding extra latency from the live write-head buffer depth.
        spike_ts = max(stim.timestamp + 1, self._sink.read_timestamp)
        self._sink.emit_spikes(
            [
                DataSourceSpike(
                    timestamp=spike_ts,
                    channel=stim.channel,
                    samples=np.zeros(75, dtype=np.float32),
                )
            ]
        )

    def _run(self, sink: LiveDataSink) -> None:
        try:
            while not self._stop.is_set():
                frames = np.zeros((5, self.metadata.channel_count), dtype=np.int16)
                sink.emit_frames(frames)
        except RuntimeError:
            # The sink raises after the simulator closes the source.
            pass

def my_source_factory() -> LiveSimulatorDataSource:
    return MyLiveSource()

cl.sim

@dataclass(frozen=True)
class DataSourceBatch:

A frames + spikes payload returned by SimulatorDataSource.read().

A batch describes the data for a contiguous range of frame timestamps [from_timestamp, from_timestamp + frame_count). Either field may be omitted: a source may return only frames (the common case), only spikes (e.g. when push-emitting out-of-band spike events on a live source), or both.

Attributes:
  • frames: int16 array shaped (frame_count, channel_count) in raw sample units. None indicates no new frames for this batch. When present, frame_count must equal the value requested by the caller of read().
  • spikes: Sequence of DataSourceSpike events whose timestamps fall within the batch's time range. Should be ordered by timestamp. Defaults to empty.
DataSourceBatch( frames: numpy.ndarray | None = None, spikes: Sequence[DataSourceSpike] = ())
frames: numpy.ndarray | None = None
spikes: Sequence[DataSourceSpike] = ()
@dataclass(frozen=True)
class DataSourceSpike:

A spike detection event produced by a simulator data source.

Sources may emit spikes alongside frames (as part of a DataSourceBatch) when they have ground-truth knowledge of spike events (e.g. when replaying a recording with annotated spikes, or generating synthetic data). Sources that don't know about spikes simply leave the spikes field of DataSourceBatch empty and let downstream consumers do the detection.

Attributes:
  • timestamp: Frame timestamp at which the spike occurred. Must fall within the time range of the batch the spike is emitted in.
  • channel: Zero-based channel index on which the spike was detected. Must be < metadata.channel_count.
  • samples: Optional waveform snippet centred on the spike, in microvolts (float32). Length is fixed by cl._sim._data_buffer.SPIKE_SAMPLES_TOTAL. None if no waveform is available.
  • channel_mean_sample: Optional baseline (mean) sample value for the channel at the time of the spike, used by some analyses for normalisation. None if not computed.
DataSourceSpike( timestamp: int, channel: int, samples: numpy.ndarray | None = None, channel_mean_sample: float | None = None)
timestamp: int
channel: int
samples: numpy.ndarray | None = None
channel_mean_sample: float | None = None
@dataclass(frozen=True)
class DataSourceStim:

A stimulation event observed by a simulator data source.

These timestamps use the same 25 kHz frame clock as read(). Bursts are delivered as individual stim events. Phase fields mirror the StimDesign used for that individual pulse.

DataSourceStim( timestamp: int, channel: int, intended_timestamp: int | None = None, phase_durations_us: tuple[int, ...] = (), phase_currents_uA: tuple[float, ...] = ())
timestamp: int
channel: int
intended_timestamp: int | None = None
phase_durations_us: tuple[int, ...] = ()
phase_currents_uA: tuple[float, ...] = ()
duration_us: int
stim_design

Reconstruct a cl.StimDesign, or None if phase data is absent.

class LiveDataSink:

Thread-safe handle for pushing data into a LiveSimulatorDataSource.

A LiveDataSink is passed to LiveSimulatorDataSource.start() and is the only way a live source should emit data. All methods are safe to call from background threads owned by the source implementation.

Backpressure: emit_batch / emit_frames will block if the source's internal buffer is full, until the simulator drains it (or the source is closed). This naturally throttles producers that run faster than the simulator can consume.

LiveDataSink(source: LiveSimulatorDataSource)
next_timestamp: int

Timestamp that will be assigned to the next emitted frame.

Useful when constructing spikes whose timestamp must fall inside the batch they are emitted with.

read_timestamp: int

Timestamp of the next frame the simulator will read (the read head).

This is the lowest timestamp an out-of-band spike (emit_spikes) can still be delivered with; earlier spikes have already been passed and are dropped. Clamp reactive spikes to max(desired_ts, read_timestamp) to minimise stim->spike latency without risking a silently dropped spike. Always <= next_timestamp (the write head runs ahead by the buffered frame depth).

def emit_batch(self, batch: DataSourceBatch) -> None:

Emit a frames + spikes batch atomically.

Any spikes in batch.spikes must have timestamps within [next_timestamp, next_timestamp + batch.frames.shape[0]). Blocks if the internal buffer is full.

Raises:
  • ValueError: if frames have the wrong shape/dtype or if a spike's timestamp falls outside the batch range.
  • RuntimeError: if the source has already been closed.
def emit_frames(self, frames: numpy.ndarray) -> None:

Emit frames with no associated spikes.

Convenience wrapper around emit_batch(DataSourceBatch(frames=frames)).

def emit_spikes(self, spikes: Sequence[DataSourceSpike]) -> None:

Emit spikes out-of-band, without advancing the frame timeline.

Use this when spike detection runs asynchronously to frame production and needs to be reported as soon as it's available. Spike timestamps may be any past or future value; spikes are sorted and merged into the pending queue.

def close(self) -> None:

Signal that no more frames or spikes will arrive.

After this call, pending read()s on the simulator side will drain the buffer and then raise once data runs out. Any subsequent emit_* call will raise.

class LiveSimulatorDataSource(cl.sim.SimulatorDataSource):

Abstract base class for push-style (live) simulator data sources.

Use this base when data arrives on its own clock, e.g., from hardware capture, a network stream, or a child process, rather than being fetched on demand. Subclasses implement start(sink) (and optionally stop()) and push frames/spikes into the supplied LiveDataSink from a background thread or event loop. The base class buffers what's pushed and serves the simulator's read() calls sequentially.

Live sources are inherently non-seekable and real-time only: the base class overrides metadata.seekable to False and metadata.realtime_only to True regardless of what the supplied metadata says.

Lifecycle

  1. __init__(metadata, ...): subclass calls super().__init__() with optional metadata and buffering parameters.
  2. open(): called by the simulator subprocess; resets internal state and invokes start(sink).
  3. The subclass's start(sink) implementation begins producing data, typically by spawning a thread that calls sink.emit_* in a loop.
  4. The simulator drains the buffer via read(); emit calls block when the buffer is full.
  5. close(): calls stop(), marks the sink closed, and unblocks any waiting reader.

Backpressure & timeouts

  • max_buffer_frames caps the number of pending frames; emit calls block when full, throttling fast producers.
  • read_timeout_seconds bounds how long the simulator will wait for enough frames to satisfy a read before raising TimeoutError.

See the module doc (cl.sim) for a full worked example.

LiveSimulatorDataSource( metadata: SimulatorDataSourceMetadata | None = None, *, max_buffer_frames: int = 25000, read_timeout_seconds: float = 30.0, supports_accelerated: bool = False)
Arguments:
  • metadata: Static metadata for the source. The seekable and realtime_only fields are ignored and forced to False and True respectively. Defaults to SimulatorDataSourceMetadata().
  • max_buffer_frames: Maximum number of pending frames to buffer before emit_* calls block. Defaults to one second's worth of frames at the SDK's sample rate.
  • read_timeout_seconds: Maximum time the simulator will wait for a read() to be satisfied. Raises TimeoutError on expiry.
  • supports_accelerated: Whether the source can keep up with accelerated time. Defaults to False (live sources are normally locked to wall-clock).

Return the static metadata describing this source.

Called once by the simulator subprocess immediately after construction, before open(). Must return the same value on every call; the simulator does not poll for metadata changes.

next_write_timestamp: int
current_read_timestamp: int

Timestamp of the next frame the simulator will read (the read head).

This is the earliest timestamp an out-of-band spike emitted via LiveDataSink.emit_spikes() can still be delivered with: spikes whose timestamp falls before the read head have already been passed and are discarded. Unlike next_write_timestamp (the source's write head, which runs ahead of the read head by the buffered frame depth), clamping a reactive spike to max(desired_ts, current_read_timestamp) places it as close as possible to the stimulus that triggered it — minimising the stim->spike latency while guaranteeing the spike is never dropped.

def open(self) -> None:

Prepare the source for read() calls in the simulator subprocess.

Use this hook (rather than __init__) to acquire process-local resources such as file handles, sockets, threads, or GPU contexts. The constructor runs in both the parent and child processes, while open() runs only in the simulator subprocess.

Default implementation does nothing.

def close(self) -> None:

Release any resources acquired in open().

Always paired with a successful open() call. Implementations should be idempotent and tolerant of being called even if open() raised.

Default implementation does nothing.

@abstractmethod
def start(self, sink: LiveDataSink) -> None:

Begin producing live data into sink.

Called once per open(). Implementations typically spawn a daemon thread (or start an async task / external process) that calls sink.emit_batch, sink.emit_frames, or sink.emit_spikes as data becomes available. Must return promptly, do not block here.

def stop(self) -> None:

Stop producing live data. Called once per close().

Implementations should signal their background workers to exit and wait for them to do so. Exceptions raised here are swallowed by the base class to ensure cleanup proceeds.

Default implementation does nothing.

def read( self, from_timestamp: int, frame_count: int) -> DataSourceBatch:

Produce a batch of frames (and optionally spikes) for the simulator.

Returns a DataSourceBatch covering the half-open timestamp range [from_timestamp, from_timestamp + frame_count). If batch.frames is not None, it must have shape (frame_count, metadata.channel_count) and dtype int16. Any spikes in batch.spikes must have timestamps within the requested range.

Arguments:
  • from_timestamp: First frame timestamp to produce, relative to metadata.start_timestamp.
  • frame_count: Number of frames the simulator expects. Always > 0.

May block for live/streamed sources, but should return promptly for seekable sources to avoid stalling the simulator.

class SimulatorDataSource(abc.ABC):

Abstract base class for pull-style simulator data sources.

A data source is the component that produces neural sample frames (and optionally spikes) on behalf of the SDK's in-process simulator. Subclass this directly when your source can answer read(from_timestamp, frame_count) on demand. For example, file-backed replay sources, deterministic generators, or any source that is randomly seekable. For push-style sources driven by their own clock (hardware capture, network streams, etc.) subclass LiveSimulatorDataSource instead.

Lifecycle

Sources are constructed in the parent process via cl.sim.set_simulator_data_source("module:factory", config={...}), then re-constructed inside the simulator subprocess from the same factory. The subprocess then calls, in order:

  1. metadata: read once to size the shared buffer and configure timing.
  2. open(): acquire any per-process resources (file handles, sockets, thread pools, etc.). Do not do this work in __init__; the constructor runs in both the parent and child processes.
  3. read(from_timestamp, frame_count): called repeatedly to drive the simulator. Must return a DataSourceBatch whose frames (if any) have the exact shape (frame_count, metadata.channel_count).
  4. close(): release the resources acquired in open().

Requirements for implementations

  • The class and its factory must be importable from the simulator subprocess (from module import factory). Lambdas, local classes, and __main__-defined symbols are rejected by set_simulator_data_source.
  • Any constructor config passed to set_simulator_data_source must be JSON-serialisable; it is forwarded as **kwargs to the factory in the subprocess.
  • metadata.frames_per_second must match the SDK's expected sample rate (currently 25,000).
  • If metadata.seekable is True, read() must accept any from_timestamp in [start_timestamp, start_timestamp + duration_frames). If False, the source may require sequential reads.

See the module doc (cl.sim) for a full worked example.

Return the static metadata describing this source.

Called once by the simulator subprocess immediately after construction, before open(). Must return the same value on every call; the simulator does not poll for metadata changes.

def open(self) -> None:

Prepare the source for read() calls in the simulator subprocess.

Use this hook (rather than __init__) to acquire process-local resources such as file handles, sockets, threads, or GPU contexts. The constructor runs in both the parent and child processes, while open() runs only in the simulator subprocess.

Default implementation does nothing.

def close(self) -> None:

Release any resources acquired in open().

Always paired with a successful open() call. Implementations should be idempotent and tolerant of being called even if open() raised.

Default implementation does nothing.

def on_stim(self, stim: DataSourceStim) -> None:

Handle one committed stim event from the SDK simulator.

Called in the data producer subprocess after Neurons publishes stim events to the shared stim ring. Override this to let a simulated model react to stimulation. Default implementation does nothing.

def on_stims(self, stims: Sequence[DataSourceStim]) -> None:

Handle a batch of committed stim events from the SDK simulator.

Override this for batch handling; otherwise each event is forwarded to on_stim().

@abstractmethod
def read( self, from_timestamp: int, frame_count: int) -> DataSourceBatch:

Produce a batch of frames (and optionally spikes) for the simulator.

Returns a DataSourceBatch covering the half-open timestamp range [from_timestamp, from_timestamp + frame_count). If batch.frames is not None, it must have shape (frame_count, metadata.channel_count) and dtype int16. Any spikes in batch.spikes must have timestamps within the requested range.

Arguments:
  • from_timestamp: First frame timestamp to produce, relative to metadata.start_timestamp.
  • frame_count: Number of frames the simulator expects. Always > 0.

May block for live/streamed sources, but should return promptly for seekable sources to avoid stalling the simulator.

@dataclass(frozen=True)
class SimulatorDataSourceMetadata:

Static description of a simulator data source.

Every SimulatorDataSource must publish a metadata instance describing its shape and capabilities. The simulator subprocess reads metadata once immediately after construction (before open()) to configure the shared buffer, timestamp accounting, and accelerated-time support.

Attributes:
  • channel_count: Number of channels per frame. Must be positive. Frames returned by read() must have shape (frame_count, channel_count).
  • frames_per_second: Sample rate in Hz. Must match the SDK's expected rate (currently 25,000); other values are rejected by validate_metadata.
  • uV_per_sample_unit: Scaling factor used to convert raw int16 sample units into microvolts. Used by downstream analysis but not by the simulator itself.
  • start_timestamp: Frame timestamp at which this source's data begins. Reads use timestamps relative to this value.
  • duration_frames: Total number of frames the source can produce, or None for unbounded sources. Bounded sources loop or stop at this length depending on the producer's policy.
  • seekable: True if read() can be called with arbitrary from_timestamp values within the source's range. False for sources that must be consumed sequentially (e.g. live streams).
  • realtime_only: True if the source can only produce data at real wall-clock speed. Set automatically for LiveSimulatorDataSource.
  • supports_accelerated: True if the source can produce frames faster than real time (used by CL_SDK_ACCELERATED_TIME). File-backed sources are accelerated; live sources usually are not.
SimulatorDataSourceMetadata( channel_count: int = 64, frames_per_second: int = 25000, uV_per_sample_unit: float = 0.195, start_timestamp: int = 0, duration_frames: int | None = None, seekable: bool = True, realtime_only: bool = False, supports_accelerated: bool = True)
channel_count: int = 64
frames_per_second: int = 25000
uV_per_sample_unit: float = 0.195
start_timestamp: int = 0
duration_frames: int | None = None
seekable: bool = True
realtime_only: bool = False
supports_accelerated: bool = True
def clear_simulator_data_source() -> None:

Unregister any custom simulator data source previously set.

After this call, subsequent simulator opens fall back to the CL_SDK_DATA_SOURCE env-var configuration if set, otherwise to the default behaviour (replay CL_SDK_REPLAY_PATH if set, else use the built-in random on-the-fly simulator source). Also invalidates any cached Neurons instance so the change takes effect on next open.

No-op if no custom source was registered.

def set_simulator_data_source( factory: str | Callable[..., SimulatorDataSource], config: Mapping[str, typing.Any] | None = None, metadata: SimulatorDataSourceMetadata | Mapping[str, typing.Any] | None = None) -> None:

Register a custom simulator data source for subsequent simulator opens.

The next time the SDK opens the simulator (e.g. via cl.neurons()), the producer subprocess will import factory and call it with **config to construct its SimulatorDataSource. Sources registered here take precedence over the CL_SDK_DATA_SOURCE / CL_SDK_DATA_SOURCE_CONFIG environment variables.

Arguments:
  • factory: Either a "module:attribute" / "module.attribute" import path string, or a callable defined at module level in an importable package. Lambdas, local functions, and factories defined in __main__/REPL/notebooks are rejected because the simulator subprocess cannot re-import them by name; move such factories into a .py file inside an importable package.
  • config: Keyword arguments forwarded to factory() in the subprocess. Must be JSON-serialisable. Defaults to no arguments.
  • metadata: Optional static metadata for the source. Supplying this avoids constructing the source in the parent process before opening.
Raises:
  • ValueError: if factory is not importable by name.
  • TypeError: if config is not JSON-serialisable.

Calling this also invalidates any cached Neurons instance so the next open picks up the new source. Pair with clear_simulator_data_source() to revert to the default (random/file replay) source.