Reputation: 1
I am creating a graph neural network model for predicting the flow of water on a terrain. To train the model, I am passing a Digital Elevation Model file as input and extracting a rectangle out of the entire file for training it. Based on the area extracted, I am generating momentum and velocity of water in X and Y directions depending on the physical features. After training, while checking the visualization, I am observing that the arrows representing the flows are being concatenated at the bottom of the output.
# Load the DEM mesh file
mesh_file_path = '/content/drive/MyDrive/meshOp.obj'
mesh = trimesh.load(mesh_file_path)
# Define the bounding box
def calculate_bounding_box(center, size):
half_size = size / 2
x_min = center[0] - half_size
x_max = center[0] + half_size
y_min = center[1] - half_size
y_max = center[1] + half_size
return [x_min, y_min, x_max, y_max]
# Example: Set center and size
center = (50, 50)
size = 200 # Size of the area in square units
# Calculate bounding box based on the given center and size
bounding_box = calculate_bounding_box(center, size)
# Process bounding box function with momentum calculation
def process_bounding_box(mesh, bounding_box):
bbox_mask = (
(mesh.vertices[:, 0] >= bounding_box[0]) &
(mesh.vertices[:, 0] <= bounding_box[2]) &
(mesh.vertices[:, 1] >= bounding_box[1]) &
(mesh.vertices[:, 1] <= bounding_box[3])
)
bbox_vertices = mesh.vertices[bbox_mask]
bbox_faces_remapped = []
vertex_mapping = {old_index: new_index for new_index, old_index in enumerate(np.where(bbox_mask)[0])}
for face in mesh.faces:
if all(bbox_mask[face]):
bbox_faces_remapped.append([vertex_mapping[vertex_index] for vertex_index in face])
bbox_faces = bbox_faces_remapped
elevation = bbox_vertices[:, 2]
slopes = np.zeros(len(bbox_vertices))
momentum = np.zeros((len(bbox_vertices), 2)) # Momentum in x and y directions
# Constants for water momentum calculation
water_density = 1000 # kg/m^3, typical density of water
gravity = 9.81 # m/s^2, gravitational acceleration
water_volume_per_vertex = 1.0 # m^3, assumed volume of water at each vertex
for i, vertex in enumerate(bbox_vertices):
adjacent_faces = [face for face in bbox_faces if i in face]
face_slopes = []
velocity_x, velocity_y = 0, 0
for face in adjacent_faces:
v0, v1, v2 = bbox_vertices[face]
dist1 = distance.euclidean(v0[:2], v1[:2])
dist2 = distance.euclidean(v1[:2], v2[:2])
dist3 = distance.euclidean(v2[:2], v0[:2])
height_diff1 = abs(v0[2] - v1[2])
height_diff2 = abs(v1[2] - v2[2])
height_diff3 = abs(v2[2] - v0[2])
# Calculate slopes
slope1 = height_diff1 / dist1 if dist1 != 0 else 0
slope2 = height_diff2 / dist2 if dist2 != 0 else 0
slope3 = height_diff3 / dist3 if dist3 != 0 else 0
face_slopes.append(np.mean([slope1, slope2, slope3]))
# Calculate potential energy converted to kinetic energy to estimate velocity
velocity_x += np.sign(v1[0] - v0[0]) * gravity * slope1
velocity_y += np.sign(v1[1] - v0[1]) * gravity * slope2
velocity_x += np.sign(v2[0] - v1[0]) * gravity * slope2
velocity_y += np.sign(v2[1] - v1[1]) * gravity * slope3
velocity_x += np.sign(v0[0] - v2[0]) * gravity * slope3
velocity_y += np.sign(v0[1] - v2[1]) * gravity * slope1
if face_slopes:
slopes[i] = np.mean(face_slopes)
# Calculate velocity magnitude
velocity_magnitude = np.sqrt(velocity_x**2 + velocity_y**2)
# Calculate momentum
mass = water_density * water_volume_per_vertex
momentum[i] = mass * np.array([velocity_x, velocity_y]) / max(len(adjacent_faces), 1)
print(f"Number of vertices in bounding box: {len(bbox_vertices)}")
tri = Delaunay(bbox_vertices[:, :2])
edges = []
for simplex in tri.simplices:
edges.extend([(simplex[i], simplex[j]) for i in range(len(simplex)) for j in range(i + 1, len(simplex))])
# Normalize features
node_features = np.column_stack((elevation, slopes, np.linalg.norm(momentum, axis=1)))
# Scaling: Standardization (zero mean, unit variance)
node_features_mean = node_features.mean(axis=0)
node_features_std = node_features.std(axis=0)
node_features = (node_features - node_features_mean) / node_features_std
# Normalize target values
momentum_mean = momentum.mean(axis=0)
momentum_std = momentum.std(axis=0)
momentum = (momentum - momentum_mean) / momentum_std
node_features = torch.tensor(node_features, dtype=torch.float)
edge_indices = torch.tensor(edges, dtype=torch.long).t().contiguous()
momentum_targets = torch.tensor(momentum, dtype=torch.float)
# Create a PyTorch Geometric data object
return Data(x=node_features, edge_index=edge_indices, y=momentum_targets)
# Process the bounding box to get training data
data = process_bounding_box(mesh, bounding_box)
# Create a DataLoader
data_loader = DataLoader([data], batch_size=1, shuffle=True)
# Define the GCN model for flow prediction
class FlowGCN(torch.nn.Module):
def __init__(self):
super(FlowGCN, self).__init__()
self.conv1 = GCNConv(3, 64)
self.conv2 = GCNConv(64, 64)
self.conv3 = GCNConv(64, 2) # Output is a vector (momentum in x and y)
def forward(self, x, edge_index):
x = F.relu(self.conv1(x, edge_index))
x = F.relu(self.conv2(x, edge_index))
x = self.conv3(x, edge_index)
return x
# Initialize the model, optimizer, and loss function
model = FlowGCN()
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()
# Training loop
def train(model, data_loader, optimizer, loss_fn, epochs=200):
model.train()
for epoch in range(epochs):
for batch in data_loader:
optimizer.zero_grad()
out = model(batch.x, batch.edge_index)
loss = loss_fn(out, batch.y)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
# Train the model
train(model, data_loader, optimizer, loss_fn)
def aggregate_predictions_to_faces(vertices, faces, predictions):
valid_faces = []
face_predictions = []
for i, face in enumerate(faces):
if all(idx < len(predictions) for idx in face):
# Aggregate vertex predictions for each face by averaging
face_prediction = np.mean(predictions[face], axis=0)
face_predictions.append(face_prediction)
valid_faces.append(i)
print(f"Total faces: {len(faces)}")
print(f"Valid faces: {len(valid_faces)}")
print(f"Predictions shape: {predictions.shape}")
return np.array(face_predictions), valid_faces
# Visualization function updated
def visualize_flow(mesh, faces, face_predictions, valid_faces, title='Predicted Water Flow'):
plt.figure(figsize=(10, 10))
# Plot the mesh
plt.triplot(mesh.vertices[:, 0], mesh.vertices[:, 1], faces, color='gray', alpha=0.5)
# Scale factor for arrows to make them more visible
scale_factor = 150.0
# Define a subset of faces for visualization
subset_indices = range(0, len(valid_faces), 10) # Adjust step size as needed
for i in subset_indices:
face_index = valid_faces[i]
face = faces[face_index]
centroid = mesh.vertices[face].mean(axis=0)
flow_vector = face_predictions[i]
# Normalize the flow vector
magnitude = np.linalg.norm(flow_vector)
if magnitude > 0:
flow_vector /= magnitude # Normalize
flow_vector *= scale_factor # Scale
# Quiver plot for better visualization
plt.quiver(
centroid[0], centroid[1],
flow_vector[0], flow_vector[1],
angles='xy', scale_units='xy', scale=1, color='blue'
)
plt.title(title)
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
# Test the model and visualize
model.eval()
with torch.no_grad():
for batch in data_loader:
predictions = model(batch.x, batch.edge_index).numpy()
# Aggregate predictions to faces
face_predictions, valid_faces = aggregate_predictions_to_faces(
mesh.vertices, mesh.faces, predictions
)
visualize_flow(mesh, mesh.faces, face_predictions, valid_faces)
[this is the current output of the visualization where the arrows are concatenated at the bottom](https://i.sstatic.net/pBRI0vdf.png)
Please do let me know if I am taking the input wrong along with any other mistakes I might have made. Thanks for any useful insights in advance. I assumed that the flows will be localized around a set of co-ordinates. Could someone please check through the code and tell me what am I doing wrong as I am fairly new to these concepts.
Upvotes: 0
Views: 17