拡散モデルによる画像生成の周波数特性とモータ設計画像

AI

MotorAIの清水です。

画像生成における拡散モデルの周波数特性に関して、以下の記事が面白かったので、モータ設計画像との関係も含めてメモ。

Diffusion is spectral autoregression
A deep dive into spectral analysis of diffusion models of images, revealing how they implicitly perform a form of autoregression in the frequency domain.

1. 参考先記事が何をしているか

画像生成における拡散モデル(diffusion models)では、通常、低周波から順に高周波を再構成していくことが知られています。拡散モデルでは通常、画像に徐々にガウシアンノイズを加えるcorruption processを用いますが、一般画像は高周波成分が少ないため、この過程で周波数ごとに均一なガウシアンノイズを加えると、高周波成分から先に失われていきます。画像生成時はこの逆のデノイジング過程なので、高周波成分が最後に生成されます。

それをプログラム付きで可視化したのが参考先の記事です。以下は、imagenetの一般画像びスペクトラムを可視化したものです。

このように高周波ほど減衰する情報に対して、周波数ごとに均一なノイズを加えると、高周波情報から失われるよね、ということを解説しています。

(記事内では、音声生成の文脈の話もしています)

2. モータ画像

私は2021年くらいにモータ設計を画像に変換して、GANで画像生成的にモータを設計する、みたいな研究をしていました。

深層生成モデルを活用した埋込磁石同期モータの自動設計システムを提案しました!【セルフ論文解説】
立命館大学の清水です。論文が IEEE Trans. Energy Convers. に採択されましたので、日本語で解説します。原論文はこちら(オープンアクセス)からどうぞ。 深層生成モデルを活用したIPMSMの自動設計 2022年は ...
Automatic Design System With Generative Adversarial Network and Convolutional Neural Network for Optimization Design of Interior Permanent Magnet Synchronous Motor
The optimal design of interior permanent magnet synchronous motors requires a long time because finite element analysis (FEA) is performed repeatedly. To solve ...
Automatic Design System With Generative Adversarial Network and Vision Transformer for Efficiency Optimization of Interior Permanent Magnet Synchronous Motor
Interior permanent magnet synchronous motors are becoming increasingly popular as traction motors in environmentally friendly vehicles. These motors, which offe...

その時のモータ画像はどうなんだろうと思ったので、スペクトラムを見てみます。当時使っていたモータ画像は以下の通り。V字(3rd Prius相当)、2D(5th Prius相当)、Nabla(Leaf相当)の典型形状を扱っていました。

これに対して、スペクトラム特性を計算してみると、以下の通り。(プログラムは下の方に)

imagenetの代表画像と重ねてみるとこんな感じ。

一般画像と、スペクトラムで見るとそんなに大差がないことがわかりました。すなわち、モータ設計に関しても、拡散モデルが有効かもね、という仮説が立てられます。多分まだ誰もやっていないはずです。

3. プログラム

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
import glob
import itertools

## 実行ディレクトリにこのプログラムの場所を追加
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)

# TensorFlow Datasets と OpenCV のインポート(オプション)
try:
import tensorflow_datasets as tfds
import cv2
TFDS_AVAILABLE = True
print("TensorFlow Datasets利用可能")
except ImportError:
TFDS_AVAILABLE = False
print("TensorFlow Datasets未インストール - ローカル画像のみ使用")

# matplotlib バックエンド設定(GUI不要)
plt.switch_backend('Agg')

# 出力ディレクトリ作成
output_dir = 'spectral_analysis_results'
output_dir = os.path.join(current_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)

# 処理する画像のファイルパス
# motor_image_paths = ['20250608/image_V.png', '20250608/image_2D.png', '20250608/image_Nabla.png']
motor_image_paths = ['image_V.png', 'image_2D.png', 'image_Nabla.png']
motor_image_paths = [os.path.join(current_dir, path) for path in motor_image_paths]
motor_image_labels = ['image_V', 'image_2D', 'image_Nabla']

