Mapping MITRE ATT&CK to CVE using NLP, BERT and PyTorch

Ilay tertman
System Weakness
Published in
8 min readAug 4, 2023

--

Photo by Brecht Corbeel on Unsplash

TL;DR

In my work on a new CSPM (Cloud Security Posture Management) platform, I was given a task to correlate a CVE impact with a MITRE ATT&CK tactic at a large scale. After further research, I implemented an NLP model with BERT and PyTorch and achieved an F1 score of 0.729757785467128, matching the description of a CVE to an ATT&CK tactic.

Why should you do it ?

ATT&CK tactics provide a more comprehensive and detailed form for defenders to investigate vulnerabilities and attack vectors.

Large CSPM platform companies like Orca Security have integrated this ability into their platforms.

CVE

CVE stands for Common Vulnerabilities and Exposures, it is a system for sharing publicly known cybersecurity vulnerabilities and exposures information. The CVE program is maintained by the MITRE Corporation, which was founded in 1999.

Each CVE record in the CVE project is associated with the following details:

  • CVE-ID — a unique identifier in this format: CVE-YYYY-NNNN. Note that the YYYY stands for the year the CVE was published, and the NNNN is an arbitrarily chosen digit combination.
  • Description — Brief description of the security vulnerability
  • References — Any pertinent references (i.e., vulnerability reports and advisories)
CVE-2022–22965 information in the MITRE CVE project
CVE-2022–22965 information in the MITRE CVE project

For more information, please visit the following link:

https://www.cve.org/About/Process#CVERecordLifecycle

MITRE ATT&CK tactics

Besides the CVE project, the MITRE corporation also maintains the MITRE ATT&CK® project, which is used as a database of adversary techniques and tactics based on real-world threat actors exploitation methods.

There are 14 main tactics in the project:

Each is later divided into smaller sub-techniques. The tactics represent the “why” of an ATT&CK technique or sub-technique. The techniques represent “how” an adversary achieves a tactical goal. In the model I built, I only focused on the main 14 tactics and didn’t take the sub-techniques into account.

The MITRE ATT&CK project home page

For more information, please visit the following link: https://attack.mitre.org/

BERT

Bert stands for Bidirectional Encoder Representations from Transformers. Introduced in 2018 by researchers at Google, it is a family of language models based on the transformer architecture. In contrast to the GPT (Generative Pre-trained Transformer) family of models developed by OpenAI, BERT is composed of Transformer encoder layers. BERT is pre-trained on the BooksCorpus dataset (800M words) and the English Wikipedia (2,500M words). We can Fine-tune the Bert model for many tasks like Named-entity recognition or Multi-label classification, which makes him useful for our task.

For more information, please visit the following link: https://arxiv.org/abs/1810.04805

Before we begin

In the CVE description tab, we can find useful information that can help us correlate the CVE with the ATT&CK tactic. For example, let’s look at CVE-2023–36538’s description:

“Improper access control in Zoom Rooms for Windows before version 5.15.0 may allow an authenticated user to enable an escalation of privilege via local access.”

As humans, we can infer that the tactic that matches the CVE is Privilege Escalation (TA0004).

Note that not all of the CVE descriptions are that easy to understand; for example, let’s look at the CVE-2017–20165 description:

“A vulnerability classified as problematic has been found in debug-js debug up to 3.0.x. This affects the function useColors of the file src/node.js. The manipulation of the argument str leads to inefficient regular expression complexity. Upgrading to version 3.1.0 is able to address this issue. The name of the patch is c38a0166c266a679c8de012d4eaccec3f944e685. It is recommended to upgrade the affected component. The identifier VDB-217665 was assigned to this vulnerability.”

This type of vulnerability description needs further investigation to determine its matching tactic.

The DataSet

The full Dataset can be found here: https://github.com/enisaeu/vuln-report/tree/master/data. The file “all.csv” holds all the information necessary for training the model.

The original file is pretty big and holds a lot of irrelevant data for our model. I pre-processed it using GPT-4 into the following format:

snapshot from Excel

You can do so using other AI tools, a Python script, or any other way you see fit.

The Model

import torch
import numpy as np
import pandas as pd
import shutil, sys
import transformers
from sklearn import metrics
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertModel, BertConfig

class CustomDataset(Dataset):

def __init__(self, dataframe, tokenizer, max_len,):
self.tokenizer = tokenizer
self.data = dataframe
self.title = dataframe['description']
self.targets = self.data.target_list
self.max_len = max_len

def __len__(self):
return len(self.title)

def __getitem__(self, index):
title = str(self.title[index])
title = " ".join(title.split(" "))

