Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,14 @@ llm.model='gpt-3.5-turbo'
llm.context_size=16385

# how many rounds should this thing go?
max_turns = 20
max_turns = 20

# The following four parameters are only relevant for the usecase rag
# rag_database_folder_name: Name of the folder where the vector store will be saved.
# rag_embedding: The name of the embedding model used. Currently only OpenAI api supported.
# openai_api_key: API key that is used for the embedding model.
# rag_return_token_limit: The upper bound for the RAG output.
rag_database_folder_name = "vetorDB"
rag_embedding = "text-embedding-3-small"
openai_api_key = 'your-openai-key'
rag_return_token_limit = 1000
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ testing = ['pytest', 'pytest-mock']
dev = [
'ruff',
]
rag-usecase = [
'langchain-community',
'langchain-openai',
'markdown',
'chromadb',
'langchain-chroma',
]

[project.scripts]
wintermute = "hackingBuddyGPT.cli.wintermute:main"
Expand Down
7 changes: 5 additions & 2 deletions src/hackingBuddyGPT/capabilities/ssh_run_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ class SSHRunCommand(Capability):
timeout: int = 10

def describe(self) -> str:
return "give a command to be executed and I will respond with the terminal output when running this command over SSH on the linux machine. The given command must not require user interaction."
return "give a command to be executed and I will respond with the terminal output when running this command over SSH on the linux machine. The given command must not require user interaction. Do not use quotation marks in front and after your command."

def get_name(self):
return "exec_command"

def __call__(self, command: str) -> Tuple[str, bool]:
if command.startswith(self.get_name()):
cmd_parts = command.split(" ", 1)
command = cmd_parts[1]
if len(cmd_parts) == 1:
command = ""
else:
command = cmd_parts[1]