# ImageNetの画像パス(パターンマッチングで検索)
imagenet_patterns = ['imagenet*.png', 'imagenet*.jpg', 'imagenet*.jpeg',
'natural*.png', 'natural*.jpg', 'natural*.jpeg',
'sample*.png', 'sample*.jpg', 'sample*.jpeg']
# 絶対パスでのパターン検索用
imagenet_patterns = [os.path.join(current_dir, pattern) for pattern in imagenet_patterns]

def resize_and_crop(image, smallest_side):
"""画像のリサイズと中央クロップ"""
height, width, num_channels = image.shape

# リサイズ
if height <= width:
new_height = smallest_side
new_width = int(width * (new_height / height))
else:
new_width = smallest_side
new_height = int(height * (new_width / width))

image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LINEAR)

# 中央クロップ
offset_h = (new_height - smallest_side) // 2
offset_w = (new_width - smallest_side) // 2

return image[offset_h:offset_h + smallest_side, offset_w:offset_w + smallest_side, :]

def download_imagenette_dataset(num_examples=32, image_size=256):
"""Imagenetteデータセットのダウンロードと前処理"""
try:
print(f" Imagenetteデータセットをダウンロード中... (初回は時間がかかります)")
dataset_builder = tfds.builder('imagenette/320px-v2')
dataset_builder.download_and_prepare()
ds = dataset_builder.as_dataset('train')

print(f" {num_examples}枚の画像を前処理中...")
images = []
labels = []

for i, example in enumerate(itertools.islice(ds, num_examples)):
arr = example['image'].numpy()
arr = resize_and_crop(arr, image_size)
images.append(arr)
labels.append(f"Imagenette_{i+1}")

if (i + 1) % 10 == 0:
print(f" 処理済み: {i+1}/{num_examples}")

images = np.array(images)
# [-1, 1]の範囲にスケール
images = (images.astype(np.float32) / 255.0) * 2 - 1

print(f" Imagenette画像取得完了: {len(images)}枚")
return images, labels

except Exception as e:
print(f" Imagenetteダウンロードエラー: {e}")
return None, None

def rapsd(image, fft_method=np.fft, return_freq=True):
"""
Radially Averaged Power Spectral Density (RAPSD) の計算
"""
# 2D FFT計算
fft2d = fft_method.fft2(image)
# パワースペクトラム密度
psd2d = np.abs(fft2d) ** 2

# 画像の中心座標
h, w = image.shape
center_h, center_w = h // 2, w // 2

# 周波数グリッドの作成
freq_h = fft_method.fftfreq(h)
freq_w = fft_method.fftfreq(w)
freq_h_grid, freq_w_grid = np.meshgrid(freq_h, freq_w, indexing='ij')

# 各点での半径(周波数の大きさ)
freq_radius = np.sqrt(freq_h_grid**2 + freq_w_grid**2)

# 半径方向での平均化
max_radius = np.max(freq_radius)
num_bins = min(h, w) // 2
radius_bins = np.linspace(0, max_radius, num_bins)

rapsd_values = []
frequencies = []

for i in range(len(radius_bins) - 1):
# 各半径範囲でのマスク
mask = (freq_radius >= radius_bins[i]) & (freq_radius < radius_bins[i + 1])
if np.sum(mask) > 0:
# その半径でのパワーの平均
avg_power = np.mean(psd2d[mask])
rapsd_values.append(avg_power)
frequencies.append((radius_bins[i] + radius_bins[i + 1]) / 2)

rapsd_values = np.array(rapsd_values)
frequencies = np.array(frequencies)

if return_freq:
return rapsd_values, frequencies
else:
return rapsd_values

def calc_mean_log_rapsd(images_list, label=""):
"""複数画像のRAPSDの対数平均を計算"""
spectra = []
frequencies_list = []

for img_data in images_list:
# 緑チャンネルを使用(グレースケールの場合は同じチャンネルを3回重複)
if len(img_data.shape) == 3:
green_channel = img_data[:, :, 1]
else:
green_channel = img_data
rapsd_vals, freqs = rapsd(green_channel, fft_method=np.fft, return_freq=True)
spectra.append(rapsd_vals)
frequencies_list.append(freqs)

if not spectra:
return None, None

