def optimize_matrix(
modified_embedding_length: int = 2048, # in my brief experimentation, bigger was better (2048 is length of babbage encoding)
batch_size: int = 100,
max_epochs: int = 100,
learning_rate: float = 100.0, # seemed to work best when similar to batch size - feel free to try a range of values
dropout_fraction: float = 0.0, # in my testing, dropout helped by a couple percentage points (definitely not necessary)
df: pd.DataFrame = df,
print_progress: bool = True,
save_results: bool = True,
) -> torch.tensor:
"""Return matrix optimized to minimize loss on training data."""
run_id = random.randint(0, 2 ** 31 - 1) # (range is arbitrary)
# convert from dataframe to torch tensors
# e is for embedding, s for similarity label
def tensors_from_dataframe(
df: pd.DataFrame,
embedding_column_1: str,
embedding_column_2: str,
similarity_label_column: str,
) -> Tuple[torch.tensor]:
e1 = np.stack(np.array(df[embedding_column_1].values))
e2 = np.stack(np.array(df[embedding_column_2].values))
s = np.stack(np.array(df[similarity_label_column].astype("float").values))
e1 = torch.from_numpy(e1).float()
e2 = torch.from_numpy(e2).float()
s = torch.from_numpy(s).float()
return e1, e2, s
e1_train, e2_train, s_train = tensors_from_dataframe(
df[df["dataset"] == "train"], "text_1_embedding", "text_2_embedding", "label"
)
e1_test, e2_test, s_test = tensors_from_dataframe(
df[df["dataset"] == "test"], "text_1_embedding", "text_2_embedding", "label"
)
# create dataset and loader
dataset = torch.utils.data.TensorDataset(e1_train, e2_train, s_train)
train_loader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=True
)
# define model (similarity of projected embeddings)
def model(embedding_1, embedding_2, matrix, dropout_fraction=dropout_fraction):
e1 = torch.nn.functional.dropout(embedding_1, p=dropout_fraction)
e2 = torch.nn.functional.dropout(embedding_2, p=dropout_fraction)
modified_embedding_1 = e1 @ matrix # @ is matrix multiplication
modified_embedding_2 = e2 @ matrix
similarity = torch.nn.functional.cosine_similarity(
modified_embedding_1, modified_embedding_2
)
return similarity
# define loss function to minimize
def mse_loss(predictions, targets):
difference = predictions - targets
return torch.sum(difference * difference) / difference.numel()
# initialize projection matrix
embedding_length = len(df["text_1_embedding"].values[0])
matrix = torch.randn(
embedding_length, modified_embedding_length, requires_grad=True
)
epochs, types, losses, accuracies, matrices = [], [], [], [], []
for epoch in range(1, 1 + max_epochs):
# iterate through training dataloader
for a, b, actual_similarity in train_loader:
# generate prediction
predicted_similarity = model(a, b, matrix)
# get loss and perform backpropagation
loss = mse_loss(predicted_similarity, actual_similarity)
loss.backward()
# update the weights
with torch.no_grad():
matrix -= matrix.grad * learning_rate
# set gradients to zero
matrix.grad.zero_()
# calculate test loss
test_predictions = model(e1_test, e2_test, matrix)
test_loss = mse_loss(test_predictions, s_test)
# compute custom embeddings and new cosine similarities
apply_matrix_to_embeddings_dataframe(matrix, df)
# calculate test accuracy
for dataset in ["train", "test"]:
data = df[df["dataset"] == dataset]
a, se = accuracy_and_se(data["cosine_similarity_custom"], data["label"])
# record results of each epoch
epochs.append(epoch)
types.append(dataset)
losses.append(loss.item() if dataset == "train" else test_loss.item())
accuracies.append(a)
matrices.append(matrix.detach().numpy())
# optionally print accuracies
if print_progress is True:
print(
f"Epoch {epoch}/{max_epochs}: {dataset} accuracy: {a:0.1%} ± {1.96 * se:0.1%}"
)
data = pd.DataFrame(
{"epoch": epochs, "type": types, "loss": losses, "accuracy": accuracies}
)
data["run_id"] = run_id
data["modified_embedding_length"] = modified_embedding_length
data["batch_size"] = batch_size
data["max_epochs"] = max_epochs
data["learning_rate"] = learning_rate
data["dropout_fraction"] = dropout_fraction
data[
"matrix"
] = matrices # saving every single matrix can get big; feel free to delete/change
if save_results is True:
data.to_csv(f"{run_id}_optimization_results.csv", index=False)
return data