sudo_pass = Responder(
pattern=r"\[sudo\] password for " + self.conn.username + ":",
Expand Down
24 changes: 19 additions & 5 deletions src/hackingBuddyGPT/capabilities/ssh_test_credential.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from typing import Tuple

from paramiko.ssh_exception import SSHException
import paramiko

from hackingBuddyGPT.utils import SSHConnection
Expand All @@ -13,15 +13,29 @@ class SSHTestCredential(Capability):
conn: SSHConnection

def describe(self) -> str:
return "give credentials to be tested"
return "give credentials to be tested."

def get_name(self):
return "test_credential"

def __call__(self, username: str, password: str, keyfilename: str) -> Tuple[str, bool]:
test_conn = self.conn.new_with(username=username, password=password, keyfilename=keyfilename)
def __call__(self, username: str, password: str) -> Tuple[str, bool]:
test_conn = self.conn.new_with(username=username, password=password)
try:
test_conn.init()
for attempt in range(10):
try:
test_conn.init()
break;
except paramiko.ssh_exception.AuthenticationException:
return "Authentication error, credentials are wrong\n", False
except SSHException as e:
if attempt == 9:
raise
print("-------------------------------------------------------")
print(e)
print("Retrying")
print("-------------------------------------------------------")


user = test_conn.run("whoami")[0].strip("\n\r ")
if user == "root":
return "Login as root was successful\n", True
Expand Down
1 change: 1 addition & 0 deletions src/hackingBuddyGPT/usecases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .web import *
from .web_api_testing import *
from .viewer import *
from .rag import *
32 changes: 32 additions & 0 deletions src/hackingBuddyGPT/usecases/rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# ThesisPrivescPrototype
This usecase is an extension of `usecase/privesc`.

## Setup
### Depdendencies
Copy link

Copilot AI Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word 'Depdendencies' is misspelled. It should be corrected to 'Dependencies'.

Suggested change
### Depdendencies
### Dependencies

Copilot uses AI. Check for mistakes.
The needed dependencies can be downloaded with `pip install -e '.[rag-usecase]'`. If you encounter the error `unexpected keyword argument 'proxies'` after trying to start the usecase, try downgrading `httpx` to 0.27.2.
### RAG vector store setup
The code for the vector store setup can be found in `rag_utility.py`. Currently the vectore store uses two sources: `GTFObins` and `hacktricks`. To use RAG, download the markdown files and place them in `rag_storage/GTFObinMarkdownfiles` (`rag_storage/hacktricksMarkdownFiles`). You can download the markdown files either from the respective github repository ([GTFObin](https://github.com/GTFOBins/GTFOBins.github.io/tree/master), [hacktricks](https://github.com/HackTricks-wiki/hacktricks/tree/master/src/linux-hardening/privilege-escalation)) or scrape them from their website ([GTFObin](https://gtfobins.github.io/), [hacktricks](https://book.hacktricks.wiki/en/linux-hardening/privilege-escalation/index.html)).

Comment on lines +8 to +9
Copy link

Copilot AI Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word 'vectore' is misspelled. It should be corrected to 'vector'.

Suggested change
The code for the vector store setup can be found in `rag_utility.py`. Currently the vectore store uses two sources: `GTFObins` and `hacktricks`. To use RAG, download the markdown files and place them in `rag_storage/GTFObinMarkdownfiles` (`rag_storage/hacktricksMarkdownFiles`). You can download the markdown files either from the respective github repository ([GTFObin](https://github.com/GTFOBins/GTFOBins.github.io/tree/master), [hacktricks](https://github.com/HackTricks-wiki/hacktricks/tree/master/src/linux-hardening/privilege-escalation)) or scrape them from their website ([GTFObin](https://gtfobins.github.io/), [hacktricks](https://book.hacktricks.wiki/en/linux-hardening/privilege-escalation/index.html)).
The code for the vector store setup can be found in `rag_utility.py`. Currently the vector store uses two sources: `GTFObins` and `hacktricks`. To use RAG, download the markdown files and place them in `rag_storage/GTFObinMarkdownfiles` (`rag_storage/hacktricksMarkdownFiles`). You can download the markdown files either from the respective github repository ([GTFObin](https://github.com/GTFOBins/GTFOBins.github.io/tree/master), [hacktricks](https://github.com/HackTricks-wiki/hacktricks/tree/master/src/linux-hardening/privilege-escalation)) or scrape them from their website ([GTFObin](https://gtfobins.github.io/), [hacktricks](https://book.hacktricks.wiki/en/linux-hardening/privilege-escalation/index.html)).

Copilot uses AI. Check for mistakes.
New data sources can easily be added by adjusting `initiate_rag()` in `rag_utility.py`.

## Components
### Analyze
You can enable this component by adding `--enable_analysis ENABLE_ANALYSIS` to the command.

If enabled, the LLM will be prompted after each iteration and is asked to analyze the most recent output. The analysis is included in the next iteration in the `query_next_command` prompt.
### Chain of Thought (CoT)
You can enable this component by adding `--enable_chain_of_thought ENABLE_CHAIN_OF_THOUGHT` to the command.

If enabled, CoT is used to generate the next command. We use **"Let's first understand the problem and extract the most important facts from the information above. Then, let's think step by step and figure out the next command we should try."**
### Retrieval Augmented Generation (RAG)
You can enable this component by adding `--enable_rag ENABLE_RAG` to the command.

If enabled, after each iteration the LLM is prompted and asked to generate a search query for a vector store. The search query is then used to retrieve relevant documents from the vector store and the information is included in the prompt for the Analyze component (Only works if Analyze is enabled).
### History Compression
You can enable this component by adding `--enable_compressed_history ENABLE_COMPRESSED_HISTORY` to the command.

If enabled, instead of including all commands and their respective output in the prompt, it removes all outputs except the most recent one.
### Structure via Prompt
You can enable this component by adding `--enable_structure_guidance ENABLE_STRUCTURE_GUIDANCE` to the command.

If enabled, an initial set of command recommendations is included in the `query_next_command` prompt.
2 changes: 2 additions & 0 deletions src/hackingBuddyGPT/usecases/rag/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .linux import *
from .rag_utility import *
234 changes: 234 additions & 0 deletions src/hackingBuddyGPT/usecases/rag/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import datetime
import pathlib
import re
import os

from dataclasses import dataclass, field
from mako.template import Template
from typing import Any, Dict, Optional
from langchain_core.vectorstores import VectorStoreRetriever

from hackingBuddyGPT.capabilities import Capability
from hackingBuddyGPT.capabilities.capability import capabilities_to_simple_text_handler
from hackingBuddyGPT.usecases.agents import Agent
from hackingBuddyGPT.usecases.rag import rag_utility as rag_util
from hackingBuddyGPT.utils.logging import log_section, log_conversation
from hackingBuddyGPT.utils import llm_util
from hackingBuddyGPT.utils.cli_history import SlidingCliHistory

template_dir = pathlib.Path(__file__).parent / "templates"
template_next_cmd = Template(filename=str(template_dir / "query_next_command.txt"))
template_analyze = Template(filename=str(template_dir / "analyze_cmd.txt"))
template_chain_of_thought = Template(filename=str(template_dir / "chain_of_thought.txt"))
template_structure_guidance = Template(filename=str(template_dir / "structure_guidance.txt"))
template_rag = Template(filename=str(template_dir / "rag_prompt.txt"))


@dataclass
class ThesisPrivescPrototype(Agent):
system: str = ""
enable_analysis: bool = False
enable_update_state: bool = False
enable_compressed_history: bool = False
disable_history: bool = False
enable_chain_of_thought: bool = False
enable_structure_guidance: bool = False
enable_rag: bool = False
_rag_document_retriever: VectorStoreRetriever = None
hint: str = ""

_sliding_history: SlidingCliHistory = None
_capabilities: Dict[str, Capability] = field(default_factory=dict)
_template_params: Dict[str, Any] = field(default_factory=dict)
_max_history_size: int = 0
_analyze: str = ""
_structure_guidance: str = ""
_chain_of_thought: str = ""
_rag_text: str = ""

def before_run(self):
if self.hint != "":
self.log.status_message(f"[bold green]Using the following hint: '{self.hint}'")

if self.disable_history is False:
self._sliding_history = SlidingCliHistory(self.llm)

if self.enable_rag:
self._rag_document_retriever = rag_util.initiate_rag()

self._template_params = {
"capabilities": self.get_capability_block(),
"system": self.system,
"hint": self.hint,
"conn": self.conn,
"target_user": "root",
'structure_guidance': self.enable_structure_guidance,
'chain_of_thought': self.enable_chain_of_thought
}

if self.enable_structure_guidance:
self._structure_guidance = template_structure_guidance.source

if self.enable_chain_of_thought:
self._chain_of_thought = template_chain_of_thought.source

template_size = self.llm.count_tokens(template_next_cmd.source)
self._max_history_size = self.llm.context_size - llm_util.SAFETY_MARGIN - template_size

def perform_round(self, turn: int) -> bool:
# get the next command and run it
cmd, message_id = self.get_next_command()


if self.enable_chain_of_thought:
# command = re.findall("<command>(.*?)</command>", answer.result)
command = re.findall(r"<command>([\s\S]*?)</command>", cmd)

if len(command) > 0:
command = "\n".join(command)
cmd = command

# split if there are multiple commands
commands = self.split_into_multiple_commands(cmd)

cmds, result, got_root = self.run_command(commands, message_id)


# log and output the command and its result
if self._sliding_history:
if self.enable_compressed_history:
self._sliding_history.add_command_only(cmds, result)
else:
self._sliding_history.add_command(cmds, result)

if self.enable_rag:
query = self.get_rag_query(cmds, result)
relevant_documents = self._rag_document_retriever.invoke(query.result)
relevant_information = "".join([d.page_content + "\n" for d in relevant_documents])
self._rag_text = llm_util.trim_result_front(self.llm, int(os.environ['rag_return_token_limit']),
relevant_information)

# analyze the result..
if self.enable_analysis:
self.analyze_result(cmds, result)


# if we got root, we can stop the loop
return got_root

def get_chain_of_thought_size(self) -> int:
if self.enable_chain_of_thought:
return self.llm.count_tokens(self._chain_of_thought)
else:
return 0

def get_structure_guidance_size(self) -> int:
if self.enable_structure_guidance:
return self.llm.count_tokens(self._structure_guidance)
else:
return 0

def get_analyze_size(self) -> int:
if self.enable_analysis:
return self.llm.count_tokens(self._analyze)
else:
return 0

def get_rag_size(self) -> int:
if self.enable_rag:
return self.llm.count_tokens(self._rag_text)
else:
return 0

@log_conversation("Asking LLM for a new command...", start_section=True)
def get_next_command(self) -> tuple[str, int]:
history = ""
if not self.disable_history:
if self.enable_compressed_history:
history = self._sliding_history.get_commands_and_last_output(self._max_history_size - self.get_chain_of_thought_size() - self.get_structure_guidance_size() - self.get_analyze_size())
else:
history = self._sliding_history.get_history(self._max_history_size - self.get_chain_of_thought_size() - self.get_structure_guidance_size() - self.get_analyze_size())

self._template_params.update({
"history": history,
'CoT': self._chain_of_thought,
'analyze': self._analyze,
'guidance': self._structure_guidance
})

cmd = self.llm.get_response(template_next_cmd, **self._template_params)
message_id = self.log.call_response(cmd)

# return llm_util.cmd_output_fixer(cmd.result), message_id
return cmd.result, message_id


@log_conversation("Asking LLM for a search query...", start_section=True)
def get_rag_query(self, cmd, result):
ctx = self.llm.context_size
template_size = self.llm.count_tokens(template_rag.source)
target_size = ctx - llm_util.SAFETY_MARGIN - template_size
result = llm_util.trim_result_front(self.llm, target_size, result)

result = self.llm.get_response(template_rag, cmd=cmd, resp=result)
self.log.call_response(result)
return result

@log_section("Executing that command...")
def run_command(self, cmd, message_id) -> tuple[Optional[str], Optional[str], bool]:
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability)

cmds = ""
result = ""
got_root = False
for i, command in enumerate(cmd):
start_time = datetime.datetime.now()
success, *output = parser(command)
if not success:
self.log.add_tool_call(message_id, tool_call_id=0, function_name="", arguments=command, result_text=output[0], duration=0)
return cmds, output[0], False
assert len(output) == 1
capability, cmd_, (result_, got_root_) = output[0]
cmds += cmd_ + "\n"
result += result_ + "\n"
got_root = got_root or got_root_
duration = datetime.datetime.now() - start_time
self.log.add_tool_call(message_id, tool_call_id=i, function_name=capability, arguments=cmd_,
result_text=result_, duration=duration)

cmds = cmds.rstrip()
result = result.rstrip()
return cmds, result, got_root

@log_conversation("Analyze its result...", start_section=True)
def analyze_result(self, cmd, result):
ctx = self.llm.context_size

template_size = self.llm.count_tokens(template_analyze.source)
target_size = ctx - llm_util.SAFETY_MARGIN - template_size - self.get_rag_size()
result = llm_util.trim_result_front(self.llm, target_size, result)

result = self.llm.get_response(template_analyze, cmd=cmd, resp=result, rag_enabled=self.enable_rag, rag_text=self._rag_text, hint=self.hint)
self._analyze = result.result
self.log.call_response(result)

def split_into_multiple_commands(self, response: str):
ret = self.split_with_delimiters(response, ["test_credential", "exec_command"])

# strip trailing newlines
ret = [r.rstrip() for r in ret]

# remove first entry. For some reason its always empty
if len(ret) > 1:
ret = ret[1:]

# combine keywords with their corresponding input
if len(ret) > 1:
ret = [ret[i] + ret[i + 1] for i in range(0, len(ret) - 1, 2)]
return ret

def split_with_delimiters(self, input: str, delimiters):
# Create a regex pattern to match any of the delimiters
regex_pattern = f"({'|'.join(map(re.escape, delimiters))})"
# Use re.split to split the text while keeping the delimiters
return re.split(regex_pattern, input)
Loading
Loading