# 最短の長さに合わせる
min_len = min(len(s) for s in spectra)
spectra = [s[:min_len] for s in spectra]
frequencies = frequencies_list[0][:min_len] # 最初の画像の周波数を使用

# 対数平均
mean_log_rapsd = np.mean(np.array([np.log(s + 1e-30) for s in spectra]), axis=0)
print(f" {label}: {len(spectra)}枚の画像から平均RAPSD計算完了")
return mean_log_rapsd, frequencies

def process_single_image(image_path, label):
"""単一画像の処理とデータ返却"""
try:
# PILで画像を読み込み
img = Image.open(image_path)
print(f"{label} - Format: {img.format}, Size: {img.size}")

# PIL画像をnumpy配列に変換
img_array = np.array(img)

# グレースケール画像の場合は3チャンネルに拡張
if len(img_array.shape) == 2:
img_array = np.stack([img_array] * 3, axis=-1)

# RGBAの場合はRGBに変換
if img_array.shape[2] == 4:
img_array = img_array[:, :, :3]

# uint8 (0-255) から float32 (-1 to 1) に変換
img_array_processed = (img_array.astype(np.float32) / 255.0) * 2 - 1

return img_array_processed, True

except FileNotFoundError:
print(f"Error: File not found at {image_path}")
return None, False
except Exception as e:
print(f"Error processing {image_path}: {e}")
return None, False

def find_local_imagenet_images():
"""ローカルImageNet画像ファイルを検索"""
imagenet_files = []
for pattern in imagenet_patterns:
imagenet_files.extend(glob.glob(pattern))

# 重複除去とソート
imagenet_files = sorted(list(set(imagenet_files)))
print(f" 検索パターン: {[os.path.basename(p) for p in imagenet_patterns]}")
print(f" 検索ディレクトリ: {current_dir}")
return imagenet_files

print("=== 画像スペクトラム解析開始(モーター画像 vs ImageNet比較) ===")
print(f"実行ディレクトリ: {current_dir}")
print(f"出力ディレクトリ: {output_dir}")

# モーター画像を処理
print("\n--- モーター画像の処理 ---")
motor_images = []
motor_labels = []

for path, label in zip(motor_image_paths, motor_image_labels):
img_data, success = process_single_image(path, label)
if success:
motor_images.append(img_data)
motor_labels.append(label)

# ImageNet画像を取得(TFDS優先、フォールバックでローカル)
print("\n--- ImageNet画像の取得・処理 ---")
imagenet_images = []
imagenet_labels = []

if TFDS_AVAILABLE:
print("方法1: TensorFlow Datasetsを使用してImagenetteデータセットを取得")
tfds_images, tfds_labels = download_imagenette_dataset(num_examples=32, image_size=256)

if tfds_images is not None:
# numpy配列をリストに変換
for img, label in zip(tfds_images, tfds_labels):
imagenet_images.append(img)
imagenet_labels.append(label)
print(f" Imagenette取得成功: {len(imagenet_images)}枚")
else:
print(" Imagenette取得失敗 - ローカル画像を検索")

# フォールバックまたは追加でローカル画像を検索
if not imagenet_images:
print("方法2: ローカルImageNet画像を検索")
local_files = find_local_imagenet_images()
print(f"検出されたローカル画像: {len(local_files)}枚")

for i, path in enumerate(local_files):
label = f"Local_ImageNet_{i+1}"
img_data, success = process_single_image(path, label)
if success:
imagenet_images.append(img_data)
imagenet_labels.append(label)

print(f"\n処理結果:")
print(f" モーター画像: {len(motor_images)}枚")
print(f" ImageNet画像: {len(imagenet_images)}枚")

if not motor_images:
print("モーター画像が見つかりませんでした。")
exit(1)

# 1. モーター画像の比較表示
print("\n1. モーター画像比較図を生成中...")
if motor_images:
fig, axes = plt.subplots(3, len(motor_images), figsize=(5*len(motor_images), 12))

if len(motor_images) == 1:
axes = axes.reshape(-1, 1)

for i, (img_data, label) in enumerate(zip(motor_images, motor_labels)):
green_channel = img_data[:, :, 1]
spec = np.fft.fft2(green_channel)
spec_shifted = np.fft.fftshift(spec)

