基于强化学习的超导腔频率控制

这段代码实现了一个使用强化学习(Reinforcement Learning, RL)控制超导腔体的仿真环境,并利用 stable_baselines3 中的 PPO 算法进行训练。下面将逐步分析代码的结构和功能。

整体概述

代码的主要目的是:

  1. 定义一个超导腔体的仿真环境 SuperconductingCavityEnv,基于 gymnasium 框架,用于模拟腔体的动态行为。
  2. 实现一个单步仿真函数 sim_cav,用于计算腔体在给定输入下的状态更新。
  3. 使用 PPO 算法训练一个代理(agent),通过调整 Piezo 补偿来控制腔体的失谐量(detuning),以优化系统性能。
  4. 提供实时可视化功能,监控训练过程中的关键变量。

代码依赖以下库:

  • **numpy**:数值计算。
  • **gymnasium**:强化学习环境接口。
  • **stable_baselines3**:提供 PPO 算法实现。
  • **matplotlib.pyplot**:数据可视化。
  • **llrflibs.rf_simllrflibs.rf_control**:自定义射频仿真和控制库。

代码逐部分解析

导入库

1
2
3
4
5
6
7
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
import matplotlib.pyplot as plt
from llrflibs.rf_sim import *
from llrflibs.rf_control import *

这些库为代码提供了数值计算、强化学习环境、PPO 算法和可视化的支持。llrflibs 可能是自定义模块,包含超导腔体仿真相关的函数(如 sim_scav_stepcav_ss_mech 等)。

单步仿真函数 sim_cav

1
2
3
4
5
6
7
8
9
10
11
12
13
def sim_cav(half_bw, RL, dw_step0, detuning0, vf_step, state_vc, Ts, beta=1e4,
state_m0=0, Am=None, Bm=None, Cm=None, Dm=None,
pulsed=True, beam_pul=None, beam_cw=0, buf_id=0):
if pulsed:
vb = -RL * beam_pul[buf_id if buf_id < len(beam_pul) else -1]
else:
vb = beam_cw

status, vc, vr, dw, state_m = sim_scav_step(half_bw, dw_step0, detuning0, vf_step, vb, state_vc, Ts,
beta=beta, state_m0=state_m0, Am=Am, Bm=Bm, Cm=Cm, Dm=Dm,
mech_exe=True)
state_vc = vc
return vc, vr, dw, state_vc, state_m

功能

  • 模拟超导腔体在一个时间步内的动态变化。
  • 输入参数包括半带宽 half_bw、负载电阻 RL、初始失谐 dw_step0、激励电压 vf_step、当前腔体电压 state_vc 等。
  • 根据是否为脉冲模式(pulsed),选择束流电压 vb
    • 脉冲模式:从 beam_pul 中提取值。
    • 连续模式:使用常数 beam_cw
  • 调用 sim_scav_step(自定义函数)执行一步仿真,更新腔体电压 vc、反射电压 vr、失谐量 dw 和机械状态 state_m
  • 返回更新后的状态。

关键点

  • sim_scav_step 是核心仿真函数,可能是 llrflibs.rf_sim 中的实现。
  • 函数支持机械模式的模拟(mech_exe=True)。

环境类 SuperconductingCavityEnv

这是一个继承自 gym.Env 的强化学习环境类,用于模拟超导腔体的行为。

初始化 __init__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SuperconductingCavityEnv(gym.Env):
def __init__(self, pulse_cycles=25):
super().__init__()
self.Ts = 1e-6 # 时间步长 1μs
self.pulse_cycles = pulse_cycles # 每个 episode 的脉冲周期数
self.current_cycle = 0 # 当前周期计数器

self._init_mechanical_modes() # 初始化机械模式
self._init_cavity_parameters() # 初始化腔体参数
self._init_simulation_parameters() # 初始化仿真参数

self.action_space = spaces.Box(low=-2 * np.pi * 100, high=2 * np.pi * 100, shape=(1,), dtype=np.float32)
self.observation_space = spaces.Box(low=np.array([0, -np.pi, -2 * np.pi * 500]),
high=np.array([20e6, np.pi, 2 * np.pi * 500]), dtype=np.float32)

