135 lines
No EOL
5.9 KiB
Python
135 lines
No EOL
5.9 KiB
Python
import os, pdb
|
|
import re
|
|
import json
|
|
from enum import Enum
|
|
from tqdm import tqdm
|
|
from bs4 import BeautifulSoup
|
|
|
|
def read_json_file(filename):
|
|
with open(filename, 'r') as infile:
|
|
data = json.load(infile)
|
|
return data
|
|
|
|
def convert_string(string_or_list):
|
|
# Add escaping symbols to English quotes in string
|
|
if isinstance(string_or_list, str):
|
|
return string_or_list.replace('"', '\\"')
|
|
elif isinstance(string_or_list, list):
|
|
return [convert_string(s) for s in string_or_list]
|
|
|
|
def is_visible(element):
|
|
bounding_box = element.get('bounding_box_rect')
|
|
return bounding_box != "-1,-1,-1,-1"
|
|
|
|
def clean_text(text):
|
|
cleaned_text = text.strip()
|
|
cleaned_text = cleaned_text.replace('\n', ' ').replace('\t', ' ')
|
|
cleaned_text = re.sub(' +', ' ', cleaned_text)
|
|
return cleaned_text
|
|
|
|
def find_semantic_info(element):
|
|
element_text = clean_text(element.get_text(strip=True))
|
|
if element_text:
|
|
return element_text
|
|
|
|
label = element.find_previous(lambda x: x.name == 'label' and is_visible(x))
|
|
if label:
|
|
label_text = clean_text(label.get_text(strip=True))
|
|
if label_text:
|
|
return label_text
|
|
return None
|
|
|
|
def action_discription(ui_element_name, ui_element_text, operation_type, value):
|
|
ret_en = ""
|
|
if operation_type == "TYPE":
|
|
if ui_element_text != "":
|
|
ret_en += f'Type text "{value}" into {ui_element_name} with text "{ui_element_text}" on it'
|
|
else:
|
|
ret_en += f'Type text "{value}" into {ui_element_name}'
|
|
elif operation_type == "SELECT":
|
|
if ui_element_text != "":
|
|
ret_en += f'Select "{value}" from {ui_element_name} with text "{ui_element_text}" on it'
|
|
else:
|
|
ret_en += f'Select "{value}" from {ui_element_name}.'
|
|
elif operation_type == "CLICK":
|
|
if ui_element_text != "":
|
|
ret_en += f'Click the {ui_element_name} element with text "{ui_element_text}" on it'
|
|
else:
|
|
ret_en += f'Click the {ui_element_name} element'
|
|
return ret_en
|
|
|
|
def process_one_task(task):
|
|
base_info = {
|
|
"website_en": task["website"],
|
|
"domain_en": task["domain"],
|
|
"subdomain_en": task["subdomain"],
|
|
"annotation_id":task["annotation_id"],
|
|
"task_description": task["confirmed_task"],
|
|
"action_reprs" : task["action_reprs"]
|
|
}
|
|
action_descriptions_en = []
|
|
for action_index, action in enumerate(task["actions"]):
|
|
action_repr = task["action_reprs"][action_index]
|
|
ui_element, _ = action_repr.split(" -> ")
|
|
assert ui_element.count("] ")==1
|
|
ui_element_name, ui_element_text = ui_element.split("] ")
|
|
ui_element_name = ui_element_name[1:]
|
|
ui_element_text = ui_element_text.strip()
|
|
|
|
if ui_element_text == "":
|
|
raw_html = action["raw_html"]
|
|
soup2 = BeautifulSoup(raw_html, 'html.parser')
|
|
selected_element2 = soup2.find(attrs={"data_pw_testid_buckeye": action["action_uid"]})
|
|
|
|
ui_element_text = find_semantic_info(selected_element2)
|
|
if ui_element_text is not None:
|
|
ui_element_text = clean_text(ui_element_text)
|
|
task["action_reprs"][action_index] = f"[{ui_element_name}] {ui_element_text} -> {task['action_reprs'][action_index].split(' -> ')[1]}"
|
|
else:
|
|
print(f'Warning: {task["annotation_id"]}, can not find semantic info for {action["action_uid"]}')
|
|
|
|
action_description_en = action_discription(ui_element_name, ui_element_text, action["operation"]["op"], action["operation"]["value"])
|
|
action_descriptions_en.append(action_description_en)
|
|
|
|
base_info["task_subintention"] = action_descriptions_en
|
|
return base_info
|
|
|
|
if __name__ == "__main__":
|
|
for foldername in ['train','test_domain','test_website','test_task']:
|
|
SAVE_PATH = f"your-path-to-data/{foldername}"
|
|
|
|
for idx in range(100):
|
|
savejsonfilename = os.path.join(SAVE_PATH,f'{foldername}_{idx}_with_actions_description_insert.json')
|
|
if os.path.exists(savejsonfilename):
|
|
continue
|
|
else:
|
|
jsonfilename = f"{SAVE_PATH}/{foldername}_{idx}.json"
|
|
if not os.path.exists(jsonfilename):
|
|
break
|
|
dataset = read_json_file(jsonfilename)
|
|
|
|
Mind2Web_with_subintentions = []
|
|
for task in tqdm(dataset):
|
|
base_info = process_one_task(task)
|
|
Mind2Web_with_subintentions.append(base_info)
|
|
assert len(Mind2Web_with_subintentions) == len(dataset)
|
|
|
|
if 'test' in foldername:
|
|
with open(os.path.join(SAVE_PATH,f'{foldername}_{idx}_with_actions_description.json'), 'r') as json_file:
|
|
Mind2Web_with_subintentions_saved = json.load(json_file)
|
|
|
|
for i in range(len(Mind2Web_with_subintentions)):
|
|
if i>=len(Mind2Web_with_subintentions_saved):
|
|
break
|
|
if Mind2Web_with_subintentions[i] != Mind2Web_with_subintentions_saved[i]:
|
|
for key in Mind2Web_with_subintentions[i].keys():
|
|
if Mind2Web_with_subintentions[i][key] != Mind2Web_with_subintentions_saved[i][key]:
|
|
found = False
|
|
for j in range(len(Mind2Web_with_subintentions_saved)):
|
|
if Mind2Web_with_subintentions[i][key] == Mind2Web_with_subintentions_saved[j][key]:
|
|
found = True
|
|
break
|
|
if not found:
|
|
print(found, i, j, jsonfilename)
|
|
with open(savejsonfilename, 'w') as json_file:
|
|
json.dump(Mind2Web_with_subintentions, json_file) |