# 元画像
axes[0, i].imshow((img_data + 1) / 2)
axes[0, i].set_title(f'{label}\nOriginal Image')
axes[0, i].axis('off')

# マグニチュード・スペクトラム
axes[1, i].imshow(np.log(np.abs(spec_shifted) + 1e-10), vmin=-5, vmax=11, cmap=plt.cm.viridis)
axes[1, i].set_title(f'{label}\nMagnitude Spectrum')
axes[1, i].axis('off')

# 位相スペクトラム
axes[2, i].imshow(np.angle(spec_shifted), vmin=-np.pi, vmax=np.pi, cmap=plt.cm.RdBu)
axes[2, i].set_title(f'{label}\nPhase Spectrum')
axes[2, i].axis('off')

plt.tight_layout()
motor_comparison_file = os.path.join(output_dir, 'motor_images_comparison.png')
plt.savefig(motor_comparison_file, dpi=300, bbox_inches='tight')
plt.close()
print(f" 保存完了: {motor_comparison_file}")

# 2. ImageNet画像のサンプル表示
if imagenet_images:
print("\n2. ImageNet画像サンプル表示を生成中...")
display_count = min(8, len(imagenet_images))

fig = plt.figure(figsize=(16, 12))

# 上段:元画像
for k in range(min(4, display_count)):
plt.subplot(3, 4, k + 1)
plt.imshow((imagenet_images[k] + 1) / 2)
plt.axis('off')
plt.title(f'{imagenet_labels[k]}\nOriginal')

# 中段:マグニチュード・スペクトラム
for k in range(min(4, display_count)):
spec = np.fft.fft2(imagenet_images[k][:, :, 1])
spec = np.fft.fftshift(spec)
plt.subplot(3, 4, 4 + k + 1)
plt.imshow(np.log(np.abs(spec) + 1e-10), vmin=-5, vmax=11, cmap=plt.cm.viridis)
plt.axis('off')
plt.title('Magnitude Spectrum')

# 下段:位相スペクトラム
for k in range(min(4, display_count)):
spec = np.fft.fft2(imagenet_images[k][:, :, 1])
spec = np.fft.fftshift(spec)
plt.subplot(3, 4, 8 + k + 1)
plt.imshow(np.angle(spec), vmin=-np.pi, vmax=np.pi, cmap=plt.cm.RdBu)
plt.axis('off')
plt.title('Phase Spectrum')

plt.tight_layout()
imagenet_sample_file = os.path.join(output_dir, 'imagenet_samples_with_spectra.png')
plt.savefig(imagenet_sample_file, dpi=300, bbox_inches='tight')
plt.close()
print(f" 保存完了: {imagenet_sample_file}")

# 3. モーター画像のRAPSD解析
print("\n3. モーター画像RAPSD解析を生成中...")
if motor_images:
fig = plt.figure(figsize=(14, 7))

# 上段:元画像
for k in range(len(motor_images)):
plt.subplot(2, len(motor_images), k + 1)
plt.imshow((motor_images[k] + 1) / 2)
plt.axis('off')
plt.title(f'{motor_labels[k]}\nOriginal')

# 下段:RAPSDプロット
for k in range(len(motor_images)):
rapsd_vals, frequencies = rapsd(motor_images[k][:, :, 1], fft_method=np.fft, return_freq=True)
plt.subplot(2, len(motor_images), len(motor_images) + k + 1)
plt.plot(frequencies[1:], rapsd_vals[1:], c='red', marker='o', markersize=3)
plt.xscale('log')
plt.yscale('log')
plt.xlabel('frequency')
plt.title(f'{motor_labels[k]}\nRAPSD')
if k == 0:
plt.ylabel('power')

plt.tight_layout()
motor_rapsd_file = os.path.join(output_dir, 'motor_images_and_rapsd.png')
plt.savefig(motor_rapsd_file, dpi=300, bbox_inches='tight')
plt.close()
print(f" 保存完了: {motor_rapsd_file}")

# 4. 平均RAPSD比較(モーター vs ImageNet)
print("\n4. 平均RAPSD比較解析を生成中...")

