import bpy import bmesh import numpy as np from pathlib import Path from mathutils import Vector from multiprocessing import Pool, cpu_count def simple_blur(data, iterations=1): """ Simple 3x3 box blur for 2D vector fields. """ result = data.copy() h, w, c = data.shape for _ in range(iterations): blurred = np.zeros_like(result) for y in range(h): for x in range(w): count = 0 total = np.zeros(c) for dy in [-1, 0, 1]: for dx in [-1, 0, 1]: ny, nx = y + dy, x + dx if 0 <= ny < h and 0 <= nx < w: total += result[ny, nx] count += 1 blurred[y, x] = total / count if count > 0 else result[y, x] result = blurred return result def sample_curve_points(curve_obj, samples_per_segment=50): """ Sample points along a Blender curve object (Bezier, NURBS, etc). Args: curve_obj: Blender curve object samples_per_segment: Number of samples per curve segment Returns: List of Vector world-space positions """ curve_obj.data.resolution_u = samples_per_segment depsgraph = bpy.context.evaluated_depsgraph_get() curve_eval = curve_obj.evaluated_get(depsgraph) mesh_from_curve = curve_eval.to_mesh() points = [curve_obj.matrix_world @ v.co for v in mesh_from_curve.vertices] curve_eval.to_mesh_clear() return points def compute_arc_lengths(points): """ Compute cumulative arc length at each point along a curve. Args: points: List of Vector positions Returns: List of floats representing cumulative distance from curve start """ if len(points) == 0: return [] arc_lengths = [0.0] cumulative = 0.0 for i in range(1, len(points)): segment_length = (points[i] - points[i-1]).length cumulative += segment_length arc_lengths.append(cumulative) return arc_lengths def get_path_curves(collection_name="Paths"): """ Get all curve objects from a collection, or all curves with 'path' in the name. Args: collection_name: Name of collection containing path curves Returns: List of curve objects """ collection = bpy.data.collections.get(collection_name) if collection: curves = [obj for obj in collection.objects if obj.type == 'CURVE'] print(f"Found {len(curves)} curves in collection '{collection_name}'") return curves print(f"Collection '{collection_name}' not found. Searching for curves with 'path' in name...") curves = [obj for obj in bpy.data.objects if obj.type == 'CURVE' and 'path' in obj.name.lower()] if not curves: print("No path curves found. Looking for any curve objects...") curves = [obj for obj in bpy.data.objects if obj.type == 'CURVE'] return curves def closest_point_on_segment_2d(point, seg_start, seg_end): """ Find closest point on a 2D line segment to a given point. Args: point: Vector2 (x, y) seg_start: Vector2 segment start seg_end: Vector2 segment end Returns: Vector2 closest point on segment """ segment = seg_end - seg_start point_vec = point - seg_start segment_len_sq = segment.length_squared if segment_len_sq < 1e-8: return seg_start t = max(0.0, min(1.0, point_vec.dot(segment) / segment_len_sq)) return seg_start + segment * t def process_row_chunk(args): """ Process a chunk of rows for parallel computation. Args: args: Tuple of (start_row, end_row, width, height_total, min_x, max_x, min_y, max_y, path_segments_simple) Returns: Tuple of (flow_field_chunk, distance_field_chunk, arc_length_field_chunk, segment_id_chunk) """ start_row, end_row, width, height_total, min_x, max_x, min_y, max_y, path_segments_simple = args height = end_row - start_row flow_field_chunk = np.zeros((height, width, 2), dtype=np.float32) distance_field_chunk = np.zeros((height, width), dtype=np.float32) arc_length_field_chunk = np.zeros((height, width), dtype=np.float32) segment_id_chunk = np.zeros((height, width), dtype=np.int32) for local_y in range(height): y = start_row + local_y for x in range(width): u = x / width v = y / height_total world_x = min_x + u * (max_x - min_x) world_y = min_y + v * (max_y - min_y) world_pos_2d = np.array([world_x, world_y]) min_dist = float('inf') closest_point = None closest_arc_length = 0.0 closest_segment_id = -1 for seg_id, (seg_start, seg_end, arc_start, arc_end) in enumerate(path_segments_simple): segment = seg_end - seg_start point_vec = world_pos_2d - seg_start segment_len_sq = np.dot(segment, segment) if segment_len_sq < 1e-8: closest_on_segment = seg_start t = 0.0 else: t = max(0.0, min(1.0, np.dot(point_vec, segment) / segment_len_sq)) closest_on_segment = seg_start + segment * t dist = np.linalg.norm(closest_on_segment - world_pos_2d) if dist < min_dist: min_dist = dist closest_point = closest_on_segment closest_arc_length = arc_start + t * (arc_end - arc_start) closest_segment_id = seg_id if closest_point is not None: direction = closest_point - world_pos_2d flow_field_chunk[local_y, x, 0] = direction[0] flow_field_chunk[local_y, x, 1] = -direction[1] distance_field_chunk[local_y, x] = min_dist arc_length_field_chunk[local_y, x] = closest_arc_length segment_id_chunk[local_y, x] = closest_segment_id return (start_row, flow_field_chunk, distance_field_chunk, arc_length_field_chunk, segment_id_chunk) def generate_flowmap_from_paths(terrain_obj, path_curves, resolution=1024, blur_iterations=2): """ Generate a flowmap texture showing direction toward nearest path. Args: terrain_obj: Blender mesh object (the terrain plane) path_curves: List of Blender curve objects representing paths resolution: Output texture resolution (square) blur_iterations: Number of smoothing passes for flow field Returns: Blender image containing encoded flowmap """ print(f"Generating flowmap from {len(path_curves)} path curves") print(f"Output resolution: {resolution}×{resolution}") mesh = terrain_obj.data bm = bmesh.new() bm.from_mesh(mesh) bm.verts.ensure_lookup_table() world_matrix = terrain_obj.matrix_world verts_world = [world_matrix @ v.co for v in bm.verts] if len(verts_world) == 0: raise ValueError("Terrain mesh has no vertices") xs = [v.x for v in verts_world] ys = [v.y for v in verts_world] min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) print(f"Terrain bounds: X=[{min_x:.2f}, {max_x:.2f}], Y=[{min_y:.2f}, {max_y:.2f}]") print("Sampling path curves...") all_path_points = [] for curve_obj in path_curves: points = sample_curve_points(curve_obj, samples_per_segment=50) all_path_points.extend(points) print(f" {curve_obj.name}: {len(points)} points") print(f"Total path points: {len(all_path_points)}") if len(all_path_points) == 0: raise ValueError("No path points sampled. Check that path curves exist and have geometry.") print("Building line segments from path points with arc lengths...") path_segments = [] current_segment_start = 0 for curve_obj in path_curves: curve_points = sample_curve_points(curve_obj, samples_per_segment=50) arc_lengths = compute_arc_lengths(curve_points) for i in range(len(curve_points) - 1): path_segments.append((curve_points[i], curve_points[i + 1], arc_lengths[i], arc_lengths[i + 1])) print(f"Created {len(path_segments)} line segments with arc length data") print("Generating flow field toward nearest path...") h, w = resolution, resolution flow_field = np.zeros((h, w, 2), dtype=np.float32) distance_field = np.zeros((h, w), dtype=np.float32) arc_length_field = np.zeros((h, w), dtype=np.float32) segment_id_field = np.zeros((h, w), dtype=np.int32) path_segments_simple = [ (np.array([seg_start.x, seg_start.y]), np.array([seg_end.x, seg_end.y]), arc_start, arc_end) for seg_start, seg_end, arc_start, arc_end in path_segments ] num_cores = cpu_count() chunk_size = max(1, h // num_cores) print(f"Using {num_cores} CPU cores with chunk size {chunk_size} rows") chunks = [] for start_row in range(0, h, chunk_size): end_row = min(start_row + chunk_size, h) chunks.append((start_row, end_row, w, h, min_x, max_x, min_y, max_y, path_segments_simple)) with Pool(processes=num_cores) as pool: total_chunks = len(chunks) print(f"Processing {total_chunks} chunks...") results = [] for i, result in enumerate(pool.imap(process_row_chunk, chunks)): results.append(result) progress = (i + 1) / total_chunks * 100 print(f" Progress: {i+1}/{total_chunks} chunks ({progress:.1f}%)") print("Assembling results from parallel workers...") for start_row, flow_chunk, distance_chunk, arc_length_chunk, segment_id_chunk in results: end_row = start_row + flow_chunk.shape[0] flow_field[start_row:end_row, :, :] = flow_chunk distance_field[start_row:end_row, :] = distance_chunk arc_length_field[start_row:end_row, :] = arc_length_chunk segment_id_field[start_row:end_row, :] = segment_id_chunk print(f"Distance range: {distance_field.min():.2f} to {distance_field.max():.2f}") print(f"Arc length range: {arc_length_field.min():.2f} to {arc_length_field.max():.2f}") print("Normalizing flow vectors...") magnitudes = np.sqrt(flow_field[:, :, 0]**2 + flow_field[:, :, 1]**2) magnitudes = np.maximum(magnitudes, 1e-8) flow_field[:, :, 0] /= magnitudes flow_field[:, :, 1] /= magnitudes if blur_iterations > 0: print(f"Applying distance-based blur (max {blur_iterations} iterations)...") original_flow = flow_field.copy() print(" Creating maximally blurred version...") blurred_flow = simple_blur(flow_field, iterations=blur_iterations) magnitudes = np.sqrt(blurred_flow[:, :, 0]**2 + blurred_flow[:, :, 1]**2) magnitudes = np.maximum(magnitudes, 1e-8) blurred_flow[:, :, 0] /= magnitudes blurred_flow[:, :, 1] /= magnitudes print(" Blending based on distance to path...") max_distance = 60.0 distance_normalized = np.clip(distance_field / max_distance, 0.0, 1.0) blend_factor = distance_normalized[:, :, np.newaxis] flow_field = original_flow * (1.0 - blend_factor) + blurred_flow * blend_factor magnitudes = np.sqrt(flow_field[:, :, 0]**2 + flow_field[:, :, 1]**2) magnitudes = np.maximum(magnitudes, 1e-8) flow_field[:, :, 0] /= magnitudes flow_field[:, :, 1] /= magnitudes print(" Distance-based blur complete!") print("Saving segment ID debug image...") num_segments = len(path_segments) segment_colors = np.random.RandomState(42).rand(num_segments, 3).astype(np.float32) segment_img = np.zeros((resolution, resolution, 4), dtype=np.float32) for y in range(resolution): for x in range(resolution): seg_id = segment_id_field[y, x] if seg_id >= 0 and seg_id < num_segments: segment_img[y, x, 0:3] = segment_colors[seg_id] segment_img[y, x, 3] = 1.0 segment_debug_img = bpy.data.images.new( "Segment_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True ) segment_debug_img.pixels[:] = segment_img.flatten() segment_debug_img.filepath_raw = str( Path(bpy.data.filepath).parent.parent / "textures" / "path_segment_debug.png" ) segment_debug_img.file_format = 'PNG' segment_debug_img.save() bpy.data.images.remove(segment_debug_img) print(f" Saved to textures/path_segment_debug.png ({num_segments} segments)") print("Saving distance field debug image...") distance_normalized = np.clip(distance_field / 50.0, 0.0, 1.0) distance_img = np.zeros((resolution, resolution, 4), dtype=np.float32) distance_img[:, :, 0] = distance_normalized distance_img[:, :, 1] = distance_normalized distance_img[:, :, 2] = distance_normalized distance_img[:, :, 3] = 1.0 debug_img = bpy.data.images.new( "Distance_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True ) debug_img.pixels[:] = distance_img.flatten() debug_img.filepath_raw = str( Path(bpy.data.filepath).parent.parent / "textures" / "path_distance_debug.png" ) debug_img.file_format = 'PNG' debug_img.save() bpy.data.images.remove(debug_img) print(" Saved to textures/path_distance_debug.png") print("Saving direction field debug image...") direction_img = np.zeros((resolution, resolution, 4), dtype=np.float32) cardinal_dirs = np.array([ [0.0, 1.0], [0.707, 0.707], [1.0, 0.0], [0.707, -0.707], [0.0, -1.0], [-0.707, -0.707], [-1.0, 0.0], [-0.707, 0.707], ]) cardinal_colors = np.array([ [76, 120, 168], [242, 142, 43], [225, 87, 89], [118, 183, 178], [89, 161, 79], [237, 201, 72], [176, 122, 161], [255, 157, 167], ]) / 255.0 flow_flat = flow_field.reshape(-1, 2) dots = flow_flat @ cardinal_dirs.T closest_dir_indices = np.argmax(dots, axis=1) direction_img[:, :, 0:3] = cardinal_colors[closest_dir_indices].reshape(resolution, resolution, 3) direction_img[:, :, 3] = 1.0 print("Drawing compass in lower right corner...") compass_radius = resolution // 8 compass_center_x = resolution - compass_radius - 20 compass_center_y = compass_radius + 20 for y in range(resolution): for x in range(resolution): dx = x - compass_center_x dy = y - compass_center_y dist = np.sqrt(dx * dx + dy * dy) if dist <= compass_radius: angle = np.arctan2(dy, dx) sector = int(((angle + np.pi / 2) % (2 * np.pi)) / (np.pi / 4)) % 8 direction_img[y, x, 0:3] = cardinal_colors[sector] direction_debug_img = bpy.data.images.new( "Direction_Debug", width=resolution, height=resolution, alpha=True, float_buffer=True ) direction_debug_img.pixels[:] = direction_img.flatten() direction_debug_img.filepath_raw = str( Path(bpy.data.filepath).parent.parent / "textures" / "path_direction_debug.png" ) direction_debug_img.file_format = 'PNG' direction_debug_img.save() bpy.data.images.remove(direction_debug_img) print(" Saved to textures/path_direction_debug.png (8-color cardinal directions)") print("\n=== DEBUG: Sample flow field values ===") print(f"Terrain bounds: X=[{min_x}, {max_x}], Y=[{min_y}, {max_y}]") sample_pixels = [ (resolution // 4, resolution // 4), (resolution // 2, resolution // 2), (3 * resolution // 4, 3 * resolution // 4), ] for py, px in sample_pixels: u = px / resolution v = py / resolution world_x = min_x + u * (max_x - min_x) world_y = min_y + v * (max_y - min_y) flow_x = flow_field[py, px, 0] flow_y = flow_field[py, px, 1] dots = [np.dot([flow_x, flow_y], cardinal_dirs[i]) for i in range(8)] best_dir = np.argmax(dots) dir_names = ["North", "NE", "East", "SE", "South", "SW", "West", "NW"] print(f" Pixel [{px}, {py}] -> World [{world_x:.1f}, {world_y:.1f}]") print(f" Flow: [{flow_x:.3f}, {flow_y:.3f}] -> {dir_names[best_dir]}") print(f" Encoded: R={flow_x * 0.5 + 0.5:.3f}, G={flow_y * 0.5 + 0.5:.3f}") print("Encoding to RGBA texture...") flow_encoded_x = flow_field[:, :, 0] * 0.5 + 0.5 flow_encoded_y = flow_field[:, :, 1] * 0.5 + 0.5 distance_normalized = np.clip(distance_field / max_distance, 0.0, 1.0) arc_length_repeat = 100.0 arc_length_normalized = np.fmod(arc_length_field, arc_length_repeat) / arc_length_repeat print(f"Distance encoding: 0.0 = on path, 1.0 = {max_distance}+ units away") print(f"Arc length encoding: repeating pattern every {arc_length_repeat} world units") flowmap = np.zeros((resolution, resolution, 4), dtype=np.float32) flowmap[:, :, 0] = flow_encoded_x flowmap[:, :, 1] = flow_encoded_y flowmap[:, :, 2] = distance_normalized flowmap[:, :, 3] = arc_length_normalized print(f"Creating Blender image...") output_img = bpy.data.images.new( name="Path_Flowmap", width=resolution, height=resolution, alpha=True, float_buffer=True ) output_img.pixels[:] = flowmap.flatten() bm.free() return output_img def save_flowmap(output_path, resolution=1024, blur_iterations=3, terrain_name="TerrainPlane", path_collection="Paths"): """ Main function to generate and save flowmap from paths in current Blender file. Args: output_path: Path to save PNG flowmap resolution: Output texture resolution blur_iterations: Smoothing iterations terrain_name: Name of terrain object path_collection: Name of collection or search pattern for path curves """ terrain_obj = bpy.data.objects.get(terrain_name) if not terrain_obj: print(f"Object '{terrain_name}' not found. Searching for terrain-like objects...") for obj in bpy.data.objects: if obj.type == 'MESH': print(f" Found mesh: {obj.name}") if 'terrain' in obj.name.lower() or 'plane' in obj.name.lower(): terrain_obj = obj print(f" -> Using: {obj.name}") break if not terrain_obj: raise ValueError( f"Object '{terrain_name}' not found and no terrain mesh detected. " f"Available objects: {[obj.name for obj in bpy.data.objects if obj.type == 'MESH']}" ) print(f"Using terrain object: {terrain_obj.name}") if terrain_obj.type != 'MESH': raise ValueError(f"Object '{terrain_obj.name}' is not a mesh") path_curves = get_path_curves(path_collection) if not path_curves: raise ValueError( f"No path curves found. Create curves and add them to a '{path_collection}' collection " f"or name them with 'path' in the name. Available curves: " f"{[obj.name for obj in bpy.data.objects if obj.type == 'CURVE']}" ) print(f"Found {len(path_curves)} path curves:") for curve in path_curves: print(f" - {curve.name}") flowmap_img = generate_flowmap_from_paths( terrain_obj, path_curves, resolution=resolution, blur_iterations=blur_iterations ) flowmap_img.filepath_raw = str(output_path) flowmap_img.file_format = 'OPEN_EXR' flowmap_img.save() bpy.data.images.remove(flowmap_img) print(f"\n{'='*60}") print(f"Flowmap generation complete!") print(f"Output: {output_path}") print(f"Resolution: {resolution}×{resolution}") print(f"Paths used: {len(path_curves)}") print(f"Format: OpenEXR (32-bit float)") print(f"{'='*60}") print("\nChannel encoding:") print(" R channel: X direction toward nearest path (0.5=none, <0.5=left, >0.5=right)") print(" G channel: Y direction toward nearest path (0.5=none, <0.5=down, >0.5=up)") print(" B channel: Distance to nearest path (0.0=on path, 1.0=far away)") print(" A channel: Arc length along path (0.0-1.0, repeating every 100 units)") print("\nTo use in shader:") print(" vec4 flowmap_sample = texture(flowmap, uv);") print(" vec2 direction = flowmap_sample.rg * 2.0 - 1.0; // Direction to path") print(" float distance = flowmap_sample.b; // Distance to path") print(" float arc_length = flowmap_sample.a * 100.0; // Arc length in world units") if __name__ == "__main__": project_root = Path(bpy.data.filepath).parent.parent output_path = project_root / "textures" / "terrain_flowmap.exr" output_path.parent.mkdir(parents=True, exist_ok=True) save_flowmap( output_path=output_path, resolution=1024, blur_iterations=5, terrain_name="TerrainPlane", path_collection="Paths" )