cl

@contextmanager
def open( take_control: bool = True, wait_until_recordable: bool = True) -> Generator[Neurons]:

Open a connection to the device, optionally take and retain control, and attempt to start it if necessary. The device will not be stopped automatically. To minimise latency, Python garbage collection is disabled while connection is open.

This is the preferred entry point for the CL API. Do not use cl.Neurons directly.

Arguments:
  • take_control: Take control of the device. Will raise a ControlRequestError if start is required and another process has control of the device.
  • wait_until_recordable: Wait (block) until the recording system is ready.

For example:

import cl

with cl.open() as neurons:
    # Your code here
    ...
def get_system_attributes() -> dict[str, typing.Any]:

Gets the system attributes that are included in each recording as a dictionary. This has the following structure:

{
    'project_id'   : str,
    'chip_id'      : str,
    'cell_batch_id': str,
    'plugin'       : dict[str, Any],   # plugin-specific attributes, with the top level keys being the plugin names
    'system_id'    : str,              # a unique identifier for the system, e.g. "cl1-0123-456"
    'hostname'     : str,              # the hostname of the system
}
def is_simulator() -> bool:

Returns True if running in the simulator environment, False if running on a real device.

class Neurons:

The Neurons class provides the main interface with the CL1 hardware. This should always be accessed via the cl.open() context manager and should not be used in isolation. This functionality includes:

If you are using the Simulator:

  • This simulates the behaviour of the CL API by either generating random data (default) or replaying data from a H5 recording (replay_file). The recording to use is controlled by the CL_SDK_REPLAY_PATH environment variable, which can be set by a .env file.
  • This operates on wall-clock time by default to maintain parity with the CL1 device. For advanced users, it is possible to switch to accelerated mode by setting the environment variable CL_SDK_ACCELERATED_TIME=1.
  • The starting position of the replay recording will be randomised every time cl.open() is called. This can be overriden by setting CL_SDK_REPLAY_START_OFFSET, where a value of 0 indicates the first frame of the recording.
def stim( self, channel_set: ChannelSet | int, stim_design: StimDesign | float, /, burst_design: BurstDesign | None = None, lead_time_us: int = 80) -> None:

Stimulate one or more channels.

Arguments:
  • channels : A ChannelSet object with one or more channels, or a single channel to stimulate.
  • stim_design : A StimDesign object or a scalar current in microamperes. Use of a StimDesign is preferred. A scalar current is the equivalent of a symmetric biphasic, negative-first pulse with a pulse width of 160 microseconds, i.e., StimDesign(160, -value, 160, value).
  • burst_design: An optional BurstDesign object specifying the burst count and frequency. If unspecified, a single pulse will be delivered.
  • lead_time_us: The lead time in microseconds before the stimulation starts.
Constraints:
  • The minimum lead_time_us is 80.
  • lead_time_us must be evenly divisible by 40.

For example:

import cl
from cl import ChannelSet, StimDesign, BurstDesign

with cl.open() as neurons:

    # Deliver a single biphasic stim with current of 1.0 uA, pulse width
    # of 160 us and negative leading edge on channels 8, 9, 10
    channel_set = ChannelSet(8, 9, 10)
    stim_design = StimDesign(160, -1.0, 160, 1.0)
    neurons.stim(channel_set, stim_design)

    # Deliver the same stim as a burst of 10 at 40 Hz
    burst_design = BurstDesign(10, 40)
    neurons.stim(channel_set, stim_design, burst_design)
def interrupt(self, channel_set: ChannelSet | int, /) -> None:

Interrupt existing and clear any pending stimulation for the specified channels.

Arguments:
  • channels: A ChannelSet object with one or more channels, or a single channel to interrupt.
def interrupt_then_stim( self, channel_set: ChannelSet | int, stim_design: StimDesign | float, /, burst_design: BurstDesign | None = None, lead_time_us: int = 80) -> None:

Interrupt existing and cancel queued stimulation, then send a stim burst. This is equivalent to calling interrupt() followed by stim(), on the same set of channels.

Constraints:
  • The minimum lead_time_us is 80.
  • lead_time_us must be evenly divisible by 40.
Arguments:
  • channel_set: A ChannelSet object with one or more channels, or a single channel to stimulate.
  • stim_design: A StimDesign object or a floating point current in microamperes.
  • burst_design: A BurstDesign object specifying the burst count and frequency.
  • lead_time_us: The lead time in microseconds before the stimulation starts.
def sync(self, channel_set: ChannelSet, /) -> None:

Cause all channel_set channels to wait until all are ready to continue together.

The sync operation allows you ensure that subsequent operations on different channels begin at the same time - after all previously queued operations on those channels have completed.

Arguments:
  • channel_set: One or more channels to sync.

For example:

with cl.open() as neurons:

    stim_design    = StimDesign(160, -1.0, 160, 1.0)

    channel_set_1  = ChannelSet(8)
    burst_design_1 = BurstDesign(2, 100)    # Interval of 250 frames

    channel_set_2  = ChannelSet(10)
    burst_design_2 = BurstDesign(2, 20)     # Interval of 1250 frames

    group_1_stims = []
    for tick in neurons.loop(ticks_per_second=10, stop_after_ticks=11):

        if tick.iteration == 0:
            # Group 1
            neurons.stim(channel_set_1, stim_design, burst_design_1)
            neurons.stim(channel_set_2, stim_design, burst_design_2)

            # Group 2
            neurons.sync(channel_set_1 | channel_set_2)
            neurons.stim(channel_set_1, stim_design)

        for stim in tick.analysis.stims:
            if stim.channel == 8:
                group_1_stims.append(stim.timestamp)

    group_gap = group_1_stims[-1] - group_1_stims[0]
    # Group gap is expected to be > 1250 being the interval of the slowest frequency
def create_stim_plan(self) -> StimPlan:

Create a new StimPlan object to build a stimulation plan.

Stim plans which are reusable stimulation instructions that can be created at the beginning of an application to run on demand and contain the same stimulation interface, such as StimPlan.stim(), etc.