inputs = self.tokenizer.encode_plus(
title,
None,
add_special_tokens=True,
max_length=self.max_len,
padding='max_length',
return_token_type_ids=True,
truncation=True
)
ids = inputs['input_ids']
mask = inputs['attention_mask']
token_type_ids = inputs["token_type_ids"]


return {
'ids': torch.tensor(ids, dtype=torch.long),
'mask': torch.tensor(mask, dtype=torch.long),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
'targets': torch.tensor(self.targets[index], dtype=torch.float)
}

class BERTClass(torch.nn.Module):
def __init__(self):
super(BERTClass, self).__init__()
self.l1 = transformers.BertModel.from_pretrained('bert-base-uncased', return_dict=False)
self.l2 = torch.nn.Dropout(0.3)
self.l3 = torch.nn.Linear(768, 14)

def forward(self, ids, mask, token_type_ids):
_, output_1= self.l1(ids, attention_mask = mask, token_type_ids = token_type_ids)
output_2 = self.l2(output_1)
output = self.l3(output_2)
return output

def loss_fn(outputs, targets):
return torch.nn.BCEWithLogitsLoss()(outputs, targets)


def save_ckp(state, is_best, checkpoint_path, best_model_path):
"""
state: checkpoint we want to save
is_best: is this the best checkpoint; min validation loss
checkpoint_path: path to save checkpoint
best_model_path: path to save best model
"""
f_path = checkpoint_path
# save checkpoint data to the path given, checkpoint_path
torch.save(state['state_dict'], f_path)
# if it is a best model, min validation loss
if is_best:
best_fpath = best_model_path
# copy that checkpoint file to best path given, best_model_path
shutil.copyfile(f_path, best_fpath)

def load_ckp(checkpoint_fpath, model, optimizer):
"""
checkpoint_path: path to save checkpoint
model: model that we want to load checkpoint parameters into
optimizer: optimizer we defined in previous training
"""
# load checkpoint
checkpoint = torch.load(checkpoint_fpath)

# initialize state_dict from checkpoint to model
model.load_state_dict(checkpoint['state_dict'])

# initialize optimizer from checkpoint to optimizer
optimizer.load_state_dict(checkpoint['optimizer'])

# handle valid_loss_min based on its type
valid_loss_min = checkpoint['valid_loss_min']
if isinstance(valid_loss_min, torch.Tensor):
valid_loss_min = valid_loss_min.item()

# return model, optimizer, epoch value, min validation loss
return model, optimizer, checkpoint['epoch'], valid_loss_min


def train_model(start_epochs, n_epochs, valid_loss_min_input,
training_loader, validation_loader, model,
optimizer, checkpoint_path, best_model_path):

# initialize tracker for minimum validation loss
valid_loss_min = valid_loss_min_input


for epoch in range(start_epochs, n_epochs+1):
train_loss = 0
valid_loss = 0

model.train()
print('############# Epoch {}: Training Start #############'.format(epoch))
for batch_idx, data in enumerate(training_loader):
#print('yyy epoch', batch_idx)
ids = data['ids'].to(device, dtype = torch.long)
mask = data['mask'].to(device, dtype = torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
targets = data['targets'].to(device, dtype = torch.float)

optimizer.zero_grad()
outputs = model(ids, mask, token_type_ids)

loss = loss_fn(outputs, targets)
if batch_idx%100==0:
print(f'Epoch: {epoch}, Training Loss: {loss.item()}')

loss.backward()
optimizer.step()
#print('before loss data in training', loss.item(), train_loss)
train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.item() - train_loss))
#print('after loss data in training', loss.item(), train_loss)

print('############# Epoch {}: Training End #############'.format(epoch))

print('############# Epoch {}: Validation Start #############'.format(epoch))
######################
# validate the model #
######################

model.eval()

print('############# Epoch {}: Validation End #############'.format(epoch))
# calculate average losses
#print('before cal avg train loss', train_loss)
train_loss = train_loss/len(training_loader)
valid_loss = valid_loss/len(validation_loader)
# print training/validation statistics
print('Epoch: {} \tAvgerage Training Loss: {:.6f} \tAverage Validation Loss: {:.6f}'.format(
epoch,
train_loss,
valid_loss
))

# create checkpoint variable and add important data
checkpoint = {
'epoch': epoch + 1,
'valid_loss_min': valid_loss,
'state_dict': model.state_dict(),
'optimizer': optimizer.state_dict()
}

# save checkpoint
save_ckp(checkpoint, False, checkpoint_path, best_model_path)

