import numpy as np import pandas as pd import json import colorsys import pickle as pkl import os import cv2 import math from PyQt5 import QtCore, QtGui, QtWidgets from threading import Lock, Thread from utils.util import sperical2equirec POSE_PAIRS = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [ 1, 11], [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17]] POSE_PAIRS_NEW = [[10, 9], [9, 8], [8, 1], [1, 11], [11, 12], [12, 13], [13, 12], [12, 11], [11, 1], [1, 2], [2, 3], [3, 4], [4, 3], [3, 2], [2, 1], [1, 5], [5, 6], [6, 7], [7, 6], [6, 5], [5, 1], [1, 0], [0, 15], [15, 17], [17, 15], [15, 0], [0, 14], [14, 16]] def getColors(N, bright=True): """ To get visually distinct colors, generate them in HSV space then convert to RGB. """ brightness = 1.0 if bright else 0.7 hsv = [(i / N, 1, brightness) for i in range(N)] colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv)) return colors class Processor(QtWidgets.QWidget): frame: int = None frameData: int = None fps: int = None frameCount: int = None frameSize: [float, float] = None movieFileName: str = None originalVideoResolution: (int, int) = None scaledVideoResolution: (int, int) = None dataFileName: str = None numberIDs: int = None visualize: list = None segments: list = None tags: list = None signalPoseSetInit = QtCore.pyqtSignal(dict, dict, list, int) signalSpeakerSetInit = QtCore.pyqtSignal(dict, list, int) signalGazeSetInit = QtCore.pyqtSignal(dict, list, int) signalInit = QtCore.pyqtSignal(list, int) signalInitTags = QtCore.pyqtSignal(list, tuple, dict, list) signalUpdateMovementGraph = QtCore.pyqtSignal(dict, list, int) signalUpdateSpeakGraph = QtCore.pyqtSignal(dict, int, int) signalUpdateHandVelocity = QtCore.pyqtSignal(dict, int) signalUpdateFaceAus = QtCore.pyqtSignal(dict) signalUpdateFaceImgs = QtCore.pyqtSignal(dict, int) signalVideoLabel = QtCore.pyqtSignal(int, int, int) signalPosePoints = QtCore.pyqtSignal(int, list, list) signalPoseChangedLabels = QtCore.pyqtSignal(dict, dict, int) signalSpeakChangedLabels = QtCore.pyqtSignal(dict, int) signalUpdateGazeGraph = QtCore.pyqtSignal(dict, int) signalUpdateGazeMap = QtCore.pyqtSignal(int, list, list) signalUpdateTagGraph = QtCore.pyqtSignal(dict) signalUpdateTags = QtCore.pyqtSignal(int, list, list) signalClearLabels = QtCore.pyqtSignal() signalClearPose = QtCore.pyqtSignal() signalClearGaze = QtCore.pyqtSignal() signalClearTags = QtCore.pyqtSignal() signalDeactivatePoseTab = QtCore.pyqtSignal(bool) signalDeactivateGazeTab = QtCore.pyqtSignal(bool) signalDeactivateFaceTab = QtCore.pyqtSignal(bool) signalDeactivateSpeakingTab = QtCore.pyqtSignal(bool) signalDeactivateObjectTab = QtCore.pyqtSignal(bool) def __init__(self, parent=None): super(Processor, self).__init__(parent) self.cap = None self.dataGaze = None self.dataGazeMeasures = None self.dataMovement = None self.dataFace = None self.dataRTGene = None self.dataSpeaker = None self.dataObjects = None self.colors = None self.tagColors = None self.videoScale = 1 self.updateAUs = dict() self.movementActivity = dict() self.tagMovement = dict() self.handActivity = dict() self.selectedIDs = None self._ready = False self.activeTab = 0 @QtCore.pyqtSlot(QtGui.QImage) def saveCurrentFrameData(self, newFrameData): if newFrameData is None: return newFrameData = newFrameData.convertToFormat(4) width = newFrameData.width() height = newFrameData.height() ptr = newFrameData.bits() ptr.setsize(newFrameData.byteCount()) self.frameData = np.array(ptr).reshape(height, width, 4) def updateFrame(self, position): threshold = 100 self.position = position if self._ready: frame = int((position / 1000.0) * self.fps) self.frame = frame movement = {} velocity = {} gaze = {} face_aus = {} speaking = {} tagData = {} # neck_points = list() f = self.dataRTGene.loc[self.dataRTGene['Frame'] == self.frame] for id_no in range(self.numberIDs): ### Facial Activity Data ### if self.dataFace is not None and self.activeTab == 3: face_aus[id_no] = self.dataFace.loc[ self.dataFace['Frame'] == self.frame, ['ID%i_AUs' % id_no]].values if len(face_aus[id_no]) > 0 \ and np.sum(np.logical_xor(self.updateAUs[id_no], [face_aus[id_no].flatten()[0] > 0.5])) > 0 \ and np.sum([face_aus[id_no].flatten()[0] > 0.5]) > 0 \ and np.sum([face_aus[id_no].flatten()[0] > 0.5]) > np.sum(self.updateAUs[id_no]): self.updateAUs[id_no] = [face_aus[id_no].flatten()[0] > 0.5] # print('Update AU Image: ', frame) self.get_current_frame(self.frame, id_no) elif len(face_aus[id_no]) > 0: self.updateAUs[id_no] = [face_aus[id_no].flatten()[0] > 0.5] ### Body Movement Data ### if self.dataMovement is not None: if self.visualize and self.visualize['Pose'].isChecked(): if self.selectedIDs[id_no]: keypoints = self.dataMovement['ID%i_Keypoints' % id_no].iloc[frame] lstX = [] lstY = [] # Plot Skeleton --> connections via pose pairs for i in range(len(POSE_PAIRS_NEW)): index = POSE_PAIRS_NEW[i] if keypoints is None: continue A, B = keypoints[index] if A is None or B is None: continue lstX.append(A[0]) lstX.append(B[0]) lstY.append(A[1]) lstY.append(B[1]) if len(lstX) > 0 and len(lstY) > 0: self.signalPosePoints.emit(id_no, lstX, lstY) else: self.signalPosePoints.emit(id_no, [], []) else: self.signalClearPose.emit() movement[id_no] = self.movementActivity[id_no][frame: frame + 200], np.arange(frame - 199, frame + 1) velocity[id_no] = self.dataMovement['ID%i_Velocity' % id_no].iloc[frame] ### Gaze RTGene Data ### if self.dataRTGene is not None: # Update Labels head = self.dataRTGene['ID%i_Head' % id_no].iloc[frame] if head is not None: if self.visualize and self.visualize['Label'].isChecked(): self.signalVideoLabel.emit(id_no, head[0], head[1]) else: self.signalClearLabels.emit() # Build heatmap if self.visualize and self.visualize['Gaze'].isChecked(): if self.selectedIDs[id_no]: if frame <= threshold: target_x = self.dataRTGene['ID%i_target_x' % id_no].iloc[: frame + 1].values.tolist() target_y = self.dataRTGene['ID%i_target_y' % id_no].iloc[: frame + 1].values.tolist() else: target_x = self.dataRTGene['ID%i_target_x' % id_no].iloc[ frame - threshold: frame + 1].values.tolist() target_y = self.dataRTGene['ID%i_target_y' % id_no].iloc[ frame - threshold: frame + 1].values.tolist() self.signalUpdateGazeMap.emit(id_no, target_x, target_y) else: self.signalClearGaze.emit() if not f.empty and self.activeTab == 0: position = f['ID%i_Head' % id_no].values.flatten()[0] gaze_phi = f['ID%i_Phi' % id_no].values.flatten()[0] if not np.any(pd.isnull(position)) and not np.any(pd.isnull(gaze_phi)): gaze[id_no] = self.calculateGazeData(position, gaze_phi) elif self.dataMovement is not None: neck = self.dataMovement['ID%s_Keypoints' % id_no].map( lambda x: x[1] if x is not None else None).map( lambda x: x[:2] if x is not None else None) # neck_points.append(neck.iloc[frame]) if self.visualize and self.visualize['Label'].isChecked(): if neck.iloc[frame] is not None: self.signalVideoLabel.emit(id_no, neck.iloc[frame][0], neck.iloc[frame][1]) else: self.signalClearLabels.emit() ### Speaking Data ### if self.dataSpeaker is not None and self.activeTab == 1: e = self.dataSpeaker.loc[self.dataSpeaker.Frame < frame] rst = e['ID%i_is_speaker' % id_no].sum() / (len(e) + 1) speaking[id_no] = rst ### Object Data ### if self.dataObjects is not None: for tag in self.tags: tagData[tag] = self.tagMovement[tag][frame: frame + 200], np.arange(frame - 199, frame + 1) if self.visualize and self.visualize['Tags'].isChecked(): if frame <= 30: position = self.dataObjects[tag].iloc[: frame + 1].values.tolist() else: position = self.dataObjects[tag].iloc[frame - 30: frame + 1].values.tolist() x_values = [x[0] for x in position if x is not None] y_values = [x[1] for x in position if x is not None] self.signalUpdateTags.emit(tag, x_values, y_values) else: self.signalClearTags.emit() ### Send collected data to respective Tabs ### if self.dataFace is not None and self.activeTab == 3: self.signalUpdateFaceAus.emit(face_aus) if self.dataMovement is not None and self.activeTab == 2: self.signalUpdateMovementGraph.emit(movement, self.colors, self.numberIDs) self.signalUpdateHandVelocity.emit(velocity, self.numberIDs) if self.dataRTGene is not None and self.activeTab == 0: self.signalUpdateGazeGraph.emit(gaze, self.numberIDs) if self.dataSpeaker is not None and self.activeTab == 1: active = self.dataSpeaker.loc[self.dataSpeaker.Frame == frame, sorted( [col for col in self.dataSpeaker.columns if 'speak_score' in col])].values.flatten() active = active[~pd.isnull(active)] if active.size > 0: active_speaker = np.argmax(active) else: active_speaker = None self.signalUpdateSpeakGraph.emit(speaking, active_speaker, self.numberIDs) if self.dataObjects is not None and self.activeTab == 4: self.signalUpdateTagGraph.emit(tagData) @QtCore.pyqtSlot(int) def tabChanged(self, current): self.activeTab = current @QtCore.pyqtSlot(list) def onSelectedID(self, lst): for i, button in enumerate(lst): if button.isChecked(): self.selectedIDs[i] = True else: self.selectedIDs[i] = False @QtCore.pyqtSlot(int) def get_current_frame(self, frame, id_no): face_imgs = {} if os.name == 'nt': # if on windows we have to read the image self.cap.set(1, frame) ret, image = self.cap.read() if ret: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) else: return else: # we can use the image from QT-decoding image = cv2.cvtColor(self.frameData, cv2.COLOR_BGR2RGB) # Get 66 landmarks from RT Gene img_land = self.dataRTGene.loc[self.dataRTGene.Frame == frame, ['ID%i_Landmarks' % id_no]].values[0] if len(img_land) > 0: img_land = img_land[0] * self.videoScale # Convert 68 landmarks to 49 img_land = np.delete(img_land, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 62, 66], axis=0).flatten() face_crop, _ = self.crop_face(image, img_land) face_imgs[id_no] = face_crop else: face_imgs[id_no] = None self.signalUpdateFaceImgs.emit(face_imgs, id_no) def crop_face(self, img, img_land, box_enlarge=4, img_size=200): leftEye0 = (img_land[2 * 19] + img_land[2 * 20] + img_land[2 * 21] + img_land[2 * 22] + img_land[2 * 23] + img_land[2 * 24]) / 6.0 leftEye1 = (img_land[2 * 19 + 1] + img_land[2 * 20 + 1] + img_land[2 * 21 + 1] + img_land[2 * 22 + 1] + img_land[2 * 23 + 1] + img_land[2 * 24 + 1]) / 6.0 rightEye0 = (img_land[2 * 25] + img_land[2 * 26] + img_land[2 * 27] + img_land[2 * 28] + img_land[2 * 29] + img_land[2 * 30]) / 6.0 rightEye1 = (img_land[2 * 25 + 1] + img_land[2 * 26 + 1] + img_land[2 * 27 + 1] + img_land[2 * 28 + 1] + img_land[2 * 29 + 1] + img_land[2 * 30 + 1]) / 6.0 deltaX = (rightEye0 - leftEye0) deltaY = (rightEye1 - leftEye1) l = math.sqrt(deltaX * deltaX + deltaY * deltaY) sinVal = deltaY / l cosVal = deltaX / l mat1 = np.mat([[cosVal, sinVal, 0], [-sinVal, cosVal, 0], [0, 0, 1]]) mat2 = np.mat([[leftEye0, leftEye1, 1], [rightEye0, rightEye1, 1], [img_land[2 * 13], img_land[2 * 13 + 1], 1], [img_land[2 * 31], img_land[2 * 31 + 1], 1], [img_land[2 * 37], img_land[2 * 37 + 1], 1]]) mat2 = (mat1 * mat2.T).T cx = float((max(mat2[:, 0]) + min(mat2[:, 0]))) * 0.5 cy = float((max(mat2[:, 1]) + min(mat2[:, 1]))) * 0.5 if (float(max(mat2[:, 0]) - min(mat2[:, 0])) > float(max(mat2[:, 1]) - min(mat2[:, 1]))): halfSize = 0.5 * box_enlarge * float((max(mat2[:, 0]) - min(mat2[:, 0]))) else: halfSize = 0.5 * box_enlarge * float((max(mat2[:, 1]) - min(mat2[:, 1]))) scale = (img_size - 1) / 2.0 / halfSize mat3 = np.mat([[scale, 0, scale * (halfSize - cx)], [0, scale, scale * (halfSize - cy)], [0, 0, 1]]) mat = mat3 * mat1 aligned_img = cv2.warpAffine(img, mat[0:2, :], (img_size, img_size), cv2.INTER_LINEAR, borderValue=(128, 128, 128)) land_3d = np.ones((int(len(img_land) / 2), 3)) land_3d[:, 0:2] = np.reshape(np.array(img_land), (int(len(img_land) / 2), 2)) mat_land_3d = np.mat(land_3d) new_land = np.array((mat * mat_land_3d.T).T) new_land = np.reshape(new_land[:, 0:2], len(img_land)) return aligned_img, new_land def calculateAllMeasures(self): """Recalculate all measures for selected segments for export""" movement = dict() gaze = dict() speaking_dict = dict() face = dict() if self.segments is None: segments = np.ones(len(self.dataRTGene)) else: segments = self.segments if self.dataMovement is not None: dataMov = self.dataMovement.loc[segments == 1] total = len(dataMov) for id_no in range(self.numberIDs): x_mov = [np.linalg.norm(x) if x is not None else np.nan for x in dataMov['ID%i_Movement' % id_no]] # Add frames until start of segment to frame number mostActivity = np.argmax(np.array(x_mov)) + np.argmax(segments) # Frames with both hands tracked tracked = dataMov.loc[dataMov['ID%s_HandsTracked' % id_no] == 2, ['ID%s_HandsTracked' % id_no]].count() high_vel = dataMov.loc[dataMov['ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count() movement[id_no] = {'Most activity': int(mostActivity), 'Hands above table (relative)': float(tracked[0] / total), 'Gestures (relative)': float(high_vel / total)} if self.dataSpeaker is not None: dataSpeak = self.dataSpeaker.loc[segments == 1] for id_no in range(self.numberIDs): tracked_frames = dataSpeak[dataSpeak.notnull()].count()['ID%i_is_speaker' % id_no] rst = dataSpeak['ID%i_is_speaker' % id_no].sum() / len(dataSpeak) turns = [] counters = [] counter = 0 turn = 0 lastFrame = 0 switch = False for frame in sorted(dataSpeak.Frame): if dataSpeak.loc[dataSpeak.Frame == frame, ['ID%i_is_speaker' % id_no]].values and frame == ( lastFrame + 1): switch = True turn = turn + 1 elif switch: if turn >= 30: turns.append(turn) counter = counter + 1 turn = 0 switch = False if frame % int(self.fps * 60) == 0: counters.append(counter) counter = 0 lastFrame = frame avg_turn = np.mean(np.array(turns)) / self.fps avg_count = np.mean(np.array(counters)) num_turns = len(turns) speaking_dict[id_no] = {'Tracked frames': int(tracked_frames), 'Speaking time (relative)': float(rst), 'Number ofr speaking turns': int(num_turns), 'Average length of speaking turn (seconds)': float(avg_turn), 'Average number of speaking turns per minute': float(avg_count)} if self.dataGazeMeasures is not None: dataGaze = self.dataGazeMeasures.loc[segments == 1] for id_no in range(self.numberIDs): # ID looked at other people for frames look = dataGaze['ID%i_looks_at' % id_no].dropna().count() # ID was watched by other people for frames watched = dataGaze['ID%i_watched_by' % id_no].map( lambda x: 1 if not np.any(pd.isna(x)) and len(x) > 0 else 0).sum() tracked = dataGaze['ID%i_tracked' % id_no].sum() gaze[id_no] = {'Tracked frames': int(tracked), 'lookSomeone': float(look / tracked), 'totalNoLook': float((tracked - look) / tracked), 'totalWatched': float(watched / tracked), 'ratioWatcherLookSOne': float(watched / look)} if self.dataFace is not None: dataFaceAUs = self.dataFace.loc[segments == 1] dict_aus = np.array(['AU1: Inner Brow Raiser', 'AU2: Outer Brow Raiser', 'AU4: Brow Lowerer', 'AU5: Upper Lid Raiser', 'AU6: Cheek Raiser', 'AU9: Nose Wrinkler', 'AU12: Lip Corner Puller', 'AU15: Lip Corner Depressor', 'AU17: Chin Raiser', 'AU20: Lip Stretcher', 'AU25: Lips Part', 'AU26: Jaw Drop']) for id_no in range(self.numberIDs): face[id_no] = [] for i, au in enumerate(dict_aus): au_data = [a[i] for a in dataFaceAUs['ID%i_AUs' % id_no] if not np.all(pd.isna(a))] au_data = np.array(au_data) > 0.5 face[id_no].append(au + ' : ' + str(au_data.sum())) return gaze, speaking_dict, movement, face def calculateGazeData(self, position, yaw): # Get position in shperical coordinates (in radian) id_u = position[0] / self.frameSize[0] id_theta = id_u * 2 * np.pi # Adjust position to more intuitive from video id_theta = (id_theta * -1) - np.pi # ID position on coordinate system id_pos_x = np.cos(id_theta) id_pos_y = np.sin(id_theta) x, y = self.get_circle(0.05) circle_x = x + id_pos_x circle_y = y + id_pos_y # Add angle - RTGene yaw is in radian id_target = id_theta + np.pi - yaw id_x1_target = np.cos(id_target) id_x2_target = np.sin(id_target) # Line line_x = np.array([id_pos_x, id_x1_target]) line_y = np.array([id_pos_y, id_x2_target]) xdata = np.append(circle_x, line_x) ydata = np.append(circle_y, line_y) return [xdata, ydata] def get_circle(self, radius): theta = np.linspace(0, 2 * np.pi, 100) x = radius * np.cos(theta) y = radius * np.sin(theta) return np.array(x), np.array(y) @QtCore.pyqtSlot(dict) def onVisualize(self, lst): self.visualize = lst @QtCore.pyqtSlot(np.ndarray) def _updateSegments(self, segments): """ Recalculate movement and speaking measures when segment was changed""" # save segments for exporting only wanted timeranges self.segments = segments if self.dataMovement is not None: dataMov = self.dataMovement.loc[segments == 1] total = len(dataMov) mostActivity = dict() hand = dict() for id_no in range(self.numberIDs): x_mov = [x[0] if x is not None else np.nan for x in dataMov['ID%i_Movement' % id_no]] # Add frames until start of segment to frame number mostActivity[id_no] = np.argmax(np.array(x_mov)) + np.argmax(segments) # Frames with both hands tracked tracked = dataMov.loc[dataMov['ID%s_HandsTracked' % id_no] == 2, ['ID%s_HandsTracked' % id_no]].count() high_vel = dataMov.loc[dataMov['ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count() hand[id_no] = [total, tracked[0], high_vel] self.signalPoseChangedLabels.emit(mostActivity, hand, self.numberIDs) if self.dataSpeaker is not None: diff = len(segments) - len(self.dataSpeaker) dataSpeak = self.dataSpeaker # dataSpeak['Frame'] = dataSpeak.index if diff > 0: speakSegments = segments[:-diff] elif diff < 0: speakSegments = np.append(segments, [*np.zeros(diff)]) else: speakSegments = segments dataSpeak = self.dataSpeaker.loc[speakSegments == 1] speaking_dict = dict() for id_no in range(self.numberIDs): tracked_frames = dataSpeak[dataSpeak.notnull()].count()['ID%i_is_speaker' % id_no] rst = dataSpeak['ID%i_is_speaker' % id_no].sum() / len(dataSpeak) speaking_dict[id_no] = [tracked_frames, rst] # , num_turns, avg_turn, avg_count self.signalSpeakChangedLabels.emit(speaking_dict, self.numberIDs) def calculateSpeakingMeasures(self): if self.dataSpeaker is None: return speaking_dict = dict() total = len(self.dataSpeaker) for id_no in range(self.numberIDs): tracked_frames = self.dataSpeaker[self.dataSpeaker.notnull()].count()['ID%i_is_speaker' % id_no] rst = self.dataSpeaker['ID%i_is_speaker' % id_no].sum() / total turns = [] counters = [] counter = 0 turn = 0 switch = False for frame in sorted(self.dataSpeaker.Frame): if self.dataSpeaker.loc[self.dataSpeaker.Frame == frame, ['ID%i_is_speaker' % id_no]].values: switch = True turn = turn + 1 elif switch: if turn >= 30: turns.append(turn) counter = counter + 1 turn = 0 switch = False if frame % int(self.fps * 60) == 0: counters.append(counter) counter = 0 avg_turn = np.mean(np.array(turns)) / self.fps avg_count = np.mean(np.array(counters)) num_turns = len(turns) speaking_dict[id_no] = [tracked_frames, rst, num_turns, avg_turn, avg_count] self.signalSpeakerSetInit.emit(speaking_dict, self.colors, self.numberIDs) def calculateMovementMeasures(self): """ initial calculation of hand velocity on full data """ if self.dataMovement is None: return total = len(self.dataMovement) mostActivity = {} for id_no in range(self.numberIDs): x_mov = [np.linalg.norm(x) if x is not None else np.nan for x in self.dataMovement['ID%i_Movement' % id_no]] mostActivity[id_no] = np.argmax(np.array(x_mov)) self.movementActivity[id_no] = np.array([*np.zeros(200), *x_mov]) # Left Wrist and Right Wrist: idx 4, idx 7 self.dataMovement['ID%i_HandsTracked' % id_no] = self.dataMovement['ID%i_Keypoints' % id_no].map( lambda x: ((np.sum(x[4] is not None) + np.sum(x[7] is not None)) // 3) if x is not None else None) # Pixel position of left and right wrist self.dataMovement['ID%i_Hand1_Vel' % id_no] = self.dataMovement['ID%s_Keypoints' % id_no].map( lambda x: x[4] if not np.all(pd.isna(x)) else np.nan).map( lambda x: x[:2].astype(float) if not np.all(pd.isna(x)) else np.nan) self.dataMovement['ID%i_Hand1_Vel' % id_no] = self.dataMovement['ID%i_Hand1_Vel' % id_no].pct_change(1).map( lambda x: np.abs(x.mean()) * 100 if not np.all(pd.isna(x)) else None) self.dataMovement['ID%i_Hand2_Vel' % id_no] = self.dataMovement['ID%s_Keypoints' % id_no].map( lambda x: x[7] if not np.all(pd.isna(x)) else np.nan).map( lambda x: x[:2].astype(float) if not np.all(pd.isna(x)) else np.nan) self.dataMovement['ID%i_Hand2_Vel' % id_no] = self.dataMovement['ID%i_Hand2_Vel' % id_no].pct_change(1).map( lambda x: np.abs(x.mean()) * 100 if not np.all(pd.isna(x)) else None) self.dataMovement['ID%i_Velocity' % id_no] = self.dataMovement[ ['ID%i_Hand1_Vel' % id_no, 'ID%i_Hand2_Vel' % id_no]].mean(axis=1) # Frames with both hands tracked tracked = self.dataMovement.loc[self.dataMovement['ID%s_HandsTracked' % id_no] == 2, ['ID%s_HandsTracked' % id_no]].count() high_vel = self.dataMovement.loc[self.dataMovement[ 'ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count() self.handActivity[id_no] = [total, tracked[0], high_vel] self.signalPoseSetInit.emit(mostActivity, self.handActivity, self.colors, self.numberIDs) def calculateGazeMeasures(self): """Initial calculation of gaze measures: dataGazeMeasures """ thresh = 15 eq_width = self.frameSize[0] totWatcher = {} lookSomeOne = {} tracked = {} for i in range(self.numberIDs): totWatcher[i] = [] lookSomeOne[i] = [] tracked[i] = [] for frame in self.dataRTGene.Frame: f = self.dataRTGene.loc[self.dataRTGene.Frame == frame] angles = [] positions = [] targets = [] for id_no in range(self.numberIDs): pos = f['ID%i_Head' % id_no].values.flatten()[0] phi = f['ID%i_Phi' % id_no].values.flatten()[0] pos = np.array(pos, dtype=np.float) phi = np.array(phi, dtype=np.float) if np.any(np.isnan(pos)) or np.any(np.isnan(phi)): positions.append(np.nan) angles.append(np.nan) targets.append(np.nan) tracked[id_no].append(False) continue tracked[id_no].append(True) # Get position in shperical coordinates id_u = pos[0] / eq_width id_theta = id_u * 2 * np.pi id_theta = np.rad2deg(id_theta) positions.append(id_theta) # Add angle - gaze[1] is yaw angle = np.rad2deg(phi) id_target = id_theta + 180 + angle targets.append(id_target % 360) angles.append(angle) # plot_frame_calculated(positions, angles) watcher = dict() for i in range(self.numberIDs): watcher[i] = [] for i, t in enumerate(targets): inside_min = np.array([(e - thresh) < targets[i] if not np.isnan(e) else False for e in positions]) inside_max = np.array([(e + thresh) > targets[i] if not np.isnan(e) else False for e in positions]) # print(inside_min, inside_max) if np.any(inside_min) and np.any(inside_max): test = np.logical_and(inside_min, inside_max) idx = np.where(test)[0] for j in range(len(idx)): # ID i watches idx[j] lookSomeOne[i].append([frame, idx[j]]) # ID idx[j] is being looked at by i watcher[idx[j]].append(i) for k, v in watcher.items(): totWatcher[k].append([frame, v]) df_totWatcher = pd.DataFrame(columns={'Frame'}) for i in range(self.numberIDs): df_id = pd.DataFrame.from_dict(totWatcher.get(i)) df_id = df_id.rename(columns={0: "Frame", 1: "ID{}_watched_by".format(i)}) df_totWatcher = pd.merge(df_totWatcher, df_id, how='outer', on=['Frame'], sort=True) df_lookSomeOne = pd.DataFrame(columns={'Frame'}) for i in range(self.numberIDs): df_id = pd.DataFrame.from_dict(lookSomeOne.get(i)) df_id = df_id.rename(columns={0: "Frame", 1: "ID{}_looks_at".format(i)}) df_lookSomeOne = pd.merge(df_lookSomeOne, df_id, how='outer', on=['Frame'], sort=True) df_tracked = pd.DataFrame(columns={'Frame'}) for i in range(self.numberIDs): df_id = pd.DataFrame.from_dict(tracked.get(i)) df_id.index.name = 'Frame' df_id = df_id.rename(columns={0: "ID{}_tracked".format(i)}) df_tracked = pd.merge(df_tracked, df_id, how='outer', on=['Frame'], sort=True) self.dataGazeMeasures = pd.merge(df_lookSomeOne, df_totWatcher, how='outer', on=['Frame'], sort=True) self.dataGazeMeasures = pd.merge(self.dataGazeMeasures, df_tracked, how='outer', on=['Frame'], sort=True) gaze = dict() for id_no in range(self.numberIDs): # print(self.dataGazeMeasures['ID%i_watched_by' % id_no]) # ID looked at other people for frames look = self.dataGazeMeasures['ID%i_looks_at' % id_no].dropna().count() # ID was watched by other people for frames watched = self.dataGazeMeasures['ID%i_watched_by' % id_no].map( lambda x: 1 if not np.any(pd.isna(x)) and len(x) > 0 else 0).sum() tracked = self.dataGazeMeasures['ID%i_tracked' % id_no].sum() gaze[id_no] = [look, watched, tracked] self.signalGazeSetInit.emit(gaze, self.colors, self.numberIDs) def calculateGazeTargets(self): # Compute gaze targets for id_no in range(self.numberIDs): # self.dataRTGene['ID%i_Phi' % id_no] = self.dataRTGene['ID%i_Phi' % id_no].rolling(15).mean() self.id_no = id_no self.dataRTGene['ID%i_alpha' % id_no] = self.dataRTGene['ID%i_Phi' % id_no].map( lambda x: np.rad2deg(x) - 180 if x is not None else None) self.dataRTGene['ID%i_beta' % id_no] = self.dataRTGene['ID%i_Theta' % id_no].map( lambda x: 180 - 2 * np.rad2deg(x) if x is not None else None) self.dataRTGene['ID%i_target_spher' % id_no] = self.dataRTGene.apply(self.fun, axis=1) self.dataRTGene[['ID%i_target_x' % id_no, 'ID%i_target_y' % id_no]] = self.dataRTGene.apply(self.fun, axis=1, result_type="expand") def fun(self, x): alpha = x['ID%i_alpha' % self.id_no] beta = x['ID%i_beta' % self.id_no] pos = x['ID%i_Head' % self.id_no] # print(pos, pd.isna(pos), type(pos)) # Discard frames where not all detected if np.any(pd.isna(pos)) or np.any(pd.isna(alpha)) or np.any(pd.isna(beta)): return None, None # Get position in spherical coordinates theta = np.rad2deg((pos[0] / self.frameSize[0]) * 2 * np.pi) phi = np.rad2deg((pos[1] / self.frameSize[1]) * np.pi) # Get position in image frame (equirectangular projection) x, y = sperical2equirec((theta + alpha) % 360, (phi + beta) % 180, self.frameSize[0], self.frameSize[1]) return x, y def calculateTagMeasures(self): if self.dataObjects is None: return for tag in self.tags: neutral = self.dataObjects[tag].dropna().iloc[0] # print('Tag #%i Starting point set to: %s' % (tag, str(neutral))) self.dataObjects['%i_Movement' % tag] = self.dataObjects[tag].map( lambda x: np.subtract(x, neutral) if x is not None else None) # Euclidian distance x_mov = [np.linalg.norm(x) if x is not None else None for x in self.dataObjects['%i_Movement' % tag]] self.tagMovement[tag] = np.array([*np.zeros(200), *x_mov]) def readData(self, movieFileName, dataFileName, verbose=False): self.movieFileName = movieFileName self.dataFileName = dataFileName if (verbose): print("## Start Reading Data") # Read Video Data f = self.movieFileName print('Reading video from %s' % f) if os.path.isfile(f): self.cap = cv2.VideoCapture(f) self.fps = self.cap.get(cv2.CAP_PROP_FPS) self.frameCount = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.scaledVideoResolution = [self.cap.get(cv2.CAP_PROP_FRAME_WIDTH), self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)] if (verbose): print('Video resolution: ', self.scaledVideoResolution) print("Video frameCount %i" % self.frameCount) duration = self.frameCount / self.fps minutes = int(duration / 60) seconds = duration % 60 print('Video duration (M:S) = ' + str(minutes) + ':' + str(seconds)) else: print("WARNING: no video available.") # read data file with open(self.dataFileName, 'rb') as f: data = pkl.load(f) if "originalVideoResolution" in data: self.originalVideoResolution = data["originalVideoResolution"] self.videoScale = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) / self.originalVideoResolution[0] self.frameSize = data["originalVideoResolution"] if verbose: print('Video resolution scale factor: ', self.videoScale) # Read RTGene Data if "RTGene" in data: self.dataRTGene = data["RTGene"] self.dataRTGene = self.dataRTGene.where(pd.notnull(self.dataRTGene), None) self.numberIDs = len([col for col in self.dataRTGene.columns if 'Landmarks' in col]) else: self.signalDeactivateGazeTab.emit(True) print("WARNING: no RTGene data avaibale. Deactivating gaze tab.") # Read Movement Data if "BodyMovement" in data: self.dataMovement = data["BodyMovement"] if not self.numberIDs: self.numberIDs = len([col for col in self.dataMovement if 'Movement' in col]) if verbose: print('Body movement sample count %i' % len(self.dataMovement)) else: self.signalDeactivatePoseTab.emit(True) print('WARNING: no body movement data available. Deactivating pose tab.') # Read Facial Activity Data if "ActivityUnits" in data: self.dataFace = data["ActivityUnits"] if not self.numberIDs: self.numberIDs = len([col for col in self.dataFace.columns if 'AUs' in col]) if (verbose): print("Activity Units sample count %i" % len(self.dataFace)) else: self.signalDeactivateFaceTab.emit(True) print("WARNING: no face activity data available. Deactivating face tab.") # Read Speaker Diarization Data if 'Speaker' in data: self.dataSpeaker = data['Speaker'] else: self.signalDeactivateSpeakingTab.emit(True) print('WARNING: no speaking data available. Deactivating speaking tab.') # Read AprilTag Data if 'April' in data: self.dataObjects = data['April'] self.tags = [col for col in self.dataObjects.columns if type(col) == int] self.tagColors = [tuple(np.random.random(size=3) * 256) for i in range(len(self.tags))] tracked = dict() for tag in self.tags: tracked[tag] = self.dataObjects[tag].dropna().count() / len(self.dataObjects) self.signalInitTags.emit(self.tags, self.originalVideoResolution, tracked, self.tagColors) else: self.signalDeactivateObjectTab\ .emit(True) print('WARNING: no object detection data available. Deactivating object tab.') # Set colors: To get visually distinct colors, generate them in HSV space then convert to RGB. hsv = [(i / self.numberIDs, 1, 1.0) for i in range(self.numberIDs)] # 1.0 brightness self.colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv)) self.selectedIDs = [] for id_no in range(self.numberIDs): self.updateAUs[id_no] = np.zeros(12) self.selectedIDs.append(True) self.calculateTagMeasures() self.calculateGazeTargets() self.calculateGazeMeasures() self.signalInit.emit(self.colors, self.numberIDs) def export(self): # get export location fileName = QtGui.QFileDialog.getSaveFileName(self, "Export calculations", self.dataFileName.replace( "dat", "json"), "Json File (*.json);;All Files (*)") if fileName[0] == '': return # collect all new calculated values data = dict() gaze, speaking, movement, face = self.calculateAllMeasures() for id_no in range(self.numberIDs): data['ID%i' % id_no] = {'Eye Gaze': gaze.get(id_no), 'Speaking Activity': speaking.get(id_no), 'Body and Hand Movement': movement.get(id_no), 'Face Activity': face.get(id_no)} with open(fileName[0], 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) segment_id = self._get_segment_ids() # export all dataframes as csv if self.dataRTGene is not None: if self.segments is None: self.dataRTGene['segment'] = 0 self.dataRTGene.to_csv(fileName[0].replace( ".json", "-gaze.csv"), index=True, encoding='utf-8') else: self.dataRTGene['segment'] = segment_id self.dataRTGene[self.segments[1:] == 1].to_csv(fileName[0].replace( ".json", "-gaze.csv"), index=True, encoding='utf-8') if self.dataMovement is not None: if self.segments is None: self.dataMovement['segment'] = 0 self.dataMovement.to_csv(fileName[0].replace( ".json", "-body-movement.csv"), index=True, encoding='utf-8') else: self.dataMovement['segment'] = segment_id self.dataMovement[self.segments == 1].to_csv(fileName[0].replace( ".json", "-body-movement.csv"), index=True, encoding='utf-8') if self.dataFace is not None: if self.segments is None: self.dataFace['segment'] = 0 self.dataFace.to_csv(fileName[0].replace( ".json", "-facial-activity.csv"), index=True, encoding='utf-8') else: self.dataFace['segment'] = segment_id self.dataFace[self.segments == 1].to_csv(fileName[0].replace( ".json", "-facial-activity.csv"), index=True, encoding='utf-8') if self.dataSpeaker is not None: if self.segments is None: self.dataSpeaker['segment'] = 0 self.dataSpeaker.to_csv(fileName[0].replace( ".json", "-speaker.csv"), index=True, encoding='utf-8') else: self.dataSpeaker['segment'] = segment_id self.dataSpeaker[self.segments == 1].to_csv(fileName[0].replace( ".json", "-speaker.csv"), index=True, encoding='utf-8') if self.dataObjects is not None: if self.dataObjects is None: self.dataObjects['segment'] = 0 self.dataObjects.to_csv(fileName[0].replace( ".json", "-objects.csv"), index=True, encoding='utf-8') else: self.dataObjects['segment'] = segment_id self.dataObjects[self.segments == 1].to_csv(fileName[0].replace( ".json", "-objects.csv"), index=True, encoding='utf-8') print('Exported data to', fileName[0]) def _get_segment_ids(self): if not self.segments: return None segment_id = [-1 for s in self.segments] segment_counter = -1 old = self.segments[0] segment_id[0] = 0 for i, current in enumerate(self.segments[1:]): if current == 1: if old != current: segment_counter += 1 segment_id[i + 1] = segment_counter old = current return segment_id def getColors(self): return self.colors def getTags(self): return self.tags def getTagColors(self): return self.tagColors def getFrameCount(self): return self.frameCount def getFrameSize(self): return self.frameSize def getFPS(self): return self.fps def getVideo(self): return self.movieFileName def getGazeData(self): return self.dataGaze def getFrame(self, frameIdx): return frameIdx def getFrameCurrent(self): return 1 def getNumberIDs(self): return self.numberIDs def getMovementData(self): return self.dataMovement def setReady(self, ready): self._ready = ready def getOriginalVideoResolution(self): return self.originalVideoResolution