For example:

import cl
from cl import ChannelSet, StimDesign, BurstDesign

with cl.open() as neurons:

    # Create a stim plan with a single biphasic stim with current of
    # 1.0 uA, pulse width of 160 us and negative leading edge on
    # two sets of channels
    my_stim_plan  = neurons.create_stim_plan()
    channel_set_1 = ChannelSet(8, 9)
    channel_set_2 = ChannelSet(10, 11)
    stim_design   = StimDesign(160, -1.0, 160, 1.0)
    my_stim_plan.stim(channel_set_1, stim_design)
    my_stim_plan.stim(channel_set_2, stim_design)

    # ... Do something else

    # Execute the stim plan at any stage of your script
    my_stim_plan.run()
def loop( self, ticks_per_second: float, stop_after_seconds: float | None = None, stop_after_ticks: int | None = None, ignore_jitter: bool = False, jitter_tolerance_frames: int = 0) -> Loop:

Periodically detect spikes and execute code. (Relates to Loop and LoopTick.)

Intended for use as an iterator:

TICKS_PER_SECOND = 100

with cl.open() as neurons:
    for tick in neurons.loop(TICKS_PER_SECOND):
        # tick                      is a `LoopTick` object
        # tick.iteration            is the count of this tick within the loop
        # tick.iteration_timestamp  is the timestamp of the loop body
        # tick.frames               is a numpy array of processed electrode samples
        # tick.analysis.spikes      is a list of any detected spikes
        # tick.analysis.stims       is a list of any stimulation
        # tick.loop                 is the running loop object

Or by passing a callback to Loop.run():

TICKS_PER_SECOND = 100

def handle_tick(tick: LoopTick):
    # Do something ...

    # When ready to stop ...
    tick.loop.stop()

neurons.loop(TICKS_PER_SECOND).run(handle_tick)

Jitter

As Loop is intended for realtime operation, by default it will raise a TimeoutError if the loop body does not finish before data beyond the next tick is available.

This can be relaxed by setting jitter_tolerance_frames to a non-zero value, or ignored entirely by setting ignore_jitter to True. We do not recommend the general use of these parameters to handle jitter. Instead consider explicit jitter recovery with Loop.recover_from_jitter().

Otherwise, the loop will continue indefinitely unless stop_after_seconds or stop_after_ticks is passed at loop creation time, LoopTick.loop.stop() is called during the tick, or a break statement is used to exit the for loop.

Timestamps

Since Loop operates in realtime, there are a few key considerations if precise timing is desired. This can be very important for executing synchronised stims and event logging.

import cl
from cl import ChanelSet, StimDesign

with cl.open() as neurons:
    stim_plan_A = neurons.create_stim_plan()
    stim_plan_A.stim(ChannelSet(8, 9), StimDesign(160, -1.0, 160, 1.0))

    stim_plan_B = neurons.create_stim_plan()
    stim_plan_B.stim(ChannelSet(16, 17), StimDesign(160, -1.0, 160, 1.0))

    data_stream = neurons.create_data_stream("stim_events")

    for tick in neurons.loop(ticks_per_second=10, stop_after_seconds=2):
        # The system timestamp will be slightly later than the
        # starting timestamp of the current loop body
        assert neurons.timestamp() >= tick.iteration_timestamp

        # Stim plans executed at the tick.iteration_timestamp will be
        # executed as soon as possible, as it is slightly in the past
        # and is not guaranteed to be at the same time
        stim_plan_A.run(at_timestamp=iteration_timestamp)
        stim_plan_B.run(at_timestamp=iteration_timestamp)

        # ... and will be equivalent to
        stim_plan_A.run()
        stim_plan_B.run()

        # Users seeking to execute synchronised stims could
        # take advantage of tick.iteration_next_timestamp
        stim_plan_A.run(at_timestamp=tick.iteration_next_timestamp)
        stim_plan_B.run(at_timestamp=tick.iteration_next_timestamp)

        # Using tick.iteration_next_timestamp is also helpful to ensure
        # that stim events are correctly aligned when logging events
        data_stream.append(tick.iteration_next_timestamp, "Stim Happened!")
Arguments:
  • ticks_per_second: How often the loop should return a result.
  • stop_after_seconds: How long to run the closed loop for in seconds. (default: None, i.e. loop indefinitely)
  • stop_after_ticks: How long to run the closed loop for in number of ticks. (default: None, i.e. loop indefinitely)
  • ignore_jitter: If True, the loop will not raise a TimeoutError.
  • jitter_tolerance_frames: How far the loop can fall behind (in frames) before it raises a TimeoutError.

Constraints:

  • ticks_per_second must not exceed the system sampling rate of 25,000 Hz.
def record( self, file_suffix: str | None = None, file_location: str | None = None, from_seconds_ago: float | None = None, from_frames_ago: int | None = None, from_timestamp: int | None = None, stop_after_seconds: float | None = None, stop_after_frames: int | None = None, attributes: dict[str, typing.Any] | None = None, include_spikes: bool = True, include_stims: bool = True, include_raw_samples: bool = True, include_data_streams: bool = True, exclude_data_streams: list[str] = []) -> Recording:

Start a new HDF5 recording.

Arguments:
  • file_suffix: The suffix to append to the filename, before the .h5 extension.
  • file_location: An absolute path to the directory where the file should be saved, or relative path (relative to the default recording location).
  • from_seconds_ago: The number of seconds ago to start recording from, if possible.
  • from_frames_ago: The number of frames ago to start recording from, if possible.
  • from_timestamp: The timestamp to start recording from, if possible.
  • stop_after_seconds: The number of seconds to record for.
  • stop_after_frames: The number of frames to record.
  • attributes: A dictionary of attributes to add to the recording.
  • include_spikes: Whether to include detected spikes in the recording.
  • include_stims: Whether to include stimulation events in the recording.
  • include_raw_samples: Whether to include frames of raw samples in the recording.
  • include_data_streams: Pass True to record all data streams, False to record no data streams, or a list of specific data stream names to record.
  • exclude_data_streams: A list of application data streams to exclude from the recording.

