Files
snow_trail/blender/scripts/generate_flowmap.py
2026-01-21 11:04:55 +01:00

595 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import bpy
import bmesh
import numpy as np
from pathlib import Path
from mathutils import Vector
from multiprocessing import Pool, cpu_count
def simple_blur(data, iterations=1):
"""
Simple 3x3 box blur for 2D vector fields.
"""
result = data.copy()
h, w, c = data.shape
for _ in range(iterations):
blurred = np.zeros_like(result)
for y in range(h):
for x in range(w):
count = 0
total = np.zeros(c)
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
ny, nx = y + dy, x + dx
if 0 <= ny < h and 0 <= nx < w:
total += result[ny, nx]
count += 1
blurred[y, x] = total / count if count > 0 else result[y, x]
result = blurred
return result
def sample_curve_points(curve_obj, samples_per_segment=50):
"""
Sample points along a Blender curve object (Bezier, NURBS, etc).
Args:
curve_obj: Blender curve object
samples_per_segment: Number of samples per curve segment
Returns:
List of Vector world-space positions
"""
curve_obj.data.resolution_u = samples_per_segment
depsgraph = bpy.context.evaluated_depsgraph_get()
curve_eval = curve_obj.evaluated_get(depsgraph)
mesh_from_curve = curve_eval.to_mesh()
points = [curve_obj.matrix_world @ v.co for v in mesh_from_curve.vertices]
curve_eval.to_mesh_clear()
return points
def compute_arc_lengths(points):
"""
Compute cumulative arc length at each point along a curve.
Args:
points: List of Vector positions
Returns:
List of floats representing cumulative distance from curve start
"""
if len(points) == 0:
return []
arc_lengths = [0.0]
cumulative = 0.0
for i in range(1, len(points)):
segment_length = (points[i] - points[i-1]).length
cumulative += segment_length
arc_lengths.append(cumulative)
return arc_lengths
def get_path_curves(collection_name="Paths"):
"""
Get all curve objects from a collection, or all curves with 'path' in the name.
Args:
collection_name: Name of collection containing path curves
Returns:
List of curve objects
"""
collection = bpy.data.collections.get(collection_name)
if collection:
curves = [obj for obj in collection.objects if obj.type == 'CURVE']
print(f"Found {len(curves)} curves in collection '{collection_name}'")
return curves
print(f"Collection '{collection_name}' not found. Searching for curves with 'path' in name...")
curves = [obj for obj in bpy.data.objects if obj.type == 'CURVE' and 'path' in obj.name.lower()]
if not curves:
print("No path curves found. Looking for any curve objects...")
curves = [obj for obj in bpy.data.objects if obj.type == 'CURVE']
return curves
def closest_point_on_segment_2d(point, seg_start, seg_end):
"""
Find closest point on a 2D line segment to a given point.
Args:
point: Vector2 (x, y)
seg_start: Vector2 segment start
seg_end: Vector2 segment end
Returns:
Vector2 closest point on segment
"""
segment = seg_end - seg_start
point_vec = point - seg_start
segment_len_sq = segment.length_squared
if segment_len_sq < 1e-8:
return seg_start
t = max(0.0, min(1.0, point_vec.dot(segment) / segment_len_sq))
return seg_start + segment * t
def process_row_chunk(args):
"""
Process a chunk of rows for parallel computation.
Args:
args: Tuple of (start_row, end_row, width, height_total, min_x, max_x, min_y, max_y, path_segments_simple)
Returns:
Tuple of (flow_field_chunk, distance_field_chunk, arc_length_field_chunk, segment_id_chunk)
"""
start_row, end_row, width, height_total, min_x, max_x, min_y, max_y, path_segments_simple = args
height = end_row - start_row
flow_field_chunk = np.zeros((height, width, 2), dtype=np.float32)
distance_field_chunk = np.zeros((height, width), dtype=np.float32)
arc_length_field_chunk = np.zeros((height, width), dtype=np.float32)
segment_id_chunk = np.zeros((height, width), dtype=np.int32)
for local_y in range(height):
y = start_row + local_y
for x in range(width):
u = x / width
v = y / height_total
world_x = min_x + u * (max_x - min_x)
world_y = min_y + v * (max_y - min_y)
world_pos_2d = np.array([world_x, world_y])
min_dist = float('inf')
closest_point = None
closest_arc_length = 0.0
closest_segment_id = -1
for seg_id, (seg_start, seg_end, arc_start, arc_end) in enumerate(path_segments_simple):
segment = seg_end - seg_start
point_vec = world_pos_2d - seg_start
segment_len_sq = np.dot(segment, segment)
if segment_len_sq < 1e-8:
closest_on_segment = seg_start
t = 0.0
else:
t = max(0.0, min(1.0, np.dot(point_vec, segment) / segment_len_sq))
closest_on_segment = seg_start + segment * t
dist = np.linalg.norm(closest_on_segment - world_pos_2d)
if dist < min_dist:
min_dist = dist
closest_point = closest_on_segment
closest_arc_length = arc_start + t * (arc_end - arc_start)
closest_segment_id = seg_id
if closest_point is not None:
direction = closest_point - world_pos_2d
flow_field_chunk[local_y, x, 0] = direction[0]
flow_field_chunk[local_y, x, 1] = -direction[1]
distance_field_chunk[local_y, x] = min_dist
arc_length_field_chunk[local_y, x] = closest_arc_length
segment_id_chunk[local_y, x] = closest_segment_id
return (start_row, flow_field_chunk, distance_field_chunk, arc_length_field_chunk, segment_id_chunk)
def generate_flowmap_from_paths(terrain_obj, path_curves, resolution=1024, blur_iterations=2):
"""
Generate a flowmap texture showing direction toward nearest path.
Args:
terrain_obj: Blender mesh object (the terrain plane)
path_curves: List of Blender curve objects representing paths
resolution: Output texture resolution (square)
blur_iterations: Number of smoothing passes for flow field
Returns:
Blender image containing encoded flowmap
"""
print(f"Generating flowmap from {len(path_curves)} path curves")
print(f"Output resolution: {resolution}×{resolution}")
mesh = terrain_obj.data
bm = bmesh.new()
bm.from_mesh(mesh)
bm.verts.ensure_lookup_table()
world_matrix = terrain_obj.matrix_world
verts_world = [world_matrix @ v.co for v in bm.verts]
if len(verts_world) == 0:
raise ValueError("Terrain mesh has no vertices")
xs = [v.x for v in verts_world]
ys = [v.y for v in verts_world]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
print(f"Terrain bounds: X=[{min_x:.2f}, {max_x:.2f}], Y=[{min_y:.2f}, {max_y:.2f}]")
print("Sampling path curves...")
all_path_points = []
for curve_obj in path_curves:
points = sample_curve_points(curve_obj, samples_per_segment=50)
all_path_points.extend(points)
print(f" {curve_obj.name}: {len(points)} points")
print(f"Total path points: {len(all_path_points)}")
if len(all_path_points) == 0:
raise ValueError("No path points sampled. Check that path curves exist and have geometry.")
print("Building line segments from path points with arc lengths...")
path_segments = []
current_segment_start = 0
for curve_obj in path_curves:
curve_points = sample_curve_points(curve_obj, samples_per_segment=50)
arc_lengths = compute_arc_lengths(curve_points)
for i in range(len(curve_points) - 1):
path_segments.append((curve_points[i], curve_points[i + 1], arc_lengths[i], arc_lengths[i + 1]))
print(f"Created {len(path_segments)} line segments with arc length data")
print("Generating flow field toward nearest path...")
h, w = resolution, resolution
flow_field = np.zeros((h, w, 2), dtype=np.float32)
distance_field = np.zeros((h, w), dtype=np.float32)
arc_length_field = np.zeros((h, w), dtype=np.float32)
segment_id_field = np.zeros((h, w), dtype=np.int32)
path_segments_simple = [
(np.array([seg_start.x, seg_start.y]), np.array([seg_end.x, seg_end.y]), arc_start, arc_end)
for seg_start, seg_end, arc_start, arc_end in path_segments
]
num_cores = cpu_count()
chunk_size = max(1, h // num_cores)
print(f"Using {num_cores} CPU cores with chunk size {chunk_size} rows")
chunks = []
for start_row in range(0, h, chunk_size):
end_row = min(start_row + chunk_size, h)
chunks.append((start_row, end_row, w, h, min_x, max_x, min_y, max_y, path_segments_simple))
with Pool(processes=num_cores) as pool:
total_chunks = len(chunks)
print(f"Processing {total_chunks} chunks...")
results = []
for i, result in enumerate(pool.imap(process_row_chunk, chunks)):
results.append(result)
progress = (i + 1) / total_chunks * 100
print(f" Progress: {i+1}/{total_chunks} chunks ({progress:.1f}%)")
print("Assembling results from parallel workers...")
for start_row, flow_chunk, distance_chunk, arc_length_chunk, segment_id_chunk in results:
end_row = start_row + flow_chunk.shape[0]
flow_field[start_row:end_row, :, :] = flow_chunk
distance_field[start_row:end_row, :] = distance_chunk
arc_length_field[start_row:end_row, :] = arc_length_chunk
segment_id_field[start_row:end_row, :] = segment_id_chunk
print(f"Distance range: {distance_field.min():.2f} to {distance_field.max():.2f}")
print(f"Arc length range: {arc_length_field.min():.2f} to {arc_length_field.max():.2f}")
print("Normalizing flow vectors...")
magnitudes = np.sqrt(flow_field[:, :, 0]**2 + flow_field[:, :, 1]**2)
magnitudes = np.maximum(magnitudes, 1e-8)
flow_field[:, :, 0] /= magnitudes
flow_field[:, :, 1] /= magnitudes
if blur_iterations > 0:
print(f"Applying distance-based blur (max {blur_iterations} iterations)...")
original_flow = flow_field.copy()
print(" Creating maximally blurred version...")
blurred_flow = simple_blur(flow_field, iterations=blur_iterations)
magnitudes = np.sqrt(blurred_flow[:, :, 0]**2 + blurred_flow[:, :, 1]**2)
magnitudes = np.maximum(magnitudes, 1e-8)
blurred_flow[:, :, 0] /= magnitudes
blurred_flow[:, :, 1] /= magnitudes
print(" Blending based on distance to path...")
max_distance = 60.0
distance_normalized = np.clip(distance_field / max_distance, 0.0, 1.0)
blend_factor = distance_normalized[:, :, np.newaxis]
flow_field = original_flow * (1.0 - blend_factor) + blurred_flow * blend_factor
magnitudes = np.sqrt(flow_field[:, :, 0]**2 + flow_field[:, :, 1]**2)
magnitudes = np.maximum(magnitudes, 1e-8)
flow_field[:, :, 0] /= magnitudes
flow_field[:, :, 1] /= magnitudes
print(" Distance-based blur complete!")
print("Saving segment ID debug image...")
num_segments = len(path_segments)
segment_colors = np.random.RandomState(42).rand(num_segments, 3).astype(np.float32)
segment_img = np.zeros((resolution, resolution, 4), dtype=np.float32)
for y in range(resolution):
for x in range(resolution):
seg_id = segment_id_field[y, x]
if seg_id >= 0 and seg_id < num_segments:
segment_img[y, x, 0:3] = segment_colors[seg_id]
segment_img[y, x, 3] = 1.0
segment_debug_img = bpy.data.images.new(
"Segment_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True
)
segment_debug_img.pixels[:] = segment_img.flatten()
segment_debug_img.filepath_raw = str(
Path(bpy.data.filepath).parent.parent / "textures" / "path_segment_debug.png"
)
segment_debug_img.file_format = 'PNG'
segment_debug_img.save()
bpy.data.images.remove(segment_debug_img)
print(f" Saved to textures/path_segment_debug.png ({num_segments} segments)")
print("Saving distance field debug image...")
distance_normalized = np.clip(distance_field / 50.0, 0.0, 1.0)
distance_img = np.zeros((resolution, resolution, 4), dtype=np.float32)
distance_img[:, :, 0] = distance_normalized
distance_img[:, :, 1] = distance_normalized
distance_img[:, :, 2] = distance_normalized
distance_img[:, :, 3] = 1.0
debug_img = bpy.data.images.new(
"Distance_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True
)
debug_img.pixels[:] = distance_img.flatten()
debug_img.filepath_raw = str(
Path(bpy.data.filepath).parent.parent / "textures" / "path_distance_debug.png"
)
debug_img.file_format = 'PNG'
debug_img.save()
bpy.data.images.remove(debug_img)
print(" Saved to textures/path_distance_debug.png")
print("Saving direction field debug image...")
direction_img = np.zeros((resolution, resolution, 4), dtype=np.float32)
cardinal_dirs = np.array([
[0.0, 1.0],
[0.707, 0.707],
[1.0, 0.0],
[0.707, -0.707],
[0.0, -1.0],
[-0.707, -0.707],
[-1.0, 0.0],
[-0.707, 0.707],
])
cardinal_colors = np.array([
[76, 120, 168],
[242, 142, 43],
[225, 87, 89],
[118, 183, 178],
[89, 161, 79],
[237, 201, 72],
[176, 122, 161],
[255, 157, 167],
]) / 255.0
flow_flat = flow_field.reshape(-1, 2)
dots = flow_flat @ cardinal_dirs.T
closest_dir_indices = np.argmax(dots, axis=1)
direction_img[:, :, 0:3] = cardinal_colors[closest_dir_indices].reshape(resolution, resolution, 3)
direction_img[:, :, 3] = 1.0
print("Drawing compass in lower right corner...")
compass_radius = resolution // 8
compass_center_x = resolution - compass_radius - 20
compass_center_y = compass_radius + 20
for y in range(resolution):
for x in range(resolution):
dx = x - compass_center_x
dy = y - compass_center_y
dist = np.sqrt(dx * dx + dy * dy)
if dist <= compass_radius:
angle = np.arctan2(dy, dx)
sector = int(((angle + np.pi / 2) % (2 * np.pi)) / (np.pi / 4)) % 8
direction_img[y, x, 0:3] = cardinal_colors[sector]
direction_debug_img = bpy.data.images.new(
"Direction_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True
)
direction_debug_img.pixels[:] = direction_img.flatten()
direction_debug_img.filepath_raw = str(
Path(bpy.data.filepath).parent.parent / "textures" / "path_direction_debug.png"
)
direction_debug_img.file_format = 'PNG'
direction_debug_img.save()
bpy.data.images.remove(direction_debug_img)
print(" Saved to textures/path_direction_debug.png (8-color cardinal directions)")
print("\n=== DEBUG: Sample flow field values ===")
print(f"Terrain bounds: X=[{min_x}, {max_x}], Y=[{min_y}, {max_y}]")
sample_pixels = [
(resolution // 4, resolution // 4),
(resolution // 2, resolution // 2),
(3 * resolution // 4, 3 * resolution // 4),
]
for py, px in sample_pixels:
u = px / resolution
v = py / resolution
world_x = min_x + u * (max_x - min_x)
world_y = min_y + v * (max_y - min_y)
flow_x = flow_field[py, px, 0]
flow_y = flow_field[py, px, 1]
dots = [np.dot([flow_x, flow_y], cardinal_dirs[i]) for i in range(8)]
best_dir = np.argmax(dots)
dir_names = ["North", "NE", "East", "SE", "South", "SW", "West", "NW"]
print(f" Pixel [{px}, {py}] -> World [{world_x:.1f}, {world_y:.1f}]")
print(f" Flow: [{flow_x:.3f}, {flow_y:.3f}] -> {dir_names[best_dir]}")
print(f" Encoded: R={flow_x * 0.5 + 0.5:.3f}, G={flow_y * 0.5 + 0.5:.3f}")
print("Encoding to RGBA texture...")
flow_encoded_x = flow_field[:, :, 0] * 0.5 + 0.5
flow_encoded_y = flow_field[:, :, 1] * 0.5 + 0.5
distance_normalized = np.clip(distance_field / max_distance, 0.0, 1.0)
arc_length_repeat = 100.0
arc_length_normalized = np.fmod(arc_length_field, arc_length_repeat) / arc_length_repeat
print(f"Distance encoding: 0.0 = on path, 1.0 = {max_distance}+ units away")
print(f"Arc length encoding: repeating pattern every {arc_length_repeat} world units")
flowmap = np.zeros((resolution, resolution, 4), dtype=np.float32)
flowmap[:, :, 0] = flow_encoded_x
flowmap[:, :, 1] = flow_encoded_y
flowmap[:, :, 2] = distance_normalized
flowmap[:, :, 3] = arc_length_normalized
print(f"Creating Blender image...")
output_img = bpy.data.images.new(
name="Path_Flowmap",
width=resolution,
height=resolution,
alpha=True,
float_buffer=True
)
output_img.pixels[:] = flowmap.flatten()
bm.free()
return output_img
def save_flowmap(output_path, resolution=1024, blur_iterations=3, terrain_name="TerrainPlane", path_collection="Paths"):
"""
Main function to generate and save flowmap from paths in current Blender file.
Args:
output_path: Path to save PNG flowmap
resolution: Output texture resolution
blur_iterations: Smoothing iterations
terrain_name: Name of terrain object
path_collection: Name of collection or search pattern for path curves
"""
terrain_obj = bpy.data.objects.get(terrain_name)
if not terrain_obj:
print(f"Object '{terrain_name}' not found. Searching for terrain-like objects...")
for obj in bpy.data.objects:
if obj.type == 'MESH':
print(f" Found mesh: {obj.name}")
if 'terrain' in obj.name.lower() or 'plane' in obj.name.lower():
terrain_obj = obj
print(f" -> Using: {obj.name}")
break
if not terrain_obj:
raise ValueError(
f"Object '{terrain_name}' not found and no terrain mesh detected. "
f"Available objects: {[obj.name for obj in bpy.data.objects if obj.type == 'MESH']}"
)
print(f"Using terrain object: {terrain_obj.name}")
if terrain_obj.type != 'MESH':
raise ValueError(f"Object '{terrain_obj.name}' is not a mesh")
path_curves = get_path_curves(path_collection)
if not path_curves:
raise ValueError(
f"No path curves found. Create curves and add them to a '{path_collection}' collection "
f"or name them with 'path' in the name. Available curves: "
f"{[obj.name for obj in bpy.data.objects if obj.type == 'CURVE']}"
)
print(f"Found {len(path_curves)} path curves:")
for curve in path_curves:
print(f" - {curve.name}")
flowmap_img = generate_flowmap_from_paths(
terrain_obj,
path_curves,
resolution=resolution,
blur_iterations=blur_iterations
)
flowmap_img.filepath_raw = str(output_path)
flowmap_img.file_format = 'OPEN_EXR'
flowmap_img.save()
bpy.data.images.remove(flowmap_img)
print(f"\n{'='*60}")
print(f"Flowmap generation complete!")
print(f"Output: {output_path}")
print(f"Resolution: {resolution}×{resolution}")
print(f"Paths used: {len(path_curves)}")
print(f"Format: OpenEXR (32-bit float)")
print(f"{'='*60}")
print("\nChannel encoding:")
print(" R channel: X direction toward nearest path (0.5=none, <0.5=left, >0.5=right)")
print(" G channel: Y direction toward nearest path (0.5=none, <0.5=down, >0.5=up)")
print(" B channel: Distance to nearest path (0.0=on path, 1.0=far away)")
print(" A channel: Arc length along path (0.0-1.0, repeating every 100 units)")
print("\nTo use in shader:")
print(" vec4 flowmap_sample = texture(flowmap, uv);")
print(" vec2 direction = flowmap_sample.rg * 2.0 - 1.0; // Direction to path")
print(" float distance = flowmap_sample.b; // Distance to path")
print(" float arc_length = flowmap_sample.a * 100.0; // Arc length in world units")
if __name__ == "__main__":
project_root = Path(bpy.data.filepath).parent.parent
output_path = project_root / "textures" / "terrain_flowmap.exr"
output_path.parent.mkdir(parents=True, exist_ok=True)
save_flowmap(
output_path=output_path,
resolution=1024,
blur_iterations=5,
terrain_name="TerrainPlane",
path_collection="Paths"
)