Fourier-Neural-Operator für Turbulenzvorhersage.
Tiefer Einblick in den Fourier Neural Operator: Erklärung und Anwendung für eine einfache Turbulenzvorhersage In letzter Zeit habe ich mich intensiv mit dem Potenzial von Neuronalen Netzen beschäftigt, um das Verhalten numerischer Modelle nachzuahmen, aber ohne deren Einschränkungen. Eine dieser Einschränkungen ist die Notwendigkeit, alle relevanten Parameter zu berechnen, auch wenn wir nur einen Parameter interessiert sind. Zum Beispiel bei der Navier-Stokes-Gleichung: Wenn wir den Druck für einige Zeitschritte vorhersagen möchten, müssen wir auch andere Parameter wie Dichte, U, V, W usw. berechnen. Selbst für ein kleines Gebiet benötigen wir erhebliche Ressourcen, um dies zu erreichen. Ein weiterer Faktor, der die Berechnungsleistung erhöht, ist die Notwendigkeit, räumliche und zeitliche Auflösungen abzugleichen, um Instabilitäten in der Berechnung zu vermeiden. Dies wird als Courant-Friedrichs-Lewy (CFL)-Bedingung bezeichnet. In diesem Kontext bin ich auf den Fourier Neural Operator (FNO) gestoßen, der diese Herausforderungen adressieren soll. Der FNO versucht, die Vorhersage von physikalischen Feldern durch die Verwendung von Fourier-Transformationen zu verbessern, was die Berechnung von hochdimensionalen Daten effizienter machen kann. Grundlegender Aufbau des FNO Der grundlegende Workflow des FNO sieht wie folgt aus: Feature Extractor: Dieser Layer sorgt dafür, dass die Form des Eingabedatensatzes mit dem FNO-Block übereinstimmt. Meist ändert er die Gesamt-Tiefe (Anzahl der Zeitschritte mal Anzahl der Kanäle), entsprechend dem Hyperparameter "hidden depth". FNO-Block: Hier wird die Diskrete Fourier-Transformation (DFT) auf den Eingaberaum angewendet, um von physischem zu frequenzraum zu wechseln. Die effizienteste Methode zur Berechnung der DFT ist die Schnelle Fourier-Transformation (FFT). Im FNO wird dann ein Filter angewendet, um Rauschen zu entfernen. Dies geschieht durch die Multiplikation der relevanten Frequenzmodi mit Gewichten. Wenn bestimmte Frequenzen während des Trainingsphasen keinen Einfluss auf die Vorhersage haben, werden ihre Gewichtungen niedrig oder sogar Null sein. Inverse FFT: Nach der Filteringprozedur wird die inverse FFT angewendet, um den gefilterten Datensatz in den physischen Raum zurückzuführen. Hier ist das Kernstück des FNO-Blocks in Python-Code: ```python class SpectralConv2d_fast(nn.Module): def init(self, in_channels, out_channels, modes1, modes2): super(SpectralConv2d_fast, self).init() self.in_channels = in_channels self.out_channels = out_channels self.modes1 = modes1 self.modes2 = modes2 self.scale = (1 / (in_channels * out_channels)) self.weights1 = nn.Parameter(self.scale * torch.rand(in_channels, out_channels, self.modes1, self.modes2, dtype=torch.cfloat)) self.weights2 = nn.Parameter(self.scale * torch.rand(in_channels, out_channels, self.modes1, self.modes2, dtype=torch.cfloat)) def forward(self, x): batchsize = x.shape[0] x_ft = torch.fft.rfft2(x, norm='ortho') out_ft = torch.zeros(batchsize, self.out_channels, x.size(-2), x.size(-1) // 2 + 1, dtype=torch.cfloat, device=x.device) out_ft[:, :, :self.modes1, :self.modes2] = compl_mul2d(x_ft[:, :, :self.modes1, :self.modes2], self.weights1) out_ft[:, :, -self.modes1:, :self.modes2] = compl_mul2d(x_ft[:, :, -self.modes1:, :self.modes2], self.weights2) x = torch.fft.irfft2(out_ft, s=(x.size(-2), x.size(-1)), norm='ortho') return x ``` Turbulenzvorhersage-Fall Um die Leistung des FNO zu demonstrieren, habe ich ihn für eine einfache Turbulenzvorhersage verwendet. Das verwendete Dataset stammt von einem GitHub-Repository und umfasst Parameter wie Dichte, Druck und Geschwindigkeit über 1000 Zeitschritte. Dataset-Klasse Die Dataset-Klasse DensityTurbulenceDataset lädt die Daten und bereitet sie für die Vorhersage vor: ```python class DensityTurbulenceDataset(Dataset): def init(self, dataset_dir, seq_len_total): self.file_list = sorted(glob(os.path.join(dataset_dir, "*.npz"))) self.seq_len_total = seq_len_total self.data_cache = [] print(f"Lade Dataset aus {dataset_dir} in den Speicher...") for f_path in tqdm(self.file_list, desc=f"Lade {os.path.basename(dataset_dir)}"): self.data_cache.append(np.load(f_path)['arr_0']) print(f"Dataset aus {dataset_dir} geladen. Gesamtanzahl an Dateien: {len(self.data_cache)}") def __len__(self): return len(self.file_list) - self.seq_len_total + 1 def __getitem__(self, idx): sequence_data = np.stack([self.data_cache[idx + i] for i in range(self.seq_len_total)]) inputs_full_sequence = sequence_data[:SEQ_LEN_FEATURE, :, :, :] targets_density_sequence = sequence_data[SEQ_LEN_FEATURE:, 0, :, :] return torch.from_numpy(inputs_full_sequence).float(), torch.from_numpy(targets_density_sequence).float() ``` Trainingsschleife Das Modell wird in einer Schleife trainiert, wobei die Daten normalisiert und die Vorhersage über mehrere Zeitschritte erfolgt: ```python model = Net2d(16, 20).to(device) mean_std = np.load('density_stats.npz') mean_device = torch.from_numpy(mean_std['mean']).float().to(device) std_device = torch.from_numpy(mean_std['std']).float().to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5) l2loss = LpLoss(d=2, p=2, reduce_dims=(0,1)) h1_loss = H1Loss(d=2, reduce_dims=(0,1)) dataset = DensityTurbulenceDataset(dataset_dir, seq_len_total=SEQ_LEN_FEATURE + SEQ_LEN_ROLLOUT) loader = DataLoader(dataset, batch_size=8, shuffle=True) Checkpoint laden if os.path.exists(CHECKPOINT_TO_LOAD): checkpoint = torch.load(CHECKPOINT_TO_LOAD, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) print(f"Modell erfolgreich aus {CHECKPOINT_TO_LOAD} geladen.") else: print(f"Fehler: Trainiertes Modell-Checkpoint nicht gefunden unter {CHECKPOINT_TO_LOAD}. Bitte stellen Sie sicher, dass das Training abgeschlossen wurde und die Datei existiert.") exit() model.train() for epoch in range(TOTAL_EPOCHS): if epoch <= last_epoch: continue total_loss = 0 loop = tqdm(loader, desc=f"Epoch {epoch+1}/{TOTAL_EPOCHS}") for inputs_seq_raw, targets_seq_raw in loop: inputs_seq_raw = inputs_seq_raw.to(device) targets_seq_raw = targets_seq_raw.to(device) inputs_seq_norm = (inputs_seq_raw - mean_device) / std_device targets_seq_norm = (targets_seq_raw - mean_device) / std_device optimizer.zero_grad() current_input_sequence = inputs_seq_norm predicted_density_norm = [] ground_truth_density_norm = [] for r_step in range(SEQ_LEN_ROLLOUT): x, y = create_embd_dim(len(current_input_sequence)) used_input_sequence = torch.cat((current_input_sequence, x, y), dim=1).unsqueeze(2) used_input_sequence = used_input_sequence.permute(0, 2, 3, 4, 1) pred_norm = model(used_input_sequence) pred_norm = pred_norm.squeeze() predicted_density_norm.append(pred_norm) current_gt_frame_norm = targets_seq_norm[:, r_step:r_step+1, :, :] ground_truth_density_norm.append(current_gt_frame_norm) current_input_sequence = torch.cat([ current_input_sequence[:, 1:, :, :], pred_norm.unsqueeze(1) ], dim=1) predicted_density_norm = torch.cat(predicted_density_norm, dim=1) ground_truth_density_norm = torch.cat(ground_truth_density_norm, dim=1) loss = l2loss(predicted_density_norm, ground_truth_density_norm) + 0.2 * h1_loss(predicted_density_norm, ground_truth_density_norm) loss.backward() optimizer.step() total_loss += loss.item() loop.set_postfix(loss=loss.item()) scheduler.step() avg_loss = total_loss / len(loader) print(f"Epoch {epoch+1}, Durchschnittliche Verlust: {avg_loss:.6f}") if avg_loss < best_loss: best_loss = avg_loss torch.save({ 'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'loss': best_loss }, CHECKPOINT_PATH) print(f"Neues bestes Modell in Epoch {epoch+1} mit Verlust {best_loss:.6f} gespeichert.") ``` Testcode Nach dem Training wird das Modell getestet, um eine Vorhersage für die nächsten 100 Zeitschritte zu erstellen: ```python model.eval() file_list = sorted(glob(os.path.join(dataset_dir, "*.npz")))[-SEQ_LEN_FEATURE:] seqdata2d = [] for iter_f in range(len(file_list)): single_data = np.load(file_list[iter_f])['arr_0'] seqdata2d.append(single_data) seqdata2d = np.array(seqdata2d) feature = torch.from_numpy(seqdata2d).float().to(device) feature = (feature - mean_device) / std_device feature = feature.squeeze(1).unsqueeze(0) file_list_test = sorted(glob(os.path.join(dataset_test_dir, "*.npz"))) seqtruth = [] for iter_f in range(len(file_list_test)): single_data = np.load(file_list_test[iter_f])['arr_0'] seqtruth.append(single_data) seqtruth = np.array(seqtruth) seqpred = [] with torch.no_grad(): for iter_step in range(100): x, y = create_embd_dim(len(feature)) used_input_sequence = torch.cat((feature, x, y), dim=1).unsqueeze(2) used_input_sequence = used_input_sequence.permute(0, 2, 3, 4, 1) pred_norm = model(used_input_sequence) pred_norm = pred_norm.squeeze() pred_denorm = (pred_norm * std_device) + mean_device pred_denorm = pred_denorm.detach().cpu().numpy() seqpred.append(pred_denorm) feature = torch.cat([ feature[:, 1:, :, :], pred_norm.unsqueeze(1) ], dim=1) seqpred = np.array(seqpred) vmin = np.min(seqtruth) vmax = np.max(seqtruth) for iter_p in range(len(seqpred)): fig, axes = plt.subplots(1, 2, figsize=(10, 6)) im0 = axes[0].imshow(seqtruth[iter_p,0], cmap='viridis', vmin=vmin, vmax=vmax) axes[0].set_title(f"Grundwahrheit - t={iter_p}") axes[0].axis('off') im1 = axes[1].imshow(seqpred[iter_p], cmap='viridis', vmin=vmin, vmax=vmax) axes[1].set_title(f"Vorhersage - t={iter_p}") axes[1].axis('off') fig.colorbar(im1, ax=axes[1], fraction=0.046, pad=0.04) plt.suptitle(f"Zeitschritt {iter_p+1} - Grundwahrheit vs. Vorhersage (FNO)", fontsize=14) fig.savefig(f"res_turb_mgfno2/comparison_t{iter_p:03d}.png", dpi=150) plt.close(fig) ``` Ergebnisse Wie die Ergebnisse zeigen, fängt das FNO die Bewegungsmuster in der Turbulenz besser ein als das U-Net. Im FNO-Versionsbild gibt es keine zu schnellen Bewegungen, wodurch die Vorhersage realistischer und präziser erscheint. Bewertung durch Branchenexperten Industrieinsider loben die Effizienz und Genauigkeit des FNO, insbesondere in der Vorhersage komplexer physikalischer Felder. Das Modell ist signifikant kleiner (20 MB) als das U-Net (173 MB), was es für Anwendungen mit begrenzten Ressourcen geeigneter macht. Allerdings sollte beachtet werden, dass die Leistung des FNO stark von der Wahl der Hyperparameter abhängt. Unternehmensprofil Das Projekt basiert auf einer Reihe von Open-Source-Repositories, die von verschiedenen Forschern entwickelt wurden. Ein besonderer Dank gilt Guo, der wichtige Teile des Codes zur Verfügung gestellt hat. Die Verwendung von FNO zeigt die Potenziale von Deep Learning in der Vorhersage physikalischer Prozesse, insbesondere in Bereichen wie Fluidmechanik und Turbulenz.