Specific to the Simulator:

  • Recording data is kept in system memory and only saved to disk when calling close().
  • Recording from the past using from_* parameters are not used.
  • Recordings can be identified by the attribute file_format.version == "SDK".
  • The following attributes are included in the Simulator recording for completeness, but the values are empty: git_hash, git_branch, git_tags, and git_status.

Typical usage example:

with cl.open() as neurons:
    recording = neurons.record()
    # Your code here ...
    recording.stop()

Example for stopping recording after a duration of time:

with cl.open() as neurons:
    recording = neurons.record(stop_after_seconds=3)
    recording.wait_until_stopped()
def create_data_stream( self, name: str, attributes: dict[str, typing.Any] | None = None) -> DataStream:

Publish a named stream of (timesamp, serialised_data) for recordings and visualisation.

See RecordingView.data_streams for how to use data streams saved in a recording.

Arguments:
  • name: Datastream name.
  • attributes: A dictionary of attributes to add to the datastream.

For example:

with cl.open() as neurons:
    # Create a named data stream - by default, it will be added to any active or future recordings.
    data_stream = neurons.create_data_stream(
        name       = 'example_data_stream',
        attributes = { 'score': 0, 'another_attrbute': [0, 1, 2, 3] }
        )

    # Start a recording
    recording = neurons.record(stop_after_seconds=1)

    timestamp = neurons.timestamp()

    # Add some data stream entries with unique, ascending timestamps:
    data_stream.append(timestamp + 0, { 'arbitrary': 'data' })
    data_stream.append(timestamp + 1, ['of', 'arbitrary', 'size'])
    data_stream.append(timestamp + 2, 'and type.')
    data_stream.append(timestamp + 3, numpy.array([2**64 - 1, 2**64 - 2, 2**64 - 3], dtype=numpy.uint64))

    # Update a single attribute
    data_stream.set_attribute('score', 1)

    # Update multiple attributes at once
    data_stream.update_attributes({ 'score': 2, 'new_attribute': 9.9 })

    recording.wait_until_stopped()
def get_channel_count(self) -> int:

Get the number of channels (electrodes) the device supports. A frame is a single sample from each channel.

def get_frames_per_second(self) -> int:

Get the number of frames per second the device is configured to produce. A frame is a single sample from each channel.

def get_frame_duration_us(self) -> float:

Get the duration of a frame in microseconds.

def timestamp(self) -> int:

Get the current timestamp of the device. The timestamp sequence resets when the device is restarted.

def read( self, frame_count: int, from_timestamp: int | None = None, /, *, analysis: bool = False) -> numpy.ndarray[tuple[int, int], numpy.dtype[numpy.int16]] | DetectionResult:

Read frame_count frames from the neurons, starting at from_timestamp if supplied.

This method will block until the requested frames are available. If from_timestamp is None, the current timestamp minus one will be used, which ensures that a single frame read will return without blocking.

Arguments:
  • frame_count: Number of frames to return (at 25kHz).
  • from_timestamp: Read from a specific timestamp (at 25kHz). If None, return from the current timestamp.
  • analysis: When True, return DetectionResult instead of raw frames.
Returns:

Frames as an array with shape (frame_count, channel_count) if analysis=False or DetectionResult if analysis=True.

async def read_async( self, frame_count: int, from_timestamp: int | None = None, /, *, analysis: bool = False) -> numpy.ndarray[tuple[int, int], numpy.dtype[numpy.int16]] | DetectionResult:

Asynchronous version of read().

class Stim:

A Stim object is created for each stim delivered by the system.

This is accessible via LoopTick.analysis (which is a DetectionResult) when using Neurons.loop() (see DetectionResult for more details). Do not create instances of Stim directly.

For example:

import cl
with cl.open() as neurons:
    for tick in neurons.loop(ticks_per_second=100, stop_after_ticks=2):
        if tick.iteration == 0:
            # In the first iteration, perform a stim
            neurons.stim(ChannelSet(8, 9), StimDesign(160, -1.0, 160, 1.0))

        for stim in tick.analysis.stims: # Loops through each stim object in the current tick
            print(stim)                  # Print out the stim object
timestamp: int

Timestamp the stim was delivered.

channel: int

Channel the stim was delivered on.

class Spike:

A Spike object is created for each spike detected by the system.

This is accessible via LoopTick.analysis (which is a DetectionResult) when using Neurons.loop() (see DetectionResult for more details). Do not create instances of Spike directly.

For example:

import cl
with cl.open() as neurons:
    for tick in neurons.loop(ticks_per_second=100, stop_after_ticks=2):
        for spike in tick.analysis.spikes: # Loops through each spike object in the current tick
            print(spike)                   # Print out the spike object
timestamp: int

Timestamp of the sample that triggered the detection of the spike.

channel: int

Which channel the spike was detected on.

channel_mean_sample: float

The rolling mean value of the channel at the time of the spike.

In the Simulator, this is the mean of samples.

samples: numpy.ndarray[tuple[int], numpy.dtype[numpy.float32]]

Numpy array of 75 floating point µV sample zero-centered values around timestamp. This involves 25 samples before the spike and 50 samples after the spike.

class DetectionResult:

A DetectionResult that holds spikes and stims at a given timestamp.

This is accessible via LoopTick.analysis when using Neurons.loop(). Do not create instances of DetectionResult directly.

start_timestamp: int

Timestamp of the first processed frame in this result.

stop_timestamp: int

Timestamp of the first not analysed frame after DetectionResult.start_timestamp. (i.e. DetectionResult.start_timestamp + len(LoopTick.frames).)

spikes: list[Spike]

List of detected spikes.

stims: list[Stim]

List of stims delivered.

class ChannelSet:

Stores a set of channels for stimulation.

Arguments:
  • *channels: One or more channels as int provided as separate arguments or as a sequence of ints.

For example:

# Select channels 8, 9 and 10
ChannelSet(8, 9, 10)

Supports convenient manipulation of channels, such as:

