import tensorflow as tf import sys import os import argparse import cv2 import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from tqdm.notebook import tqdm from rt_gene.gaze_tools import get_phi_theta_from_euler, limit_yaw from rt_gene.extract_landmarks_method_base import LandmarkMethodBase from rt_gene.estimate_gaze_base import GazeEstimatorBase from rt_gene.estimate_gaze_tensorflow import GazeEstimator from rt_gene.gaze_tools_standalone import euler_from_matrix import itertools import pandas as pd #os.environ["CUDA_VISIBLE_DEVICES"]="1" def getCenter(box): return np.array([box[2]+box[0], box[3]+box[1]])/2 def load_camera_calibration(calibration_file): fileType = calibration_file.split(".")[-1] if fileType == "pkl": import pickle infile = open(calibration_file,'rb') data = pickle.load(infile) return data["distortion_coef"], data["camera_matrix"] elif fileType == "yaml": import yaml with open(calibration_file, 'r') as f: cal = yaml.safe_load(f) dist_coefficients = np.array(cal['distortion_coefficients']['data'], dtype='float32').reshape(1, 5) camera_matrix = np.array(cal['camera_matrix']['data'], dtype='float32').reshape(3, 3) return dist_coefficients, camera_matrix def extract_eye_image_patches(subjects, landmark_estimator): for subject in subjects: le_c, re_c, _, _ = subject.get_eye_image_from_landmarks(subject, landmark_estimator.eye_image_size) subject.left_eye_color = le_c subject.right_eye_color = re_c def estimate_gaze(base_name, color_img, landmark_estimator, gaze_estimator, dist_coefficients, camera_matrix, args): faceboxes = landmark_estimator.get_face_bb(color_img) if len(faceboxes) == 0: tqdm.write('Could not find faces in the image') return subjects = landmark_estimator.get_subjects_from_faceboxes(color_img, faceboxes) extract_eye_image_patches(subjects, landmark_estimator) input_r_list = [] input_l_list = [] input_head_list = [] valid_subject_list = [] roll_pitch_yaw_list = [] for idx, subject in enumerate(subjects): if subject.left_eye_color is None or subject.right_eye_color is None: #tqdm.write('Failed to extract eye image patches') continue success, rotation_vector, _ = cv2.solvePnP(landmark_estimator.model_points, subject.landmarks.reshape(len(subject.landmarks), 1, 2), cameraMatrix=camera_matrix, distCoeffs=dist_coefficients, flags=cv2.SOLVEPNP_DLS) if not success: tqdm.write('Not able to extract head pose for subject {}'.format(idx)) continue _rotation_matrix, _ = cv2.Rodrigues(rotation_vector) _rotation_matrix = np.matmul(_rotation_matrix, np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]])) _m = np.zeros((4, 4)) _m[:3, :3] = _rotation_matrix _m[3, 3] = 1 # Go from camera space to ROS space _camera_to_ros = [[0.0, 0.0, 1.0, 0.0], [-1.0, 0.0, 0.0, 0.0], [0.0, -1.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]] roll_pitch_yaw = list(euler_from_matrix(np.dot(_camera_to_ros, _m))) roll_pitch_yaw = limit_yaw(roll_pitch_yaw) roll_pitch_yaw_list.append(roll_pitch_yaw) phi_head, theta_head = get_phi_theta_from_euler(roll_pitch_yaw) face_image_resized = cv2.resize(subject.face_color, dsize=(224, 224), interpolation=cv2.INTER_CUBIC) head_pose_image = landmark_estimator.visualize_headpose_result(face_image_resized, (phi_head, theta_head)) if args['vis_headpose']: plt.axis("off") plt.imshow(cv2.cvtColor(head_pose_image, cv2.COLOR_BGR2RGB)) plt.show() if args['save_headpose']: cv2.imwrite(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_headpose.jpg'), head_pose_image) input_r_list.append(gaze_estimator.input_from_image(subject.right_eye_color)) input_l_list.append(gaze_estimator.input_from_image(subject.left_eye_color)) input_head_list.append([theta_head, phi_head]) valid_subject_list.append(idx) if len(valid_subject_list) == 0: return gaze_est = gaze_estimator.estimate_gaze_twoeyes(inference_input_left_list=input_l_list, inference_input_right_list=input_r_list, inference_headpose_list=input_head_list) file_base = os.path.splitext(base_name)[0] file = "_".join(file_base.split("_")[:-1]) frame = int(file_base.split("_")[-1]) ret = [] for subject_id, gaze, headpose, roll_pitch_yaw in zip(valid_subject_list, gaze_est.tolist(), input_head_list, roll_pitch_yaw_list): subject = subjects[subject_id] #print(roll_pitch_yaw) # Build visualizations r_gaze_img = gaze_estimator.visualize_eye_result(subject.right_eye_color, gaze) l_gaze_img = gaze_estimator.visualize_eye_result(subject.left_eye_color, gaze) s_gaze_img = np.concatenate((r_gaze_img, l_gaze_img), axis=1) if args['vis_gaze']: plt.axis("off") plt.imshow(cv2.cvtColor(s_gaze_img, cv2.COLOR_BGR2RGB)) plt.show() if args['save_gaze']: cv2.imwrite(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_gaze.jpg'), s_gaze_img) # cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_left.jpg'), subject.left_eye_color) # cv2.imwrite(os.path.join(args.output_path, os.path.splitext(base_name)[0] + '_right.jpg'), subject.right_eye_color) if args['save_estimate']: with open(os.path.join(args['output_path'], os.path.splitext(base_name)[0] + '_output.txt'), 'w+') as f: f.write(os.path.splitext(base_name)[0] + ', [' + str(headpose[1]) + ', ' + str(headpose[0]) + ']' + ', [' + str(gaze[1]) + ', ' + str(gaze[0]) + ']' + '\n') # Phi: pos - look down, neg - look up # Theta: pos - rotate left, neg - rotate right d = {"File":file, "Frame": frame, "SubjectId":subject_id, "HeadBox":subject.box, "Landmarks": subject.landmarks, "GazeTheta":gaze[0], "GazePhi":gaze[1], "HeadPoseTheta":headpose[0], "HeadPosePhi":headpose[1], "HeadPoseRoll":roll_pitch_yaw[0], "HeadPosePitch":roll_pitch_yaw[1], "HeadPoseYaw":roll_pitch_yaw[2]} ret.append(d) return ret def visualize(df, FRAMES): path_list = [f for f in os.listdir(FRAMES) if '.jpg' in f] path_list.sort() image = cv2.imread(os.path.join(FRAMES, path_list[0])) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) fig, ax = plt.subplots(1, figsize=(18,10)) for i in range(len(df.SubjectId.unique())): bbox = df.loc[(df.Frame == 0) & (df.SubjectId == i)]['HeadBox'].values print(bbox) if not np.any(pd.isna(bbox)) and len(bbox) > 0: bbox = np.array(bbox[0]) rect = patches.Rectangle((bbox[0],bbox[1]),bbox[2]-bbox[0],bbox[3]-bbox[1],linewidth=1,edgecolor='c',facecolor='none') plt.text(bbox[0], bbox[1], 'ID%i' % i, color='c' ,fontsize=20) ax.add_patch(rect) ax.imshow(image) plt.show() def visualize_sorting(df_sorted): subs = sorted(df_sorted[~df_sorted.PId.isna()].PId.unique()) for sid in subs: x = df_sorted[df_sorted.PId==sid].HeadCenter.apply(lambda x: x[0]) y = df_sorted[df_sorted.PId==sid].HeadCenter.apply(lambda x: x[1]) frames = df_sorted[df_sorted.PId==sid].Frame.to_list() plt.scatter(frames, x, alpha=.2, label = "Sub %i" % sid) plt.legend() plt.show() def process(file, maxPeople, cameraRes = [5760, 2880]): VIDEO = file VIDEOOUT = VIDEO.split("/")[-1].split(".")[0] ROOT = "/".join(VIDEO.split("/")[:-1]) + "/" TMP_DIR = "/".join(VIDEO.split("/")[:-2]) + "/temp/" FRAMES = "%s%s_frames" % (TMP_DIR, VIDEOOUT) if not os.path.exists(VIDEO): print('WARNING: Could not find video file') return script_path = "./" args = {} args["calib_file"] = "./calib_insta.pkl" args["vis_headpose"] = False # store_false args["save_headpose"] = False # store_false args["vis_gaze"] = False # store_false args["save_gaze"] = False # store_false args["save_estimate"] = False # store_false args["device_id_facedetection"] = "cuda:0" # store_false args["im_path"] = os.path.join(script_path, './samples_gaze/') args["output_path"] = os.path.join(script_path, './samples_gaze/') args["models"] = [os.path.join(script_path, '../model_nets/Model_allsubjects1.h5')] args['gaze_backend'] = 'tensorflow' tqdm.write('Loading networks') landmark_estimator = LandmarkMethodBase(device_id_facedetection=args["device_id_facedetection"], checkpoint_path_face=os.path.join(script_path, "../model_nets/SFD/s3fd_facedetector.pth"), checkpoint_path_landmark=os.path.join(script_path, "../model_nets/phase1_wpdc_vdc.pth.tar"), model_points_file=os.path.join(script_path, "../model_nets/face_model_68.txt")) #gaze_estimator = GazeEstimator("/gpu:0", args['models']) if args['gaze_backend'] == "tensorflow": from rt_gene.estimate_gaze_tensorflow import GazeEstimator gaze_estimator = GazeEstimator("/gpu:0", args['models']) elif args['gaze_backend'] == "pytorch": from rt_gene.estimate_gaze_pytorch import GazeEstimator gaze_estimator = GazeEstimator("cuda:0", args['models']) else: raise ValueError("Incorrect gaze_base backend, choices are: tensorflow or pytorch") if not os.path.isdir(args["output_path"]): os.makedirs(args["output_path"]) video = cv2.VideoCapture(VIDEO) print('Video frame count: ', video.get(cv2.CAP_PROP_FRAME_COUNT)) if args["calib_file"] is not None and os.path.exists(args["calib_file"]): _dist_coefficients, _camera_matrix = load_camera_calibration(args["calib_file"]) else: im_width = video.get(cv2.CAP_PROP_FRAME_WIDTH) im_height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) print('WARNING!!! You should provide the camera calibration file, otherwise you might get bad results. \n\ Using a crude approximation!') _dist_coefficients, _camera_matrix = np.zeros((1, 5)), np.array( [[im_height, 0.0, im_width / 2.0], [0.0, im_height, im_height / 2.0], [0.0, 0.0, 1.0]]) lstRet = [] for i in tqdm(list(range(int(video.get(cv2.CAP_PROP_FRAME_COUNT))))): image_file_name = "%s_%i.XXX" % (os.path.splitext(VIDEO)[0], i) ret, image = video.read() image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) lstRet.append(estimate_gaze(image_file_name, image, landmark_estimator, gaze_estimator, _dist_coefficients, _camera_matrix, args)) lst = list(itertools.chain.from_iterable(lstRet)) df = pd.DataFrame(lst) df["HeadCenter"] = df.HeadBox.apply(lambda x: getCenter(x)) df["Phi"] = df.GazePhi + df.HeadPosePhi # gaze yaw df["Theta"] = df.GazeTheta + df.HeadPoseTheta # gaze pitch df['Yaw'] = df.GazePhi + df.HeadPoseYaw df['Pitch'] = df.GazeTheta + df.HeadPosePitch # path = '%s%s_raw.pkl' % (TMP_DIR, VIDEOOUT) # df.to_pickle(path) # print('Saved raw detections to: ', path) visualize(df, FRAMES) # Sort ID detections ############################################################################################################### # Find first frame where all are detected for frame in sorted(df.Frame.unique()): frame_df = df.loc[df.Frame == frame] if len(frame_df['SubjectId'].unique()) == maxPeople: first_frame = frame print('First frame where all are detected: ', first_frame) break empty_rows = pd.DataFrame() empty_rows['Frame'] = np.zeros(maxPeople).astype(int) for col in df.columns: if not col == 'Frame': empty_rows[col] = df.loc[df.Frame == first_frame, [col]].values df = df.loc[df.Frame != 0] df = df.append(empty_rows).sort_values(by=['Frame']) df.head() df_sorted = df.copy() df_sorted["PId"] = None df_sorted.loc[df_sorted.Frame == 0, "PId"] = list(range(maxPeople)) df_sorted = df_sorted.sort_values("Frame") df_sorted.index = list(range(len(df_sorted))) for frameId in tqdm(sorted(df_sorted.Frame.unique())[1:]): pidAssignement = [] for frameIdBefore in range(frameId - 1, -1, -1): allFramesBefore = df_sorted[(df_sorted.Frame == frameIdBefore) & (~df_sorted.PId.isna())] if (np.array_equal(sorted(allFramesBefore.PId.to_list()), np.arange(maxPeople))): dfFramesCurrent = df_sorted[df_sorted.Frame == frameId] for indexCurrentFrame, frameCurrent in dfFramesCurrent.iterrows(): lst = [] for indexBeforeFrame, frameBefore in allFramesBefore.iterrows(): if (frameBefore.HeadCenter[0] > frameCurrent.HeadCenter[0]): p1 = np.array(frameCurrent.HeadCenter) p2 = np.array(frameBefore.HeadCenter) else: p1 = np.array(frameBefore.HeadCenter) p2 = np.array(frameCurrent.HeadCenter) v1 = p1 - p2 dist1 = np.linalg.norm(v1) p1[0] = p1[0] + cameraRes[0] v2 = p1 - p2 dist2 = np.linalg.norm(v2) dist = min([dist1, dist2]) lst.append([dist, frameCurrent.name, indexBeforeFrame, frameBefore]) lst.sort(key=lambda x: x[0]) pidAssignement.append([indexCurrentFrame, lst[0][-1].PId]) break for index, pid in pidAssignement: df_sorted.loc[df_sorted.index == index, "PId"] = pid visualize_sorting(df_sorted) del df_sorted["SubjectId"] # Rearrange DataFrame: each ID has specific columns ############################################################################################################### df_sorted = df_sorted[~df_sorted.PId.isna()].drop_duplicates(subset=['Frame', 'PId']) FACE_COUNT = len(df_sorted[~df_sorted.PId.isna()].PId.unique()) df2 = df_sorted.pivot(index='Frame', columns="PId", values=["Landmarks", "GazeTheta", "GazePhi", "HeadCenter", "HeadPoseTheta", "HeadPosePhi", "HeadPoseYaw", "HeadPosePitch", "HeadPoseRoll", "Phi", "Theta"]) lst = [] for label in ["Landmarks", "GazeTheta", "GazePhi", "Head", "HeadPoseTheta", "HeadPosePhi", "HeadPoseYaw", "HeadPosePitch", "HeadPoseRoll", "Phi", "Theta"]: for head_id in range(FACE_COUNT): lst.append("ID%i_%s" % (head_id, label)) df2.columns = lst df2 = df2.reset_index() path = "%s%s_RTGene.pkl" % (TMP_DIR, VIDEOOUT) df2.to_pickle(path) print("Saved RT-Gene detections to %s" % path)