HyperAI
Back to Headlines

Gamma-Spektroskopie: Maschinelles Lernen zur Erkennung radioaktiver Isotopen

vor 3 Tagen

Im zweiten Teil meiner Analyse der Gamma-Spektroskopie-Daten mit Python werde ich zeigen, wie man ein maschinelles Lernmodell zur Erkennung radioaktiver Elemente erstellt und trainiert. Dieses Projekt basiert auf der Verwendung eines modernen Zählrohrdetektors, wie zum Beispiel des Radiacode 103G, um Gamma-Spektren zu sammeln und zu analysieren. Gamma-Spektren Ein Gamma-Spektrum ist eine grafische Darstellung der Energie von Gammastrahlung, die von radioaktiven Materialien emittiert wird. Während ein Geigerzähler nur die Anzahl der erkannten Partikel angibt, liefert ein Zählrohrdetektor sowohl die Anzahl als auch die Energie der Partikel. Dies ermöglicht es, spezifische Isotope zu identifizieren, da jedes Isotop einen einzigartigen Energiemuster („Fingerabdruck“) hat. Als erstes Beispiel habe ich ein Schmuckstück aus einem chinesischen Laden gekauft, das als „Ionenerzeuger“ beworben wurde. Es zeigte eine Radioaktivität von etwa 1,20 µSv/h, was 12-mal höher als der Hintergrundwert (0,1 µSv/h) ist. Das Schmuckstück enthält Thorium-232, und das Zerfallskettenprodukt davon sind Radium und Actinium. Der Peak von Actinium-228 ist gut im Spektrum erkennbar. Als zweites Beispiel betrachten wir eine Uranmineralfunde, wie sie in bestimmten Gebieten Deutschlands, der Tschechischen Republik oder der USA vorkommen können. Ein Gamma-Spektrum zeigt, dass das Mineral Uran enthält, aber kein Thorium. Daten Sammeln Die erste Herausforderung besteht darin, geeignete Proben zu sammeln. Obwohl ich kein nuklearer Forschungsinstitut bin, sind einige Materialien legal erhältlich und können gekauft werden, wie zum Beispiel Americium in Rauchmeldern, Radium in alten Uhren oder Thorium in Tungstenstäben. Die Daten werden mit dem Radiacode-Detektor erfasst und in JSON-Dateien gespeichert. Der Prozess, ein Gamma-Spektrum zu erfassen, dauert 10-20 Minuten pro Probe. Um genügend Daten zu sammeln, müssen mehrere Proben über längere Zeiträume hinweg gemessen werden. Im besten Fall kann man 100-200 Dateien sammeln, was für unser Projekt ausreichend ist. Hier ist eine vereinfachte Version des Python-Skripts, das die Spektren erfasst: ```python from radiacode import RadiaCode, RawData, Spectrum import random import time import datetime import json import logging def read_forever(rc: RadiaCode): while True: interval_sec = random.randint(1060, 3060) read_spectrum(rc, interval_sec) def read_spectrum(rc: RadiaCode, interval: int): rc.spectrum_reset() dt = datetime.datetime.now() filename = dt.strftime("spectrum-%Y%m%d%H%M%S.json") logging.debug(f"Spektrum für {interval // 60} Min erzeugen") t_start = time.monotonic() while time.monotonic() - t_start < interval: show_device_data(rc) time.sleep(0.4) spectrum = rc.spectrum() spectrum_save(spectrum, filename) def show_device_data(rc: RadiaCode): data = rc.data_buf() for record in data: if isinstance(record, RawData): logging.debug(f"CPS: {int(record.count_rate)}") def spectrum_save(spectrum: Spectrum, filename: str): duration_sec = spectrum.duration.total_seconds() data = { "a0": spectrum.a0, "a1": spectrum.a1, "a2": spectrum.a2, "counts": spectrum.counts, "duration": duration_sec, } with open(filename, "w") as f_out: json.dump(data, f_out, indent=4) logging.debug(f"Datei '{filename}' gespeichert") rc = RadiaCode() read_forever(rc) ``` Modell Trainieren Nachdem die Daten gesammelt wurden, können wir das Lernmodell trainieren. Die Daten sind auf Kaggle verfügbar, sodass Leser ihre eigenen Modelle erstellen können. Daten laden und vorbereiten Zuerst laden wir die JSON-Dateien und extrahieren die relevanten Features. Hier ist ein Beispiel, wie man ein Spektrum aus einer JSON-Datei lädt: ```python @dataclass class Spectrum: duration: int a0: float a1: float a2: float counts: list[int] def channel_to_energy(self, ch: int) -> float: return self.a0 + self.a1 * ch + self.a2 * ch**2 def energy_to_channel(self, e: float): c = self.a0 - e return int((np.sqrt(self.a1**2 - 4 * self.a2 * c) - self.a1) / (2 * self.a2)) def load_spectrum_json(filename: str) -> Spectrum: with open(filename) as f_in: data = json.load(f_in) return Spectrum( a0=data["a0"], a1=data["a1"], a2=data["a2"], counts=data["counts"], duration=int(data["duration"]), ) ``` Daten transformieren Um die Daten zu bereinigen, wenden wir einen Savitzky-Golay-Filter an, um Rauschen zu reduzieren, und normalisieren die Werte auf den Bereich 0 bis 1: ```python from scipy.signal import savgol_filter def smooth_data(data: np.array) -> np.array: window_size = 10 data_out = savgol_filter( data, window_length=window_size, polyorder=2, ) return np.clip(data_out, a_min=0, a_max=None) def normalize(spectrum: Spectrum) -> Spectrum: counts = np.array(spectrum.counts).astype(np.float64) counts = smooth_data(counts) val_norm = counts.max() return Spectrum( duration=spectrum.duration, a0=spectrum.a0, a1=spectrum.a1, a2=spectrum.a2, counts=counts / val_norm ) ``` Daten augmentation Da unser Datensatz klein ist, fügen wir synthetische Proben hinzu, indem wir Rauschen zu den originalen Spektren hinzufügen: python def add_noise(spectrum: Spectrum) -> Spectrum: counts = np.array(spectrum.counts) ch_empty = spectrum.energy_to_channel(680.0) val_norm = counts[ch_empty] ampl = val_norm / 2 noise = np.random.normal(0, ampl, counts.shape) data_out = np.clip(counts + noise, min=0) return Spectrum( duration=spectrum.duration, a0=spectrum.a0, a1=spectrum.a1, a2=spectrum.a2, counts=data_out ) Features extrahieren Wir extrahieren die Spektrumwerte für spezifische Isotope, um die Datenmenge zu reduzieren: ```python isotopes = [ ("Am-241", 59.5), ("K-40", 1460.0), ("Ra-226", 186.2), ("Pb-214", 242.0), ("Pb-214", 295.2), ("Pb-214", 351.9), ("Bi-214", 609.3), ("Bi-214", 1120.3), ("Bi-214", 1764.5), ("Pb-212", 238.6), ("Ac-228", 338.2), ("TI-208", 583.2), ("AC-228", 911.2), ("AC-228", 969.0), ("Th-234", 63.3), ("Th-231", 84.2), ("Th-234", 92.4), ("Th-234", 92.8), ("U-235", 143.8), ("U-235", 185.7), ("U-235", 205.3), ("Pa-234m", 766.4), ("Pa-234m", 1000.9), ] def get_features(spectrum: Spectrum, isotopes: List) -> np.array: energies = [energy for _, energy in isotopes] data = [spectrum.counts[spectrum.energy_to_channel(energy)] for energy in energies] return np.array(data) ``` Modell Trainieren Wir kombinieren alle JSON-Dateien zu einem Datensatz und trainieren das Modell: ```python all_files = [ ("Americium", glob.glob("../data/train/americium.json")), ("Radium", glob.glob("../data/train/radium.json")), ("Thorium", glob.glob("../data/train/thorium.json")), ("Uranium Glass", glob.glob("../data/train/uraniumGlass.json")), ("Uranium Glaze", glob.glob("../data/train/uraniumGlaze.json")), ("Uraninite", glob.glob("../data/train/uraninite.json")), ("Background", glob.glob("../data/train/background*.json")), ] def prepare_data(augmentation: int) -> Tuple[np.array, np.array]: x, y = [], [] for name, files in all_files: for filename in files: logging.debug(f"Verarbeite {filename}...") sp = normalize(load_spectrum_json(filename)) for _ in range(augmentation): sp_out = add_noise(sp) x.append(get_features(sp_out, isotopes)) y.append(name) return np.array(x), np.array(y) X_train, y_train = prepare_data(augmentation=10) from sklearn.preprocessing import LabelEncoder le = LabelEncoder() le.fit(y_train) y_train = le.transform(y_train) from xgboost import XGBClassifier from sklearn.model_selection import GridSearchCV bst = XGBClassifier(n_estimators=10, max_depth=2, learning_rate=1) clf = GridSearchCV( bst, { "max_depth": [1, 2, 3, 4], "n_estimators": range(2, 20), "learning_rate": [0.001, 0.01, 0.1, 1.0, 10.0] }, verbose=1, n_jobs=1, cv=3, ) clf.fit(X_train, y_train) print("best_score:", clf.best_score_) > best_score: 0.99474 print("best_params:", clf.best_params_) > best_params: {'learning_rate': 1.0, 'max_depth': 1, 'n_estimators': 9} ``` Modell speichern Das trainierte Modell sowie die Liste der Isotope und Labels werden gespeichert: python isotopes_save("../models/V1/isotopes.json") bst.save_model("../models/V1/XGBClassifier.json") np.save("../models/V1/LabelEncoder.npy", le.classes_) Testen Um das Modell zu testen, verwenden wir XML-Dateien, die mit der Radiacode-App gesammelt wurden. Hier ist ein Beispiel, wie man ein Spektrum aus einer XML-Datei lädt: ```python import xmltodict def load_spectrum_xml(file_path: str) -> Spectrum: with open(file_path) as f_in: doc = xmltodict.parse(f_in.read()) result = doc["ResultDataFile"]["ResultDataList"]["ResultData"] spectrum = result["EnergySpectrum"] cal = spectrum["EnergyCalibration"]["Coefficients"]["Coefficient"] a0, a1, a2 = float(cal[0]), float(cal[1]), float(cal[2]) duration = int(spectrum["MeasurementTime"]) data = spectrum["Spectrum"]["DataPoint"] return Spectrum( duration=duration, a0=a0, a1=a1, a2=a2, counts=[int(x) for x in data], ) ``` Das Testen des Modells ist einfach: ```python bst = XGBClassifier() bst.load_model("../models/V1/XGBClassifier.json") isotopes = isotopes_load("../models/V1/isotopes.json") le = LabelEncoder() le.classes_ = np.load("../models/V1/LabelEncoder.npy") test_data = [ ["../data/test/background1.xml", "../data/test/background2.xml"], ["../data/test/thorium1.xml", "../data/test/thorium2.xml"], ["../data/test/uraniumGlass1.xml", "../data/test/uraniumGlass2.xml"], ... ] for group in test_data: data = [] for filename in group: spectrum = load_spectrum(filename) features = get_features(normalize(spectrum), isotopes) data.append(features) X_test = np.array(data) preds = bst.predict(X_test) preds = le.inverse_transform(preds) print(preds) > ['Background' 'Background'] > ['Thorium' 'Thorium'] > ['Uranium Glass' 'Uranium Glass'] > ... ``` Echtzeit-Test Schließlich testen wir das Modell im Echtbetrieb mit dem Radiacode-Detektor: ```python from radiacode import RadiaCode, RealTimeData, Spectrum import logging le = LabelEncoder() le.classes_ = np.load("../models/V1/LabelEncoder.npy") isotopes = isotopes_load("../models/V1/isotopes.json") bst = XGBClassifier() bst.load_model("../models/V1/XGBClassifier.json") def read_spectrum(rc: RadiaCode): spectrum = rc.spectrum() logging.debug(f"Spektrum: {spectrum.duration} Sekunden Sammlungszeit") result = predict_spectrum(spectrum) logging.debug(f"Vorhersage: {result}") def predict_spectrum(sp: Spectrum) -> str: features = get_features(normalize(sp), isotopes) preds = bst.predict([features]) return le.inverse_transform(preds)[0] def read_cps(rc: RadiaCode): data = rc.data_buf() for record in data: if isinstance(record, RealTimeData): logging.debug(f"CPS: {record.count_rate:.2f}") if name == 'main': logging.basicConfig(level=logging.DEBUG, format="[%(asctime)-15s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") rc = RadiaCode() logging.debug("ML-Modell geladen") fw_version = rc.fw_version() logging.debug(f"Gerät verbunden, Firmware {fw_version[1]}") rc.spectrum_reset() while True: for _ in range(12): read_cps(rc) time.sleep(5.0) read_spectrum(rc) ``` Fazit In diesem Artikel habe ich den Prozess der Erstellung und des Trainings eines maschinellen Lernmodells zur Vorhersage radioaktiver Isotope beschrieben. Das Modell wurde erfolgreich mit verschiedenen Proben getestet, die legal erworben werden können. Leser, die keine Radiacode-Hardware besitzen, können die gesammelten Daten auf Kaggle nutzen. Für zukünftige Verbesserungen könnten mehr Daten, zusätzliche Features und andere Modelltypen berücksichtigt werden. Ich plane, im nächsten Teil einen interaktiven HTMX-Frontend für das Modell zu entwickeln, falls genügend Interesse vorhanden ist. Dieses Projekt verwendet den Radiacode-Detektor, der für interessante Experimente geeignet ist (Haftungsausschluss: Ich habe kein finanzielles oder kommerzielles Interesse an dessen Verkauf). Alle Quellcode-Dateien sind auf meiner Patreon-Seite frei verfügbar. Leser können mich auch über LinkedIn kontaktieren, wo ich regelmäßig kleinere Beiträge veröffentleiche. Vielen Dank fürs Lesen.

Related Links