print(ChannelSet(8, 9) | ChannelSet(9, 10)) # ChannelSet(8, 9, 10)
print(ChannelSet(8, 9) & ChannelSet(9, 10)) # ChannelSet(9)
print(ChannelSet(8, 9) ^ ChannelSet(9, 10)) # ChannelSet(8, 10)
print(~ChannelSet(8, 9))                    # All channels except 8, 9
ChannelSet(*channels: int | Iterable[int])

Constructor for ChannelSet.

class BurstDesign:

Stores the parameters of a stimulation burst.

Arguments:
  • burst_count: Number of stims to perform within this burst.
  • burst_hz : Frequency of stims within this burst.

Constraints:

  • burst_hz must not exceed 200 Hz.

For example:

# Burst containing 10 stims operating at 150 Hz
BurstDesign(10, 150)
BurstDesign(burst_count: int, burst_hz: float, /)

Constructor for BurstDesign.

class StimDesign:

Stores the parameters of a mono, bi, or triphasic stim design by specifying 2, 4 or 6 pairs of arguments respectively.

Arguments:
  • duration_us: Pulse width in microseconds (us).
  • current_uA : Current in microampere (uA).

Constraints:

  • duration_us must be positive and evenly divisible by 20 us.
  • current_uA must be less than or equal to 3.0 uA in absolute terms (i.e. range -3.0 to 3.0).
  • Total charge must not exceed 3.0 nanocoulombs (nC).

For example:

# Monophasic stim with current of -1.0 uA, pulse width of 160 us.
StimDesign(160, -1.0)
# Biphasic stim with current of 1.0 uA, pulse width of 160 us and negative leading edge.
StimDesign(160, -1.0, 160, 1.0)
# Triphasic stim with current of 1.0 uA, pulse width of 160 us and negative leading edge.
StimDesign(160, -1.0, 160, 1.0, 160, -1.0)
StimDesign(*args)

Constructor for StimDesign.

duration_us: int

Total stimulation duration in microseconds (us).

class StimPlan:

Allows building and executing a sequence of stim operations that can be run on demand. The StimPlan cannot be modified further after it has been run once. Stim plans are created with Neurons.create_stim_plan(), do not create StimPlan instances directly.

A RuntimeError will be raised if any modification method is called after the first run().

For example:

import cl

with cl.open() as neurons:
    # Create a stim plan
    stim_plan = neurons.create_stim_plan()
def stim( self, channel_set: ChannelSet | int, stim_design: StimDesign | float, /, burst_design: BurstDesign | None = None, lead_time_us: int = 80) -> None:

Enqueues the same operation as Neurons.stim() onto this StimPlan.

Arguments:
  • channel_set : A ChannelSet object with one or more channels, or a single channel to stimulate.
  • stim_design : A StimDesign object or a scalar current in microamperes. Use of a StimDesign is preferred. A scalar current is the equivalent of a symmetric biphasic, negative-first pulse with a pulse width of 160 microseconds, i.e., StimDesign(160, -value, 160, value).
  • burst_design: An optional BurstDesign object specifying the burst count and frequency. If unspecified, a single pulse will be delivered.
  • lead_time_us: The lead time in microseconds before the stimulation starts.
Constraints:
  • The minimum lead_time_us is 80.
  • lead_time_us must be evenly divisible by 40.

For example:

import cl
from cl import ChannelSet, StimDesign, BurstDesign

# Predefine stimulation parameters
channel_set_1 = ChannelSet(1, 2, 3)
channel_set_2 = ChannelSet(8, 9, 10)
stim_design   = StimDesign(160, -1.0, 160, 1.0)
burst_design  = BurstDesign(5, 100)

with cl.open() as neurons:
    stim_plan = neurons.create_stim_plan()

    # Queue a burst stimulation on channel_set_1
    stim_plan.stim(channel_set_1, stim_design, burst_design)

    # Queue a single stimulation on channel_set_2 with 0.5 µA current
    stim_plan.stim(channel_set_2, 0.5)  # Using scalar current

    # Optionally interrupt these channels when running this plan to ensure they are free
    stim_plan.channels_to_interrupt = channel_set_1 | channel_set_2

    # Finalise and run the stim plan
    # The above stimulations will run concurrently, due to no overlapping channels
    stim_plan.run()
def sync(self, channel_set: ChannelSet, /) -> None:

Enqueues the same operation as Neurons.sync() onto this StimPlan.

channels_to_interrupt: ChannelSet | None

Allows specification of channels to interrupt when this plan is run.

For example:

# Set multiple channels with a ChannelSet object
stim_plan.channels_to_interrupt = ChannelSet(8, 9, 10)
# Set a single channel with an integer.
stim_plan.channels_to_interrupt = 8
# Clear previously set channels.
stim_plan.channels_to_interrupt = None
def run(self, at_timestamp: int | None = None) -> None:

Execute the queued operations in the StimPlan. After this method is called, the StimPlan is frozen and cannot be modified.

If StimPlan.channels_to_interrupt has been set, interrupt will be called on the specified channels before executing enqueued commands.

Arguments:
  • at_timestamp: Optionally, execute this StimPlan at a specified timestamp. StimPlan will run immediately if timestamp is in the past.
class Loop:

Iterator that yields a LoopTick. (Relates to LoopTick and Neurons.loop().)

This is made available through the Neurons.loop() interface. Do not create instances of Loop directly.

start_timestamp: int

Return the timestamp of the first loop iteration.

duration_ticks: int

Return the current duration of the loop, in ticks.

duration_frames: int

Return the current duration of the loop, in frames.

frames_per_tick: int

Return the number of frames in each tick

def approximate_duration_seconds(self) -> float:

Return an approximate duration of the closed loop in seconds.

def __iter__(self) -> Generator[LoopTick]:

For each tick, yield a LoopTick object containing spikes, stims and frames collected during the previous iteration.

def run( self, loop_body_callback: Callable[[LoopTick], None]) -> None:

Run the closed loop, calling loop_body_callback for each tick.

The callback is passed a LoopTick object containing detected spikes and other relevant information. The loop body can stop the loop by calling LoopTick.loop.stop().