# モーター画像の平均RAPSD
motor_mean_log_rapsd, motor_frequencies = calc_mean_log_rapsd(motor_images, "モーター画像")

# ImageNet画像の平均RAPSD
if imagenet_images:
imagenet_mean_log_rapsd, imagenet_frequencies = calc_mean_log_rapsd(imagenet_images, "ImageNet画像")
else:
imagenet_mean_log_rapsd, imagenet_frequencies = None, None

# 比較プロット
plt.figure(figsize=(10, 6))

if motor_mean_log_rapsd is not None:
plt.plot(motor_frequencies[1:], np.exp(motor_mean_log_rapsd)[1:],
c='red', marker='o', markersize=4, linewidth=2,
label=f'Motor Images (n={len(motor_images)})', alpha=0.8)

if imagenet_mean_log_rapsd is not None:
plt.plot(imagenet_frequencies[1:], np.exp(imagenet_mean_log_rapsd)[1:],
c='blue', marker='s', markersize=4, linewidth=2,
label=f'ImageNet/Imagenette (n={len(imagenet_images)})', alpha=0.8)

plt.xscale('log')
plt.yscale('log')
plt.xlabel('Frequency', fontsize=12)
plt.ylabel('Power', fontsize=12)
plt.title('Mean RAPSD Comparison: Motor Images vs ImageNet/Imagenette', fontsize=14)
plt.legend(fontsize=12)
plt.grid(True, alpha=0.3)

mean_comparison_file = os.path.join(output_dir, 'mean_rapsd_comparison.png')
plt.savefig(mean_comparison_file, dpi=300, bbox_inches='tight')
plt.close()
print(f" 保存完了: {mean_comparison_file}")

# 5. 個別RAPSD重ね合わせ(全画像)
print("\n5. 全画像RAPSD重ね合わせを生成中...")
plt.figure(figsize=(12, 8))

# モーター画像の個別RAPSD
colors_motor = ['red', 'darkred', 'crimson']
for i, (img_data, label) in enumerate(zip(motor_images, motor_labels)):
rapsd_vals, frequencies = rapsd(img_data[:, :, 1], fft_method=np.fft, return_freq=True)
color = colors_motor[i % len(colors_motor)]
plt.plot(frequencies[1:], rapsd_vals[1:], c=color, linewidth=2, alpha=0.7, label=f'{label}')

# ImageNet画像の個別RAPSD(最大8個まで)
colors_imagenet = ['blue', 'navy', 'deepskyblue', 'steelblue', 'dodgerblue', 'lightblue', 'royalblue', 'mediumblue']
for i, (img_data, label) in enumerate(zip(imagenet_images[:8], imagenet_labels[:8])):
rapsd_vals, frequencies = rapsd(img_data[:, :, 1], fft_method=np.fft, return_freq=True)
color = colors_imagenet[i % len(colors_imagenet)]
plt.plot(frequencies[1:], rapsd_vals[1:], c=color, linewidth=1, alpha=0.5, label=f'{label}')

plt.xscale('log')
plt.yscale('log')
plt.xlabel('Frequency', fontsize=12)
plt.ylabel('Power', fontsize=12)
plt.title('Individual RAPSD Comparison: Motor Images vs ImageNet/Imagenette', fontsize=14)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)

individual_comparison_file = os.path.join(output_dir, 'individual_rapsd_comparison.png')
plt.savefig(individual_comparison_file, dpi=300, bbox_inches='tight')
plt.close()
print(f" 保存完了: {individual_comparison_file}")

# 6. 数値的比較レポート
print("\n6. 数値解析結果を保存中...")
analysis_file = os.path.join(output_dir, 'spectral_analysis_comparison_report.txt')

