conan/processing.py

1016 lines
44 KiB
Python

import numpy as np
import pandas as pd
import json
import colorsys
import pickle as pkl
import os
import cv2
import math
from PyQt5 import QtCore, QtGui, QtWidgets
from threading import Lock, Thread
from utils.util import sperical2equirec
POSE_PAIRS = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [
1, 11], [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17]]
POSE_PAIRS_NEW = [[10, 9], [9, 8], [8, 1], [1, 11], [11, 12], [12, 13], [13, 12], [12, 11], [11, 1], [1, 2], [2, 3],
[3, 4], [4, 3], [3, 2], [2, 1], [1, 5], [5, 6], [6, 7], [7, 6], [6, 5], [5, 1], [1, 0], [0, 15],
[15, 17], [17, 15], [15, 0], [0, 14], [14, 16]]
def getColors(N, bright=True):
"""
To get visually distinct colors, generate them in HSV space then
convert to RGB.
"""
brightness = 1.0 if bright else 0.7
hsv = [(i / N, 1, brightness) for i in range(N)]
colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv))
return colors
class Processor(QtWidgets.QWidget):
frame: int = None
frameData: int = None
fps: int = None
frameCount: int = None
frameSize: [float, float] = None
movieFileName: str = None
originalVideoResolution: (int, int) = None
scaledVideoResolution: (int, int) = None
dataFileName: str = None
numberIDs: int = None
visualize: list = None
segments: list = None
tags: list = None
signalPoseSetInit = QtCore.pyqtSignal(dict, dict, list, int)
signalSpeakerSetInit = QtCore.pyqtSignal(dict, list, int)
signalGazeSetInit = QtCore.pyqtSignal(dict, list, int)
signalInit = QtCore.pyqtSignal(list, int)
signalInitTags = QtCore.pyqtSignal(list, tuple, dict, list)
signalUpdateMovementGraph = QtCore.pyqtSignal(dict, list, int)
signalUpdateSpeakGraph = QtCore.pyqtSignal(dict, int, int)
signalUpdateHandVelocity = QtCore.pyqtSignal(dict, int)
signalUpdateFaceAus = QtCore.pyqtSignal(dict)
signalUpdateFaceImgs = QtCore.pyqtSignal(dict, int)
signalVideoLabel = QtCore.pyqtSignal(int, int, int)
signalPosePoints = QtCore.pyqtSignal(int, list, list)
signalPoseChangedLabels = QtCore.pyqtSignal(dict, dict, int)
signalSpeakChangedLabels = QtCore.pyqtSignal(dict, int)
signalUpdateGazeGraph = QtCore.pyqtSignal(dict, int)
signalUpdateGazeMap = QtCore.pyqtSignal(int, list, list)
signalUpdateTagGraph = QtCore.pyqtSignal(dict)
signalUpdateTags = QtCore.pyqtSignal(int, list, list)
signalClearLabels = QtCore.pyqtSignal()
signalClearPose = QtCore.pyqtSignal()
signalClearGaze = QtCore.pyqtSignal()
signalClearTags = QtCore.pyqtSignal()
signalDeactivatePoseTab = QtCore.pyqtSignal(bool)
signalDeactivateGazeTab = QtCore.pyqtSignal(bool)
signalDeactivateFaceTab = QtCore.pyqtSignal(bool)
signalDeactivateSpeakingTab = QtCore.pyqtSignal(bool)
signalDeactivateObjectTab = QtCore.pyqtSignal(bool)
def __init__(self, parent=None):
super(Processor, self).__init__(parent)
self.cap = None
self.dataGaze = None
self.dataGazeMeasures = None
self.dataMovement = None
self.dataFace = None
self.dataRTGene = None
self.dataSpeaker = None
self.dataObjects = None
self.colors = None
self.tagColors = None
self.videoScale = 1
self.updateAUs = dict()
self.movementActivity = dict()
self.tagMovement = dict()
self.handActivity = dict()
self.selectedIDs = None
self._ready = False
self.activeTab = 0
@QtCore.pyqtSlot(QtGui.QImage)
def saveCurrentFrameData(self, newFrameData):
if newFrameData is None:
return
newFrameData = newFrameData.convertToFormat(4)
width = newFrameData.width()
height = newFrameData.height()
ptr = newFrameData.bits()
ptr.setsize(newFrameData.byteCount())
self.frameData = np.array(ptr).reshape(height, width, 4)
def updateFrame(self, position):
threshold = 100
self.position = position
if self._ready:
frame = int((position / 1000.0) * self.fps)
self.frame = frame
movement = {}
velocity = {}
gaze = {}
face_aus = {}
speaking = {}
tagData = {}
# neck_points = list()
f = self.dataRTGene.loc[self.dataRTGene['Frame'] == self.frame]
for id_no in range(self.numberIDs):
### Facial Activity Data ###
if self.dataFace is not None and self.activeTab == 3:
face_aus[id_no] = self.dataFace.loc[
self.dataFace['Frame'] == self.frame, ['ID%i_AUs' % id_no]].values
if len(face_aus[id_no]) > 0 \
and np.sum(np.logical_xor(self.updateAUs[id_no], [face_aus[id_no].flatten()[0] > 0.5])) > 0 \
and np.sum([face_aus[id_no].flatten()[0] > 0.5]) > 0 \
and np.sum([face_aus[id_no].flatten()[0] > 0.5]) > np.sum(self.updateAUs[id_no]):
self.updateAUs[id_no] = [face_aus[id_no].flatten()[0] > 0.5]
# print('Update AU Image: ', frame)
self.get_current_frame(self.frame, id_no)
elif len(face_aus[id_no]) > 0:
self.updateAUs[id_no] = [face_aus[id_no].flatten()[0] > 0.5]
### Body Movement Data ###
if self.dataMovement is not None:
if self.visualize and self.visualize['Pose'].isChecked():
if self.selectedIDs[id_no]:
keypoints = self.dataMovement['ID%i_Keypoints' % id_no].iloc[frame]
lstX = []
lstY = []
# Plot Skeleton --> connections via pose pairs
for i in range(len(POSE_PAIRS_NEW)):
index = POSE_PAIRS_NEW[i]
if keypoints is None:
continue
A, B = keypoints[index]
if A is None or B is None:
continue
lstX.append(A[0])
lstX.append(B[0])
lstY.append(A[1])
lstY.append(B[1])
if len(lstX) > 0 and len(lstY) > 0:
self.signalPosePoints.emit(id_no, lstX, lstY)
else:
self.signalPosePoints.emit(id_no, [], [])
else:
self.signalClearPose.emit()
movement[id_no] = self.movementActivity[id_no][frame: frame + 200], np.arange(frame - 199,
frame + 1)
velocity[id_no] = self.dataMovement['ID%i_Velocity' % id_no].iloc[frame]
### Gaze RTGene Data ###
if self.dataRTGene is not None:
# Update Labels
head = self.dataRTGene['ID%i_Head' % id_no].iloc[frame]
if head is not None:
if self.visualize and self.visualize['Label'].isChecked():
self.signalVideoLabel.emit(id_no, head[0], head[1])
else:
self.signalClearLabels.emit()
# Build heatmap
if self.visualize and self.visualize['Gaze'].isChecked():
if self.selectedIDs[id_no]:
if frame <= threshold:
target_x = self.dataRTGene['ID%i_target_x' % id_no].iloc[: frame + 1].values.tolist()
target_y = self.dataRTGene['ID%i_target_y' % id_no].iloc[: frame + 1].values.tolist()
else:
target_x = self.dataRTGene['ID%i_target_x' % id_no].iloc[
frame - threshold: frame + 1].values.tolist()
target_y = self.dataRTGene['ID%i_target_y' % id_no].iloc[
frame - threshold: frame + 1].values.tolist()
self.signalUpdateGazeMap.emit(id_no, target_x, target_y)
else:
self.signalClearGaze.emit()
if not f.empty and self.activeTab == 0:
position = f['ID%i_Head' % id_no].values.flatten()[0]
gaze_phi = f['ID%i_Phi' % id_no].values.flatten()[0]
if not np.any(pd.isnull(position)) and not np.any(pd.isnull(gaze_phi)):
gaze[id_no] = self.calculateGazeData(position, gaze_phi)
elif self.dataMovement is not None:
neck = self.dataMovement['ID%s_Keypoints' % id_no].map(
lambda x: x[1] if x is not None else None).map(
lambda x: x[:2] if x is not None else None)
# neck_points.append(neck.iloc[frame])
if self.visualize and self.visualize['Label'].isChecked():
if neck.iloc[frame] is not None:
self.signalVideoLabel.emit(id_no, neck.iloc[frame][0], neck.iloc[frame][1])
else:
self.signalClearLabels.emit()
### Speaking Data ###
if self.dataSpeaker is not None and self.activeTab == 1:
e = self.dataSpeaker.loc[self.dataSpeaker.Frame < frame]
rst = e['ID%i_is_speaker' % id_no].sum() / (len(e) + 1)
speaking[id_no] = rst
### Object Data ###
if self.dataObjects is not None:
for tag in self.tags:
tagData[tag] = self.tagMovement[tag][frame: frame + 200], np.arange(frame - 199, frame + 1)
if self.visualize and self.visualize['Tags'].isChecked():
if frame <= 30:
position = self.dataObjects[tag].iloc[: frame + 1].values.tolist()
else:
position = self.dataObjects[tag].iloc[frame - 30: frame + 1].values.tolist()
x_values = [x[0] for x in position if x is not None]
y_values = [x[1] for x in position if x is not None]
self.signalUpdateTags.emit(tag, x_values, y_values)
else:
self.signalClearTags.emit()
### Send collected data to respective Tabs ###
if self.dataFace is not None and self.activeTab == 3:
self.signalUpdateFaceAus.emit(face_aus)
if self.dataMovement is not None and self.activeTab == 2:
self.signalUpdateMovementGraph.emit(movement, self.colors, self.numberIDs)
self.signalUpdateHandVelocity.emit(velocity, self.numberIDs)
if self.dataRTGene is not None and self.activeTab == 0:
self.signalUpdateGazeGraph.emit(gaze, self.numberIDs)
if self.dataSpeaker is not None and self.activeTab == 1:
active = self.dataSpeaker.loc[self.dataSpeaker.Frame == frame, sorted(
[col for col in self.dataSpeaker.columns if 'speak_score' in col])].values.flatten()
active = active[~pd.isnull(active)]
if active.size > 0:
active_speaker = np.argmax(active)
else:
active_speaker = None
self.signalUpdateSpeakGraph.emit(speaking, active_speaker, self.numberIDs)
if self.dataObjects is not None and self.activeTab == 4:
self.signalUpdateTagGraph.emit(tagData)
@QtCore.pyqtSlot(int)
def tabChanged(self, current):
self.activeTab = current
@QtCore.pyqtSlot(list)
def onSelectedID(self, lst):
for i, button in enumerate(lst):
if button.isChecked():
self.selectedIDs[i] = True
else:
self.selectedIDs[i] = False
@QtCore.pyqtSlot(int)
def get_current_frame(self, frame, id_no):
face_imgs = {}
if os.name == 'nt':
# if on windows we have to read the image
self.cap.set(1, frame)
ret, image = self.cap.read()
if ret:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
else:
return
else:
# we can use the image from QT-decoding
image = cv2.cvtColor(self.frameData, cv2.COLOR_BGR2RGB)
# Get 66 landmarks from RT Gene
img_land = self.dataRTGene.loc[self.dataRTGene.Frame == frame, ['ID%i_Landmarks' % id_no]].values[0]
if len(img_land) > 0:
img_land = img_land[0] * self.videoScale
# Convert 68 landmarks to 49
img_land = np.delete(img_land, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 62, 66], axis=0).flatten()
face_crop, _ = self.crop_face(image, img_land)
face_imgs[id_no] = face_crop
else:
face_imgs[id_no] = None
self.signalUpdateFaceImgs.emit(face_imgs, id_no)
def crop_face(self, img, img_land, box_enlarge=4, img_size=200):
leftEye0 = (img_land[2 * 19] + img_land[2 * 20] + img_land[2 * 21] + img_land[2 * 22] + img_land[2 * 23] +
img_land[2 * 24]) / 6.0
leftEye1 = (img_land[2 * 19 + 1] + img_land[2 * 20 + 1] + img_land[2 * 21 + 1] + img_land[2 * 22 + 1] +
img_land[2 * 23 + 1] + img_land[2 * 24 + 1]) / 6.0
rightEye0 = (img_land[2 * 25] + img_land[2 * 26] + img_land[2 * 27] + img_land[2 * 28] + img_land[2 * 29] +
img_land[2 * 30]) / 6.0
rightEye1 = (img_land[2 * 25 + 1] + img_land[2 * 26 + 1] + img_land[2 * 27 + 1] + img_land[2 * 28 + 1] +
img_land[2 * 29 + 1] + img_land[2 * 30 + 1]) / 6.0
deltaX = (rightEye0 - leftEye0)
deltaY = (rightEye1 - leftEye1)
l = math.sqrt(deltaX * deltaX + deltaY * deltaY)
sinVal = deltaY / l
cosVal = deltaX / l
mat1 = np.mat([[cosVal, sinVal, 0], [-sinVal, cosVal, 0], [0, 0, 1]])
mat2 = np.mat([[leftEye0, leftEye1, 1], [rightEye0, rightEye1, 1], [img_land[2 * 13], img_land[2 * 13 + 1], 1],
[img_land[2 * 31], img_land[2 * 31 + 1], 1], [img_land[2 * 37], img_land[2 * 37 + 1], 1]])
mat2 = (mat1 * mat2.T).T
cx = float((max(mat2[:, 0]) + min(mat2[:, 0]))) * 0.5
cy = float((max(mat2[:, 1]) + min(mat2[:, 1]))) * 0.5
if (float(max(mat2[:, 0]) - min(mat2[:, 0])) > float(max(mat2[:, 1]) - min(mat2[:, 1]))):
halfSize = 0.5 * box_enlarge * float((max(mat2[:, 0]) - min(mat2[:, 0])))
else:
halfSize = 0.5 * box_enlarge * float((max(mat2[:, 1]) - min(mat2[:, 1])))
scale = (img_size - 1) / 2.0 / halfSize
mat3 = np.mat([[scale, 0, scale * (halfSize - cx)], [0, scale, scale * (halfSize - cy)], [0, 0, 1]])
mat = mat3 * mat1
aligned_img = cv2.warpAffine(img, mat[0:2, :], (img_size, img_size), cv2.INTER_LINEAR,
borderValue=(128, 128, 128))
land_3d = np.ones((int(len(img_land) / 2), 3))
land_3d[:, 0:2] = np.reshape(np.array(img_land), (int(len(img_land) / 2), 2))
mat_land_3d = np.mat(land_3d)
new_land = np.array((mat * mat_land_3d.T).T)
new_land = np.reshape(new_land[:, 0:2], len(img_land))
return aligned_img, new_land
def calculateAllMeasures(self):
"""Recalculate all measures for selected segments for export"""
movement = dict()
gaze = dict()
speaking_dict = dict()
face = dict()
if self.segments is None:
segments = np.ones(len(self.dataRTGene))
else:
segments = self.segments
if self.dataMovement is not None:
dataMov = self.dataMovement.loc[segments == 1]
total = len(dataMov)
for id_no in range(self.numberIDs):
x_mov = [np.linalg.norm(x) if x is not None else np.nan for x in dataMov['ID%i_Movement' % id_no]]
# Add frames until start of segment to frame number
mostActivity = np.argmax(np.array(x_mov)) + np.argmax(segments)
# Frames with both hands tracked
tracked = dataMov.loc[dataMov['ID%s_HandsTracked' % id_no] == 2, ['ID%s_HandsTracked' % id_no]].count()
high_vel = dataMov.loc[dataMov['ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count()
movement[id_no] = {'Most activity': int(mostActivity),
'Hands above table (relative)': float(tracked[0] / total),
'Gestures (relative)': float(high_vel / total)}
if self.dataSpeaker is not None:
dataSpeak = self.dataSpeaker.loc[segments == 1]
for id_no in range(self.numberIDs):
tracked_frames = dataSpeak[dataSpeak.notnull()].count()['ID%i_is_speaker' % id_no]
rst = dataSpeak['ID%i_is_speaker' % id_no].sum() / len(dataSpeak)
turns = []
counters = []
counter = 0
turn = 0
lastFrame = 0
switch = False
for frame in sorted(dataSpeak.Frame):
if dataSpeak.loc[dataSpeak.Frame == frame, ['ID%i_is_speaker' % id_no]].values and frame == (
lastFrame + 1):
switch = True
turn = turn + 1
elif switch:
if turn >= 30:
turns.append(turn)
counter = counter + 1
turn = 0
switch = False
if frame % int(self.fps * 60) == 0:
counters.append(counter)
counter = 0
lastFrame = frame
avg_turn = np.mean(np.array(turns)) / self.fps
avg_count = np.mean(np.array(counters))
num_turns = len(turns)
speaking_dict[id_no] = {'Tracked frames': int(tracked_frames), 'Speaking time (relative)': float(rst),
'Number ofr speaking turns': int(num_turns),
'Average length of speaking turn (seconds)': float(avg_turn),
'Average number of speaking turns per minute': float(avg_count)}
if self.dataGazeMeasures is not None:
dataGaze = self.dataGazeMeasures.loc[segments == 1]
for id_no in range(self.numberIDs):
# ID looked at other people for frames
look = dataGaze['ID%i_looks_at' % id_no].dropna().count()
# ID was watched by other people for frames
watched = dataGaze['ID%i_watched_by' % id_no].map(
lambda x: 1 if not np.any(pd.isna(x)) and len(x) > 0 else 0).sum()
tracked = dataGaze['ID%i_tracked' % id_no].sum()
gaze[id_no] = {'Tracked frames': int(tracked),
'lookSomeone': float(look / tracked),
'totalNoLook': float((tracked - look) / tracked),
'totalWatched': float(watched / tracked),
'ratioWatcherLookSOne': float(watched / look)}
if self.dataFace is not None:
dataFaceAUs = self.dataFace.loc[segments == 1]
dict_aus = np.array(['AU1: Inner Brow Raiser', 'AU2: Outer Brow Raiser', 'AU4: Brow Lowerer', 'AU5: Upper Lid Raiser',
'AU6: Cheek Raiser', 'AU9: Nose Wrinkler', 'AU12: Lip Corner Puller', 'AU15: Lip Corner Depressor',
'AU17: Chin Raiser', 'AU20: Lip Stretcher', 'AU25: Lips Part', 'AU26: Jaw Drop'])
for id_no in range(self.numberIDs):
face[id_no] = []
for i, au in enumerate(dict_aus):
au_data = [a[i] for a in dataFaceAUs['ID%i_AUs' % id_no] if not np.all(pd.isna(a))]
au_data = np.array(au_data) > 0.5
face[id_no].append(au + ' : ' + str(au_data.sum()))
return gaze, speaking_dict, movement, face
def calculateGazeData(self, position, yaw):
# Get position in shperical coordinates (in radian)
id_u = position[0] / self.frameSize[0]
id_theta = id_u * 2 * np.pi
# Adjust position to more intuitive from video
id_theta = (id_theta * -1) - np.pi
# ID position on coordinate system
id_pos_x = np.cos(id_theta)
id_pos_y = np.sin(id_theta)
x, y = self.get_circle(0.05)
circle_x = x + id_pos_x
circle_y = y + id_pos_y
# Add angle - RTGene yaw is in radian
id_target = id_theta + np.pi - yaw
id_x1_target = np.cos(id_target)
id_x2_target = np.sin(id_target)
# Line
line_x = np.array([id_pos_x, id_x1_target])
line_y = np.array([id_pos_y, id_x2_target])
xdata = np.append(circle_x, line_x)
ydata = np.append(circle_y, line_y)
return [xdata, ydata]
def get_circle(self, radius):
theta = np.linspace(0, 2 * np.pi, 100)
x = radius * np.cos(theta)
y = radius * np.sin(theta)
return np.array(x), np.array(y)
@QtCore.pyqtSlot(dict)
def onVisualize(self, lst):
self.visualize = lst
@QtCore.pyqtSlot(np.ndarray)
def _updateSegments(self, segments):
""" Recalculate movement and speaking measures when segment was changed"""
# save segments for exporting only wanted timeranges
self.segments = segments
if self.dataMovement is not None:
dataMov = self.dataMovement.loc[segments == 1]
total = len(dataMov)
mostActivity = dict()
hand = dict()
for id_no in range(self.numberIDs):
x_mov = [x[0] if x is not None else np.nan for x in dataMov['ID%i_Movement' % id_no]]
# Add frames until start of segment to frame number
mostActivity[id_no] = np.argmax(np.array(x_mov)) + np.argmax(segments)
# Frames with both hands tracked
tracked = dataMov.loc[dataMov['ID%s_HandsTracked' % id_no] == 2, ['ID%s_HandsTracked' % id_no]].count()
high_vel = dataMov.loc[dataMov['ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count()
hand[id_no] = [total, tracked[0], high_vel]
self.signalPoseChangedLabels.emit(mostActivity, hand, self.numberIDs)
if self.dataSpeaker is not None:
diff = len(segments) - len(self.dataSpeaker)
dataSpeak = self.dataSpeaker
# dataSpeak['Frame'] = dataSpeak.index
if diff > 0:
speakSegments = segments[:-diff]
elif diff < 0:
speakSegments = np.append(segments, [*np.zeros(diff)])
else:
speakSegments = segments
dataSpeak = self.dataSpeaker.loc[speakSegments == 1]
speaking_dict = dict()
for id_no in range(self.numberIDs):
tracked_frames = dataSpeak[dataSpeak.notnull()].count()['ID%i_is_speaker' % id_no]
rst = dataSpeak['ID%i_is_speaker' % id_no].sum() / len(dataSpeak)
speaking_dict[id_no] = [tracked_frames, rst] # , num_turns, avg_turn, avg_count
self.signalSpeakChangedLabels.emit(speaking_dict, self.numberIDs)
def calculateSpeakingMeasures(self):
if self.dataSpeaker is None:
return
speaking_dict = dict()
total = len(self.dataSpeaker)
for id_no in range(self.numberIDs):
tracked_frames = self.dataSpeaker[self.dataSpeaker.notnull()].count()['ID%i_is_speaker' % id_no]
rst = self.dataSpeaker['ID%i_is_speaker' % id_no].sum() / total
turns = []
counters = []
counter = 0
turn = 0
switch = False
for frame in sorted(self.dataSpeaker.Frame):
if self.dataSpeaker.loc[self.dataSpeaker.Frame == frame, ['ID%i_is_speaker' % id_no]].values:
switch = True
turn = turn + 1
elif switch:
if turn >= 30:
turns.append(turn)
counter = counter + 1
turn = 0
switch = False
if frame % int(self.fps * 60) == 0:
counters.append(counter)
counter = 0
avg_turn = np.mean(np.array(turns)) / self.fps
avg_count = np.mean(np.array(counters))
num_turns = len(turns)
speaking_dict[id_no] = [tracked_frames, rst, num_turns, avg_turn, avg_count]
self.signalSpeakerSetInit.emit(speaking_dict, self.colors, self.numberIDs)
def calculateMovementMeasures(self):
""" initial calculation of hand velocity on full data """
if self.dataMovement is None:
return
total = len(self.dataMovement)
mostActivity = {}
for id_no in range(self.numberIDs):
x_mov = [np.linalg.norm(x) if x is not None else np.nan for x in self.dataMovement['ID%i_Movement' % id_no]]
mostActivity[id_no] = np.argmax(np.array(x_mov))
self.movementActivity[id_no] = np.array([*np.zeros(200), *x_mov])
# Left Wrist and Right Wrist: idx 4, idx 7
self.dataMovement['ID%i_HandsTracked' % id_no] = self.dataMovement['ID%i_Keypoints' % id_no].map(
lambda x: ((np.sum(x[4] is not None) + np.sum(x[7] is not None)) // 3) if x is not None else None)
# Pixel position of left and right wrist
self.dataMovement['ID%i_Hand1_Vel' % id_no] = self.dataMovement['ID%s_Keypoints' % id_no].map(
lambda x: x[4] if not np.all(pd.isna(x)) else np.nan).map(
lambda x: x[:2].astype(float) if not np.all(pd.isna(x)) else np.nan)
self.dataMovement['ID%i_Hand1_Vel' % id_no] = self.dataMovement['ID%i_Hand1_Vel' % id_no].pct_change(1).map(
lambda x: np.abs(x.mean()) * 100 if not np.all(pd.isna(x)) else None)
self.dataMovement['ID%i_Hand2_Vel' % id_no] = self.dataMovement['ID%s_Keypoints' % id_no].map(
lambda x: x[7] if not np.all(pd.isna(x)) else np.nan).map(
lambda x: x[:2].astype(float) if not np.all(pd.isna(x)) else np.nan)
self.dataMovement['ID%i_Hand2_Vel' % id_no] = self.dataMovement['ID%i_Hand2_Vel' % id_no].pct_change(1).map(
lambda x: np.abs(x.mean()) * 100 if not np.all(pd.isna(x)) else None)
self.dataMovement['ID%i_Velocity' % id_no] = self.dataMovement[
['ID%i_Hand1_Vel' % id_no, 'ID%i_Hand2_Vel' % id_no]].mean(axis=1)
# Frames with both hands tracked
tracked = self.dataMovement.loc[self.dataMovement['ID%s_HandsTracked' %
id_no] == 2, ['ID%s_HandsTracked' % id_no]].count()
high_vel = self.dataMovement.loc[self.dataMovement[
'ID%i_Velocity' % id_no] > 1]['ID%i_Velocity' % id_no].count()
self.handActivity[id_no] = [total, tracked[0], high_vel]
self.signalPoseSetInit.emit(mostActivity, self.handActivity, self.colors, self.numberIDs)
def calculateGazeMeasures(self):
"""Initial calculation of gaze measures: dataGazeMeasures """
thresh = 15
eq_width = self.frameSize[0]
totWatcher = {}
lookSomeOne = {}
tracked = {}
for i in range(self.numberIDs):
totWatcher[i] = []
lookSomeOne[i] = []
tracked[i] = []
for frame in self.dataRTGene.Frame:
f = self.dataRTGene.loc[self.dataRTGene.Frame == frame]
angles = []
positions = []
targets = []
for id_no in range(self.numberIDs):
pos = f['ID%i_Head' % id_no].values.flatten()[0]
phi = f['ID%i_Phi' % id_no].values.flatten()[0]
pos = np.array(pos, dtype=np.float)
phi = np.array(phi, dtype=np.float)
if np.any(np.isnan(pos)) or np.any(np.isnan(phi)):
positions.append(np.nan)
angles.append(np.nan)
targets.append(np.nan)
tracked[id_no].append(False)
continue
tracked[id_no].append(True)
# Get position in shperical coordinates
id_u = pos[0] / eq_width
id_theta = id_u * 2 * np.pi
id_theta = np.rad2deg(id_theta)
positions.append(id_theta)
# Add angle - gaze[1] is yaw
angle = np.rad2deg(phi)
id_target = id_theta + 180 + angle
targets.append(id_target % 360)
angles.append(angle)
# plot_frame_calculated(positions, angles)
watcher = dict()
for i in range(self.numberIDs):
watcher[i] = []
for i, t in enumerate(targets):
inside_min = np.array([(e - thresh) < targets[i] if not np.isnan(e) else False for e in positions])
inside_max = np.array([(e + thresh) > targets[i] if not np.isnan(e) else False for e in positions])
# print(inside_min, inside_max)
if np.any(inside_min) and np.any(inside_max):
test = np.logical_and(inside_min, inside_max)
idx = np.where(test)[0]
for j in range(len(idx)):
# ID i watches idx[j]
lookSomeOne[i].append([frame, idx[j]])
# ID idx[j] is being looked at by i
watcher[idx[j]].append(i)
for k, v in watcher.items():
totWatcher[k].append([frame, v])
df_totWatcher = pd.DataFrame(columns={'Frame'})
for i in range(self.numberIDs):
df_id = pd.DataFrame.from_dict(totWatcher.get(i))
df_id = df_id.rename(columns={0: "Frame", 1: "ID{}_watched_by".format(i)})
df_totWatcher = pd.merge(df_totWatcher, df_id, how='outer', on=['Frame'], sort=True)
df_lookSomeOne = pd.DataFrame(columns={'Frame'})
for i in range(self.numberIDs):
df_id = pd.DataFrame.from_dict(lookSomeOne.get(i))
df_id = df_id.rename(columns={0: "Frame", 1: "ID{}_looks_at".format(i)})
df_lookSomeOne = pd.merge(df_lookSomeOne, df_id, how='outer', on=['Frame'], sort=True)
df_tracked = pd.DataFrame(columns={'Frame'})
for i in range(self.numberIDs):
df_id = pd.DataFrame.from_dict(tracked.get(i))
df_id.index.name = 'Frame'
df_id = df_id.rename(columns={0: "ID{}_tracked".format(i)})
df_tracked = pd.merge(df_tracked, df_id, how='outer', on=['Frame'], sort=True)
self.dataGazeMeasures = pd.merge(df_lookSomeOne, df_totWatcher, how='outer', on=['Frame'], sort=True)
self.dataGazeMeasures = pd.merge(self.dataGazeMeasures, df_tracked, how='outer', on=['Frame'], sort=True)
gaze = dict()
for id_no in range(self.numberIDs):
# print(self.dataGazeMeasures['ID%i_watched_by' % id_no])
# ID looked at other people for frames
look = self.dataGazeMeasures['ID%i_looks_at' % id_no].dropna().count()
# ID was watched by other people for frames
watched = self.dataGazeMeasures['ID%i_watched_by' % id_no].map(
lambda x: 1 if not np.any(pd.isna(x)) and len(x) > 0 else 0).sum()
tracked = self.dataGazeMeasures['ID%i_tracked' % id_no].sum()
gaze[id_no] = [look, watched, tracked]
self.signalGazeSetInit.emit(gaze, self.colors, self.numberIDs)
def calculateGazeTargets(self):
# Compute gaze targets
for id_no in range(self.numberIDs):
# self.dataRTGene['ID%i_Phi' % id_no] = self.dataRTGene['ID%i_Phi' % id_no].rolling(15).mean()
self.id_no = id_no
self.dataRTGene['ID%i_alpha' % id_no] = self.dataRTGene['ID%i_Phi' % id_no].map(
lambda x: np.rad2deg(x) - 180 if x is not None else None)
self.dataRTGene['ID%i_beta' % id_no] = self.dataRTGene['ID%i_Theta' % id_no].map(
lambda x: 180 - 2 * np.rad2deg(x) if x is not None else None)
self.dataRTGene['ID%i_target_spher' % id_no] = self.dataRTGene.apply(self.fun, axis=1)
self.dataRTGene[['ID%i_target_x' % id_no, 'ID%i_target_y' % id_no]] = self.dataRTGene.apply(self.fun,
axis=1,
result_type="expand")
def fun(self, x):
alpha = x['ID%i_alpha' % self.id_no]
beta = x['ID%i_beta' % self.id_no]
pos = x['ID%i_Head' % self.id_no]
# print(pos, pd.isna(pos), type(pos))
# Discard frames where not all detected
if np.any(pd.isna(pos)) or np.any(pd.isna(alpha)) or np.any(pd.isna(beta)):
return None, None
# Get position in spherical coordinates
theta = np.rad2deg((pos[0] / self.frameSize[0]) * 2 * np.pi)
phi = np.rad2deg((pos[1] / self.frameSize[1]) * np.pi)
# Get position in image frame (equirectangular projection)
x, y = sperical2equirec((theta + alpha) % 360, (phi + beta) % 180, self.frameSize[0], self.frameSize[1])
return x, y
def calculateTagMeasures(self):
if self.dataObjects is None:
return
for tag in self.tags:
neutral = self.dataObjects[tag].dropna().iloc[0]
# print('Tag #%i Starting point set to: %s' % (tag, str(neutral)))
self.dataObjects['%i_Movement' % tag] = self.dataObjects[tag].map(
lambda x: np.subtract(x, neutral) if x is not None else None)
# Euclidian distance
x_mov = [np.linalg.norm(x) if x is not None else None for x in self.dataObjects['%i_Movement' % tag]]
self.tagMovement[tag] = np.array([*np.zeros(200), *x_mov])
def readData(self, movieFileName, dataFileName, verbose=False):
self.movieFileName = movieFileName
self.dataFileName = dataFileName
if (verbose):
print("## Start Reading Data")
# Read Video Data
f = self.movieFileName
print('Reading video from %s' % f)
if os.path.isfile(f):
self.cap = cv2.VideoCapture(f)
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frameCount = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.scaledVideoResolution = [self.cap.get(cv2.CAP_PROP_FRAME_WIDTH),
self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)]
if (verbose):
print('Video resolution: ', self.scaledVideoResolution)
print("Video frameCount %i" % self.frameCount)
duration = self.frameCount / self.fps
minutes = int(duration / 60)
seconds = duration % 60
print('Video duration (M:S) = ' + str(minutes) + ':' + str(seconds))
else:
print("WARNING: no video available.")
# read data file
with open(self.dataFileName, 'rb') as f:
data = pkl.load(f)
if "originalVideoResolution" in data:
self.originalVideoResolution = data["originalVideoResolution"]
self.videoScale = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) / self.originalVideoResolution[0]
self.frameSize = data["originalVideoResolution"]
if verbose:
print('Video resolution scale factor: ', self.videoScale)
# Read RTGene Data
if "RTGene" in data:
self.dataRTGene = data["RTGene"]
self.dataRTGene = self.dataRTGene.where(pd.notnull(self.dataRTGene), None)
self.numberIDs = len([col for col in self.dataRTGene.columns if 'Landmarks' in col])
else:
self.signalDeactivateGazeTab.emit(True)
print("WARNING: no RTGene data avaibale. Deactivating gaze tab.")
# Read Movement Data
if "BodyMovement" in data:
self.dataMovement = data["BodyMovement"]
if not self.numberIDs:
self.numberIDs = len([col for col in self.dataMovement if 'Movement' in col])
if verbose:
print('Body movement sample count %i' % len(self.dataMovement))
else:
self.signalDeactivatePoseTab.emit(True)
print('WARNING: no body movement data available. Deactivating pose tab.')
# Read Facial Activity Data
if "ActivityUnits" in data:
self.dataFace = data["ActivityUnits"]
if not self.numberIDs:
self.numberIDs = len([col for col in self.dataFace.columns if 'AUs' in col])
if (verbose):
print("Activity Units sample count %i" % len(self.dataFace))
else:
self.signalDeactivateFaceTab.emit(True)
print("WARNING: no face activity data available. Deactivating face tab.")
# Read Speaker Diarization Data
if 'Speaker' in data:
self.dataSpeaker = data['Speaker']
else:
self.signalDeactivateSpeakingTab.emit(True)
print('WARNING: no speaking data available. Deactivating speaking tab.')
# Read AprilTag Data
if 'April' in data:
self.dataObjects = data['April']
self.tags = [col for col in self.dataObjects.columns if type(col) == int]
self.tagColors = [tuple(np.random.random(size=3) * 256) for i in range(len(self.tags))]
tracked = dict()
for tag in self.tags:
tracked[tag] = self.dataObjects[tag].dropna().count() / len(self.dataObjects)
self.signalInitTags.emit(self.tags, self.originalVideoResolution, tracked, self.tagColors)
else:
self.signalDeactivateObjectTab\
.emit(True)
print('WARNING: no object detection data available. Deactivating object tab.')
# Set colors: To get visually distinct colors, generate them in HSV space then convert to RGB.
hsv = [(i / self.numberIDs, 1, 1.0) for i in range(self.numberIDs)] # 1.0 brightness
self.colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv))
self.selectedIDs = []
for id_no in range(self.numberIDs):
self.updateAUs[id_no] = np.zeros(12)
self.selectedIDs.append(True)
self.calculateTagMeasures()
self.calculateGazeTargets()
self.calculateGazeMeasures()
self.signalInit.emit(self.colors, self.numberIDs)
def export(self):
# get export location
fileName = QtGui.QFileDialog.getSaveFileName(self, "Export calculations", self.dataFileName.replace(
"dat", "json"), "Json File (*.json);;All Files (*)")
if fileName[0] == '':
return
# collect all new calculated values
data = dict()
gaze, speaking, movement, face = self.calculateAllMeasures()
for id_no in range(self.numberIDs):
data['ID%i' % id_no] = {'Eye Gaze': gaze.get(id_no),
'Speaking Activity': speaking.get(id_no),
'Body and Hand Movement': movement.get(id_no),
'Face Activity': face.get(id_no)}
with open(fileName[0], 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
segment_id = self._get_segment_ids()
# export all dataframes as csv
if self.dataRTGene is not None:
if self.segments is None:
self.dataRTGene['segment'] = 0
self.dataRTGene.to_csv(fileName[0].replace(
".json", "-gaze.csv"), index=True, encoding='utf-8')
else:
self.dataRTGene['segment'] = segment_id
self.dataRTGene[self.segments[1:] == 1].to_csv(fileName[0].replace(
".json", "-gaze.csv"), index=True, encoding='utf-8')
if self.dataMovement is not None:
if self.segments is None:
self.dataMovement['segment'] = 0
self.dataMovement.to_csv(fileName[0].replace(
".json", "-body-movement.csv"), index=True, encoding='utf-8')
else:
self.dataMovement['segment'] = segment_id
self.dataMovement[self.segments == 1].to_csv(fileName[0].replace(
".json", "-body-movement.csv"), index=True, encoding='utf-8')
if self.dataFace is not None:
if self.segments is None:
self.dataFace['segment'] = 0
self.dataFace.to_csv(fileName[0].replace(
".json", "-facial-activity.csv"), index=True, encoding='utf-8')
else:
self.dataFace['segment'] = segment_id
self.dataFace[self.segments == 1].to_csv(fileName[0].replace(
".json", "-facial-activity.csv"), index=True, encoding='utf-8')
if self.dataSpeaker is not None:
if self.segments is None:
self.dataSpeaker['segment'] = 0
self.dataSpeaker.to_csv(fileName[0].replace(
".json", "-speaker.csv"), index=True, encoding='utf-8')
else:
self.dataSpeaker['segment'] = segment_id
self.dataSpeaker[self.segments == 1].to_csv(fileName[0].replace(
".json", "-speaker.csv"), index=True, encoding='utf-8')
if self.dataObjects is not None:
if self.dataObjects is None:
self.dataObjects['segment'] = 0
self.dataObjects.to_csv(fileName[0].replace(
".json", "-objects.csv"), index=True, encoding='utf-8')
else:
self.dataObjects['segment'] = segment_id
self.dataObjects[self.segments == 1].to_csv(fileName[0].replace(
".json", "-objects.csv"), index=True, encoding='utf-8')
print('Exported data to', fileName[0])
def _get_segment_ids(self):
if not self.segments:
return None
segment_id = [-1 for s in self.segments]
segment_counter = -1
old = self.segments[0]
segment_id[0] = 0
for i, current in enumerate(self.segments[1:]):
if current == 1:
if old != current:
segment_counter += 1
segment_id[i + 1] = segment_counter
old = current
return segment_id
def getColors(self):
return self.colors
def getTags(self):
return self.tags
def getTagColors(self):
return self.tagColors
def getFrameCount(self):
return self.frameCount
def getFrameSize(self):
return self.frameSize
def getFPS(self):
return self.fps
def getVideo(self):
return self.movieFileName
def getGazeData(self):
return self.dataGaze
def getFrame(self, frameIdx):
return frameIdx
def getFrameCurrent(self):
return 1
def getNumberIDs(self):
return self.numberIDs
def getMovementData(self):
return self.dataMovement
def setReady(self, ready):
self._ready = ready
def getOriginalVideoResolution(self):
return self.originalVideoResolution