InferringIntention/keyboard_and_mouse/process_data.py

208 lines
6.9 KiB
Python
Raw Normal View History

2024-03-24 23:42:27 +01:00
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import argparse
def view_clean_data():
with open('dataset/clean_data.pkl', 'rb') as f:
data = pickle.load(f)
print(type(data), len(data))
print(data.keys())
print('length of data:',len(data))
print('event', data['Event'], 'length of event', len(data['Event']))
print('rule', data['Rule'], 'length of event', len(data['Rule']))
print('rule unique', data.Rule.unique())
print('task id unique', data.TaskID.unique())
print('pid unique', data.PID.unique())
print('event unique', data.Event.unique())
def split_org_data():
# generate train, test data by split user, aggregate action sequence for next action prediction
# orignial action seq: a = [a_0 ... a_n]
# new action seq: for a: a0 = [a_0], a1 = [a_0, a_1] ...
# split original data into train and test based on user
with open('dataset/clean_data.pkl', 'rb') as f:
data = pickle.load(f)
print('original data keys', data.keys())
print('len of original data', len(data))
print('rule unique', data.Rule.unique())
print('event unique', data.Event.unique())
data_train = data[data['PID']<=11]
data_test = data[data['PID']>11]
print('train set len', len(data_train))
print('test set len', len(data_test))
# split data by task
train_data_intent = []
test_data_intent = []
for i in range(7):
# 7 different rules, each as an intention
train_data_intent.append(data_train[data_train['Rule']==i])
test_data_intent.append(data_test[data_test['Rule']==i])
# generate train set
max_len = 0 # max len is 35
for i in range(7): # 7 tasks/rules
train_data = [] # [task]
train_label = []
for u in range(1,12):
user_data = train_data_intent[i][train_data_intent[i]['PID']==u]
for j in range(1,6): # 5 parts == 5 trials
part_data = user_data[user_data['Part']==j]
for l in range(1,len(part_data['Event'])-1):
print(part_data['Event'][:l].tolist())
train_data.append(part_data['Event'][:l].tolist())
train_label.append(part_data['Event'].iat[l+1])
if len(part_data['Event'])>max_len:
max_len = len(part_data['Event'])
for k in range(len(train_data)):
while len(train_data[k])<35:
train_data[k].append(0) # padding with 0
print('x_len', len(train_data), type(train_data[0]), len(train_data[0]))
print('y_len', len(train_label), type(train_label[0]))
Path("dataset/strategy_dataset").mkdir(parents=True, exist_ok=True)
with open('dataset/strategy_dataset/train_label_'+str(i)+'.pkl', 'wb') as f:
pickle.dump(train_label, f)
with open('dataset/strategy_dataset/train_data_'+str(i)+'.pkl', 'wb') as f:
pickle.dump(train_data, f)
print('max_len', max_len)
# generate test set
max_len = 0 # max len is 33, total max is 35
for i in range(7): # 7 tasks/rules
test_data = [] # [task][user]
test_label = []
test_action_id = []
for u in range(12,17):
user_data = test_data_intent[i][test_data_intent[i]['PID']==u]
test_data_user = []
test_label_user = []
test_action_id_user = []
for j in range(1,6): # 5 parts == 5 trials
part_data = user_data[user_data['Part']==j]
for l in range(1,len(part_data['Event'])-1):
test_data_user.append(part_data['Event'][:l].tolist())
test_label_user.append(part_data['Event'].iat[l+1])
test_action_id_user.append(part_data['Part'].iat[l])
if len(part_data['Event'])>max_len:
max_len = len(part_data['Event'])
for k in range(len(test_data_user)):
while len(test_data_user[k])<35:
test_data_user[k].append(0) # padding with 0
test_data.append(test_data_user)
test_label.append(test_label_user)
test_action_id.append(test_action_id_user)
print('x_len', len(test_data), type(test_data[0]), len(test_data[0]))
print('y_len', len(test_label), type(test_label[0]))
with open('dataset/strategy_dataset/test_label_'+str(i)+'.pkl', 'wb') as f:
pickle.dump(test_label, f)
with open('dataset/strategy_dataset/test_data_'+str(i)+'.pkl', 'wb') as f:
pickle.dump(test_data, f)
with open('dataset/strategy_dataset/test_action_id_'+str(i)+'.pkl', 'wb') as f:
pickle.dump(test_action_id, f)
print('max_len', max_len)
def calc_gt_prob():
# train set unique label
for i in range(7):
with open('dataset/strategy_dataset/train_label_'+str(i)+'.pkl', 'rb') as f:
y = pickle.load(f)
y = np.array(y)
print('task ', i)
print('unique train label', np.unique(y))
def plot_gt_dist():
full_data = []
for i in range(7):
with open('dataset/strategy_dataset/' + 'test' + '_label_' + str(i) + '.pkl', 'rb') as f:
data = pickle.load(f)
#print(len(data))
full_data.append(data)
fig, axs = plt.subplots(7)
fig.set_figheight(10)
fig.set_figwidth(16)
act_name = ["Italic", "Bold", "Underline", "Indent", "Align", "FontSize", "FontFamily"]
x = np.arange(7)
width = 0.1
for i in range(7):
for u in range(len(data)): # 5 users
values, counts = np.unique(full_data[i][u], return_counts=True)
counts_vis = [0]*7
for j in range(len(values)):
counts_vis[values[j]-1] = counts[j]
print('task', i, 'actions', values, 'num', counts)
axs[i].set_title('Intention '+str(i))
axs[i].set_xlabel('action id')
axs[i].set_ylabel('num of actions')
axs[i].bar(x+u*width, counts_vis, width=0.1, label='user '+str(u))
axs[i].set_xticks(np.arange(len(x)))
axs[i].set_xticklabels(act_name)
axs[i].set_ylim([0,80])
axs[0].legend(loc='upper right', ncol=1)
plt.tight_layout()
plt.savefig('dataset/'+'test'+'_gt_dist.png')
plt.show()
def plot_act():
full_data = []
for i in range(7):
with open('dataset/strategy_dataset/' + 'test' + '_label_' + str(i) + '.pkl', 'rb') as f:
data = pickle.load(f)
full_data.append(data)
width = 0.1
for i in range(7):
fig, axs = plt.subplots(5)
fig.set_figheight(10)
fig.set_figwidth(16)
act_name = ["Italic", "Bold", "Underline", "Indent", "Align", "FontSize", "FontFamily"]
for u in range(len(full_data[i])): # 5 users
x = np.arange(len(full_data[i][u]))
axs[u].set_xlabel('action id')
axs[u].set_ylabel('num of actions')
axs[u].plot(x, full_data[i][u])
axs[0].legend(loc='upper right', ncol=1)
plt.tight_layout()
#plt.savefig('test'+'_act.png')
plt.show()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("func", help="select what function to run. view_clean_data, split_org_data, calc_gt_prob, plot_gt_dist, plot_act", type=str)
args = parser.parse_args()
if args.func == 'view_clean_data':
view_clean_data() # view original keyboad and mouse interaction dataset
if args.func == 'split_org_data':
split_org_data() # split the original keyboad and mouse interaction dataset. User 1-11 for training, rest for testing
if args.func == 'calc_gt_prob':
calc_gt_prob() # see unique label in train set
if args.func == 'plot_gt_dist':
plot_gt_dist() # plot the label distribution of test set
if args.func == 'plot_act':
plot_act() # plot the label of test set