import random import cv2 import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.ndimage import gaussian_filter from tqdm import tqdm from scipy import interpolate from sklearn.preprocessing import normalize # Atari-HEAD constants SIGMA = (210 / 44.6, 160 / 28.5) SUBJECT_TO_SCREEN = 787 SCREEN_WIDTH_MM = 646 SCREEN_HEIGHT_MM = 400 SCREEN_WIDTH_PX = 1280 SCREEN_HEIGHT_PX = 840 TYPES = {'frame_id': str, 'episode_id': int, 'score': int, 'duration(ms)': int, 'unclipped_reward': int, 'action': int, 'gaze_positions': list} ALE_ENUMS = {0: 'PLAYER_A_NOOP', 1: 'PLAYER_A_FIRE', 2: 'PLAYER_A_UP', 3: 'PLAYER_A_RIGHT', 4: 'PLAYER_A_LEFT', 5: 'PLAYER_A_DOWN', 6: 'PLAYER_A_UPRIGHT', 7: 'PLAYER_A_UPLEFT', 8: 'PLAYER_A_DOWNRIGHT', 9: 'PLAYER_A_DOWNLEFT', 10: 'PLAYER_A_UPFIRE', 11: 'PLAYER_A_RIGHTFIRE', 12: 'PLAYER_A_LEFTFIRE', 13: 'PLAYER_A_DOWNFIRE', 14: 'PLAYER_A_UPRIGHTFIRE', 15: 'PLAYER_A_UPLEFTFIRE', 16: 'PLAYER_A_DOWNRIGHTFIRE', 17: 'PLAYER_A_DOWNLEFTFIRE'} def txt_to_dataframe(path: str) -> pd.DataFrame: """Read txt file with annotations for trial line by line and add to new dataframe. Parameters ---------- path : str The path to the trial's txt file e.g. 291_RZ_7364933_May-08-20-23-25.txt Returns ------- pd.DataFrame Dataframe with one frame per row and columns TYPES if available. """ file = open(path, 'r') Lines = file.readlines() columns = Lines[0].strip().split(',') trial_df = pd.DataFrame(columns=columns) for line in Lines[1:]: raw_vals = line.strip().split(',') vals = dict() for i, c in enumerate(columns): if not c == 'gaze_positions': try: vals[c] = [TYPES.get(c)((raw_vals[i]))] except: vals[c] = None #print('WARNING', c, raw_vals[i]) else: # gaze_positions: x0,y0,x1,y1,...,xn,yn. Gaze positions for the current frame. # Could be null if no gaze. (0,0) is the top-left corner. x: horizontal axis. y: vertical. try: gaze_positions = np.array([float(v) for v in raw_vals[i:]]).reshape(-1, 2) except Exception as e: gaze_positions = None #print(f'WARNING: no gaze data available for frame_id: {vals["frame_id"]} because {e}', raw_vals[i]) new_df = pd.DataFrame(vals) new_df['gaze_positions'] = [gaze_positions] trial_df = pd.concat([trial_df, new_df], ignore_index=True) return trial_df def get_subgoal_proposals(df, threshold=0.35, visualize=False, room=1) -> dict(): # Get init screen for visualizations init_screen = cv2.imread(df.iloc[0].img_path) init_screen = cv2.cvtColor(init_screen, cv2.COLOR_BGR2RGB) subgoal_proposals = {} for episode in df.ID.unique(): gaze = df.loc[df.ID == episode].loc[df.room_id == room].loc[df.level==0].gaze_positions if gaze is None: continue # Generate saliency map saliency_map = np.zeros(init_screen.shape[:2]) for gaze_points in gaze: if gaze_points is not None: for item in gaze_points: try: saliency_map[int(item[1])][int(item[0])] += 1 except: # Not all gaze points are on image continue # Construct fixation map fix_map = saliency_map >= 1.0 # Construct empirical saliency map saliency_map = gaussian_filter(saliency_map, sigma=SIGMA, mode='nearest') # Normalize saliency map into range [0, 1] if not saliency_map.max() == 0: saliency_map /= saliency_map.max() proposals_y, proposals_x = np.where(saliency_map > threshold) bboxes = [] scores = [] for x, y in zip(proposals_x, proposals_y): # draw bounding box around saliency map peak in panama joe size box = [x - 5, y - 10, x + 5, y + 10] bboxes.append(box) scores.append(saliency_map[y][x]) if len(bboxes) == 0: continue # Non-max suppression keep = apply_nms(np.array(bboxes), np.array(scores), thresh_iou=0.1) # Merge boxes with any iou > 0 # Note: run might generate new ious > 0 merged = merge_boxes(keep) subgoal_proposals[episode] = [keep, merged] if visualize: print('Episode: ', episode) mask = saliency_map > threshold masked_saliency = saliency_map.copy() masked_saliency[~mask] = 0 img = masked_saliency.copy() for box in random.choices(bboxes, k=25): img = cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (1,0,0), 1) print('Number of bounding box proposals: ', len(bboxes)) fig = plt.figure(figsize=(8,8)) plt.imshow(init_screen) plt.imshow(img, cmap='jet', alpha=0.5) plt.axis('off') plt.show() print('Bounding boxes after non-maximum suppression') img = init_screen.copy() for box in keep: img = cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (255,0,0), 1) fig = plt.figure(figsize=(8,8)) plt.imshow(img) plt.axis('off') plt.show() print('Bounding boxes after merging') img = init_screen.copy() for box in keep: img = cv2.rectangle(img, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (255,0,0), 1) fig = plt.figure(figsize=(8,8)) plt.imshow(img) plt.axis('off') plt.show() return subgoal_proposals def visualize_sample(image, target): fig = plt.figure(figsize=(12,6)) ax1 = fig.add_subplot(131) ax2 = fig.add_subplot(132) ax3 = fig.add_subplot(133) gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) fov_image = np.multiply(target, gray) # element-wise product ax1.imshow(image) ax1.set_title('Input image') ax1.axis('off') ax2.imshow(target, cmap='jet') ax2.set_title('Saliency map') ax2.axis('off') ax3.imshow(fov_image, cmap='gray') ax3.set_title('Foveated image') ax3.axis('off') plt.show() def saliency_map_to_image(saliency_map): minimum_value = saliency_map.min() if minimum_value < 0: saliency_map = saliency_map - minimum_value saliency_map = saliency_map * 255 / saliency_map.max() image_data = np.round(saliency_map).astype(np.uint8) return image_data def apply_nms(boxes: np.ndarray, scores: np.ndarray = None, thresh_iou: float = 0.2) -> np.ndarray: """ adapted from https://learnopencv.com/non-maximum-suppression-theory-and-implementation-in-pytorch/ Apply non-maximum suppression to avoid detecting too many overlapping bounding boxes based on iou threshold. """ x1 = boxes[:, 0] # x coordinate of the top-left corner y1 = boxes[:, 1] # y coordinate of the top-left corner x2 = boxes[:, 2] # x coordinate of the bottom-right corner y2 = boxes[:, 3] # y coordinate of the bottom-right corner # calculate area of every block in boxes areas = (x2 - x1) * (y2 - y1) if scores is not None: # sort the prediction boxes according to their confidence scores order = scores.argsort() else: order = y2.argsort() # initialise an empty list for filtered prediction boxes keep = [] while len(order) > 0: # extract the index of the prediction with highest score and add to keep list idx = order[-1] keep.append(boxes[idx]) order = order[:-1] # sanity check if len(order) == 0: break # select coordinates of boxes according to the indices in order xx1 = np.take(x1, indices=order, axis=0) xx2 = np.take(x2, indices=order, axis=0) yy1 = np.take(y1, indices=order, axis=0) yy2 = np.take(y2, indices=order, axis=0) # find the coordinates of the intersection boxes xx1 = np.maximum(xx1, x1[idx]) yy1 = np.maximum(yy1, y1[idx]) xx2 = np.minimum(xx2, x2[idx]) yy2 = np.minimum(yy2, y2[idx]) # find out the width and the height of the intersection box w = np.maximum(0, xx2 - xx1) h = np.maximum(0, yy2 - yy1) # find the intersection area inter = w*h # find the areas of boxes according to indices in order rem_areas = np.take(areas, indices=order, axis=0) # find the union of every box with currently selected box union = (rem_areas - inter) + areas[idx] # find the IoU of every box with currently selected box IoU = inter / union # keep the boxes with IoU less than thresh_iou mask = IoU < thresh_iou order = order[mask] return np.array(keep) def merge_boxes(boxes: np.ndarray) -> np.ndarray: x1 = boxes[:, 0] # x coordinate of the top-left corner y1 = boxes[:, 1] # y coordinate of the top-left corner x2 = boxes[:, 2] # x coordinate of the bottom-right corner y2 = boxes[:, 3] # y coordinate of the bottom-right corner # calculate area of every block in boxes areas = (x2 - x1) * (y2 - y1) merged = [] indices = np.arange(len(boxes)) while len(indices) > 0: idx = indices[0] # find the coordinates of the intersection boxes xx1 = np.maximum(x1, x1[idx]) yy1 = np.maximum(y1, y1[idx]) xx2 = np.minimum(x2, x2[idx]) yy2 = np.minimum(y2, y2[idx]) # find out the width and the height of the intersection box w = np.maximum(0, xx2 - xx1) h = np.maximum(0, yy2 - yy1) # find the intersection over union of every box with currently selected box inter = w * h union = (areas - inter) + areas[idx] iou = inter / union merge_idx = np.where(iou > 0.0)[0] # box surrounding all selected boxes --> [min(x1), min(y1)] x [max(x2), max(y2)] big_box = [boxes[merge_idx, 0].min(), boxes[merge_idx, 1].min(), boxes[merge_idx, 2].max(), boxes[merge_idx, 3].max()] merged.append(big_box) delete_idx = [np.where(indices == i)[0] for i in merge_idx if len(np.where(indices == i)[0]) > 0] indices = np.delete(indices, delete_idx) return np.array(merged) def pixel_to_3D(gaze_positions): if gaze_positions.shape[0] != 2: gaze_positions = np.moveaxis(gaze_positions, 0, 1) x, y = gaze_positions x *= SCREEN_WIDTH_MM / SCREEN_WIDTH_PX y *= SCREEN_HEIGHT_MM / SCREEN_HEIGHT_PX gaze_positions_3D = np.array([x, y, [SUBJECT_TO_SCREEN] * len(x)]) return np.moveaxis(gaze_positions_3D, 0, 1) def get_velocity_vectorized(gaze: np.ndarray, ratio: float): # FIXED: https://stackoverflow.com/questions/52457989/pandas-df-apply-unexpectedly-changes-dataframe-inplace gaze = gaze.copy() # pixel coordinates to 3D world coordinates gaze_3D = pixel_to_3D(gaze) # vectorize gaze[i], gaze[i+1] by shifting vector by 1 u, v = gaze_3D[:-1], gaze_3D[1:] assert len(u) == len(v) """ # normalize try: u, v = normalize(u), normalize(v) except Exception as e: print(e) """ # Normalize each vector u and v --> ||u[i]|| = ||v[i]|| = 1 norm_mat_u = np.stack([np.linalg.norm(u, axis=1), np.linalg.norm(u, axis=1), np.linalg.norm(u, axis=1)], axis=1) norm_mat_v = np.stack([np.linalg.norm(v, axis=1), np.linalg.norm(v, axis=1), np.linalg.norm(v, axis=1)], axis=1) u /= norm_mat_u v /= norm_mat_v u_minus_v = np.linalg.norm(u - v, axis=1) # || u - v || u_plus_v = np.linalg.norm(u + v, axis=1) # || u + v || # angular displacement theta = 2 * np.arctan2(u_minus_v, u_plus_v) * 5.73 # converts the unit from radians to degrees # velocity with average fps velocity = (theta / ratio) * 10000 # converts the unit from microsecond to degrees per second return theta, velocity def interpolate_outliers(gaze, ratio, threshold=800, visualize=False): idx = np.where(gaze > threshold)[0][0] x = list(np.arange(len(gaze))) x.pop(idx) gaze = list(gaze) gaze.pop(idx) # outliers on border can't be interpolated -> remove entirely if idx == 0 or idx == len(gaze): return gaze else: f = interpolate.interp1d(x, gaze) if visualize: xnew = np.arange(0, ratio * (len(gaze) - 1) + 0.1, 0.1) ynew = f(xnew) plt.plot(x, gaze, 'o', xnew, ynew, '-', idx, f(idx), '*') plt.show() return np.array(gaze[:idx] + [float(f(idx))] + gaze[idx:]) def get_angle(center, point): pf = [center[0], center[1], SUBJECT_TO_SCREEN] cf = [point[0], point[1], SUBJECT_TO_SCREEN] v = np.dot(pf, cf) / np.dot(np.linalg.norm(pf), np.linalg.norm(cf)) angle = np.arccos(np.clip(v, a_min=-1, a_max=1)) return angle * 5.73 * 1000 def get_idt_dispersion(cfg): # Get dispersion of current fixation group to determine smooth pursuits # see https://github.com/M3stark/Eye_tracking_proj/blob/main/ivdt.py max_x, min_x = max(cfg[:, 0]), min(cfg[:, 0]) max_y, min_y = max(cfg[:, 1]), min(cfg[:, 1]) return (max_x - min_x) + (max_y - min_y) def agent_in_subgoal(subgoals, agent_x, agent_y): test_min_x = subgoals[:, 0] < agent_x test_max_x = subgoals[:, 2] > agent_x test_min_y = subgoals[:, 1] < agent_y test_max_y = subgoals[:, 3] > agent_y return np.any(test_min_x & test_max_x & test_min_y & test_max_y), np.where(test_min_x & test_max_x & test_min_y & test_max_y)[0]