do the ground-truth check as well

This commit is contained in:
Noa Aarts 2026-01-12 14:04:12 +01:00
parent cb9dc2679c
commit e011a14df5
Signed by: noa
GPG key ID: 1850932741EFF672

View file

@ -1,19 +1,32 @@
#!/usr/bin/env python #!/usr/bin/env python
from __future__ import annotations
import argparse
from itertools import repeat
import json import json
import math
import random
from dataclasses import dataclass from dataclasses import dataclass
from enum import IntEnum from enum import IntEnum
from multiprocessing import Pool from multiprocessing import Pool
import random
import numpy as np
from typing import override from typing import override
from qiskit.circuit import ParameterVector import numpy as np
from tqdm import tqdm
from qiskit import QuantumCircuit as QiskitCircuit, transpile from qiskit import QuantumCircuit as QiskitCircuit, transpile
from qiskit.circuit import ParameterVector, ParameterVectorElement
from qiskit_aer import AerSimulator from qiskit_aer import AerSimulator
from tqdm import tqdm
from qas_flow import Stream from qas_flow import Stream
# ----------------------------
# Paper hyperparameters/defaults
# ----------------------------
TWO_QUBIT_GATE_PROBABILITY = 0.5 TWO_QUBIT_GATE_PROBABILITY = 0.5
CHEMICAL_ACCURACY = 1.6e-3 # Ha, success if E - E0 <= this
# ----------------------------
# Circuit representation
# ----------------------------
class GateType(IntEnum): class GateType(IntEnum):
@ -32,7 +45,11 @@ class Gate:
param_idx: int param_idx: int
def to_json(self): def to_json(self):
return {"type": self.type, "qubits": self.qubits, "param_idx": self.param_idx} return {
"type": int(self.type),
"qubits": self.qubits,
"param_idx": self.param_idx,
}
def haar_fidelity_pdf(F: np.ndarray, d: int) -> np.ndarray: def haar_fidelity_pdf(F: np.ndarray, d: int) -> np.ndarray:
@ -55,16 +72,22 @@ class QuantumCircuit:
paths: int = 0 paths: int = 0
expressibility: float = float("-inf") expressibility: float = float("-inf")
# ground-truth fields (filled later)
gt_energy: float | None = None
gt_error: float | None = None
gt_steps: int | None = None
gt_success: bool | None = None
def calculate_paths(self): def calculate_paths(self):
path_counts = [1 for _ in range(self.qubits)] path_counts = [1 for _ in range(self.qubits)]
for layer in self.gates: for layer in self.gates:
for gate in layer: for gate in layer:
if gate.type <= 3 or type(gate.qubits) is int: if gate.type <= 3 or isinstance(gate.qubits, int):
continue continue
(q1, q2) = gate.qubits (q1, q2) = gate.qubits
# same logic as your existing file
path_counts[q1] = path_counts[q1] + path_counts[q2] path_counts[q1] = path_counts[q1] + path_counts[q2]
path_counts[q2] = path_counts[q1] path_counts[q2] = path_counts[q1]
self.paths = sum(path_counts) self.paths = sum(path_counts)
def to_json(self): def to_json(self):
@ -74,9 +97,14 @@ class QuantumCircuit:
"params": self.params, "params": self.params,
"paths": self.paths, "paths": self.paths,
"expressibility": self.expressibility, "expressibility": self.expressibility,
"gt_energy": self.gt_energy,
"gt_error": self.gt_error,
"gt_steps": self.gt_steps,
"gt_success": self.gt_success,
} }
def to_qiskit(self): def to_qiskit_ansatz(self) -> tuple[QiskitCircuit, ParameterVector]:
"""Return *just* the parametrized ansatz defined by this circuit."""
qc = QiskitCircuit(self.qubits) qc = QiskitCircuit(self.qubits)
thetas = ParameterVector("theta", self.params) thetas = ParameterVector("theta", self.params)
@ -95,27 +123,43 @@ class QuantumCircuit:
qc.ryy(theta, *gate.qubits) qc.ryy(theta, *gate.qubits)
elif gate.type == GateType.ZZ: elif gate.type == GateType.ZZ:
qc.rzz(theta, *gate.qubits) qc.rzz(theta, *gate.qubits)
else:
raise ValueError(f"Unknown gate type: {gate.type}")
return qc, thetas
def to_qiskit_for_expressibility(self) -> tuple[QiskitCircuit, ParameterVector]:
"""Expressibility uses |0...0> input (no TFIM-specific H layer)."""
qc, thetas = self.to_qiskit_ansatz()
qc.save_statevector()
return qc, thetas
def to_qiskit_for_tfim_vqe(self) -> tuple[QiskitCircuit, ParameterVector]:
"""TFIM VQE begins with a layer of Hadamards on all qubits."""
ans, thetas = self.to_qiskit_ansatz()
qc = QiskitCircuit(self.qubits)
qc.h(range(self.qubits))
qc.compose(ans, inplace=True)
qc.save_statevector() qc.save_statevector()
return qc, thetas return qc, thetas
def expressibility_estimate( def expressibility_estimate(
self, samples: int, seed: int, bins: int = 75, eps: float = 1e-12 self, samples: int, seed: int, bins: int = 75, eps: float = 1e-12
): ) -> "QuantumCircuit":
qc, thetas = self.to_qiskit() qc, thetas = self.to_qiskit_for_expressibility()
if self.params <= 0: if self.params <= 0:
return float("inf") self.expressibility = float("-inf")
return self
rng = random.Random(seed) rng = random.Random(seed)
d = 1 << self.qubits d = 1 << self.qubits
backend = AerSimulator(method="statevector", seed_simulator=seed) backend = AerSimulator(method="statevector", seed_simulator=seed)
tqc = transpile(qc, backend, optimization_level=0) tqc = transpile(qc, backend, optimization_level=0)
nexp = 2 * samples nexp = 2 * samples
# vectorized binds: one circuit, many parameter values
binds = [ binds: list[dict[ParameterVectorElement, list[float]]] = [
{param: [rng.random() for _ in range(nexp)] for param in thetas.params} {param: [rng.random() for _ in range(nexp)] for param in thetas.params}
] ]
@ -134,14 +178,10 @@ class QuantumCircuit:
) )
F = (inners.conjugate() * inners).real F = (inners.conjugate() * inners).real
# 4) empirical histogram (as a probability mass over bins)
hist, edges = np.histogram(F, bins=bins, range=(0.0, 1.0), density=False) hist, edges = np.histogram(F, bins=bins, range=(0.0, 1.0), density=False)
p = hist.astype(np.float64) p = hist.astype(np.float64) + eps
p = p + eps
p = p / p.sum() p = p / p.sum()
# 5) Haar distribution mass per bin (integrate PDF over each bin)
# approximate via midpoint rule
mids = 0.5 * (edges[:-1] + edges[1:]) mids = 0.5 * (edges[:-1] + edges[1:])
widths = edges[1:] - edges[:-1] widths = edges[1:] - edges[:-1]
q = haar_fidelity_pdf(mids, d) * widths q = haar_fidelity_pdf(mids, d) * widths
@ -160,7 +200,6 @@ class QuantumCircuit:
strs[i] += "---" strs[i] += "---"
idx = 0 idx = 0
for gate in layer: for gate in layer:
match gate.type: match gate.type:
case GateType.RX: case GateType.RX:
@ -184,12 +223,21 @@ class QuantumCircuit:
strs[q1] = strs[q1][:-2] + f"Z{idx}" strs[q1] = strs[q1][:-2] + f"Z{idx}"
strs[q2] = strs[q2][:-2] + f"Z{idx}" strs[q2] = strs[q2][:-2] + f"Z{idx}"
idx += 1 idx += 1
gt = ""
if self.gt_energy is not None:
gt = f"\nGT: E={self.gt_energy:.12f}, err={self.gt_error:.3e}, steps={self.gt_steps}, success={self.gt_success}"
return ( return (
"\n".join(strs) "\n".join(strs)
+ f"\npaths: {self.paths}, expressibility: {self.expressibility}" + f"\npaths: {self.paths}, expressibility: {self.expressibility}"
+ gt
) )
# ----------------------------
# Search space sampling (layerwise)
# ----------------------------
def even_parity(qubits: int): def even_parity(qubits: int):
return [(x, x + 1) for x in range(0, qubits, 2)] return [(x, x + 1) for x in range(0, qubits, 2)]
@ -211,6 +259,7 @@ def sample_circuit(rng: random.Random, qubits: int, depth: int) -> QuantumCircui
gate_type_offset = 3 if rng.random() < TWO_QUBIT_GATE_PROBABILITY else 0 gate_type_offset = 3 if rng.random() < TWO_QUBIT_GATE_PROBABILITY else 0
gate_type = rng.randint(1, 3) gate_type = rng.randint(1, 3)
gate_locations = even if rng.random() < 0.5 else odd gate_locations = even if rng.random() < 0.5 else odd
if gate_type_offset == 0: if gate_type_offset == 0:
gates.append( gates.append(
[Gate(GateType(gate_type), x, params) for (x, _) in gate_locations] [Gate(GateType(gate_type), x, params) for (x, _) in gate_locations]
@ -226,6 +275,7 @@ def sample_circuit(rng: random.Random, qubits: int, depth: int) -> QuantumCircui
) )
total_double += len(gate_locations) total_double += len(gate_locations)
params += 1 params += 1
return QuantumCircuit(qubits, gates, total_single, total_double, params) return QuantumCircuit(qubits, gates, total_single, total_double, params)
@ -238,43 +288,347 @@ def more_single_than_double(qc: QuantumCircuit) -> bool:
return qc.single_qubit_gates >= qc.two_qubit_gates return qc.single_qubit_gates >= qc.two_qubit_gates
if __name__ == "__main__": # ----------------------------
rng = random.Random() # TFIM ground-truth VQE
qubits = 4 # ----------------------------
depth = 15
sample_amount = 50000
expressibility_samples = 2000 def _pauli_x():
proxy_pass_amount = 5000 return np.array([[0.0, 1.0], [1.0, 0.0]], dtype=np.float64)
def _pauli_z():
return np.array([[1.0, 0.0], [0.0, -1.0]], dtype=np.float64)
def _eye():
return np.eye(2, dtype=np.float64)
def kron_n(ops: list[np.ndarray]) -> np.ndarray:
out = ops[0]
for op in ops[1:]:
out = np.kron(out, op)
return out
def tfim_hamiltonian_matrix(n: int, periodic: bool = True) -> np.ndarray:
"""
HTFIM = sum_{i=1}^n Z_i Z_{i+1} + X_i (paper definition) :contentReference[oaicite:5]{index=5}
with Z_{n+1}=Z_1 if periodic.
"""
X = _pauli_x()
Z = _pauli_z()
I = _eye()
dim = 1 << n
H = np.zeros((dim, dim), dtype=np.float64)
# Xi terms
for i in range(n):
ops = [I] * n
ops[i] = X
H += kron_n(ops)
# Zi Zi+1 terms
for i in range(n):
j = (i + 1) % n
if (not periodic) and (j == 0):
continue
ops = [I] * n
ops[i] = Z
ops[j] = Z
H += kron_n(ops)
return H
def exact_ground_energy(H: np.ndarray) -> float:
# H is real symmetric here
w = np.linalg.eigvalsh(H)
return float(w[0])
def statevector_from_bound_circuit(
backend: AerSimulator, tqc: QiskitCircuit, bind: dict
) -> np.ndarray:
res = backend.run([tqc], parameter_binds=[bind]).result()
sv = np.asarray(res.get_statevector(0), dtype=np.complex128)
return sv
def energy_expectation_from_sv(H: np.ndarray, sv: np.ndarray) -> float:
# E = <psi|H|psi>
# H real symmetric; sv complex
return float(np.real(np.vdot(sv, H @ sv)))
def adam_optimize_tfim_energy(
circuit: QuantumCircuit,
H: np.ndarray,
seed: int,
lr: float = 0.01, # paper :contentReference[oaicite:6]{index=6}
max_steps: int = 2000,
tol: float = 1e-7,
beta1: float = 0.9,
beta2: float = 0.999,
eps: float = 1e-8,
param_shift: float = math.pi / 2,
) -> tuple[float, int]:
"""
Optimize parameters with Adam using parameter-shift gradients.
Returns: (best_energy, steps_taken)
"""
qc, thetas = circuit.to_qiskit_for_tfim_vqe()
p = circuit.params
rng = np.random.default_rng(seed)
theta = rng.uniform(
-2 * math.pi, 2 * math.pi, size=p
) # paper init range :contentReference[oaicite:7]{index=7}
backend = AerSimulator(method="statevector", seed_simulator=seed)
tqc = transpile(qc, backend, optimization_level=0)
def E(th: np.ndarray) -> float:
bind = {param: [val] for param, val in zip(thetas.params, th)}
sv = statevector_from_bound_circuit(backend, tqc, bind)
return energy_expectation_from_sv(H, sv)
def grad(th: np.ndarray) -> np.ndarray:
# parameter-shift: d/dθ f(θ) = 0.5*(f(θ+π/2)-f(θ-π/2))
g = np.zeros_like(th)
for i in range(p):
plus = th.copy()
minus = th.copy()
plus[i] += param_shift
minus[i] -= param_shift
g[i] = 0.5 * (E(plus) - E(minus))
return g
m = np.zeros_like(theta)
v = np.zeros_like(theta)
best_E = float("inf")
prev_E = None
for t in range(1, max_steps + 1):
g = grad(theta)
m = beta1 * m + (1 - beta1) * g
v = beta2 * v + (1 - beta2) * (g * g)
mhat = m / (1 - beta1**t)
vhat = v / (1 - beta2**t)
theta = theta - lr * mhat / (np.sqrt(vhat) + eps)
cur_E = E(theta)
if cur_E < best_E:
best_E = cur_E
if prev_E is not None and abs(prev_E - cur_E) < tol:
return best_E, t
prev_E = cur_E
return best_E, max_steps
def ground_truth_tfim(
circ: QuantumCircuit,
H: np.ndarray,
E0: float,
seed: int,
lr: float,
max_steps: int,
tol: float,
) -> QuantumCircuit:
best_E, steps = adam_optimize_tfim_energy(
circ, H, seed=seed, lr=lr, max_steps=max_steps, tol=tol
)
err = best_E - E0
circ.gt_energy = best_E
circ.gt_error = err
circ.gt_steps = steps
circ.gt_success = err <= CHEMICAL_ACCURACY
return circ
# ----------------------------
# Main
# ----------------------------
def parse_args():
ap = argparse.ArgumentParser()
ap.add_argument(
"--qubits",
type=int,
default=6,
help="TFIM qubit count (paper uses 6). :contentReference[oaicite:8]{index=8}",
)
ap.add_argument(
"--depth",
type=int,
default=10,
help="Max depth for TFIM layerwise (paper max depth 10). :contentReference[oaicite:9]{index=9}",
)
ap.add_argument(
"--sample_amount",
type=int,
default=50000,
help="Number of sampled circuits (paper uses 50,000). :contentReference[oaicite:10]{index=10}",
)
ap.add_argument(
"--expressibility_samples",
type=int,
default=2000,
help="Fidelity samples for expressibility (paper uses 2000). :contentReference[oaicite:11]{index=11}",
)
ap.add_argument(
"--proxy_pass_amount",
type=int,
default=5000,
help="R: top circuits kept by paths (paper uses 5000). :contentReference[oaicite:12]{index=12}",
)
ap.add_argument(
"--topk",
type=int,
default=100,
help="K: how many top circuits to output/store.",
)
ap.add_argument(
"--seed", type=int, default=0, help="RNG seed for sampling/reproducibility."
)
ap.add_argument(
"--periodic",
action="store_true",
help="Use periodic TFIM boundary (default true if set).",
)
ap.add_argument(
"--no-periodic",
dest="periodic",
action="store_false",
help="Use open boundary TFIM.",
)
ap.set_defaults(periodic=True)
# ground-truth options
ap.add_argument(
"--do_ground_truth",
action="store_true",
help="Also evaluate ground-truth TFIM VQE performance.",
)
ap.add_argument(
"--gt_budget",
type=int,
default=100,
help="Max number of circuits to query for ground-truth (paper often reports within 100 queries). :contentReference[oaicite:13]{index=13}",
)
ap.add_argument(
"--gt_lr",
type=float,
default=0.01,
help="Adam learning rate (paper uses 0.01). :contentReference[oaicite:14]{index=14}",
)
ap.add_argument(
"--gt_max_steps",
type=int,
default=2000,
help="Max Adam steps per circuit (practical cap).",
)
ap.add_argument(
"--gt_tol",
type=float,
default=1e-7,
help="Convergence tolerance for |E_t - E_{t-1}|.",
)
ap.add_argument("--dump", type=str, default="dump.json", help="Output JSON file.")
return ap.parse_args()
def expr_worker(payload):
circ, seed, samples = payload
return circ.expressibility_estimate(samples, seed)
def main():
args = parse_args()
rng = random.Random(args.seed)
# --- Stage 0: sample circuits
circuits = ( circuits = (
Stream(sample_generator(rng, qubits, depth)) Stream(sample_generator(rng, args.qubits, args.depth))
.filter(more_single_than_double) .filter(more_single_than_double)
.take(sample_amount) .take(args.sample_amount)
.collect() .collect()
) )
for circuit in tqdm(circuits):
circuit.calculate_paths()
circuits.sort(key=lambda qc: qc.paths, reverse=True)
seeds = [rng.randint(0, 1000000000) for _ in range(proxy_pass_amount)]
def starmap_func(args): # --- Stage 1: paths
(circ, seed, idx) = args for c in tqdm(circuits, desc="paths"):
res = circ.expressibility_estimate(expressibility_samples, seed) c.calculate_paths()
return res circuits.sort(key=lambda qc: qc.paths, reverse=True)
candidates = circuits[: args.proxy_pass_amount]
# --- Stage 2: expressibility
seeds = [rng.randint(0, 1_000_000_000) for _ in range(len(candidates))]
final_circuits: list[QuantumCircuit] = [] final_circuits: list[QuantumCircuit] = []
with Pool() as p: with Pool() as p:
for circ in tqdm( for circ in tqdm(
p.imap_unordered( p.imap_unordered(
starmap_func, expr_worker, zip(candidates, seeds, repeat(args.expressibility_samples))
zip(circuits[:proxy_pass_amount], seeds, range(proxy_pass_amount)),
), ),
total=proxy_pass_amount, total=len(candidates),
desc="expressibility",
): ):
final_circuits.append(circ) final_circuits.append(circ)
final_circuits.sort(key=lambda qc: qc.expressibility, reverse=True) final_circuits.sort(key=lambda qc: qc.expressibility, reverse=True)
for i, circuit in enumerate(final_circuits[::-1]):
print(f"circuit {i}:\n{circuit}")
with open("dump.json", "w+") as fp: # --- Ground-truth TFIM (optional): query from best to worst until success or budget
json.dump([c.to_json() for c in final_circuits], fp, indent=4) success_idx = None
if args.do_ground_truth:
H = tfim_hamiltonian_matrix(args.qubits, periodic=args.periodic)
E0 = exact_ground_energy(H)
print(f"\nTFIM exact ground energy E0 = {E0:.12f} (periodic={args.periodic})")
gt_seed_stream = random.Random(args.seed + 1234567)
queried = 0
for i, circ in enumerate(tqdm(final_circuits, desc="ground-truth queries")):
if queried >= args.gt_budget:
break
seed = gt_seed_stream.randint(0, 1_000_000_000)
ground_truth_tfim(
circ,
H,
E0,
seed=seed,
lr=args.gt_lr,
max_steps=args.gt_max_steps,
tol=args.gt_tol,
)
queried += 1
if circ.gt_success:
success_idx = i
break
if success_idx is None:
print(f"\nNo circuit hit chemical accuracy within budget={args.gt_budget}.")
else:
print(f"\nFirst success at rank {success_idx} (0-based) within budget.")
print(final_circuits[success_idx])
# Print bottom few (worst) and top few (best) for sanity
print("\nTop circuits (best expressibility):")
for i, c in enumerate(final_circuits[: min(args.topk, 10)]):
print(f"\nrank {i}:\n{c}")
# Dump top-K (or all if you want)
out = final_circuits[: args.topk]
with open(args.dump, "w", encoding="utf-8") as fp:
json.dump([c.to_json() for c in out], fp, indent=2)
print(f"\nWrote {len(out)} circuits to {args.dump}.")
if __name__ == "__main__":
main()