For example:

TICKS_PER_SECOND = 2

def loop_body_callback(tick: LoopTick):
    # Do something ...
    tick.loop.stop()

with cl.open() as neurons:
    loop = neurons.loop(TICKS_PER_SECOND)
    loop.run(loop_body_callback)
def stop(self) -> None:

Stop the Loop iterations.

Typically called via LoopTick.loop in a loop body in cases where a simple break is not convenient, such as when using the Loop.run() syntax.

def recover_from_jitter( self, handle_recovery_tick: Callable[[LoopTick], None] | None = None, timeout_seconds: float = 5.0) -> None:

Call to enable jitter recovery for potentially long running operations within a Loop iteration that otherwise might trigger a TimeoutError.

This effectively skips execution of code in the loop body until iterations catches up to the expected iteration in realtime. Data in the skipped iterations can be accessed through the handle_recovery_tick callback.

Arguments:
  • handle_recovery_tick: Optional callback function that accepts a LoopTick as the only argument.
  • timeout_seconds: Number of seconds to allow for the recovery, defaults to 5 seconds if None.

Constraints:

  • Users need to be careful that the handle_recovery_tick callback does not take too long otherwise the loop will never catch up.
  • TimeoutError will be raised if loop has not caught up within timeout_seconds.

In the following example, the loop from iteration 2 will keep reading data but not yield any ticks until the loop catches up in iteration 7.

TICKS_PER_SECOND = 100
STOP_AFTER_TICKS = 10

def handle_recovery_tick(tick: LoopTick):
    # Optionally do something with tick data during recovery
    ...

with cl.open() as neurons:
    for tick in neurons.loop(TICKS_PER_SECOND, stop_after_ticks=STOP_AFTER_TICKS):
        print(f"{tick.iteration}=")
        if (tick.iteration == 1):
            tick.loop.recover_from_jitter(handle_recovery_ticks)
            time.sleep(0.05)

# Expected output:
# tick.iteration=0
# tick.iteration=1
# tick.iteration=7
# tick.iteration=8
# tick.iteration=9
class LoopTick:

Contains spikes, stims and frames collected during a loop iteration. (Relates to Neurons.loop() and Loop.)

The tick object itself is only valid for the duration of the loop iteration. If you need to keep a reference to instance variables (such as analysis) beyond the end of the loop body, copy them to another variable.

This is accessible via Neurons.loop() and yielded by the Loop iterator. Do not create instances of LoopTick directly.

loop: Loop

A reference to the running Loop.

iteration: int

Iteration count of this LoopTick within the Loop.

iteration_timestamp: int

Timestamp of the loop body. This is equivalent to DetectionResult.stop_timestamp.

iteration_next_timestamp: int

Timestamp of the next loop body. This is equivalent to LoopTick.iteration_timestamp + len(LoopTick.frames).

analysis: DetectionResult

Contains the spikes and stims analysis of the frames read during the tick.

frames: numpy.ndarray[tuple[int, int], numpy.dtype[numpy.int16]]

The frames read during the tick period, with shape (duration_frames, channel_count).

class Recording:

Handles recording functionality by the CL1 system. This is returned when calling Neurons.record(). Do not create instances of Recording directly.

Note that:

  • In the Simulator, recording data is captured by a background subprocess that independently reads from the shared data buffer.
  • Each recording maintains its own read cursor, so multiple recordings can run concurrently without interfering with each other or with neurons.read() calls.
attributes: RecordingView.AttributesDict

Attributes that will be written to the recording file and available at Recording.file.root._v_attrs if using the raw PyTables interaface. See RecordingView.attributes for details.

Note that:

  • Simulator recordings can be identified by file_format.version == "SDK".
  • The following attributes are included in the Simulator recording for completeness, but the values are empty: git_hash, git_branch, git_tags, and git_status.
file: dict[str, str]

dict containing information relating to the recording file.

Keys:

name: Recording file name. path: Absolute path to the recording file. uri_path: URL encoded file path.

start_timestamp: int

Timestamp of the first frame.

status: Literal['started', 'stopped']

Indicates the recording status.

def open(self):

Return a RecordingView of the recoding file.

Constraints:

  • This can only be performed after the recording has stopped.
def set_attribute(self, key: str, value: Any):

Set a single application attribute on the recording. The application attribute refers to the attribute dictionary passed to Neurons.record(attributes).

Arguments:
  • key: Attribute key.
  • value: Attribute value.

Constraints:

  • This can only be performed before the recording is stopped.
def update_attributes(self, attributes: dict[str, typing.Any]):

Update multiple application attributes on the recording. The application attribute refers to the attribute dictionary originally passed to Neurons.record(attributes).

Arguments:
  • attributes: dict containing attribute keys and values to be updated.

Constraints:

  • This can only be performed before the recording is stopped.
def stop(self):

Stop the recording, if not already stopped.

def has_stopped(self):

Return True if the recording has stopped.

In process mode this also detects auto-stop (via stop_after_seconds or stop_after_frames) and finalises the main-thread state.

def wait_until_stopped(self):

Wait until the recording has stopped.

Raises RuntimeError if the recording was not scheduled to stop automatically.

class DataStream:

Manages a named stream of (timestamp, serialised_data) for recordings and visualisation. This is created using Neurons.create_data_stream(). Do not create instances of DataStream directly.

See RecordingView.data_streams for how to use data streams saved in a recording.

name: str

Name of this data stream.

def append(self, timestamp: int, data: Any):

Append a new data point to the stream.

Arguments:
  • timestamp: Timestamp that marks this data point.
  • data: Any type of serialisable data.

Constraints:

  • New data must have timestamp greater than existing in the data stream, otherwise a RuntimeError will be raised.
def set_attribute(self, key: str, value: Any):

Set a single attribute on the data stream. The attribute refers to the attribute dictionary passed to Neurons.create_data_stream(attributes).

Arguments:
  • key: Attribute key.
  • value: Attribute value.
def update_attributes(self, attributes: dict[str, typing.Any]):

