cl
Open a connection to the device, optionally take and retain control, and attempt to start it if necessary. The device will not be stopped automatically. To minimise latency, Python garbage collection is disabled while connection is open.
This is the preferred entry point for the CL API. Do not use cl.Neurons directly.
Arguments:
- take_control: Take control of the device. Will raise a
ControlRequestErrorif start is required and another process has control of the device. - wait_until_recordable: Wait (block) until the recording system is ready.
For example:
import cl
with cl.open() as neurons:
# Your code here
...
Gets the system attributes that are included in each recording as a dictionary. This has the following structure:
{
'project_id' : str,
'chip_id' : str,
'cell_batch_id': str,
'plugin' : dict[str, Any], # plugin-specific attributes, with the top level keys being the plugin names
'system_id' : str, # a unique identifier for the system, e.g. "cl1-0123-456"
'hostname' : str, # the hostname of the system
}
Returns True if running in the simulator environment, False if running on a real device.
The Neurons class provides the main interface with the CL1 hardware. This should
always be accessed via the cl.open() context manager and should not be
used in isolation. This functionality includes:
- Perform
stim()andcreate_stim_plan(), - Access to device information such as
timestamp(), - Create a tightly timed
loop()to detect spikes and execute code, record()data to file,read()data from the MEA,- and more.
If you are using the Simulator:
- This simulates the behaviour of the CL API by either generating
random data (default) or replaying data from a H5 recording (replay_file). The
recording to use is controlled by the
CL_SDK_REPLAY_PATHenvironment variable, which can be set by a.envfile. - This operates on wall-clock time by default to maintain parity with the CL1 device. For
advanced users, it is possible to switch to accelerated mode by setting the environment
variable
CL_SDK_ACCELERATED_TIME=1. - The starting position of the replay recording will be randomised every time
cl.open()is called. This can be overriden by settingCL_SDK_REPLAY_START_OFFSET, where a value of0indicates the first frame of the recording.
Stimulate one or more channels.
Arguments:
- channels : A
ChannelSetobject with one or more channels, or a single channel to stimulate. - stim_design : A
StimDesignobject or a scalar current in microamperes. Use of aStimDesignis preferred. A scalar current is the equivalent of a symmetric biphasic, negative-first pulse with a pulse width of160microseconds, i.e.,StimDesign(160, -value, 160, value). - burst_design: An optional
BurstDesignobject specifying the burst count and frequency. If unspecified, a single pulse will be delivered. - lead_time_us: The lead time in microseconds before the stimulation starts.
Constraints:
- The minimum
lead_time_usis80.lead_time_usmust be evenly divisible by40.
For example:
import cl
from cl import ChannelSet, StimDesign, BurstDesign
with cl.open() as neurons:
# Deliver a single biphasic stim with current of 1.0 uA, pulse width
# of 160 us and negative leading edge on channels 8, 9, 10
channel_set = ChannelSet(8, 9, 10)
stim_design = StimDesign(160, -1.0, 160, 1.0)
neurons.stim(channel_set, stim_design)
# Deliver the same stim as a burst of 10 at 40 Hz
burst_design = BurstDesign(10, 40)
neurons.stim(channel_set, stim_design, burst_design)
Interrupt existing and clear any pending stimulation for the specified channels.
Arguments:
- channels: A
ChannelSetobject with one or more channels, or a single channel to interrupt.
Interrupt existing and cancel queued stimulation, then send a stim burst. This is equivalent to
calling interrupt() followed by stim(), on the same set of channels.
Constraints:
- The minimum
lead_time_usis80.lead_time_usmust be evenly divisible by40.
Arguments:
- channel_set: A
ChannelSetobject with one or more channels, or a single channel to stimulate. - stim_design: A
StimDesignobject or a floating point current in microamperes. - burst_design: A
BurstDesignobject specifying the burst count and frequency. - lead_time_us: The lead time in microseconds before the stimulation starts.
Cause all channel_set channels to wait until all are ready to continue together.
The sync operation allows you ensure that subsequent operations on different channels begin at the same time - after all previously queued operations on those channels have completed.
Arguments:
- channel_set: One or more channels to sync.
For example:
with cl.open() as neurons:
stim_design = StimDesign(160, -1.0, 160, 1.0)
channel_set_1 = ChannelSet(8)
burst_design_1 = BurstDesign(2, 100) # Interval of 250 frames
channel_set_2 = ChannelSet(10)
burst_design_2 = BurstDesign(2, 20) # Interval of 1250 frames
group_1_stims = []
for tick in neurons.loop(ticks_per_second=10, stop_after_ticks=11):
if tick.iteration == 0:
# Group 1
neurons.stim(channel_set_1, stim_design, burst_design_1)
neurons.stim(channel_set_2, stim_design, burst_design_2)
# Group 2
neurons.sync(channel_set_1 | channel_set_2)
neurons.stim(channel_set_1, stim_design)
for stim in tick.analysis.stims:
if stim.channel == 8:
group_1_stims.append(stim.timestamp)
group_gap = group_1_stims[-1] - group_1_stims[0]
# Group gap is expected to be > 1250 being the interval of the slowest frequency
Create a new StimPlan object to build a stimulation plan.
Stim plans which are reusable stimulation instructions that can be created
at the beginning of an application to run on demand and contain the same
stimulation interface, such as StimPlan.stim(), etc.
For example:
import cl
from cl import ChannelSet, StimDesign, BurstDesign
with cl.open() as neurons:
# Create a stim plan with a single biphasic stim with current of
# 1.0 uA, pulse width of 160 us and negative leading edge on
# two sets of channels
my_stim_plan = neurons.create_stim_plan()
channel_set_1 = ChannelSet(8, 9)
channel_set_2 = ChannelSet(10, 11)
stim_design = StimDesign(160, -1.0, 160, 1.0)
my_stim_plan.stim(channel_set_1, stim_design)
my_stim_plan.stim(channel_set_2, stim_design)
# ... Do something else
# Execute the stim plan at any stage of your script
my_stim_plan.run()
Periodically detect spikes and execute code. (Relates to Loop and LoopTick.)
Intended for use as an iterator:
TICKS_PER_SECOND = 100
with cl.open() as neurons:
for tick in neurons.loop(TICKS_PER_SECOND):
# tick is a `LoopTick` object
# tick.iteration is the count of this tick within the loop
# tick.iteration_timestamp is the timestamp of the loop body
# tick.frames is a numpy array of processed electrode samples
# tick.analysis.spikes is a list of any detected spikes
# tick.analysis.stims is a list of any stimulation
# tick.loop is the running loop object
Or by passing a callback to Loop.run():
TICKS_PER_SECOND = 100
def handle_tick(tick: LoopTick):
# Do something ...
# When ready to stop ...
tick.loop.stop()
neurons.loop(TICKS_PER_SECOND).run(handle_tick)
Jitter
As Loop is intended for realtime operation, by default it will raise a
TimeoutError if the loop body does not finish before data beyond the next
tick is available.
This can be relaxed by setting jitter_tolerance_frames to a non-zero value,
or ignored entirely by setting ignore_jitter to True. We do not recommend
the general use of these parameters to handle jitter. Instead consider
explicit jitter recovery with Loop.recover_from_jitter().
Otherwise, the loop will continue indefinitely unless stop_after_seconds
or stop_after_ticks is passed at loop creation time, LoopTick.loop.stop()
is called during the tick, or a break statement is used to exit the for loop.
Timestamps
Since Loop operates in realtime, there are a few key considerations if
precise timing is desired. This can be very important for executing synchronised
stims and event logging.
- Data accessible during each loop tick via
tick.analysis(which is a type ofDetectionResult) is collected in the previous tick and is bounded byDetectionResult.start_timestampandDetectionResult.stop_timestamp. - System timestamp when entering the loop body is accessible by
LoopTick.iteration_timestamp, and is equivalent to the end of the data collection period. (i.e.LoopTick.iteration_timestamp == DetectionResult.stop_timestamp.)
import cl
from cl import ChanelSet, StimDesign
with cl.open() as neurons:
stim_plan_A = neurons.create_stim_plan()
stim_plan_A.stim(ChannelSet(8, 9), StimDesign(160, -1.0, 160, 1.0))
stim_plan_B = neurons.create_stim_plan()
stim_plan_B.stim(ChannelSet(16, 17), StimDesign(160, -1.0, 160, 1.0))
data_stream = neurons.create_data_stream("stim_events")
for tick in neurons.loop(ticks_per_second=10, stop_after_seconds=2):
# The system timestamp will be slightly later than the
# starting timestamp of the current loop body
assert neurons.timestamp() >= tick.iteration_timestamp
# Stim plans executed at the tick.iteration_timestamp will be
# executed as soon as possible, as it is slightly in the past
# and is not guaranteed to be at the same time
stim_plan_A.run(at_timestamp=iteration_timestamp)
stim_plan_B.run(at_timestamp=iteration_timestamp)
# ... and will be equivalent to
stim_plan_A.run()
stim_plan_B.run()
# Users seeking to execute synchronised stims could
# take advantage of tick.iteration_next_timestamp
stim_plan_A.run(at_timestamp=tick.iteration_next_timestamp)
stim_plan_B.run(at_timestamp=tick.iteration_next_timestamp)
# Using tick.iteration_next_timestamp is also helpful to ensure
# that stim events are correctly aligned when logging events
data_stream.append(tick.iteration_next_timestamp, "Stim Happened!")
Arguments:
- ticks_per_second: How often the loop should return a result.
- stop_after_seconds: How long to run the closed loop for in seconds.
(default:
None, i.e. loop indefinitely) - stop_after_ticks: How long to run the closed loop for in number of ticks.
(default:
None, i.e. loop indefinitely) - ignore_jitter: If True, the loop will not raise a
TimeoutError. - jitter_tolerance_frames: How far the loop can fall behind (in frames)
before it raises a
TimeoutError.
Constraints:
ticks_per_secondmust not exceed the system sampling rate of 25,000 Hz.
Start a new HDF5 recording.
Arguments:
- file_suffix: The suffix to append to the filename, before the
.h5extension. - file_location: An absolute path to the directory where the file should be saved, or relative path (relative to the default recording location).
- from_seconds_ago: The number of seconds ago to start recording from, if possible.
- from_frames_ago: The number of frames ago to start recording from, if possible.
- from_timestamp: The timestamp to start recording from, if possible.
- stop_after_seconds: The number of seconds to record for.
- stop_after_frames: The number of frames to record.
- attributes: A dictionary of attributes to add to the recording.
- include_spikes: Whether to include detected spikes in the recording.
- include_stims: Whether to include stimulation events in the recording.
- include_raw_samples: Whether to include frames of raw samples in the recording.
- include_data_streams: Pass
Trueto record all data streams, False to record no data streams, or a list of specific data stream names to record. - exclude_data_streams: A list of application data streams to exclude from the recording.
Specific to the Simulator:
- Recording data is kept in system memory and only saved to disk when calling
close(). - Recording from the past using
from_*parameters are not used. - Recordings can be identified by the attribute
file_format.version == "SDK". - The following attributes are included in the Simulator recording for
completeness, but the values are empty:
git_hash,git_branch,git_tags, andgit_status.
Typical usage example:
with cl.open() as neurons:
recording = neurons.record()
# Your code here ...
recording.stop()
Example for stopping recording after a duration of time:
with cl.open() as neurons:
recording = neurons.record(stop_after_seconds=3)
recording.wait_until_stopped()
Publish a named stream of (timesamp, serialised_data) for recordings and visualisation.
See RecordingView.data_streams for how to use data streams saved in a recording.
Arguments:
- name: Datastream name.
- attributes: A dictionary of attributes to add to the datastream.
For example:
with cl.open() as neurons:
# Create a named data stream - by default, it will be added to any active or future recordings.
data_stream = neurons.create_data_stream(
name = 'example_data_stream',
attributes = { 'score': 0, 'another_attrbute': [0, 1, 2, 3] }
)
# Start a recording
recording = neurons.record(stop_after_seconds=1)
timestamp = neurons.timestamp()
# Add some data stream entries with unique, ascending timestamps:
data_stream.append(timestamp + 0, { 'arbitrary': 'data' })
data_stream.append(timestamp + 1, ['of', 'arbitrary', 'size'])
data_stream.append(timestamp + 2, 'and type.')
data_stream.append(timestamp + 3, numpy.array([2**64 - 1, 2**64 - 2, 2**64 - 3], dtype=numpy.uint64))
# Update a single attribute
data_stream.set_attribute('score', 1)
# Update multiple attributes at once
data_stream.update_attributes({ 'score': 2, 'new_attribute': 9.9 })
recording.wait_until_stopped()
Get the number of channels (electrodes) the device supports. A frame is a single sample from each channel.
Get the number of frames per second the device is configured to produce. A frame is a single sample from each channel.
Get the current timestamp of the device. The timestamp sequence resets when the device is restarted.
Read frame_count frames from the neurons, starting at from_timestamp
if supplied.
This method will block until the requested frames are available.
If from_timestamp is None, the current timestamp minus one will be
used, which ensures that a single frame read will return without
blocking.
Arguments:
- frame_count: Number of frames to return (at 25kHz).
- from_timestamp: Read from a specific timestamp (at 25kHz). If None, return from the current timestamp.
- analysis: When
True, returnDetectionResultinstead of raw frames.
Returns:
Frames as an array with shape (frame_count, channel_count) if
analysis=FalseorDetectionResultifanalysis=True.
Asynchronous version of read().
A Stim object is created for each stim delivered by the system.
This is accessible via LoopTick.analysis (which is a DetectionResult) when using Neurons.loop()
(see DetectionResult for more details). Do not create instances of Stim directly.
For example:
import cl
with cl.open() as neurons:
for tick in neurons.loop(ticks_per_second=100, stop_after_ticks=2):
if tick.iteration == 0:
# In the first iteration, perform a stim
neurons.stim(ChannelSet(8, 9), StimDesign(160, -1.0, 160, 1.0))
for stim in tick.analysis.stims: # Loops through each stim object in the current tick
print(stim) # Print out the stim object
A Spike object is created for each spike detected by the system.
This is accessible via LoopTick.analysis (which is a DetectionResult) when using Neurons.loop()
(see DetectionResult for more details). Do not create instances of Spike directly.
For example:
import cl
with cl.open() as neurons:
for tick in neurons.loop(ticks_per_second=100, stop_after_ticks=2):
for spike in tick.analysis.spikes: # Loops through each spike object in the current tick
print(spike) # Print out the spike object
The rolling mean value of the channel at the time of the spike.
In the Simulator, this is the mean of samples.
A DetectionResult that holds spikes and stims at a given timestamp.
This is accessible via LoopTick.analysis when using Neurons.loop().
Do not create instances of DetectionResult directly.
Timestamp of the first not analysed frame after DetectionResult.start_timestamp.
(i.e. DetectionResult.start_timestamp + len(LoopTick.frames).)
Stores a set of channels for stimulation.
Arguments:
- *channels: One or more channels as int provided as separate arguments or as a sequence of ints.
For example:
# Select channels 8, 9 and 10
ChannelSet(8, 9, 10)
Supports convenient manipulation of channels, such as:
print(ChannelSet(8, 9) | ChannelSet(9, 10)) # ChannelSet(8, 9, 10)
print(ChannelSet(8, 9) & ChannelSet(9, 10)) # ChannelSet(9)
print(ChannelSet(8, 9) ^ ChannelSet(9, 10)) # ChannelSet(8, 10)
print(~ChannelSet(8, 9)) # All channels except 8, 9
Stores the parameters of a stimulation burst.
Arguments:
- burst_count: Number of stims to perform within this burst.
- burst_hz : Frequency of stims within this burst.
Constraints:
burst_hzmust not exceed200Hz.
For example:
# Burst containing 10 stims operating at 150 Hz
BurstDesign(10, 150)
Stores the parameters of a mono, bi, or triphasic stim design by specifying 2, 4 or 6 pairs of arguments respectively.
Arguments:
- duration_us: Pulse width in microseconds (us).
- current_uA : Current in microampere (uA).
Constraints:
duration_usmust be positive and evenly divisible by20us.current_uAmust be less than or equal to3.0uA in absolute terms (i.e. range-3.0to3.0).- Total charge must not exceed
3.0nanocoulombs (nC).
For example:
# Monophasic stim with current of -1.0 uA, pulse width of 160 us.
StimDesign(160, -1.0)
# Biphasic stim with current of 1.0 uA, pulse width of 160 us and negative leading edge.
StimDesign(160, -1.0, 160, 1.0)
# Triphasic stim with current of 1.0 uA, pulse width of 160 us and negative leading edge.
StimDesign(160, -1.0, 160, 1.0, 160, -1.0)
Allows building and executing a sequence of stim operations that can be run on demand. The StimPlan
cannot be modified further after it has been run once. Stim plans are created with Neurons.create_stim_plan(),
do not create StimPlan instances directly.
A RuntimeError will be raised if any modification method is called after the first run().
For example:
import cl
with cl.open() as neurons:
# Create a stim plan
stim_plan = neurons.create_stim_plan()
Enqueues the same operation as Neurons.stim() onto this StimPlan.
Arguments:
- channel_set : A
ChannelSetobject with one or more channels, or a single channel to stimulate. - stim_design : A
StimDesignobject or a scalar current in microamperes. Use of aStimDesignis preferred. A scalar current is the equivalent of a symmetric biphasic, negative-first pulse with a pulse width of160microseconds, i.e.,StimDesign(160, -value, 160, value). - burst_design: An optional
BurstDesignobject specifying the burst count and frequency. If unspecified, a single pulse will be delivered. - lead_time_us: The lead time in microseconds before the stimulation starts.
Constraints:
- The minimum
lead_time_usis80.lead_time_usmust be evenly divisible by40.
For example:
import cl
from cl import ChannelSet, StimDesign, BurstDesign
# Predefine stimulation parameters
channel_set_1 = ChannelSet(1, 2, 3)
channel_set_2 = ChannelSet(8, 9, 10)
stim_design = StimDesign(160, -1.0, 160, 1.0)
burst_design = BurstDesign(5, 100)
with cl.open() as neurons:
stim_plan = neurons.create_stim_plan()
# Queue a burst stimulation on channel_set_1
stim_plan.stim(channel_set_1, stim_design, burst_design)
# Queue a single stimulation on channel_set_2 with 0.5 µA current
stim_plan.stim(channel_set_2, 0.5) # Using scalar current
# Optionally interrupt these channels when running this plan to ensure they are free
stim_plan.channels_to_interrupt = channel_set_1 | channel_set_2
# Finalise and run the stim plan
# The above stimulations will run concurrently, due to no overlapping channels
stim_plan.run()
Enqueues the same operation as Neurons.sync() onto this StimPlan.
Allows specification of channels to interrupt when this plan is run.
For example:
# Set multiple channels with a ChannelSet object
stim_plan.channels_to_interrupt = ChannelSet(8, 9, 10)
# Set a single channel with an integer.
stim_plan.channels_to_interrupt = 8
# Clear previously set channels.
stim_plan.channels_to_interrupt = None
Execute the queued operations in the StimPlan. After this method is called,
the StimPlan is frozen and cannot be modified.
If StimPlan.channels_to_interrupt has been set, interrupt will be called
on the specified channels before executing enqueued commands.
Arguments:
Iterator that yields a LoopTick. (Relates to LoopTick and Neurons.loop().)
This is made available through the Neurons.loop() interface.
Do not create instances of Loop directly.
Return an approximate duration of the closed loop in seconds.
For each tick, yield a LoopTick object containing spikes, stims and frames collected during
the previous iteration.
Run the closed loop, calling loop_body_callback for each tick.
The callback is passed a LoopTick object containing detected
spikes and other relevant information. The loop body can stop the loop
by calling LoopTick.loop.stop().
For example:
TICKS_PER_SECOND = 2
def loop_body_callback(tick: LoopTick):
# Do something ...
tick.loop.stop()
with cl.open() as neurons:
loop = neurons.loop(TICKS_PER_SECOND)
loop.run(loop_body_callback)
Stop the Loop iterations.
Typically called via LoopTick.loop in a loop body in cases where a simple
break is not convenient, such as when using the Loop.run() syntax.
Call to enable jitter recovery for potentially long running operations within
a Loop iteration that otherwise might trigger a TimeoutError.
This effectively skips execution of code in the loop body until iterations
catches up to the expected iteration in realtime. Data in the skipped iterations
can be accessed through the handle_recovery_tick callback.
Arguments:
- handle_recovery_tick: Optional callback function that accepts a
LoopTickas the only argument. - timeout_seconds: Number of seconds to allow for the recovery, defaults to
5seconds ifNone.
Constraints:
- Users need to be careful that the
handle_recovery_tickcallback does not take too long otherwise the loop will never catch up. TimeoutErrorwill be raised if loop has not caught up withintimeout_seconds.
In the following example, the loop from iteration 2 will keep reading data but
not yield any ticks until the loop catches up in iteration 7.
TICKS_PER_SECOND = 100
STOP_AFTER_TICKS = 10
def handle_recovery_tick(tick: LoopTick):
# Optionally do something with tick data during recovery
...
with cl.open() as neurons:
for tick in neurons.loop(TICKS_PER_SECOND, stop_after_ticks=STOP_AFTER_TICKS):
print(f"{tick.iteration}=")
if (tick.iteration == 1):
tick.loop.recover_from_jitter(handle_recovery_ticks)
time.sleep(0.05)
# Expected output:
# tick.iteration=0
# tick.iteration=1
# tick.iteration=7
# tick.iteration=8
# tick.iteration=9
Contains spikes, stims and frames collected during a loop iteration.
(Relates to Neurons.loop() and Loop.)
The tick object itself is only valid for the duration of the loop iteration. If you need to keep a reference to instance variables (such as analysis) beyond the end of the loop body, copy them to another variable.
This is accessible via Neurons.loop() and yielded by the Loop iterator.
Do not create instances of LoopTick directly.
Timestamp of the loop body. This is equivalent to DetectionResult.stop_timestamp.
Timestamp of the next loop body.
This is equivalent to LoopTick.iteration_timestamp + len(LoopTick.frames).
Contains the spikes and stims analysis of the frames read during the tick.
Handles recording functionality by the CL1 system. This is returned when
calling Neurons.record(). Do not create instances of Recording directly.
Note that:
- In the Simulator, recording data is captured by a background subprocess that independently reads from the shared data buffer.
- Each recording maintains its own read cursor, so multiple recordings can
run concurrently without interfering with each other or with
neurons.read()calls.
Attributes that will be written to the recording file and available at
Recording.file.root._v_attrs if using the raw PyTables interaface.
See RecordingView.attributes for details.
Note that:
- Simulator recordings can be identified by file_format.version == "SDK".
- The following attributes are included in the Simulator recording for
completeness, but the values are empty:
git_hash,git_branch,git_tags, andgit_status.
dict containing information relating to the recording file.
Keys:
name: Recording file name. path: Absolute path to the recording file. uri_path: URL encoded file path.
Return a RecordingView of the recoding file.
Constraints:
- This can only be performed after the recording has stopped.
Set a single application attribute on the recording. The application attribute
refers to the attribute dictionary passed to Neurons.record(attributes).
Arguments:
- key: Attribute key.
- value: Attribute value.
Constraints:
- This can only be performed before the recording is stopped.
Update multiple application attributes on the recording. The application attribute
refers to the attribute dictionary originally passed to Neurons.record(attributes).
Arguments:
- attributes:
dictcontaining attribute keys and values to be updated.
Constraints:
- This can only be performed before the recording is stopped.
Manages a named stream of (timestamp, serialised_data) for recordings and visualisation.
This is created using Neurons.create_data_stream(). Do not create instances
of DataStream directly.
See RecordingView.data_streams for how to use data streams saved in a recording.
Append a new data point to the stream.
Arguments:
- timestamp: Timestamp that marks this data point.
- data: Any type of serialisable data.
Constraints:
- New
datamust havetimestampgreater than existing in the data stream, otherwise aRuntimeErrorwill be raised.
Set a single attribute on the data stream. The attribute refers to the
attribute dictionary passed to Neurons.create_data_stream(attributes).
Arguments:
- key: Attribute key.
- value: Attribute value.
Update multiple attributes on the data stream. The attribute refers to the
attribute dictionary passed to Neurons.create_data_stream(attributes).
Arguments:
- attributes:
dictcontaining attribute keys and values to be updated.
Recording files are standard HDF5 files and can be opened with any
HDF5 viewer or library. A RecordingView provides a more convenient
way to access the data, providing ready access for:
attributes,samples,spikes,stims,data_streams, and- A range of analysis functions (coming soon).
Full access to the underlying HDF5 file (via the PyTables library)
is provided through the file attribute. This allows access to the full range
of PyTables functionality if needed.
For example:
from cl import RecordingView
file_path = "/path/to/recording.h5"
with RecordingView(file_path) as recording:
# Do something ...
...
Constructor for RecordingView.
Arguments:
- file_path: Path to the recording (
.h5) file to be opened.
The file / root level attributes accessible as a dictionary.
| Attribute | Type | Description |
|---|---|---|
| application | dict[str, Any] |
Application attributes as a user provided dict from the attributes parameter when creating a recording. |
| hostname | str |
Hostname of the CL1 system, managed through the CL1 dashboard. |
| project_id | str |
Unique identifier for the undergoing project, managed through the CL1 dashboard. |
| cell_batch_id | str |
Unique identifier of the cell batch, managed through the CL1 dashboard. |
| created_localtime | str |
When the recording is created in ISO format in the local timezone. |
| created_utc | str |
When the recording is created in ISO format in UTC timezone. |
| ended_localtime | str |
When the recording ended in ISO format in the local timezone. |
| ended_utc | str |
When the recording ended in ISO format in UTC timezone. |
| git_hash | str |
Metadata relating to the software version. |
| git_branch | str |
Metadata relating to the software version. |
| git_tags | str |
Metadata relating to the software version. |
| git_status | str |
Metadata relating to the software version. |
| channel_count | int |
Number of channels. |
| sampling_frequency | int |
Sampling frequency in Hz, same as frames_per_second (Note 2). |
| frames_per_second | int |
Number of frames per second in Hz, same as sampling_frequency (Note 2). |
| uV_per_sample_unit | float |
Multiply the recording sample values by this constant to obtain sample values as microvolts (uV). |
| duration_frames | int |
Duration of this recording in frames. |
| duration_seconds | float |
Duration of this recording in seconds. |
| start_timestamp | int |
Timestamp of the first frame. |
| end_timestamp | int |
Timestamp of the last frame. |
| file_format | dict |
See below (Note 1). |
Notes:
- The
file_formatattribute contains information relating to the format of the recording as adict. This contains two attributes beingversionandstim_and_spike_timestamps_relative_to_start. The latter, whenTrue, indicates that the timestamps included for stims and spikes are relative to thestart_timestampof the recording. - Both
frames_per_secondandsampling_frequencygive the same value. Use ofsampling_frequencyis discouraged as this attribute is flagged for deprecation and may be removed in a future version.
For example:
recording = RecordingView(file_path)
channel_count = recording.attributes["channel_count"]
Recorded raw samples (frames) as an int16 array with shape (duration_frames, channel_count).
Sample values can be converted to microvolts (uV) by multiplying with the uV_per_sample_unit attribute.
This is None if a recording is created with Neurons.record(include_raw_samples=False).
For example:
recording = RecordingView(file_path)
# Get a slice of the first 1000 samples from channel 8, 9, 10 and convert to uV
samples = recording.samples[:1000, 8:11] # shape (1000, 3), dtype int16
samples_uV = samples * recording.attributes["uV_per_sample_unit"] # shape (1000, 3), dtype float64
Recorded detected spikes as a Table with columns channel, timestamp and samples.
This is None if a recording is created with Neurons.record(include_spikes=False).
Spike timestamps are relative to the start of the recording by default and can be checked
with the attribute stim_and_spike_timestamps_relative_to_start.
For example:
recording = RecordingView(file_path)
# Get a count of total spikes
spike_count = len(recording.spikes)
# Iterate through all spikes in the recording
for spike in recording.spikes:
# spike["channel"], equivalent to `Spike.channel`
# spike["timestamp], equivalent to `Spike.timestamp`
# spike["samples], equivalent to `Spike.samples`
# Get spikes from channels 8, 9, occurring within the the first 1000 frames
spike_indices = recording.spikes.get_where_list("((channel==8) | (channel==9)) & ((timestamp >= 0) & (timestamp < 1000))")
spikes = recording.spikes[spike_indices]
Recorded stimulation events as a Table with columns channel and timestamp.
This is None if a recording is created with Neurons.record(include_stims=False).
Stim timestamps are relative to the start of the recording by default and can be checked
with the attribute stim_and_spike_timestamps_relative_to_start.
For example:
recording = RecordingView(file_path)
# Get a count of total stims
stim_count = len(recording.stims)
# Iterate through all stims in the recording
for stim in recording.stims:
# stim["channel"], equivalent to `Stim.channel`
# stim["timestamp], equivalent to `Stim.timestamp`
# Get stims from channels 8, 9, occurring within the the first 1000 frames
stim_indices = recording.stims.get_where_list("((channel==8) | (channel==9)) & ((timestamp >= 0) & (timestamp < 1000))")
stims = recording.stims[stim_indices]
Recorded data streams with a dictionary like interface.
This is None if a recording is created with Neurons.record(include_data_streams=False).
Available data streams can be accessed as follows:
recording = RecordingView(file_path)
# Print a list of available data streams
print(recording.data_streams)
# Get available data stream names as a list
data_stream_names = list(recording.data_streams.keys())
Data within each named data stream can be accessed like a dictionary, where the keys are timestamps and values contain data, for example:
my_data_stream = recording.data_streams["my_data_stream"]
for timestamp, data in my_data_stream.items():
print(timestamp, data)
It is also possible to get data for using timestamps within a data stream:
my_data_stream = recording.data_streams["my_data_stream"]
# Single timestamp
data = my_data_stream[timestamp]
# Range of timestamps
data_list = my_data_stream[start_timestamp : end_timestamp]
Close the underlying PyTables file.
Arguments:
- safe_mode: When True, suppress any exception raised while closing. Intended for best-effort cleanup paths (finalizers, interpreter shutdown, error rollback) where the caller cannot meaningfully react to a failure.
Sets the timestamp range for all analysis function. When None, minimum
timestamp will be 0 and maximum timestamp will be the maximum timestamp
contained in the recording.
Arguments:
- minimum_timestamp: Minimum timestamp (cannot be lower than 0).
- maximum_timestamp: Maximum timestamp (cannot be greater than attributes["duration_frames"]).
Compute firing statistics efficiently for a neural recording by binning spike activity into fixed-width time bins..
Arguments:
- bin_size_sec: Size of each time bin in seconds used to aggregate spike counts.
When this is set to
1.0second, the results can be interpreted as Hertz.
Returns:
cl.analysis.AnalysisResultFiringStats: Returns per-channel and population-level firing statistics computed from binned spike activity, including firing counts, firing rates, inter-spike interval (ISI) distributions, and summary statistics across channels. The result provides both channel-wise measures and aggregated culture-level metrics for baseline activity characterisation.
Detects network-level bursts from spike data using a spike rate thresholding method. This function identifies periods of high-frequency, synchronised spiking activity across multiple channels, classifying them as network bursts.
Arguments:
- bin_size_sec: Size of each time bin in seconds.
- onset_freq_hz: Per channel spike rate in Hz to mark a burst onset.
- offset_freq_hz: Per channel spike rate in Hz to mark a burst offset.
- min_active_channels: Scalar constant to apply to the onset / offset frequencies.
Returns:
cl.analysis.AnalysisResultNetworkBursts: Returns detected network-level burst events based on population spike-rate thresholding, including burst count, durations, and spike counts, along with the underlying binned firing rates used for detection. The result also stores burst timing information and per-bin activity.
Detects neuronal avalanches and computes criticality-related metrics such as avalanche size distributions, duration distributions, power-law statistics, deviation from criticality coefficient (DCC), shape collapse error, and branching ratio.
To find avalanches:
- Compute the total network activity by summing spike counts across channels for each time bin.
- Define a threshold based on the provided percentile.
- Identify avalanches where network activity exceed the threshold in consecutive time bins.
- Calculates spike counts for each avalanche as well as durations in number of time bins.
Arguments:
- bin_size_sec: Size of each time bin in seconds.
- percentile_threshold: A percentile value (0 to 1) used to calculate a threshold for detecting bursts. If percentile > 0, the threshold is determined as the percentile of the summed network activity. If percentile == 0, the threshold is set to 0.
- max_lags_branching_ratio: Maximum number of time lags to consider for slope estimation.
- duration_thresholds: Thresholds (min, max) for avalanche durations (number of frame_bins). Recommend values between (3-6).
- min_spike_count_threshold: Minimum threshold for spike counts in avalanches for calculating the size exponent. Recommend values between (8-20).
- n_boostraps: Number of random resampling iterations used to estimate the variability of the beta exponent.
Returns:
cl.analysis.AnalysisResultCriticality: Returns a comprehensive set of avalanche and criticality summaries derived from thresholded network activity, including avalanche sizes and durations, inter-avalanche intervals, power-law fit exponents and KS statistics, scaling-relation / shape-collapse measures (e.g., DCC and collapse error), and a branching-ratio estimate. All intermediate arrays used to compute these metrics (e.g., per-avalanche binned spike profiles and fitted parameter traces) are included in the result object for inspection and downstream re-analysis.
Computes per-bin Bernoulli entropy of the fraction of channels that have >=1 spike in the bin.
Arguments:
- bin_size_sec: Size of each time bin in seconds.
- fillna: Value to fill for
NaN(defaults to 0.). - log_base: Entropy units where None uses natural log (nats), 2 uses log2 (bits)
Returns:
cl.analysis.AnalysisResultInformationEntropy: Returns per-bin Bernoulli entropy computed from the fraction of active channels in each time bin, providing a population-level measure of activity variability over time. Entropy can be expressed in nats or bits.
Computes Lempel–Ziv complexity (LZ78) for each channel’s binned spike activity, measuring temporal complexity and structure.
For each channel, a symbol sequence is formed per time bin:
- use_binary=True: binary sequence (spike present/absent per bin).
- use_binary=False: integer spike-count sequence.
Complexity is the number of phrases added by an LZ78 dictionary-building procedure. If normalise=True, scores are length-normalised using c_norm = c(n) * log_k(n) / n, where n is the number of bins and k is the alphabet size (k=2 for binary).
Arguments:
- bin_size_sec: Time-bin size in seconds.
- min_bin_count: Minimum number of bins required; below this threshold complexity is set to zero.
- normalise: Whether to return the length-normalised score.
- use_binary: Whether to binarise spike counts before computing complexity or use raw spike counts.
Returns:
cl.analysis.AnalysisResultComplexityLempelZiv: Returns per-channel Lempel–Ziv (LZ78) complexity scores computed from binned spike activity, using either binary or count-based symbol sequences. Scores can be returned raw or length-normalised based on the sequence length and alphabet size, providing a measure of temporal complexity for each channel.
Calculates the Discrete Cosine Transform (DCT) features based on channel spike counts.
Arguments:
- k: The frequency index (coefficient index) of the DCT.
Returns:
cl.analysis.AnalysisResultDctFeatures: Returns spatial Discrete Cosine Transform (DCT) features computed from per-channel spike counts arranged in the common MEA layout, capturing low-frequency spatial structure in neural activity. The result includes the DCT basis coefficients for both MEA dimensions and a dictionary of DCT feature values indexed by spatial frequency components.
Generates spike-triggered histograms using the most active channels as triggers, quantifying population responses around trigger spikes.
Arguments:
- bin_size_sec: Bin size in seconds for the histogram.
- start_sec: Time in seconds to include before the trigger spike.
- end_sec: Time in seconds to include after the trigger spike.
- num_channels: How many of the most active channels to use as triggers.
- min_firing_rate_threshold_hz: Threshold (in Hz) for minimum firing rate. Only channels with firing rates above this value are considered.
Returns:
cl.analysis.AnalysisResultSpikeTriggeredHistogram: Contains spike-triggered histograms computed for ordered pairs of the most active channels, where each histogram represents the timing of target-channel spikes relative to trigger-channel spikes within a fixed temporal window. The result includes the common time bins and a dictionary mapping channel pairs to their corresponding histograms.
Compute functional connectivity (based on Pearson correlation) and summary graph metrics from spike data.
Arguments:
- bin_size_sec: Size of each time bin in seconds.
- correlation_threshold: Absolute correlation threshold in [0, 1]. Only connections where Pearson correlation >= correlation_threshold are kept as graph edges. Use 0.0 to keep the full weighted correlation matrix.
Returns:
cl.analysis.AnalysisResultsFunctionalConnectivity: Returns a weighted functional connectivity matrix computed using Pearson correlation between binned channel spike counts, along with basic graph-level metrics including total and average edge weights, clustering coefficient, Louvain community structure, modularity index, and maximum betweenness centrality. These network metrics are provided as baseline summaries using default parameters; users requiring fine-tuned or domain-specific network analysis are encouraged to directly use the returned connectivity matrix to recompute graph metrics with custom methods and parameter settings.
Creates a plot of a single spike.
Arguments:
- spike: Either a Spike object or a tuple of (timestamp, channel, samples).
- figsize: Size of the plot figure.
- title: Title for the plot, if not provided, a default will be used.
- save_path: Path to the save the plot instead of showing it.
- ax: Axes draw the plots. (Defaults to None).
Return:
Axes: Axes on which the plot was drawn.
For example:
from cl import RecordingView
recording = RecordingView(file_path)
# Plot the largest N spikes
N = 5
largest_spikes = sorted(
recording.spikes[:],
key = lambda spike: max(spike["samples"]) - min(spike["samples"]),
reverse = True
)[:N]
for spike in largest_spikes:
recording.plot_spike(spike)
Creates a raster plot of spikes and stims in the recording.
Arguments:
- figsize: Size of the plot figure.
- title: Title for the plot, if not provided, a default will be used.
- save_path: Path to the save the plot instead of showing it.
- limit_to_time_range_secs: If provided, limit the time axis as a tuple of
(start_time_secs, end_time_secs). - limit_to_channels: If provided, limit the number of channels if provided.
Raised when an attempt to take control is rejected. This implies that another process has control.
Raised when a method is called that requires control, but control has not been taken.
Raised when a stimulation plan is rejected by the system.
Raised when a stimulation plan is rejected because a channel has too many operations queued.
Raised when a stimulation plan is rejected because too many sync operations are in flight.
Raised when a stimulation plan is rejected because the supplied timestamp to run at is not newer than an already queued plan.
Raised when a stimulation plan is rejected because too many deferred channel interruptions are in flight.
Raised when a recording unexpectedly fails.
Raised when an error occurs in the generic WebSocket API client API.
Raised when opening a recording containing potentially unsafe objects.