Reference: Op Filters¶
OpFilter Class¶
psiqworkbench.OpFilter ¶
Bases: object
Base class for handlers in the instruction stream chain.
OpFilter objects are used in the filter pipeline of a QPU to process, transform, or manipulate quantum instructions as they are emitted. Filters can be chained together to implement custom behavior or functionality.
What are Upstream and Downstream Filters? In a filter pipeline, filters process and pass data along in a specific order:
-
Upstream Filters: Filters that process data before the current filter. They send data downstream to the current filter.
-
Downstream Filters: Filters that process data after the current filter. They receive data from the current filter.
For example, in a pipeline like Filter A → Filter B → Filter C:
- For
Filter B,Filter Ais an upstream filter andFilter Cis a downstream filter. - For
Filter A, bothFilter BandFilter Care downstream filters. - For
Filter C, bothFilter AandFilter Bare upstream filters.
Example
Below is an example of defining a custom filter, registering it as a built-in filter, and using it in a QPU.
# Define the StreamDebugPrint filter
class StreamDebugPrint(UtilityFilter):
"""
A handler that prints each instruction in the stream for debugging purposes.
"""
def __init__(self, name="debug_print", qpu=None):
super().__init__(name=name, qpu=qpu, may_allocate_qubits=False)
self.format = 'asm' # Default format for printing instructions
def _put(self, instructions):
"""
Processes and prints instructions.
Args:
instructions (list): A list of instructions to process.
"""
instructions_py = convert_ops_to_py(instructions)
for inst in instructions_py:
print(f'{format_stream([inst], str_format=self.format, newline=False)}')
self.emit_downstream(instructions)
# Register the StreamDebugPrint filter
register_built_in_filter_type('>>print>>', StreamDebugPrint, """
Just print every op (used for debugging).
""")
# Initialize a QPU with the registered filter
qc = QPU(filters=['>>print>>'])
# Alternatively, directly use the StreamDebugPrint filter
qc = QPU(filters=[StreamDebugPrint()])
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Name of the filter. If the name ends with |
verbose |
bool
|
Whether verbose output is enabled for debugging purposes. |
qpu |
QPU
|
The QPU instance associated with this filter, if any. |
may_allocate_qubits |
bool
|
Indicates whether this filter can allocate qubits. |
linked_handlers |
list
|
Downstream filters to which instructions will be sent. |
enabled |
bool
|
Whether this filter is enabled. |
_queue |
deque
|
Internal queue for buffering instructions. |
_qpu_thread_context |
QPUThreadContext
|
Thread-specific QPU context for managing resources. |
_native_handle |
object
|
Native handle for QPU interaction, if applicable. |
_time_log |
list
|
Logs timing information for instructions processed. |
_passthrough |
bool
|
Indicates whether instructions should be passed downstream. |
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
A name for this filter. Defaults to None. |
None
|
verbose
|
bool
|
Enables verbose terminal output for debugging. Defaults to False. |
False
|
qpu
|
QPU
|
The QPU instance this filter is attached to. Defaults to None. |
None
|
may_allocate_qubits
|
bool
|
Indicates if this filter might cause Qubit allocations requiring heap-tracking. Defaults to True. |
True
|
print_stream_setup ¶
Print the setup of the filter chain, showing linked downstream handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int
|
The level of indentation for nested handlers. Defaults to 0. |
0
|
get_filter ¶
Retrieve a downstream handler, or list of handlers by its name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The name of the handler to retrieve. |
required |
first_only
|
bool
|
Whether to fetch the first handler in the chain if True, or a list of all matching handlers if False. |
True
|
result_list
|
list or None
|
List to append results as we go down the chain. |
None
|
Returns:
| Type | Description |
|---|---|
OpFilter or list of OpFilter or None
|
The handler instance or list of instances if found, otherwise None. |
get_filters_by_type ¶
Retrieve downstream handlers by their types.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_type
|
type
|
The types of the handlers to retrieve. |
required |
result_list
|
list or None
|
List to append results as we go down the chain. |
None
|
Returns:
| Type | Description |
|---|---|
OpFilter or None
|
The handler instance if found, otherwise None. |
get_read_result ¶
Retrieve a read result for a specified flag from downstream handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_flag
|
int
|
The starting flag to query. |
required |
num_flags
|
int
|
The number of flags to read. Defaults to 1. |
1
|
Returns:
| Type | Description |
|---|---|
object or None
|
The read result if available, otherwise None. |
put_instructions ¶
Add instructions to this filter for processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
list
|
A list of instructions to process. |
required |
put ¶
Process incoming instructions. By default, passes instructions downstream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
list
|
A list of instructions to process. |
required |
Returns:
| Type | Description |
|---|---|
list or None
|
Processed instructions, if applicable. |
emit_downstream ¶
Pass instructions to downstream handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instructions
|
list
|
Instructions to pass downstream. |
required |
draw_timing_diagram ¶
Draw a timing diagram based on logged timing data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
str
|
Path to save the diagram. If None, returns the SVG as a byte string. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bytes
|
The timing diagram in SVG format. |
flush ¶
Flush all pending instructions in the filter's queue, optionally waiting for a specific condition to be resolved.
The flush function is essential in systems where filters may buffer or delay instructions for optimization, batching, or dependency resolution. It ensures that all buffered instructions are processed and passed downstream, maintaining consistency and synchronization in the instruction pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
until_flag_resolved
|
int
|
A flag condition to wait for before completing the flush. Defaults to |
0
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If an error occurs during the flush operation in a native filter. |
Example
Consider a pipeline with a batch filter that groups instructions into batches of 10. If the total number of instructions is not divisible by 10, the remaining instructions might remain unprocessed unless flushed.
Pipeline setup:
Input → OpBatchFilter → StreamDebugPrint → Execution
# Define the pipeline
qc = QPU(filters=[OpBatchFilter(max_batch_length=10), StreamDebugPrint()])
# Simulate sending 23 instructions to the pipeline
for i in range(23):
qc.nop()
# Force any remaining instructions in the buffer to be processed
qc.flush()
Without flush, only 20 instructions would be processed because the last 3 instructions don't complete a batch.
With flush, all 23 instructions are processed, ensuring no data is lost.
get_instructions ¶
Retrieve instructions in the stream buffer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
str_format
|
str
|
Format for the output instructions. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
list
|
Instructions in the buffer, formatted if |