limits-of-tom/logistic_regression_tom_feats.py

384 lines
19 KiB
Python
Raw Normal View History

2024-06-11 15:36:55 +02:00
import argparse
import numpy as np
import pickle
import os
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, f1_score
from src.data.game_parser_graphs_new import GameParser, make_splits, onehot, set_seed
from tqdm import tqdm
from scipy.stats import wilcoxon
from sklearn.exceptions import ConvergenceWarning
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import umap
import warnings
warnings.simplefilter("ignore", category=ConvergenceWarning)
def parse_q(q, game):
if not q is None:
q ,l = q
q = np.concatenate([
onehot(q[2],2),
onehot(q[3],2),
onehot(q[4][0][0]+1,2),
onehot(game.materials_dict[q[4][0][1]],len(game.materials_dict)),
onehot(q[4][1][0]+1,2),
onehot(game.materials_dict[q[4][1][1]],len(game.materials_dict)),
onehot(q[4][2]+1,2),
onehot(q[5][0][0]+1,2),
onehot(game.materials_dict[q[5][0][1]],len(game.materials_dict)),
onehot(q[5][1][0]+1,2),
onehot(game.materials_dict[q[5][1][1]],len(game.materials_dict)),
onehot(q[5][2]+1,2)
])
else:
q = np.zeros(100)
l = None
return q, l
def cosine_similarity(array1, array2):
"""
Compute the cosine similarity between two arrays.
Parameters:
- array1: First input array
- array2: Second input array
Returns:
- similarity: Cosine similarity between the two arrays
"""
dot_product = np.dot(array1, array2)
norm_array1 = np.linalg.norm(array1)
norm_array2 = np.linalg.norm(array2)
similarity = dot_product / (norm_array1 * norm_array2)
return similarity
def compute_and_plot_pca(data1, data2, labels=None, fname='pca'):
"""
Compute and plot Principal Component Analysis (PCA) for a given dataset with 2 components.
Parameters:
- data: Input dataset
- labels: Labels for data points (optional)
Returns:
- pca_result: Result of PCA transformation
"""
scaler1 = StandardScaler()
data_standardized1 = scaler1.fit_transform(data1)
scaler2 = StandardScaler()
data_standardized2 = scaler2.fit_transform(data2)
pca1 = PCA(n_components=2)
pca_result1 = pca1.fit_transform(data_standardized1)
pca2 = PCA(n_components=2)
pca_result2 = pca2.fit_transform(data_standardized2)
pca_result = np.concatenate([pca_result1, pca_result2])
unique_labels = np.unique(labels) if labels is not None else [None]
plt.figure(figsize=(8, 6))
for unique_label in unique_labels:
mask = (labels == unique_label) if labels is not None else slice(None)
plt.scatter(pca_result[mask, 0], pca_result[mask, 1], label=unique_label)
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
if labels is not None:
plt.legend()
os.makedirs("figures/", exist_ok=True)
plt.savefig(f"figures/{fname}.pdf", bbox_inches='tight')
return pca_result
def compute_and_plot_tsne(data1, data2, labels=None, fname='tsne'):
"""
Compute and plot t-SNE for a given standardized dataset with 2 components.
Parameters:
- data: Input dataset
- labels: Labels for data points (optional)
Returns:
- tsne_result: Result of t-SNE transformation
"""
scaler1 = StandardScaler()
data_standardized1 = scaler1.fit_transform(data1)
tsne1 = TSNE(n_components=2)
tsne_result1 = tsne1.fit_transform(data_standardized1)
scaler2 = StandardScaler()
data_standardized2 = scaler2.fit_transform(data2)
tsne2 = TSNE(n_components=2)
tsne_result2 = tsne2.fit_transform(data_standardized2)
tsne_result = np.concatenate([tsne_result1, tsne_result2])
unique_labels = np.unique(labels) if labels is not None else [None]
plt.figure(figsize=(8, 6))
for unique_label in unique_labels:
mask = (labels == unique_label) if labels is not None else slice(None)
plt.scatter(tsne_result[mask, 0], tsne_result[mask, 1], label=unique_label)
plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')
if labels is not None:
plt.legend()
plt.savefig(f"figures/{fname}.pdf", bbox_inches='tight')
return tsne_result
def compute_and_plot_umap(data1, data2, labels=None, fname='umap'):
"""
Compute and plot UMAP for a given standardized dataset with 2 components.
Parameters:
- data: Input dataset
- labels: Labels for data points (optional)
Returns:
- umap_result: Result of UMAP transformation
"""
scaler1 = StandardScaler()
data_standardized1 = scaler1.fit_transform(data1)
umap_model1 = umap.UMAP(n_components=2)
umap_result1 = umap_model1.fit_transform(data_standardized1)
scaler2 = StandardScaler()
data_standardized2 = scaler2.fit_transform(data2)
umap_model2 = umap.UMAP(n_components=2)
umap_result2 = umap_model2.fit_transform(data_standardized2)
umap_result = np.concatenate([umap_result1, umap_result2])
unique_labels = np.unique(labels) if labels is not None else [None]
plt.figure(figsize=(8, 6))
for unique_label in unique_labels:
mask = (labels == unique_label) if labels is not None else slice(None)
plt.scatter(umap_result[mask, 0], umap_result[mask, 1], label=unique_label)
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
if labels is not None:
plt.legend()
plt.savefig(f"figures/{fname}.pdf", bbox_inches='tight')
return umap_result
def prepare_data_tom(mode):
dataset_splits = make_splits('config/dataset_splits_new.json')
d_flag = False
d_move_flag = False
if mode == 'train':
data = [GameParser(f,d_flag,1,7,d_move_flag) for f in tqdm(dataset_splits['training'])]
data += [GameParser(f,d_flag,2,7,d_move_flag) for f in tqdm(dataset_splits['training'])]
elif mode == 'test':
data = [GameParser(f,d_flag,1,7,d_move_flag) for f in tqdm(dataset_splits['test'])]
data += [GameParser(f,d_flag,2,7,d_move_flag) for f in tqdm(dataset_splits['test'])]
else:
raise ValueError('train or test are supported')
tom6repr = []
tom7repr = []
tom8repr = []
tom6labels = []
tom7labels = []
tom8labels = []
for game in data:
_, _, _, q, _, _, interm, _ = zip(*list(game))
interm = np.array(interm)
# intermediate = np.concatenate([ToM6,ToM7,ToM8,DAct,DMove])
tom6, tom7, tom8, _, _ = np.split(interm, np.cumsum([1024] * 5)[:-1], axis=1)
q = [parse_q(x, game) for x in q]
q, l = zip(*q)
indexes = [idx for idx, element in enumerate(l) if element is not None]
tom6repr.append(tom6[indexes])
tom7repr.append(tom7[indexes])
tom8repr.append(tom8[indexes])
l = [item[1] for item in l if item is not None]
tom6labels.append([['NO', 'MAYBE', 'YES'].index(item[0]) for item in l])
tom7labels.append([['NO', 'MAYBE', 'YES'].index(item[1]) for item in l])
tom8labels.append([game.materials_dict[item[2]] if item[2] in game.materials_dict else 0 for item in l])
tom6labels = sum(tom6labels, [])
tom7labels = sum(tom7labels, [])
tom8labels = sum(tom8labels, [])
return np.concatenate(tom6repr), tom6labels, np.concatenate(tom7repr), tom7labels, np.concatenate(tom8repr), tom8labels
def prepare_data_cpa(mode, experiment):
dataset_splits = make_splits('config/dataset_splits_new.json')
d_flag = False
d_move_flag = False
if mode == "train":
data = [GameParser(f,d_flag,1,7,d_move_flag) for f in tqdm(dataset_splits['training'])]
data += [GameParser(f,d_flag,2,7,d_move_flag) for f in tqdm(dataset_splits['training'])]
if experiment == 2:
with open('XXX', 'rb') as f:
feats = pickle.load(f)
elif experiment == 3:
with open('XXX', 'rb') as f:
feats = pickle.load(f)
else: raise ValueError
elif mode == "test":
data = [GameParser(f,d_flag,1,7,d_move_flag) for f in tqdm(dataset_splits['test'])]
data += [GameParser(f,d_flag,2,7,d_move_flag) for f in tqdm(dataset_splits['test'])]
if experiment == 2:
with open('XXX', 'rb') as f:
feats = pickle.load(f)
elif experiment == 3:
with open('XXX', 'rb') as f:
feats = pickle.load(f)
else: raise ValueError
else:
raise ValueError('train or test are supported')
tom6labels = []
tom7labels = []
tom8labels = []
features = [item[0] for item in feats]
game_names = [item[1] for item in feats]
selected_feats = []
for i, game in enumerate(data):
_, _, _, q, _, _, _, _ = zip(*list(game))
q = [parse_q(x, game) for x in q]
q, l = zip(*q)
indexes = [idx for idx, element in enumerate(l) if element is not None]
assert game.game_path.split("/")[-1] == game_names[i].split("/")[-1]
selected_feats.append(features[i][indexes])
l = [item[1] for item in l if item is not None]
tom6labels.append([['NO', 'MAYBE', 'YES'].index(item[0]) for item in l])
tom7labels.append([['NO', 'MAYBE', 'YES'].index(item[1]) for item in l])
tom8labels.append([game.materials_dict[item[2]] if item[2] in game.materials_dict else 0 for item in l])
tom6labels = sum(tom6labels, [])
tom7labels = sum(tom7labels, [])
tom8labels = sum(tom8labels, [])
selected_feats = np.concatenate(selected_feats)
return selected_feats, tom6labels, tom7labels, tom8labels
def fit_and_test_LR(X_train, y_train, X_test, y_test, max_iter=100):
logreg_model = LogisticRegression(max_iter=max_iter)
logreg_model.fit(X_train, y_train)
y_pred = logreg_model.predict(X_test)
f1 = f1_score(y_test, y_pred, average="weighted", zero_division=1)
# classification_report_output = classification_report(y_test, y_pred)
print("F1 score:", f1)
# print("Classification Report:\n", classification_report_output)
return logreg_model
def fit_and_test_RF(X_train, y_train, X_test, y_test, n_estimators):
model = RandomForestClassifier(n_estimators=n_estimators)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred, average="weighted", zero_division=1)
print("F1 score:", f1)
return model
def wilcoxon_test(model1, model2, X_test_1, X_test_2):
probabilities_model1 = model1.predict_proba(X_test_1)[:, 1]
probabilities_model2 = model2.predict_proba(X_test_2)[:, 1]
differences = probabilities_model1 - probabilities_model2
_, p_value_wilcoxon = wilcoxon(differences)
print("Wilcoxon signed-rank test p-value:", p_value_wilcoxon)
return p_value_wilcoxon
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--task", type=str)
parser.add_argument("--seed", type=int)
parser.add_argument("--experiment", type=int)
args = parser.parse_args()
set_seed(args.seed)
task = args.task
experiment = args.experiment
if task == "tom":
tom6_train_x, tom6_train_labels, tom7_train_x, tom7_train_labels, tom8_train_x, tom8_train_labels = prepare_data_tom("train")
tom6_test_x, tom6_test_labels, tom7_test_x, tom7_test_labels, tom8_test_x, tom8_test_labels = prepare_data_tom("test")
print("=========== EXP 6 ========================================")
# 0.6056079527261083
fit_and_test_LR(tom6_train_x, tom6_train_labels, tom6_test_x, tom6_test_labels, 100)
print("=========== EXP 7 ========================================")
# 0.5090737845776365
fit_and_test_LR(tom7_train_x, tom7_train_labels, tom7_test_x, tom7_test_labels, 100)
print("=========== EXP 8 ========================================")
# 0.10206891928130866
fit_and_test_LR(tom8_train_x, tom8_train_labels, tom8_test_x, tom8_test_labels, 6)
breakpoint()
elif task == "cpa":
train_x, tom6_train_labels, tom7_train_labels, tom8_train_labels = prepare_data_cpa("train", experiment)
test_x, tom6_test_labels, tom7_test_labels, tom8_test_labels = prepare_data_cpa("test", experiment)
print("=========== EXP 6 ========================================")
# 0.5157497361676466 139
fit_and_test_LR(train_x, tom6_train_labels, test_x, tom6_test_labels, 139 if experiment == 2 else 11)
print("=========== EXP 7 ========================================")
# 0.49755418256915795 307
fit_and_test_LR(train_x, tom7_train_labels, test_x, tom7_test_labels, 307 if experiment == 2 else 25)
print("=========== EXP 8 ========================================")
# 0.14099639490943838 23
fit_and_test_LR(train_x, tom8_train_labels, test_x, tom8_test_labels, 23 if experiment == 2 else 9)
breakpoint()
elif task == "random":
tom6_train_x, tom6_train_labels, tom7_train_x, tom7_train_labels, tom8_train_x, tom8_train_labels = prepare_data_tom("train")
tom6_test_x, tom6_test_labels, tom7_test_x, tom7_test_labels, tom8_test_x, tom8_test_labels = prepare_data_tom("test")
tom6_train_x = np.random.randn(*tom6_train_x.shape) * 0.1
tom7_train_x = np.random.randn(*tom7_train_x.shape) * 0.1
tom8_train_x = np.random.randn(*tom8_train_x.shape) * 0.1
tom6_test_x = np.random.randn(*tom6_test_x.shape) * 0.1
tom7_test_x = np.random.randn(*tom7_test_x.shape) * 0.1
tom8_test_x = np.random.randn(*tom8_test_x.shape) * 0.1
print("=========== EXP 6 ========================================")
# 0.4573518645097593
fit_and_test_LR(tom6_train_x, tom6_train_labels, tom6_test_x, tom6_test_labels, 100)
print("=========== EXP 7 ========================================")
# 0.45066310491597705
fit_and_test_LR(tom7_train_x, tom7_train_labels, tom7_test_x, tom7_test_labels, 100)
print("=========== EXP 8 ========================================")
# 0.09281225255303022
fit_and_test_LR(tom8_train_x, tom8_train_labels, tom8_test_x, tom8_test_labels, 100)
breakpoint()
elif task == "all":
############## TOM
print("############## TOM")
tom6_train_x_tom, tom6_train_labels_tom, tom7_train_x_tom, tom7_train_labels_tom, tom8_train_x_tom, tom8_train_labels_tom = prepare_data_tom("train")
tom6_test_x_tom, tom6_test_labels_tom, tom7_test_x_tom, tom7_test_labels_tom, tom8_test_x_tom, tom8_test_labels_tom = prepare_data_tom("test")
print("=========== EXP 6 ========================================")
model_tom_6 = fit_and_test_LR(tom6_train_x_tom, tom6_train_labels_tom, tom6_test_x_tom, tom6_test_labels_tom, 100)
print("=========== EXP 7 ========================================")
model_tom_7 = fit_and_test_LR(tom7_train_x_tom, tom7_train_labels_tom, tom7_test_x_tom, tom7_test_labels_tom, 100)
print("=========== EXP 8 ========================================")
model_tom_8 = fit_and_test_LR(tom8_train_x_tom, tom8_train_labels_tom, tom8_test_x_tom, tom8_test_labels_tom, 6)
############## CPA
print("############## CPA")
train_x_cpa, tom6_train_labels_cpa, tom7_train_labels_cpa, tom8_train_labels_cpa = prepare_data_cpa("train", experiment)
test_x_cpa, tom6_test_labels_cpa, tom7_test_labels_cpa, tom8_test_labels_cpa = prepare_data_cpa("test", experiment)
print("=========== EXP 6 ========================================")
model_cpa_6 = fit_and_test_LR(train_x_cpa, tom6_train_labels_cpa, test_x_cpa, tom6_test_labels_cpa, 139)
print("=========== EXP 7 ========================================")
model_cpa_7 = fit_and_test_LR(train_x_cpa, tom7_train_labels_cpa, test_x_cpa, tom7_test_labels_cpa, 307)
print("=========== EXP 8 ========================================")
model_cpa_8 = fit_and_test_LR(train_x_cpa, tom8_train_labels_cpa, test_x_cpa, tom8_test_labels_cpa, 23)
############## RANDOM
print("############## RANDOM")
tom6_train_x_rand = np.random.randn(*tom6_train_x_tom.shape) * 0.1
tom7_train_x_rand = np.random.randn(*tom7_train_x_tom.shape) * 0.1
tom8_train_x_rand = np.random.randn(*tom8_train_x_tom.shape) * 0.1
tom6_test_x_rand = np.random.randn(*tom6_test_x_tom.shape) * 0.1
tom7_test_x_rand = np.random.randn(*tom7_test_x_tom.shape) * 0.1
tom8_test_x_rand = np.random.randn(*tom8_test_x_tom.shape) * 0.1
print("=========== EXP 6 ========================================")
model_rand_6 = fit_and_test_LR(tom6_train_x_rand, tom6_train_labels_tom, tom6_test_x_rand, tom6_test_labels_tom, 100)
print("=========== EXP 7 ========================================")
model_rand_7 = fit_and_test_LR(tom7_train_x_rand, tom7_train_labels_tom, tom7_test_x_rand, tom7_test_labels_tom, 100)
print("=========== EXP 8 ========================================")
model_rand_8 = fit_and_test_LR(tom8_train_x_rand, tom8_train_labels_tom, tom8_test_x_rand, tom8_test_labels_tom, 100)
wilcoxon_test(model_tom_6, model_cpa_6, tom6_test_x_tom, test_x_cpa)
wilcoxon_test(model_rand_6, model_cpa_6, tom6_test_x_rand, test_x_cpa)
wilcoxon_test(model_rand_6, model_tom_6, tom6_test_x_rand, tom6_test_x_tom)
wilcoxon_test(model_tom_7, model_cpa_7, tom7_test_x_tom, test_x_cpa)
wilcoxon_test(model_rand_7, model_cpa_7, tom7_test_x_rand, test_x_cpa)
wilcoxon_test(model_rand_7, model_tom_7, tom7_test_x_rand, tom7_test_x_tom)
wilcoxon_test(model_tom_8, model_cpa_8, tom8_test_x_tom, test_x_cpa)
wilcoxon_test(model_rand_8, model_cpa_8, tom8_test_x_rand, test_x_cpa)
wilcoxon_test(model_rand_8, model_tom_8, tom8_test_x_rand, tom8_test_x_tom)
scaler = StandardScaler()
scaled_tom6_test_x_tom = scaler.fit_transform(tom6_test_x_tom)
scaler = StandardScaler()
scaled_tom7_test_x_tom = scaler.fit_transform(tom7_test_x_tom)
scaler = StandardScaler()
scaled_tom8_test_x_tom = scaler.fit_transform(tom8_test_x_tom)
scaler = StandardScaler()
scaled_test_x_cpa = scaler.fit_transform(test_x_cpa)
sim6 = [cosine_similarity(t, c) for t, c in zip(scaled_tom6_test_x_tom, scaled_test_x_cpa)]
sim7 = [cosine_similarity(t, c) for t, c in zip(scaled_tom7_test_x_tom, scaled_test_x_cpa)]
sim8 = [cosine_similarity(t, c) for t, c in zip(scaled_tom8_test_x_tom, scaled_test_x_cpa)]
print(f"[tom6] max sim: {np.max(sim6)}, mean sim: {np.mean(sim6)}")
print(f"[tom7] max sim: {np.max(sim7)}, mean sim: {np.mean(sim7)}")
print(f"[tom8] max sim: {np.max(sim8)}, mean sim: {np.mean(sim8)}")
breakpoint()
else:
raise ValueError