Reputation: 1
I want to take pictures and put them in a folder after passing some filters to it. The problem I'm having is that whenever I take a second image or third etc, and try to save it to another directory, A very similar image to the one taken at the start is being saved to the new folder, and not an image taken just when I said so in the input. I believe my program is taking lots of pictures at the start somewhere I do not know and then using those. Maybe it is a memory problem, as I am using a Jetson Nano and taking too many pictures in RAM is not easy for it My code (Do not mind for the AI PARTS)
import cv2
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, concatenate, Input
from sklearn.model_selection import train_test_split
import numpy as np
from tensorflow.keras.utils import to_categorical
import os
from time import sleep
path = os.getcwd()
forward = "fd"
right = "rt"
left = "lt"
right_rotate = "rtr"
left_rotate = "ltr"
forward_folder = f"{os.path.join(path, forward)}"
right_folder = f"{os.path.join(path, right)}"
left_folder = f"{os.path.join(path, left)}"
rotate_right_folder = f"{os.path.join(path, right_rotate)}"
rotate_left_folder = f"{os.path.join(path, left_rotate)}"
fol2abbrev = {forward_folder : forward, right_folder : right, left_folder : left, rotate_right_folder : right_rotate, rotate_left_folder : left_rotate}
folders = [forward_folder, right_folder, left_folder, rotate_right_folder, rotate_left_folder]
abbrev = [forward, right, left, right_rotate, left_rotate]
recording_number = 1
for element in abbrev:
try:
os.mkdir(element)
except Exception as e:
#print(e)
pass
cam = cv2.VideoCapture(0)
#Instantiate cv2 cam (TAKES LESS NOW)
width = int(cam.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cam.get(cv2.CAP_PROP_FRAME_HEIGHT))
Images = np.empty((0, height, width))#set the image array to an empty one, with the shape of height width (To make the image shape)
labels = np.empty(0)
Lasers = np.empty((0, 5)) #Put 8 if 8 lasers in the end
def preprocess_image(image):
# Grayscale
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Blur the image
blurred_image = cv2.GaussianBlur(image, (5, 5), 4)
# Sharpen the image
kernel = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]])
sharpened_image = cv2.filter2D(blurred_image, -1, kernel)
# Enhance contrast (optional)
clahe = cv2.createCLAHE(clipLimit=16.0, tileGridSize=(16, 16))
contrast_enhanced = clahe.apply(sharpened_image)
return contrast_enhanced
def image_sort(path):
images = []
for element in os.listdir(path):
if ".jpg" in element:
images.append(element)
return images
def text_sort(path):
texts = []
for element in os.listdir(path):
if ".txt" in element:
texts.append(element)
return texts
def folder_value(folder):
if folder == forward_folder:
return 0
elif folder == right_folder:
return 1
elif folder == left_folder:
return 2
elif folder == rotate_right_folder:
return 3
elif folder == rotate_left_folder:
return 4
else:
raise "Problem with folder, the value given is not equal to a valid folder adress stated above!"
def rec_image(folder, times=1):
ret, image = cam.read() #THE CAMERA SHOULD BE CONNECTED DIRECTLY TO JETSON NANO
image = preprocess_image(image)
print(f"IMAGE is being saved to {folder}")
cv2.imwrite(f"{folder}/{len(image_sort(folder))}.jpg",image)
def rec_lasers(folder, times=1):
data = [1,2,3,4,5] #GET DATA FROM LASERS WITH JETSON NANO
file = open(f"{folder}/{len(text_sort(folder))}.txt", "w")
file.write(str(data))
file.close()
def read_data(folder):
global Images, labels, Lasers
for i in range(len(image_sort(folder))):
if i < len(image_sort(folder))-1:
image = cv2.imread(f"{folder}{i+1}.jpg", cv2.IMREAD_GRAYSCALE)
Images = np.append(Images, [image], axis=0)
labels = np.append(labels, folder_value(folder))
for i in range(len(text_sort(folder))):
if i < len(text_sort(folder))-1:
data = open(folder+text_sort(folder)[i],"r")
data = data.read().strip('][').split(', ')
Lasers = np.append(Lasers, [data], axis=0) # Load the LASER DATA
def process_predictions(tensor): #Has to be a numpy tensor, because tensorflow const cannot append
final_tensor = np.array([])
for prediction in tensor:
i = 1
for choice in prediction:
#print(choice)
if round(float(choice)) == 1:
final_tensor = np.concatenate((final_tensor, [i]))
#print("IT SHOULD CONCAT 1!")
else:
i+=1
return tf.constant(np.round(final_tensor))
def process_predictions_combined(predictions_image, predictions_laser):
max_indices_image = np.argmax(predictions_image, axis=1)
max_indices_laser = np.argmax(predictions_laser, axis=1)
# Combine max indices for image and laser predictions
combined_indices = np.column_stack((max_indices_image, max_indices_laser))
return tf.constant(np.round(combined_indices))
def train():
global Images, labels, Lasers
for folder in folders:
read_data(folder)
Lasers = Lasers.reshape((Lasers.shape[0], Lasers.shape[1], 1))
print(Images.shape)
print(labels.shape)
print(Lasers.shape)
# Split the data
train_images, test_images, train_labels, test_labels = train_test_split(Images, labels, test_size=0.1, random_state=21, stratify=labels)
train_lasers, test_lasers, _, _ = train_test_split(Lasers, labels, test_size=0.1, random_state=21, stratify=labels)
#print(f"LASERS : {train_lasers},{train_lasers.shape}")
#Preprocess the data
train_images = tf.expand_dims(train_images, axis=-1)
test_images = tf.expand_dims(test_images, axis=-1)
train_images = tf.cast(train_images, dtype=tf.int32)
test_images = tf.cast(test_images, dtype=tf.int32)
"""
train_lasers = tf.expand_dims(train_lasers, axis=-1)
train_lasers = tf.expand_dims(train_lasers, axis=-1)"""
train_lasers = tf.strings.to_number(train_lasers, out_type=tf.float64)
test_lasers = tf.strings.to_number(test_lasers, out_type=tf.float64)
train_labels = train_labels.astype(int)
test_labels = test_labels.astype(int)
laser_count = len(Lasers[0])
print(f"LASERS : {train_lasers},{train_lasers.shape}")
# CONV2D
model1 = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(height, width, 1), name="Image_Input"),
tf.keras.layers.Conv2D(16, (3, 3), activation=tf.keras.activations.relu, name="Image_1_Conv2D"),
tf.keras.layers.MaxPooling2D((2, 2), name="Image_2_Pooling2D"),
tf.keras.layers.Conv2D(32, (3, 3), activation=tf.keras.activations.relu, name="Image_3_Conv2D"),
tf.keras.layers.MaxPooling2D((2, 2), name="Image_4_Pooling2D"),
tf.keras.layers.Conv2D(64, (3, 3), activation=tf.keras.activations.relu, name="Image_5_Conv2D"),
tf.keras.layers.MaxPooling2D((2, 2),name="Image_6_Pooling2D"),
tf.keras.layers.Flatten(name="Image_7_Flatten"),
tf.keras.layers.Dropout(0.5,name="Image_8_Dropout"),
tf.keras.layers.Dense(5, activation=tf.keras.activations.softmax)
])
#CONV1D
model2 = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(laser_count, 1), name="Laser_Input"),
tf.keras.layers.Conv1D(32, kernel_size=3, activation=tf.keras.activations.relu, name="Laser_Conv1D"),
tf.keras.layers.Flatten(name="Laser_flatten"),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(5, activation=tf.keras.activations.softmax)
])
#FC MODEL
model3 = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=2, name="Combined_Input"),
tf.keras.layers.Dense(32, activation=tf.keras.activations.relu, name="Combined_1_Dense"),
tf.keras.layers.Dense(64, activation=tf.keras.activations.relu, name="Combined_2_Dense"),
tf.keras.layers.Dropout(0.3, name="Combined_3_Dropout"),
tf.keras.layers.Dense(5, activation=tf.keras.activations.softmax, name="Combined_OUTPUT")
])
#Compile and train the models
model1.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
loss=tf.keras.losses.sparse_categorical_crossentropy, #Because it is category, mse is for numbers
metrics=["accuracy"])
model2.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.003),
loss=tf.keras.losses.sparse_categorical_crossentropy, #Because it is category, mse is for numbers
metrics=["accuracy"])
model3.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.005),
loss=tf.keras.losses.sparse_categorical_crossentropy, #Because it is category, mse is for numbers
metrics=["accuracy"])
#print(train_labels)
model1.fit(train_images, train_labels, epochs=10, validation_data=(test_images, test_labels))
"""print("Train lasers shape:", train_lasers.shape)
print("Test lasers shape:", test_lasers.shape)
"""
model2.fit(train_lasers, train_labels, epochs=10, validation_data=(test_lasers, test_labels))
predictions_model1 = model1.predict(train_images)
predictions_model2 = model2.predict(train_lasers)
# Concatenate predictions along the axis corresponding to models (axis=1)
"""print(len(train_images)+len(train_lasers))
print("LENGTH of predictions",len(predictions))
print("LENGTH of predictions[0]", len(predictions[0]))
print(np.array(predictions).shape)
print(predictions)"""
predictions = process_predictions_combined(predictions_model1, predictions_model2)
print(predictions)
print(len(predictions))
#predictions = process_predictions(predictions)
#predictions = predictions #- 1
print(f"Predictions : {predictions.shape}")
print(f"Labels : {train_labels.shape}")
model3.fit(predictions-1, train_labels, epochs=25)
model1.save("models/2DCNN")
model2.save("models/1DCNN")
model3.save("models/FCNN")
return model1, model2, model3
def predict(images, lasers):
sleep(1)
predictions_model1 = model1.predict(images)
predictions_model2 = model2.predict(lasers)
combined = process_predictions_combined(predictions_model1, predictions_model2)
combined = combined - 1
print(combined, combined.shape)
pred3 = model3.predict(combined)
print(pred3)
print(pred3.argmax())
return pred3.argmax()
if __name__ == "__main__":
if cam.isOpened():
while True:
main = input("1(fd), 2(rt), 3(lt), 4(rtr), 5(ltr), 6(Train), 7(predict)")
#Instead of input, get values from recv
#Channel 1 to fwd
#Channel 2 to rt and lt (depends on the number)
#Channel 3 constant (set drone to hover / constant throttle)
#Channel 4 to rtr and ltr (depends on number)
if main == "1":
rec_image(forward_folder, recording_number)
rec_lasers(forward_folder, recording_number)
elif main == "2":
rec_image(right_folder, recording_number)
rec_lasers(right_folder, recording_number)
elif main == "3":
rec_image(left_folder, recording_number)
rec_lasers(left_folder, recording_number)
elif main == "4":
rec_image(rotate_right_folder, recording_number)
rec_lasers(rotate_right_folder, recording_number)
elif main == "5":
rec_image(rotate_left_folder, recording_number)
rec_lasers(rotate_left_folder, recording_number)
elif main == "6":
model1, model2, model3 = train()
elif main == "7":
predict()
In the name == "main" you can see I only call the function rec_image(folder) once per option, but it somehow records more than once, maybe it is in the rec_image() function, in cam.read(), or maybe in preprocess_image(). I do not know why it does not take an image each time
I have tried using another python file and importing this one. Trying to use only the rec_image() function it does work however importing the whole file into another one and using the whole if name == "main" it does not
Upvotes: 0
Views: 49