Update multiple attributes on the data stream. The attribute refers to the attribute dictionary passed to Neurons.create_data_stream(attributes).

Arguments:
  • attributes: dict containing attribute keys and values to be updated.
class RecordingView:

Recording files are standard HDF5 files and can be opened with any HDF5 viewer or library. A RecordingView provides a more convenient way to access the data, providing ready access for:

Full access to the underlying HDF5 file (via the PyTables library) is provided through the file attribute. This allows access to the full range of PyTables functionality if needed.

For example:

from cl import RecordingView

file_path = "/path/to/recording.h5"
with RecordingView(file_path) as recording:
    # Do something ...
    ...
RecordingView(file_path: str)

Constructor for RecordingView.

Arguments:
  • file_path: Path to the recording (.h5) file to be opened.
file: tables.file.File

The underlying PyTables file.

attributes: RecordingView.AttributesDict

The file / root level attributes accessible as a dictionary.

Attribute Type Description
application dict[str, Any] Application attributes as a user provided dict from the attributes parameter when creating a recording.
hostname str Hostname of the CL1 system, managed through the CL1 dashboard.
project_id str Unique identifier for the undergoing project, managed through the CL1 dashboard.
cell_batch_id str Unique identifier of the cell batch, managed through the CL1 dashboard.
created_localtime str When the recording is created in ISO format in the local timezone.
created_utc str When the recording is created in ISO format in UTC timezone.
ended_localtime str When the recording ended in ISO format in the local timezone.
ended_utc str When the recording ended in ISO format in UTC timezone.
git_hash str Metadata relating to the software version.
git_branch str Metadata relating to the software version.
git_tags str Metadata relating to the software version.
git_status str Metadata relating to the software version.
channel_count int Number of channels.
sampling_frequency int Sampling frequency in Hz, same as frames_per_second (Note 2).
frames_per_second int Number of frames per second in Hz, same as sampling_frequency (Note 2).
uV_per_sample_unit float Multiply the recording sample values by this constant to obtain sample values as microvolts (uV).
duration_frames int Duration of this recording in frames.
duration_seconds float Duration of this recording in seconds.
start_timestamp int Timestamp of the first frame.
end_timestamp int Timestamp of the last frame.
file_format dict See below (Note 1).

Notes:

  1. The file_format attribute contains information relating to the format of the recording as a dict. This contains two attributes being version and stim_and_spike_timestamps_relative_to_start. The latter, when True, indicates that the timestamps included for stims and spikes are relative to the start_timestamp of the recording.
  2. Both frames_per_second and sampling_frequency give the same value. Use of sampling_frequency is discouraged as this attribute is flagged for deprecation and may be removed in a future version.

For example:

recording     = RecordingView(file_path)
channel_count = recording.attributes["channel_count"]
samples: tables.earray.EArray | None = None

Recorded raw samples (frames) as an int16 array with shape (duration_frames, channel_count). Sample values can be converted to microvolts (uV) by multiplying with the uV_per_sample_unit attribute. This is None if a recording is created with Neurons.record(include_raw_samples=False).

For example:

recording = RecordingView(file_path)

# Get a slice of the first 1000 samples from channel 8, 9, 10 and convert to uV
samples    = recording.samples[:1000, 8:11] # shape (1000, 3), dtype int16
samples_uV = samples * recording.attributes["uV_per_sample_unit"] # shape (1000, 3), dtype float64
spikes: tables.table.Table | None = None

Recorded detected spikes as a Table with columns channel, timestamp and samples. This is None if a recording is created with Neurons.record(include_spikes=False).

Spike timestamps are relative to the start of the recording by default and can be checked with the attribute stim_and_spike_timestamps_relative_to_start.

For example:

recording = RecordingView(file_path)

# Get a count of total spikes
spike_count = len(recording.spikes)

# Iterate through all spikes in the recording
for spike in recording.spikes:
    # spike["channel"],  equivalent to `Spike.channel`
    # spike["timestamp], equivalent to `Spike.timestamp`
    # spike["samples],   equivalent to `Spike.samples`

# Get spikes from channels 8, 9, occurring within the the first 1000 frames
spike_indices = recording.spikes.get_where_list("((channel==8) | (channel==9)) & ((timestamp >= 0) & (timestamp < 1000))")
spikes        = recording.spikes[spike_indices]
stims: tables.table.Table | None = None

Recorded stimulation events as a Table with columns channel and timestamp. This is None if a recording is created with Neurons.record(include_stims=False).

Stim timestamps are relative to the start of the recording by default and can be checked with the attribute stim_and_spike_timestamps_relative_to_start.

For example:

recording = RecordingView(file_path)

# Get a count of total stims
stim_count = len(recording.stims)

# Iterate through all stims in the recording
for stim in recording.stims:
    # stim["channel"],  equivalent to `Stim.channel`
    # stim["timestamp], equivalent to `Stim.timestamp`

# Get stims from channels 8, 9, occurring within the the first 1000 frames
stim_indices = recording.stims.get_where_list("((channel==8) | (channel==9)) & ((timestamp >= 0) & (timestamp < 1000))")
stims        = recording.stims[stim_indices]
data_streams: RecordingView.DataStreamCollection | None = None

Recorded data streams with a dictionary like interface. This is None if a recording is created with Neurons.record(include_data_streams=False).

Available data streams can be accessed as follows:

recording = RecordingView(file_path)

# Print a list of available data streams
print(recording.data_streams)

# Get available data stream names as a list
data_stream_names = list(recording.data_streams.keys())

Data within each named data stream can be accessed like a dictionary, where the keys are timestamps and values contain data, for example:

my_data_stream = recording.data_streams["my_data_stream"]
for timestamp, data in my_data_stream.items():
    print(timestamp, data)

It is also possible to get data for using timestamps within a data stream:

my_data_stream = recording.data_streams["my_data_stream"]

# Single timestamp
data = my_data_stream[timestamp]

# Range of timestamps
data_list = my_data_stream[start_timestamp : end_timestamp]
def close(self, safe_mode: bool = False):

