conan/processing/process_RTGene.py

364 lines
16 KiB
Python

import tensorflow as tf
import sys
import os
import argparse
import cv2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from tqdm.notebook import tqdm
from rt_gene.gaze_tools import get_phi_theta_from_euler, limit_yaw
from rt_gene.extract_landmarks_method_base import LandmarkMethodBase
from rt_gene.estimate_gaze_base import GazeEstimatorBase
from rt_gene.estimate_gaze_tensorflow import GazeEstimator
from rt_gene.gaze_tools_standalone import euler_from_matrix
import itertools
import pandas as pd
#os.environ["CUDA_VISIBLE_DEVICES"]="1"
def getCenter(box):
return np.array([box[2]+box[0], box[3]+box[1]])/2
def load_camera_calibration(calibration_file):
fileType = calibration_file.split(".")[-1]
if fileType == "pkl":
import pickle
infile = open(calibration_file,'rb')
data = pickle.load(infile)
return data["distortion_coef"], data["camera_matrix"]
elif fileType == "yaml":
import yaml
with open(calibration_file, 'r') as f:
cal = yaml.safe_load(f)
dist_coefficients = np.array(cal['distortion_coefficients']['data'], dtype='float32').reshape(1, 5)
camera_matrix = np.array(cal['camera_matrix']['data'], dtype='float32').reshape(3, 3)
return dist_coefficients, camera_matrix
def extract_eye_image_patches(subjects, landmark_estimator):
for subject in subjects:
le_c, re_c, _, _ = subject.get_eye_image_from_landmarks(subject, landmark_estimator.eye_image_size)
subject.left_eye_color = le_c
subject.right_eye_color = re_c
def estimate_gaze(base_name, color_img, landmark_estimator, gaze_estimator, dist_coefficients, camera_matrix, args):
faceboxes = landmark_estimator.get_face_bb(color_img)
if len(faceboxes) == 0:
tqdm.write('Could not find faces in the image')
return
subjects = landmark_estimator.get_subjects_from_faceboxes(color_img, faceboxes)
extract_eye_image_patches(subjects, landmark_estimator)
input_r_list = []
input_l_list = []
input_head_list = []
valid_subject_list = []
roll_pitch_yaw_list = []
for idx, subject in enumerate(subjects):
if subject.left_eye_color is None or subject.right_eye_color is None:
#tqdm.write('Failed to extract eye image patches')
continue
success, rotation_vector, _ = cv2.solvePnP(landmark_estimator.model_points,
subject.landmarks.reshape(len(subject.landmarks), 1, 2),
cameraMatrix=camera_matrix,
distCoeffs=dist_coefficients, flags=cv2.SOLVEPNP_DLS)
if not success:
tqdm.write('Not able to extract head pose for subject {}'.format(idx))
continue
_rotation_matrix, _ = cv2.Rodrigues(rotation_vector)
_rotation_matrix = np.matmul(_rotation_matrix, np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]]))
_m = np.zeros((4, 4))
_m[:3, :3] = _rotation_matrix
_m[3, 3] = 1
# Go from camera space to ROS space
_camera_to_ros = [[0.0, 0.0, 1.0, 0.0],
[-1.0, 0.0, 0.0, 0.0],
[0.0, -1.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 1.0]]
roll_pitch_yaw = list(euler_from_matrix(np.dot(_camera_to_ros, _m)))
roll_pitch_yaw = limit_yaw(roll_pitch_yaw)
roll_pitch_yaw_list.append(roll_pitch_yaw)
phi_head, theta_head = get_phi_theta_from_euler(roll_pitch_yaw)
face_image_resized = cv2.resize(subject.face_color, dsize=(224, 224), interpolation=cv2.INTER_CUBIC)
head_pose_image = landmark_estimator.visualize_headpose_result(face_image_resized, (phi_head, theta_head))
if args['vis_headpose']:
plt.axis("off")
plt.imshow(cv2.cvtColor(head_pose_image, cv2.COLOR_BGR2RGB))
plt.show()
if args['save_headpose']:
cv2.imwrite(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_headpose.jpg'), head_pose_image)
input_r_list.append(gaze_estimator.input_from_image(subject.right_eye_color))
input_l_list.append(gaze_estimator.input_from_image(subject.left_eye_color))
input_head_list.append([theta_head, phi_head])
valid_subject_list.append(idx)
if len(valid_subject_list) == 0:
return
gaze_est = gaze_estimator.estimate_gaze_twoeyes(inference_input_left_list=input_l_list,
inference_input_right_list=input_r_list,
inference_headpose_list=input_head_list)
file_base = os.path.splitext(base_name)[0]
file = "_".join(file_base.split("_")[:-1])
frame = int(file_base.split("_")[-1])
ret = []
for subject_id, gaze, headpose, roll_pitch_yaw in zip(valid_subject_list, gaze_est.tolist(), input_head_list, roll_pitch_yaw_list):
subject = subjects[subject_id]
#print(roll_pitch_yaw)
# Build visualizations
r_gaze_img = gaze_estimator.visualize_eye_result(subject.right_eye_color, gaze)
l_gaze_img = gaze_estimator.visualize_eye_result(subject.left_eye_color, gaze)
s_gaze_img = np.concatenate((r_gaze_img, l_gaze_img), axis=1)
if args['vis_gaze']:
plt.axis("off")
plt.imshow(cv2.cvtColor(s_gaze_img, cv2.COLOR_BGR2RGB))
plt.show()
if args['save_gaze']:
cv2.imwrite(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_gaze.jpg'), s_gaze_img)
# cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_left.jpg'), subject.left_eye_color)
# cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_right.jpg'), subject.right_eye_color)
if args['save_estimate']:
with open(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_output.txt'), 'w+') as f:
f.write(os.path.splitext(base_name)[0] + ', [' + str(headpose[1]) + ', ' + str(headpose[0]) + ']' +
', [' + str(gaze[1]) + ', ' + str(gaze[0]) + ']' + '\n')
# Phi: pos - look down, neg - look up
# Theta: pos - rotate left, neg - rotate right
d = {"File":file, "Frame": frame, "SubjectId":subject_id, "HeadBox":subject.box, "Landmarks": subject.landmarks, "GazeTheta":gaze[0], "GazePhi":gaze[1], "HeadPoseTheta":headpose[0], "HeadPosePhi":headpose[1], "HeadPoseRoll":roll_pitch_yaw[0], "HeadPosePitch":roll_pitch_yaw[1], "HeadPoseYaw":roll_pitch_yaw[2]}
ret.append(d)
return ret
def visualize(df, FRAMES):
path_list = [f for f in os.listdir(FRAMES) if '.jpg' in f]
path_list.sort()
image = cv2.imread(os.path.join(FRAMES, path_list[0]))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
fig, ax = plt.subplots(1, figsize=(18,10))
for i in range(len(df.SubjectId.unique())):
bbox = df.loc[(df.Frame == 0) & (df.SubjectId == i)]['HeadBox'].values
print(bbox)
if not np.any(pd.isna(bbox)) and len(bbox) > 0:
bbox = np.array(bbox[0])
rect = patches.Rectangle((bbox[0],bbox[1]),bbox[2]-bbox[0],bbox[3]-bbox[1],linewidth=1,edgecolor='c',facecolor='none')
plt.text(bbox[0], bbox[1], 'ID%i' % i, color='c' ,fontsize=20)
ax.add_patch(rect)
ax.imshow(image)
plt.show()
def visualize_sorting(df_sorted):
subs = sorted(df_sorted[~df_sorted.PId.isna()].PId.unique())
for sid in subs:
x = df_sorted[df_sorted.PId==sid].HeadCenter.apply(lambda x: x[0])
y = df_sorted[df_sorted.PId==sid].HeadCenter.apply(lambda x: x[1])
frames = df_sorted[df_sorted.PId==sid].Frame.to_list()
plt.scatter(frames, x, alpha=.2, label = "Sub %i" % sid)
plt.legend()
plt.show()
def process(file, maxPeople, cameraRes = [5760, 2880]):
VIDEO = file
VIDEOOUT = VIDEO.split("/")[-1].split(".")[0]
ROOT = "/".join(VIDEO.split("/")[:-1]) + "/"
TMP_DIR = "/".join(VIDEO.split("/")[:-2]) + "/temp/"
FRAMES = "%s%s_frames" % (TMP_DIR, VIDEOOUT)
if not os.path.exists(VIDEO):
print('WARNING: Could not find video file')
return
script_path = "./"
args = {}
args["calib_file"] = "./calib_insta.pkl"
args["vis_headpose"] = False # store_false
args["save_headpose"] = False # store_false
args["vis_gaze"] = False # store_false
args["save_gaze"] = False # store_false
args["save_estimate"] = False # store_false
args["device_id_facedetection"] = "cuda:0" # store_false
args["im_path"] = os.path.join(script_path, './samples_gaze/')
args["output_path"] = os.path.join(script_path, './samples_gaze/')
args["models"] = [os.path.join(script_path, '../model_nets/Model_allsubjects1.h5')]
args['gaze_backend'] = 'tensorflow'
tqdm.write('Loading networks')
landmark_estimator = LandmarkMethodBase(device_id_facedetection=args["device_id_facedetection"],
checkpoint_path_face=os.path.join(script_path,
"../model_nets/SFD/s3fd_facedetector.pth"),
checkpoint_path_landmark=os.path.join(script_path,
"../model_nets/phase1_wpdc_vdc.pth.tar"),
model_points_file=os.path.join(script_path,
"../model_nets/face_model_68.txt"))
#gaze_estimator = GazeEstimator("/gpu:0", args['models'])
if args['gaze_backend'] == "tensorflow":
from rt_gene.estimate_gaze_tensorflow import GazeEstimator
gaze_estimator = GazeEstimator("/gpu:0", args['models'])
elif args['gaze_backend'] == "pytorch":
from rt_gene.estimate_gaze_pytorch import GazeEstimator
gaze_estimator = GazeEstimator("cuda:0", args['models'])
else:
raise ValueError("Incorrect gaze_base backend, choices are: tensorflow or pytorch")
if not os.path.isdir(args["output_path"]):
os.makedirs(args["output_path"])
video = cv2.VideoCapture(VIDEO)
print('Video frame count: ', video.get(cv2.CAP_PROP_FRAME_COUNT))
if args["calib_file"] is not None and os.path.exists(args["calib_file"]):
_dist_coefficients, _camera_matrix = load_camera_calibration(args["calib_file"])
else:
im_width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
print('WARNING!!! You should provide the camera calibration file, otherwise you might get bad results. \n\
Using a crude approximation!')
_dist_coefficients, _camera_matrix = np.zeros((1, 5)), np.array(
[[im_height, 0.0, im_width / 2.0], [0.0, im_height, im_height / 2.0], [0.0, 0.0, 1.0]])
lstRet = []
for i in tqdm(list(range(int(video.get(cv2.CAP_PROP_FRAME_COUNT))))):
image_file_name = "%s_%i.XXX" % (os.path.splitext(VIDEO)[0], i)
ret, image = video.read()
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
lstRet.append(estimate_gaze(image_file_name, image, landmark_estimator, gaze_estimator, _dist_coefficients, _camera_matrix, args))
lst = list(itertools.chain.from_iterable(lstRet))
df = pd.DataFrame(lst)
df["HeadCenter"] = df.HeadBox.apply(lambda x: getCenter(x))
df["Phi"] = df.GazePhi + df.HeadPosePhi # gaze yaw
df["Theta"] = df.GazeTheta + df.HeadPoseTheta # gaze pitch
df['Yaw'] = df.GazePhi + df.HeadPoseYaw
df['Pitch'] = df.GazeTheta + df.HeadPosePitch
# path = '%s%s_raw.pkl' % (TMP_DIR, VIDEOOUT)
# df.to_pickle(path)
# print('Saved raw detections to: ', path)
visualize(df, FRAMES)
# Sort ID detections
###############################################################################################################
# Find first frame where all are detected
for frame in sorted(df.Frame.unique()):
frame_df = df.loc[df.Frame == frame]
if len(frame_df['SubjectId'].unique()) == maxPeople:
first_frame = frame
print('First frame where all are detected: ', first_frame)
break
empty_rows = pd.DataFrame()
empty_rows['Frame'] = np.zeros(maxPeople).astype(int)
for col in df.columns:
if not col == 'Frame':
empty_rows[col] = df.loc[df.Frame == first_frame, [col]].values
df = df.loc[df.Frame != 0]
df = df.append(empty_rows).sort_values(by=['Frame'])
df.head()
df_sorted = df.copy()
df_sorted["PId"] = None
df_sorted.loc[df_sorted.Frame == 0, "PId"] = list(range(maxPeople))
df_sorted = df_sorted.sort_values("Frame")
df_sorted.index = list(range(len(df_sorted)))
for frameId in tqdm(sorted(df_sorted.Frame.unique())[1:]):
pidAssignement = []
for frameIdBefore in range(frameId - 1, -1, -1):
allFramesBefore = df_sorted[(df_sorted.Frame == frameIdBefore) & (~df_sorted.PId.isna())]
if (np.array_equal(sorted(allFramesBefore.PId.to_list()), np.arange(maxPeople))):
dfFramesCurrent = df_sorted[df_sorted.Frame == frameId]
for indexCurrentFrame, frameCurrent in dfFramesCurrent.iterrows():
lst = []
for indexBeforeFrame, frameBefore in allFramesBefore.iterrows():
if (frameBefore.HeadCenter[0] > frameCurrent.HeadCenter[0]):
p1 = np.array(frameCurrent.HeadCenter)
p2 = np.array(frameBefore.HeadCenter)
else:
p1 = np.array(frameBefore.HeadCenter)
p2 = np.array(frameCurrent.HeadCenter)
v1 = p1 - p2
dist1 = np.linalg.norm(v1)
p1[0] = p1[0] + cameraRes[0]
v2 = p1 - p2
dist2 = np.linalg.norm(v2)
dist = min([dist1, dist2])
lst.append([dist, frameCurrent.name, indexBeforeFrame, frameBefore])
lst.sort(key=lambda x: x[0])
pidAssignement.append([indexCurrentFrame, lst[0][-1].PId])
break
for index, pid in pidAssignement:
df_sorted.loc[df_sorted.index == index, "PId"] = pid
visualize_sorting(df_sorted)
del df_sorted["SubjectId"]
# Rearrange DataFrame: each ID has specific columns
###############################################################################################################
df_sorted = df_sorted[~df_sorted.PId.isna()].drop_duplicates(subset=['Frame', 'PId'])
FACE_COUNT = len(df_sorted[~df_sorted.PId.isna()].PId.unique())
df2 = df_sorted.pivot(index='Frame', columns="PId",
values=["Landmarks", "GazeTheta", "GazePhi", "HeadCenter", "HeadPoseTheta", "HeadPosePhi",
"HeadPoseYaw", "HeadPosePitch", "HeadPoseRoll", "Phi", "Theta"])
lst = []
for label in ["Landmarks", "GazeTheta", "GazePhi", "Head", "HeadPoseTheta", "HeadPosePhi", "HeadPoseYaw",
"HeadPosePitch", "HeadPoseRoll", "Phi", "Theta"]:
for head_id in range(FACE_COUNT):
lst.append("ID%i_%s" % (head_id, label))
df2.columns = lst
df2 = df2.reset_index()
path = "%s%s_RTGene.pkl" % (TMP_DIR, VIDEOOUT)
df2.to_pickle(path)
print("Saved RT-Gene detections to %s" % path)