## TODO: save the model if validation loss has decreased
if valid_loss <= valid_loss_min:
print('Validation loss decreased ({:.6f} --> {:.6f}). Saving model ...'.format(valid_loss_min,valid_loss))
# save checkpoint as best model
# save_ckp(checkpoint, True, checkpoint_path, best_model_path)
valid_loss_min = valid_loss

print('############# Epoch {} Done #############\n'.format(epoch))


return model


def do_validation(dataloader):
model.eval()
fin_targets=[]
fin_outputs=[]
with torch.no_grad():
for _, data in enumerate(dataloader, 0):
ids = data['ids'].to(device, dtype = torch.long)
mask = data['mask'].to(device, dtype = torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
targets = data['targets'].to(device, dtype = torch.float)
outputs = model(ids, mask, token_type_ids)
fin_targets.extend(targets.cpu().detach().numpy().tolist())
fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
return fin_outputs, fin_targets


if __name__ == '__main__':

# If there's a GPU available...
if torch.cuda.is_available():

# Tell PyTorch to use the GPU.
device = torch.device("cuda")

print('There are %d GPU(s) available.' % torch.cuda.device_count())

print('We will use the GPU:', torch.cuda.get_device_name(0))

# If not...
else:
print('No GPU available, using the CPU instead.')
device = torch.device("cpu")

val_targets=[]
val_outputs=[]
MAX_LEN = 256
TRAIN_BATCH_SIZE = 32
VALID_BATCH_SIZE = 32
EPOCHS = 14
LEARNING_RATE = 1e-05
bucket='your bucket name'
train_data_location = 's3://{}/{}'.format(bucket, 'train.csv')
test_data_location = 's3://{}/{}'.format(bucket, 'test.csv')
train_df = pd.read_csv(train_data_location,on_bad_lines='skip')
test_df = pd.read_csv(test_data_location,on_bad_lines='skip')
select_labels = train_df.columns.values.tolist()[2:]
train_df['target_list'] = train_df[select_labels].values.tolist()
test_df['target_list'] = test_df[select_labels].values.tolist()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
training_set = CustomDataset(train_df, tokenizer, MAX_LEN)
validation_set = CustomDataset(test_df, tokenizer, MAX_LEN)
train_params = {'batch_size': TRAIN_BATCH_SIZE,
'shuffle': True,
'num_workers': 0
}

test_params = {'batch_size': VALID_BATCH_SIZE,
'shuffle': False,
'num_workers': 0
}

training_loader = DataLoader(training_set, **train_params)
validation_loader = DataLoader(validation_set, **test_params)
model = BERTClass()
model.to(device)
optimizer = torch.optim.Adam(params = model.parameters(), lr=LEARNING_RATE)
checkpoint_path = '/content/checkpoints/current_checkpoint.pt'
best_model = '/content/checkpoints/best_model.pt'
trained_model = train_model(1, EPOCHS, np.Inf, training_loader, validation_loader, model,
optimizer,checkpoint_path,best_model)

outputs, targets = do_validation(validation_loader)
val_preds = (np.array(outputs) > 0.5).astype(int)
val_targets = (np.array(targets) > 0.5).astype(int)
accuracy = metrics.accuracy_score(val_targets, val_preds)
f1_score_micro = metrics.f1_score(val_targets, val_preds, average='micro')
f1_score_macro = metrics.f1_score(val_targets, val_preds, average='macro')
print(f"Accuracy Score = {accuracy}")
print(f"F1 Score (Micro) = {f1_score_micro}")
print(f"F1 Score (Macro) = {f1_score_macro}")

Without going into much detail in the code, the model has three layers:

  • The pre-trained Bert model from the Transformers library in its basic form
  • A Dropout layer
  • A fully connected linear layer with 14 output neurons for each ATT&CK tactic

Each batch of data is pre-processed in the CustomDataset class by the BertTokenizer and later fed to the BertModel for further processing.

The Loss function used for the model is BCE logistic loss, and the optimizer used is the Adam optimizer.

I used Amazon SageMaker for training the model. The model was trained for 8 hours on an ml.p3.2xlarge instance and achieved an F1 score of 0.729757785467128.

The code is inspired by this amazing tutorial by Kyaw Khaung. I highly recommend you go and read it in more detail.

Other work

In my studies, I stumbled upon this project: https://github.com/center-for-threat-informed-defense/attack_to_cve

The author describes a detailed guide for solving the exact problem I was facing. Unfortunately, I needed an automatic way of doing so. The CVE project, as of 8/2/2023, stores 221285 CVEs. There is no way a single human can do this by hand, so I chose to use an NLP model for the task.

--

--