This commit is contained in:
Noa Aarts 2026-02-06 13:49:55 +01:00
parent 9a361417e8
commit 130a64755f
Signed by: noa
GPG key ID: 1850932741EFF672
4 changed files with 85 additions and 194 deletions

View file

@ -4,83 +4,20 @@
import random
import sys
from typing import Generator, Never
import matplotlib.pyplot as plt
from qas_flow import Stream
from quantum_circuit import (Gate, GateType, QuantumCircuit, circ_from_layers,
sample_random_generator, single_typ)
from sampling_hyperparams import sample_hyperspace
gate_set: list[GateType] = [
GateType.H,
GateType.RX,
GateType.RY,
GateType.RZ,
GateType.CRX,
GateType.CX,
]
def sample_hyperspace(
*args: tuple[int, int] | tuple[float, float], seed: int = 2010392991
) -> Generator[tuple[float | int, ...], None, Never]:
minimums: tuple[float | int, ...] = tuple(arg[0] for arg in args)
maximums: tuple[float | int, ...] = tuple(arg[1] for arg in args)
diffs: tuple[float | int, ...] = tuple(
ma - mi for mi, ma in zip(minimums, maximums)
)
idiffs: tuple[float, ...] = tuple(1.0 / diff for diff in diffs)
rng: random.Random = random.Random(seed)
previous_points: set[tuple[float | int, ...]] = set()
def dist(point: tuple[float | int, ...], other: tuple[float | int, ...]) -> float:
return sum(((p - o) * idiff) ** 2 for p, o, idiff in zip(point, other, idiffs))
while True:
sampled_points: list[tuple[float | int, ...]] = [
tuple(
min
+ (
rng.randint(min, min + diff)
if isinstance(min, int) and isinstance(diff, int)
else rng.uniform(min, min + diff)
)
for min, diff in zip(minimums, diffs)
)
for _ in range(10)
]
if len(previous_points) == 0:
previous_points.add(sampled_points[0])
yield sampled_points[0]
min_distances: list[float] = [
min((dist(point, other) for other in previous_points))
for point in sampled_points
]
mdist: float = max(min_distances)
point: tuple[float | int, ...] = sampled_points[
[i for i, j in enumerate(min_distances) if j == mdist][0]
]
previous_points.add(point)
yield point
gate_set: list[GateType] = [GateType.H, GateType.RX, GateType.RY, GateType.RZ, GateType.CRX, GateType.CX]
EXPRESSIBILITY_SAMPLES: int = 2000
def run_ga_qas(
depth: int,
qubits: int,
generations: int,
generation_size: int,
parent_amount: int,
mutation_rate: float,
seed: int,
depth: int, qubits: int, generations: int, generation_size: int, parent_amount: int, mutation_rate: float, seed: int
) -> list[tuple[int, int, int, int, int, float, float, float]]:
print(
@ -90,19 +27,8 @@ def run_ga_qas(
seed_rng = random.Random(seed)
initial_population: list[QuantumCircuit] = (
Stream(
sample_random_generator(
random.Random(seed_rng.randint(1000, 1000000000)),
qubits,
depth,
gate_set,
)
)
.apply(
lambda circ: circ.expressibility_estimate(
EXPRESSIBILITY_SAMPLES, seed_rng.randint(1000, 1000000000)
)
)
Stream(sample_random_generator(random.Random(seed_rng.randint(1000, 1000000000)), qubits, depth, gate_set))
.apply(lambda circ: circ.expressibility_estimate(EXPRESSIBILITY_SAMPLES, seed_rng.randint(1000, 1000000000)))
.take(generation_size)
.collect()
)
@ -129,17 +55,13 @@ def run_ga_qas(
if old_gate.single():
child_layers[layer_idx][gate_idx] = Gate(
main_rng.choice(
[gate for gate in gate_set if single_typ(gate)]
),
main_rng.choice([gate for gate in gate_set if single_typ(gate)]),
old_gate.qubits,
old_gate.param_idx,
)
else:
child_layers[layer_idx][gate_idx] = Gate(
old_gate.typ,
(old_gate.qubits[1], old_gate.qubits[0]),
old_gate.param_idx,
old_gate.typ, (old_gate.qubits[1], old_gate.qubits[0]), old_gate.param_idx
)
child = circ_from_layers(child_layers, qubits)
@ -164,35 +86,22 @@ def run_ga_qas(
else:
best_circuits.append(offspring[0])
population = offspring
print(f"finished seed {seed}", file=sys.stderr)
print(f"finished seed {seed}, data: {return_data}", file=sys.stderr)
return return_data
def run_from_point(pnt: tuple[tuple[int, int, int, int, float], int]):
(point, seed) = pnt
return run_ga_qas(
point[0],
point[1],
20,
point[2],
point[3],
point[4],
seed,
)
try:
return run_ga_qas(point[0], point[1], 20, point[2], point[3], point[4], seed)
except:
print(f"There was an error for {point}, {seed}, ignoring it")
return []
def print_ret(ret_data):
for dat in ret_data:
(
depth,
qubits,
generation,
generation_size,
parent_amount,
mutation_rate,
best_pop,
best_offspring,
) = dat
(depth, qubits, generation, generation_size, parent_amount, mutation_rate, best_pop, best_offspring) = dat
print(
f"{depth},{qubits},{generation},{generation_size},{parent_amount},{mutation_rate},{best_pop},{best_offspring}",
flush=True,
@ -201,19 +110,10 @@ def print_ret(ret_data):
def main() -> None:
rng = random.Random(123456789)
rng = random.Random()
results: list[QuantumCircuit] = (
Stream(
sample_hyperspace(
(1, 40),
(1, 10),
(1, 100),
(1, 20),
(0.0, 1.0),
seed=rng.randint(1000, 1000000000),
)
)
Stream(sample_hyperspace((1, 40), (1, 10), (1, 100), (1, 20), (0.0, 1.0), seed=rng.randint(1000, 1000000000)))
.map(lambda point: (point, rng.randint(1000, 1000000000)))
.par_map(run_from_point)
.apply(print_ret)

View file

@ -307,7 +307,7 @@ def sample_circuit_layers(
total_single += 1
else:
if loc[1] == qubits:
loc[1] == 0
loc: tuple[int, int] = (loc[0], 0)
layer.append(Gate(GateType(gate_type), loc, params))
total_double += 1
params += param_count(gate_type)

View file

@ -0,0 +1,46 @@
import random
from collections.abc import Generator
from typing import Never
def sample_hyperspace(
*args: tuple[int, int] | tuple[float, float], seed: int = 2010392991
) -> Generator[tuple[float | int, ...], None, Never]:
minimums: tuple[float | int, ...] = tuple(arg[0] for arg in args)
maximums: tuple[float | int, ...] = tuple(arg[1] for arg in args)
diffs: tuple[float | int, ...] = tuple(ma - mi for mi, ma in zip(minimums, maximums))
idiffs: tuple[float, ...] = tuple(1.0 / diff for diff in diffs)
rng: random.Random = random.Random(seed)
previous_points: set[tuple[float | int, ...]] = set()
def dist(point: tuple[float | int, ...], other: tuple[float | int, ...]) -> float:
return sum(((p - o) * idiff) ** 2 for p, o, idiff in zip(point, other, idiffs))
while True:
sampled_points: list[tuple[float | int, ...]] = [
tuple(
min
+ (
rng.randint(min, min + diff)
if isinstance(min, int) and isinstance(diff, int)
else rng.uniform(min, min + diff)
)
for min, diff in zip(minimums, diffs)
)
for _ in range(10)
]
if len(previous_points) == 0:
previous_points.add(sampled_points[0])
yield sampled_points[0]
min_distances: list[float] = [
min((dist(point, other) for other in previous_points)) for point in sampled_points
]
mdist: float = max(min_distances)
point: tuple[float | int, ...] = sampled_points[[i for i, j in enumerate(min_distances) if j == mdist][0]]
previous_points.add(point)
yield point

View file

@ -1,22 +1,25 @@
#!/usr/bin/env python
from __future__ import annotations
import argparse
from itertools import repeat
import json
import math
import random
from dataclasses import dataclass
from enum import IntEnum
from itertools import repeat
from multiprocessing import Pool
from typing import override
import numpy as np
from qiskit import QuantumCircuit as QiskitCircuit, transpile
from qiskit import QuantumCircuit as QiskitCircuit
from qiskit import transpile
from qiskit.circuit import ParameterVector, ParameterVectorElement
from qiskit_aer import AerSimulator
from tqdm import tqdm
from .qas_flow import Stream
from .quantum_circuit import QuantumCircuit, GateType, sample_layers_generator
from qas_flow import Stream
from quantum_circuit import GateType, QuantumCircuit, sample_layers_generator
# ----------------------------
# Paper hyperparameters/defaults
@ -95,9 +98,7 @@ def exact_ground_energy(H: np.ndarray) -> float:
return float(w[0])
def statevector_from_bound_circuit(
backend: AerSimulator, tqc: QiskitCircuit, bind: dict
) -> np.ndarray:
def statevector_from_bound_circuit(backend: AerSimulator, tqc: QiskitCircuit, bind: dict) -> np.ndarray:
res = backend.run([tqc], parameter_binds=[bind]).result()
sv = np.asarray(res.get_statevector(0), dtype=np.complex128)
return sv
@ -129,9 +130,7 @@ def adam_optimize_tfim_energy(
p = circuit.params
rng = np.random.default_rng(seed)
theta = rng.uniform(
-2 * math.pi, 2 * math.pi, size=p
) # paper init range :contentReference[oaicite:7]{index=7}
theta = rng.uniform(-2 * math.pi, 2 * math.pi, size=p) # paper init range :contentReference[oaicite:7]{index=7}
backend = AerSimulator(method="statevector", seed_simulator=seed)
tqc = transpile(qc, backend, optimization_level=0)
@ -160,10 +159,7 @@ def adam_optimize_tfim_energy(
energies = np.array(
[
energy_expectation_from_sv(H, sv)
for sv in [
np.asarray(res.get_statevector(k), dtype=np.complex128)
for k in range(2 * p)
]
for sv in [np.asarray(res.get_statevector(k), dtype=np.complex128) for k in range(2 * p)]
]
)
@ -199,17 +195,9 @@ def adam_optimize_tfim_energy(
def ground_truth_tfim(
circ: QuantumCircuit,
H: np.ndarray,
E0: float,
seed: int,
lr: float,
max_steps: int,
tol: float,
circ: QuantumCircuit, H: np.ndarray, E0: float, seed: int, lr: float, max_steps: int, tol: float
) -> QuantumCircuit:
best_E, steps = adam_optimize_tfim_energy(
circ, H, seed=seed, lr=lr, max_steps=max_steps, tol=tol
)
best_E, steps = adam_optimize_tfim_energy(circ, H, seed=seed, lr=lr, max_steps=max_steps, tol=tol)
err = best_E - E0
circ.gt_energy = best_E
circ.gt_error = err
@ -226,10 +214,7 @@ def ground_truth_tfim(
def parse_args():
ap = argparse.ArgumentParser()
ap.add_argument(
"--qubits",
type=int,
default=6,
help="TFIM qubit count (paper uses 6). :contentReference[oaicite:8]{index=8}",
"--qubits", type=int, default=6, help="TFIM qubit count (paper uses 6). :contentReference[oaicite:8]{index=8}"
)
ap.add_argument(
"--depth",
@ -255,34 +240,14 @@ def parse_args():
default=5000,
help="R: top circuits kept by paths (paper uses 5000). :contentReference[oaicite:12]{index=12}",
)
ap.add_argument(
"--topk",
type=int,
default=100,
help="K: how many top circuits to output/store.",
)
ap.add_argument(
"--seed", type=int, default=0, help="RNG seed for sampling/reproducibility."
)
ap.add_argument(
"--periodic",
action="store_true",
help="Use periodic TFIM boundary (default true if set).",
)
ap.add_argument(
"--no-periodic",
dest="periodic",
action="store_false",
help="Use open boundary TFIM.",
)
ap.add_argument("--topk", type=int, default=100, help="K: how many top circuits to output/store.")
ap.add_argument("--seed", type=int, default=0, help="RNG seed for sampling/reproducibility.")
ap.add_argument("--periodic", action="store_true", help="Use periodic TFIM boundary (default true if set).")
ap.add_argument("--no-periodic", dest="periodic", action="store_false", help="Use open boundary TFIM.")
ap.set_defaults(periodic=True)
# ground-truth options
ap.add_argument(
"--do_ground_truth",
action="store_true",
help="Also evaluate ground-truth TFIM VQE performance.",
)
ap.add_argument("--do_ground_truth", action="store_true", help="Also evaluate ground-truth TFIM VQE performance.")
ap.add_argument(
"--gt_budget",
type=int,
@ -295,18 +260,8 @@ def parse_args():
default=0.01,
help="Adam learning rate (paper uses 0.01). :contentReference[oaicite:14]{index=14}",
)
ap.add_argument(
"--gt_max_steps",
type=int,
default=2000,
help="Max Adam steps per circuit (practical cap).",
)
ap.add_argument(
"--gt_tol",
type=float,
default=1e-7,
help="Convergence tolerance for |E_t - E_{t-1}|.",
)
ap.add_argument("--gt_max_steps", type=int, default=2000, help="Max Adam steps per circuit (practical cap).")
ap.add_argument("--gt_tol", type=float, default=1e-7, help="Convergence tolerance for |E_t - E_{t-1}|.")
ap.add_argument("--dump", type=str, default="dump.json", help="Output JSON file.")
return ap.parse_args()
@ -341,9 +296,7 @@ def main():
final_circuits: list[QuantumCircuit] = []
with Pool() as p:
for circ in tqdm(
p.imap_unordered(
expr_worker, zip(candidates, seeds, repeat(args.expressibility_samples))
),
p.imap_unordered(expr_worker, zip(candidates, seeds, repeat(args.expressibility_samples))),
total=len(candidates),
desc="expressibility",
):
@ -366,15 +319,7 @@ def main():
if queried >= args.gt_budget:
break
seed = gt_seed_stream.randint(0, 1_000_000_000)
ground_truth_tfim(
circ,
H,
E0,
seed=seed,
lr=args.gt_lr,
max_steps=args.gt_max_steps,
tol=args.gt_tol,
)
ground_truth_tfim(circ, H, E0, seed=seed, lr=args.gt_lr, max_steps=args.gt_max_steps, tol=args.gt_tol)
if circ.gt_error is not None and circ.gt_error < min_error:
print(f"new best error for {queried}: {circ.gt_error}")
min_error = circ.gt_error