Close the underlying PyTables file.

Arguments:
  • safe_mode: When True, suppress any exception raised while closing. Intended for best-effort cleanup paths (finalizers, interpreter shutdown, error rollback) where the caller cannot meaningfully react to a failure.
def analysis_timestamp_limit( self, minimum_timestamp: int | None = None, maximum_timestamp: int | None = None):

Sets the timestamp range for all analysis function. When None, minimum timestamp will be 0 and maximum timestamp will be the maximum timestamp contained in the recording.

Arguments:
  • minimum_timestamp: Minimum timestamp (cannot be lower than 0).
  • maximum_timestamp: Maximum timestamp (cannot be greater than attributes["duration_frames"]).
def analyse_firing_stats( self, bin_size_sec: float = 1.0) -> cl.analysis.AnalysisResultFiringStats:

Compute firing statistics efficiently for a neural recording by binning spike activity into fixed-width time bins..

Arguments:
  • bin_size_sec: Size of each time bin in seconds used to aggregate spike counts. When this is set to 1.0 second, the results can be interpreted as Hertz.
Returns:

cl.analysis.AnalysisResultFiringStats: Returns per-channel and population-level firing statistics computed from binned spike activity, including firing counts, firing rates, inter-spike interval (ISI) distributions, and summary statistics across channels. The result provides both channel-wise measures and aggregated culture-level metrics for baseline activity characterisation.

def analyse_network_bursts( self, bin_size_sec: float, onset_freq_hz: float, offset_freq_hz: float, min_active_channels: int | None = None) -> cl.analysis.AnalysisResultNetworkBursts:

Detects network-level bursts from spike data using a spike rate thresholding method. This function identifies periods of high-frequency, synchronised spiking activity across multiple channels, classifying them as network bursts.

Arguments:
  • bin_size_sec: Size of each time bin in seconds.
  • onset_freq_hz: Per channel spike rate in Hz to mark a burst onset.
  • offset_freq_hz: Per channel spike rate in Hz to mark a burst offset.
  • min_active_channels: Scalar constant to apply to the onset / offset frequencies.
Returns:

cl.analysis.AnalysisResultNetworkBursts: Returns detected network-level burst events based on population spike-rate thresholding, including burst count, durations, and spike counts, along with the underlying binned firing rates used for detection. The result also stores burst timing information and per-bin activity.

def analyse_criticality( self, bin_size_sec: float, percentile_threshold: float, max_lags_branching_ratio: int = 40, duration_thresholds: tuple[int, int] = (2, 5), min_spike_count_threshold: int = 10, n_bootstraps: int = 100, random_seed: int = 42) -> cl.analysis.AnalysisResultCriticality:

Detects neuronal avalanches and computes criticality-related metrics such as avalanche size distributions, duration distributions, power-law statistics, deviation from criticality coefficient (DCC), shape collapse error, and branching ratio.

To find avalanches:

  1. Compute the total network activity by summing spike counts across channels for each time bin.
  2. Define a threshold based on the provided percentile.
  3. Identify avalanches where network activity exceed the threshold in consecutive time bins.
  4. Calculates spike counts for each avalanche as well as durations in number of time bins.
Arguments:
  • bin_size_sec: Size of each time bin in seconds.
  • percentile_threshold: A percentile value (0 to 1) used to calculate a threshold for detecting bursts. If percentile > 0, the threshold is determined as the percentile of the summed network activity. If percentile == 0, the threshold is set to 0.
  • max_lags_branching_ratio: Maximum number of time lags to consider for slope estimation.
  • duration_thresholds: Thresholds (min, max) for avalanche durations (number of frame_bins). Recommend values between (3-6).
  • min_spike_count_threshold: Minimum threshold for spike counts in avalanches for calculating the size exponent. Recommend values between (8-20).
  • n_boostraps: Number of random resampling iterations used to estimate the variability of the beta exponent.
Returns:

cl.analysis.AnalysisResultCriticality: Returns a comprehensive set of avalanche and criticality summaries derived from thresholded network activity, including avalanche sizes and durations, inter-avalanche intervals, power-law fit exponents and KS statistics, scaling-relation / shape-collapse measures (e.g., DCC and collapse error), and a branching-ratio estimate. All intermediate arrays used to compute these metrics (e.g., per-avalanche binned spike profiles and fitted parameter traces) are included in the result object for inspection and downstream re-analysis.

def analyse_information_entropy( self, bin_size_sec: float, fillna: float | None = 0.0, log_base: float | None = None) -> cl.analysis.AnalysisResultInformationEntropy:

Computes per-bin Bernoulli entropy of the fraction of channels that have >=1 spike in the bin.

Arguments:
  • bin_size_sec: Size of each time bin in seconds.
  • fillna: Value to fill for NaN (defaults to 0.).
  • log_base: Entropy units where None uses natural log (nats), 2 uses log2 (bits)
Returns:

cl.analysis.AnalysisResultInformationEntropy: Returns per-bin Bernoulli entropy computed from the fraction of active channels in each time bin, providing a population-level measure of activity variability over time. Entropy can be expressed in nats or bits.

def analyse_lempel_ziv_complexity( self, bin_size_sec: float, min_bin_count: int = 2, normalise: bool = True, use_binary: bool = True) -> cl.analysis.AnalysisResultComplexityLempelZiv:

Computes Lempel–Ziv complexity (LZ78) for each channel’s binned spike activity, measuring temporal complexity and structure.

For each channel, a symbol sequence is formed per time bin:

  • use_binary=True: binary sequence (spike present/absent per bin).
  • use_binary=False: integer spike-count sequence.

Complexity is the number of phrases added by an LZ78 dictionary-building procedure. If normalise=True, scores are length-normalised using c_norm = c(n) * log_k(n) / n, where n is the number of bins and k is the alphabet size (k=2 for binary).

Arguments:
  • bin_size_sec: Time-bin size in seconds.
  • min_bin_count: Minimum number of bins required; below this threshold complexity is set to zero.
  • normalise: Whether to return the length-normalised score.
  • use_binary: Whether to binarise spike counts before computing complexity or use raw spike counts.