self.history = {'vc_amp': [], 'vc_phase': [], 'detuning': [], 'actions': []}
  • 时间参数:时间步长 Ts = 1e-6(1微秒),每个 episode 有 pulse_cycles 个脉冲周期。
  • 初始化顺序:依次初始化机械模式、腔体参数和仿真参数。
  • 动作空间[-100 Hz, 100 Hz],表示 Piezo 调整的失谐量(弧度/秒)。
  • 观测空间:三维向量 [vc_amp, vc_phase, dw],分别表示腔体电压幅值(0到20MV)、相位(-π到π)和失谐量(-500 Hz到500 Hz)。
  • 历史记录:存储电压幅值、相位、失谐量和动作的数据。

初始化机械模式 _init_mechanical_modes

1
2
3
4
5
6
7
8
def _init_mechanical_modes(self):
self.mech_modes = {'f': [280, 341, 460, 487, 618], 'Q': [40, 20, 50, 80, 100], 'K': [2, 0.8, 2, 0.6, 0.2]}
status, Am, Bm, Cm, Dm = cav_ss_mech(self.mech_modes)
if not status:
raise RuntimeError("机械模式状态空间模型创建失败")
status, self.Ad, self.Bd, self.Cd, self.Dd, _ = ss_discrete(Am, Bm, Cm, Dm, Ts=self.Ts, method='zoh', plot=False)
if not status:
raise RuntimeError("离散化状态空间模型创建失败")
  • 定义机械振动的频率 f(Hz)、品质因数 Q 和耦合系数 K
  • 使用 cav_ss_mech 创建连续状态空间模型 (Am, Bm, Cm, Dm)
  • 使用 ss_discrete 将其离散化为 (Ad, Bd, Cd, Dd),以适应离散时间步长 Ts

初始化腔体参数 _init_cavity_parameters

1
2
3
4
5
6
7
8
def _init_cavity_parameters(self):
self.f0 = 1.3e9 # 共振频率 1.3 GHz
self.beta = 1e4 # 耦合因子
self.roQ = 1036 # 特性阻抗 (Ohm)
self.QL = 3e6 # 负载品质因数
self.RL = 0.5 * self.roQ * self.QL # 负载电阻
self.wh = np.pi * self.f0 / self.QL # 半带宽
self.dw0 = 0.0 # 初始失谐
  • 设置超导腔体的物理参数,包括共振频率、耦合因子等。
  • 计算负载电阻 RL 和半带宽 wh

初始化仿真参数 _init_simulation_parameters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _init_simulation_parameters(self):
self.t_fill = 510 # 填充时间
self.t_flat = 1300 # 平顶时间
self.buf_size = 2048 * 8 # 缓冲区大小
self.base_pul = np.zeros(self.buf_size, dtype=complex)
self.base_pul[:self.t_flat] = 1.0
self.beam_pul = np.zeros(self.buf_size, dtype=complex)
self.beam_pul[self.t_fill:self.t_flat] = 0.008

self.fsrc = -460 # RF 源频率 (Hz)
self.Asrc = 1.0 # RF 源幅度 (V)
self.pha_src = 0.0 # RF 源相位 (rad)
self.gain_dB = 20 * np.log10(12e6) # 增益 (dB)

self.buf_id = 0
self.state_vc = 0.0 + 0j # 初始腔体电压
self.state_m = np.matrix(np.zeros(self.Bd.shape)) # 初始机械状态
  • 定义脉冲形状:base_pul 表示激励脉冲,beam_pul 表示束流脉冲。
  • 设置 RF 源参数和初始仿真状态。

重置环境 reset

1
2
3
4
5
6
7
8
9
10
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.buf_id = 0
self.current_cycle = 0
self.state_vc = np.complex64(0.0)
self.state_m = np.matrix(np.zeros(self.Bd.shape))
self.pha_src = 0.0
self.dw_current = np.float32(0.0)
self.history = {k: [] for k in self.history}
return self._get_obs(), {}
  • 将仿真状态重置为初始值,清空历史数据,返回初始观测。

单步执行 step