with open(analysis_file, 'w', encoding='utf-8') as f:
f.write("=== スペクトラム解析比較レポート ===\n")
f.write(f"処理日時: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"TensorFlow Datasets使用: {'Yes' if TFDS_AVAILABLE else 'No'}\n\n")

f.write("=== モーター画像解析結果 ===\n")
for img_data, label in zip(motor_images, motor_labels):
green_channel = img_data[:, :, 1]
spec = np.fft.fft2(green_channel)
magnitude = np.abs(spec)
rapsd_vals, freqs = rapsd(green_channel, fft_method=np.fft, return_freq=True)

f.write(f"\n{label}:\n")
f.write(f" 画像サイズ: {img_data.shape}\n")
f.write(f" DC成分: {magnitude[0, 0]:.6f}\n")
f.write(f" 総エネルギー: {np.sum(magnitude**2):.6f}\n")
f.write(f" RAPSD平均パワー: {np.mean(rapsd_vals[1:]):.6f}\n")
f.write(f" RAPSD最大パワー: {np.max(rapsd_vals[1:]):.6f}\n")

if imagenet_images:
f.write(f"\n=== ImageNet/Imagenette画像解析結果 (n={len(imagenet_images)}) ===\n")
f.write(f"データソース: {'Imagenette (TFDS)' if TFDS_AVAILABLE and len(imagenet_images) > 0 else 'Local files'}\n")

# ImageNet統計サマリー
all_dc_components = []
all_energies = []
all_rapsd_means = []

for img_data, label in zip(imagenet_images, imagenet_labels):
green_channel = img_data[:, :, 1]
spec = np.fft.fft2(green_channel)
magnitude = np.abs(spec)
rapsd_vals, freqs = rapsd(green_channel, fft_method=np.fft, return_freq=True)

all_dc_components.append(magnitude[0, 0])
all_energies.append(np.sum(magnitude**2))
all_rapsd_means.append(np.mean(rapsd_vals[1:]))

f.write(f"\nImageNet統計サマリー:\n")
f.write(f" DC成分平均: {np.mean(all_dc_components):.6f} ± {np.std(all_dc_components):.6f}\n")
f.write(f" 総エネルギー平均: {np.mean(all_energies):.6f} ± {np.std(all_energies):.6f}\n")
f.write(f" RAPSD平均パワー: {np.mean(all_rapsd_means):.6f} ± {np.std(all_rapsd_means):.6f}\n")

f.write(f"\n=== 比較分析 ===\n")
if imagenet_images and motor_images:
# 簡単な比較統計
motor_rapsd_means = []
for img_data in motor_images:
green_channel = img_data[:, :, 1]
rapsd_vals, _ = rapsd(green_channel, fft_method=np.fft, return_freq=True)
motor_rapsd_means.append(np.mean(rapsd_vals[1:]))

f.write(f"モーター画像RAPSD平均: {np.mean(motor_rapsd_means):.6f}\n")
f.write(f"ImageNet画像RAPSD平均: {np.mean(all_rapsd_means):.6f}\n")
f.write(f"比率 (Motor/ImageNet): {np.mean(motor_rapsd_means)/np.mean(all_rapsd_means):.3f}\n")

print(f" 保存完了: {analysis_file}")

# 7. コンソール出力サマリー
print("\n=== 解析結果サマリー ===")
print(f"モーター画像: {len(motor_images)}枚")
for i, label in enumerate(motor_labels):
print(f" {i+1}. {label}")

print(f"\nImageNet画像: {len(imagenet_images)}枚")
if imagenet_images:
for i, label in enumerate(imagenet_labels[:5]):
print(f" {i+1}. {label}")
if len(imagenet_images) > 5:
print(f" ... and {len(imagenet_images) - 5} more")

print(f"\n=== 解析完了 ===")
print(f"実行ディレクトリ: {current_dir}")
print(f"出力ディレクトリ: {output_dir}")
print("生成されたファイル:")
print(f" - motor_images_comparison.png (モーター画像比較)")
if imagenet_images:
print(f" - imagenet_samples_with_spectra.png (ImageNetサンプル+スペクトラム)")
print(f" - motor_images_and_rapsd.png (モーター画像RAPSD)")
print(f" - mean_rapsd_comparison.png (平均RAPSD比較)")
print(f" - individual_rapsd_comparison.png (個別RAPSD重ね合わせ)")
print(f" - spectral_analysis_comparison_report.txt (比較レポート)")
print("\n実行完了しました。")


 

 

 

 

 

AI
スポンサーリンク
シェアする
モータ研究者の技術解説
タイトルとURLをコピーしました