Returns:

cl.analysis.AnalysisResultComplexityLempelZiv: Returns per-channel Lempel–Ziv (LZ78) complexity scores computed from binned spike activity, using either binary or count-based symbol sequences. Scores can be returned raw or length-normalised based on the sequence length and alphabet size, providing a measure of temporal complexity for each channel.

def analyse_dct_features( self, k: int) -> cl.analysis.AnalysisResultDctFeatures:

Calculates the Discrete Cosine Transform (DCT) features based on channel spike counts.

Arguments:
  • k: The frequency index (coefficient index) of the DCT.
Returns:

cl.analysis.AnalysisResultDctFeatures: Returns spatial Discrete Cosine Transform (DCT) features computed from per-channel spike counts arranged in the common MEA layout, capturing low-frequency spatial structure in neural activity. The result includes the DCT basis coefficients for both MEA dimensions and a dictionary of DCT feature values indexed by spatial frequency components.

def analyse_spike_triggered_histogram( self, bin_size_sec: float, start_sec: float, end_sec: float, num_channels: int, min_firing_rate_threshold_hz: float = 0.1) -> cl.analysis.AnalysisResultSpikeTriggeredHistogram:

Generates spike-triggered histograms using the most active channels as triggers, quantifying population responses around trigger spikes.

Arguments:
  • bin_size_sec: Bin size in seconds for the histogram.
  • start_sec: Time in seconds to include before the trigger spike.
  • end_sec: Time in seconds to include after the trigger spike.
  • num_channels: How many of the most active channels to use as triggers.
  • min_firing_rate_threshold_hz: Threshold (in Hz) for minimum firing rate. Only channels with firing rates above this value are considered.
Returns:

cl.analysis.AnalysisResultSpikeTriggeredHistogram: Contains spike-triggered histograms computed for ordered pairs of the most active channels, where each histogram represents the timing of target-channel spikes relative to trigger-channel spikes within a fixed temporal window. The result includes the common time bins and a dictionary mapping channel pairs to their corresponding histograms.

def analyse_functional_connectivity( self, bin_size_sec: float, correlation_threshold: float = 0.6) -> cl.analysis.AnalysisResultsFunctionalConnectivity:

Compute functional connectivity (based on Pearson correlation) and summary graph metrics from spike data.

Arguments:
  • bin_size_sec: Size of each time bin in seconds.
  • correlation_threshold: Absolute correlation threshold in [0, 1]. Only connections where Pearson correlation >= correlation_threshold are kept as graph edges. Use 0.0 to keep the full weighted correlation matrix.
Returns:

cl.analysis.AnalysisResultsFunctionalConnectivity: Returns a weighted functional connectivity matrix computed using Pearson correlation between binned channel spike counts, along with basic graph-level metrics including total and average edge weights, clustering coefficient, Louvain community structure, modularity index, and maximum betweenness centrality. These network metrics are provided as baseline summaries using default parameters; users requiring fine-tuned or domain-specific network analysis are encouraged to directly use the returned connectivity matrix to recompute graph metrics with custom methods and parameter settings.

@staticmethod
def plot_spike( spike: tuple[int, int, numpy.ndarray[tuple[int], numpy.dtype[numpy.float64]]] | Spike, figsize: tuple[int, int] = (6, 2), title: str | None = None, save_path: str | None = None, ax: matplotlib.axes._axes.Axes | None = None):

Creates a plot of a single spike.

Arguments:
  • spike: Either a Spike object or a tuple of (timestamp, channel, samples).
  • figsize: Size of the plot figure.
  • title: Title for the plot, if not provided, a default will be used.
  • save_path: Path to the save the plot instead of showing it.
  • ax: Axes draw the plots. (Defaults to None).
Return:

Axes: Axes on which the plot was drawn.

For example:

from cl import RecordingView
recording = RecordingView(file_path)

# Plot the largest N spikes
N = 5
largest_spikes = sorted(
    recording.spikes[:],
    key     = lambda spike: max(spike["samples"]) - min(spike["samples"]),
    reverse = True
    )[:N]

for spike in largest_spikes:
    recording.plot_spike(spike)
def plot_spikes_and_stims( self, figsize: tuple[int, int] = (12, 8), title: str | None = None, save_path: str | None = None, limit_to_time_range_secs: tuple[float, float] | None = None, limit_to_channels: list[int] | None = None):

Creates a raster plot of spikes and stims in the recording.

Arguments:
  • figsize: Size of the plot figure.
  • title: Title for the plot, if not provided, a default will be used.
  • save_path: Path to the save the plot instead of showing it.
  • limit_to_time_range_secs: If provided, limit the time axis as a tuple of (start_time_secs, end_time_secs).
  • limit_to_channels: If provided, limit the number of channels if provided.
class ControlRequestError(builtins.RuntimeError):

Raised when an attempt to take control is rejected. This implies that another process has control.

class ControlRequiredError(builtins.RuntimeError):

Raised when a method is called that requires control, but control has not been taken.

class TransactionRejected(builtins.RuntimeError):

Raised when a stimulation plan is rejected by the system.

class ChannelQueueFull(cl.TransactionRejected):

Raised when a stimulation plan is rejected because a channel has too many operations queued.

class SyncLimitExceeded(cl.TransactionRejected):

Raised when a stimulation plan is rejected because too many sync operations are in flight.

class RunTimestampOrderError(cl.TransactionRejected):

Raised when a stimulation plan is rejected because the supplied timestamp to run at is not newer than an already queued plan.

class DeferredInterruptLimitExceeded(cl.TransactionRejected):

Raised when a stimulation plan is rejected because too many deferred channel interruptions are in flight.

class RecordingFailedError(builtins.RuntimeError):

Raised when a recording unexpectedly fails.

class WsApiError(builtins.RuntimeError):

Raised when an error occurs in the generic WebSocket API client API.

class UnsafeOperationError(builtins.Exception):

Raised when opening a recording containing potentially unsafe objects.