1
2
3
4
5
6
7
8
9
10
11
12
def step(self, action):
dw_piezo = action[0]
cycle_rewards = []
for _ in range(self.buf_size):
self._simulation_step(dw_piezo)
self._record_data()
cycle_rewards.append(-abs(self.dw_current / (2 * np.pi)))
self.current_cycle += 1
reward = np.mean(cycle_rewards)
terminated = self.current_cycle >= self.pulse_cycles
truncated = False
return self._get_obs().flatten(), reward, terminated, truncated, {}
  • 应用 Piezo 调整 dw_piezo
  • 执行一个完整脉冲周期(buf_size 个时间步),计算每个时间步的奖励(负失谐量绝对值)。
  • 返回平均奖励、是否终止等信息。

单步仿真 _simulation_step

1
2
3
4
5
6
7
8
9
10
11
def _simulation_step(self, dw_piezo):
dw_micr = 2.0 * np.pi * self.np_random.normal(0, 10)
S0, self.pha_src = self._sim_rfsrc()
S1 = self._sim_iqmod(S0)
S2 = self._sim_amp(S1)
self.vc, vr, self.dw_current, self.state_vc, self.state_m = sim_cav(
self.wh, self.RL, self.dw_current, self.dw0 + dw_micr + dw_piezo, S2, self.state_vc, self.Ts,
beta=self.beta, state_m0=self.state_m, Am=self.Ad, Bm=self.Bd, Cm=self.Cd, Dm=self.Dd,
pulsed=True, beam_pul=self.beam_pul, buf_id=self.buf_id
)
self.buf_id = (self.buf_id + 1) % self.buf_size
  • 生成随机微扰 dw_micr
  • 模拟 RF 信号链(源、IQ 调制、放大)。
  • 调用 sim_cav 更新腔体状态。

获取观测 _get_obs

1
2
3
4
5
def _get_obs(self):
vc_amp = np.abs(np.array([self.state_vc], dtype=complex))[0]
vc_phase = np.angle(np.array([self.state_vc], dtype=complex))[0]
dw = self.dw_current / (2 * np.pi)
return np.array([vc_amp, vc_phase, dw], dtype=np.float32)
  • 返回腔体电压幅值、相位和失谐量(Hz)。

数据记录 _record_data

1
2
3
4
5
def _record_data(self):
self.history['vc_amp'].append(abs(self.state_vc) * 1e-6)
self.history['vc_phase'].append(np.angle(self.state_vc) * 180 / np.pi)
self.history['detuning'].append(self.dw_current / (2 * np.pi))
self.history['actions'].append(self.dw_current / (2 * np.pi))
  • 记录电压幅值(MV)、相位(度)、失谐量(Hz)和动作(Hz)。

可视化 render

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def render(self, mode='human'):
if not hasattr(self, 'fig'):
self.fig, self.axs = plt.subplots(4, 1, figsize=(10, 8))
plt.ion()
plt.show()
titles = ['Cavity Voltage (MV)', 'Phase (deg)', 'Detuning (Hz)', 'Piezo Adjustment (Hz)']
for i, ax in enumerate(self.axs):
ax.clear()
ax.set_title(titles[i])
ax.grid(True)
if i < 3:
data = list(self.history.values())[i]
ax.plot(data[-1000:])
else:
ax.plot(self.history['actions'][-1000:])
plt.pause(0.001)
  • 实时绘制四个子图,显示最近 1000 个数据点。

训练配置

1
2
3
4
5
6
7
8
9
if __name__ == "__main__":
env = SuperconductingCavityEnv()
model = PPO("MlpPolicy", env, verbose=1, learning_rate=3e-4, n_steps=2048, batch_size=64, tensorboard_log="./tb_logs/")
try:
model.learn(total_timesteps=1e6, progress_bar=True)
except KeyboardInterrupt:
pass
model.save("sc_cavity_control")
env.close()
  • 创建环境实例。
  • 配置 PPO 模型:使用 MLP 策略,学习率 3e-4,每 2048 步更新,批次大小 64。
  • 训练 100 万步,保存模型为 sc_cavity_control

总结

这段代码实现了一个超导腔体的强化学习控制系统:

  • 环境SuperconductingCavityEnv 模拟腔体动态,包括机械振动、RF 信号链和束流影响。
  • 目标:通过 Piezo 调整失谐量,最小化腔体的失谐(奖励为负失谐量绝对值)。
  • 训练:使用 PPO 算法优化控制策略。
  • 可视化:实时监控电压、相位、失谐量和动作。