Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNMP: Add plugin entry point #3304

Merged
merged 3 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions monkey/agent_plugins/exploiters/snmp/src/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
from functools import partial
from pprint import pformat
from typing import Any, Dict, Sequence

# common imports
from common.event_queue import IAgentEventPublisher
from common.types import AgentID, Event
from common.utils.code_utils import del_key

# dependencies to get rid of or internalize
from infection_monkey.exploit import IAgentBinaryRepository, IAgentOTPProvider
from infection_monkey.exploit.tools import all_udp_ports_are_closed
from infection_monkey.exploit.tools.http_agent_binary_server import start_agent_binary_server
from infection_monkey.i_puppet import ExploiterResultData, TargetHost
from infection_monkey.network import TCPPortSelector
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository

logger = logging.getLogger(__name__)


class StubSNMPOptions:
def __init__(self, *args, **kwargs):
pass


class StubSNMPExploiter:
def __init__(self, *args, **kwargs):
pass

def exploit_host(self, *args, **kwargs):
pass


class StubSNMPExploitClient:
def __init__(self, *args, **kwargs):
pass


SNMP_PORTS = [161]


def should_attempt_exploit(host: TargetHost) -> bool:
return not all_udp_ports_are_closed(host, SNMP_PORTS)


class Plugin:
def __init__(
self,
*,
plugin_name: str,
agent_id: AgentID,
agent_event_publisher: IAgentEventPublisher,
agent_binary_repository: IAgentBinaryRepository,
propagation_credentials_repository: IPropagationCredentialsRepository,
tcp_port_selector: TCPPortSelector,
otp_provider: IAgentOTPProvider,
**kwargs,
):
exploit_client = StubSNMPExploitClient(agent_id, agent_event_publisher)
agent_binary_server_factory = partial(
start_agent_binary_server,
agent_binary_repository=agent_binary_repository,
tcp_port_selector=tcp_port_selector,
)

self._snmp_exploiter = StubSNMPExploiter(
agent_id,
exploit_client,
agent_binary_server_factory,
propagation_credentials_repository,
otp_provider,
)

def run(
self,
*,
host: TargetHost,
servers: Sequence[str],
current_depth: int,
options: Dict[str, Any],
interrupt: Event,
**kwargs,
) -> ExploiterResultData:
# HTTP ports options are hack because they are needed in fingerprinters
del_key(options, "http_ports")

try:
logger.debug(f"Parsing options: {pformat(options)}")
snmp_options = StubSNMPOptions(**options)
except Exception as err:
msg = f"Failed to parse SNMP options: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)

if not should_attempt_exploit(host):
msg = f"Host {host.ip} has no open SNMP ports"
logger.debug(msg)
return ExploiterResultData(
exploitation_success=False, propagation_success=False, error_message=msg
)

try:
logger.debug(f"Running SNMP exploiter on host {host.ip}")
return self._snmp_exploiter.exploit_host(
host, servers, current_depth, snmp_options, interrupt
)
except Exception as err:
msg = f"An unexpected exception occurred while attempting to exploit host: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)
2 changes: 1 addition & 1 deletion monkey/infection_monkey/exploit/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
from .i_remote_access_client_factory import IRemoteAccessClientFactory
from .brute_force_credentials_provider import BruteForceCredentialsProvider
from .brute_force_exploiter import BruteForceExploiter
from .utils import all_exploitation_ports_are_closed
from .utils import all_exploitation_ports_are_closed, all_udp_ports_are_closed
13 changes: 13 additions & 0 deletions monkey/infection_monkey/exploit/tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
from typing import Sequence

from common.types import NetworkPort
from infection_monkey.i_puppet import TargetHost


def all_exploitation_ports_are_closed(host: TargetHost, exploitation_ports: Sequence[int]) -> bool:
closed_tcp_ports = host.ports_status.tcp_ports.closed
return all([p in closed_tcp_ports for p in exploitation_ports])


def all_udp_ports_are_closed(host: TargetHost, udp_ports: Sequence[NetworkPort]) -> bool:
"""
Check if all UDP ports in a given sequence are closed on the host

:param host: The host to check
:param udp_ports: The sequence of UDP ports to check
:return: True if all ports are closed, False otherwise
"""
closed_udp_ports = host.ports_status.udp_ports.closed
return all([p in closed_udp_ports for p in udp_ports])
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
from ipaddress import IPv4Address
from typing import List

import pytest

from common import OperatingSystem
from common.types import PortStatus
from infection_monkey.exploit.tools import all_exploitation_ports_are_closed
from common.types import NetworkPort, PortStatus
from infection_monkey.exploit.tools import (
all_exploitation_ports_are_closed,
all_udp_ports_are_closed,
)
from infection_monkey.i_puppet import PortScanData, PortScanDataDict, TargetHost, TargetHostPorts

TARGET_IP = IPv4Address("127.0.0.1")
PORTS_STATUS = TargetHostPorts(
tcp_ports=PortScanDataDict(
{
5000: PortScanData(port=5000, status=PortStatus.OPEN),
5001: PortScanData(port=5001, status=PortStatus.CLOSED),
6000: PortScanData(port=6000, status=PortStatus.OPEN),
6001: PortScanData(port=6001, status=PortStatus.CLOSED),
NetworkPort(5000): PortScanData(port=5000, status=PortStatus.OPEN),
NetworkPort(5001): PortScanData(port=5001, status=PortStatus.CLOSED),
NetworkPort(6000): PortScanData(port=6000, status=PortStatus.OPEN),
NetworkPort(6001): PortScanData(port=6001, status=PortStatus.CLOSED),
}
)
),
udp_ports=PortScanDataDict(
{
NetworkPort(7000): PortScanData(port=7000, status=PortStatus.OPEN),
NetworkPort(7001): PortScanData(port=7001, status=PortStatus.CLOSED),
NetworkPort(8000): PortScanData(port=8000, status=PortStatus.OPEN),
NetworkPort(8001): PortScanData(port=8001, status=PortStatus.CLOSED),
}
),
)


Expand All @@ -29,36 +41,51 @@ def target_host() -> TargetHost:
)


def test_all_exploitation_ports_open(target_host):
exploitation_ports = [5000, 6000]

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)


def test_some_exploitation_ports_open(target_host):
exploitation_ports = [5000, 5001, 6000, 6001]
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5000, 6000]), (all_udp_ports_are_closed, [7000, 8000])],
)
def test_all_exploitation_ports_open(target_host, all_ports_are_closed, ports_to_check):
assert not all_ports_are_closed(target_host, ports_to_check)

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)

@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[
(all_exploitation_ports_are_closed, [5000, 5001, 6000, 6001]),
(all_udp_ports_are_closed, [7000, 7001, 8000, 8001]),
],
)
def test_some_exploitation_ports_open(target_host, all_ports_are_closed, ports_to_check):
assert not all_ports_are_closed(target_host, ports_to_check)

def test_all_exploitation_ports_closed(target_host):
exploitation_ports = [5001, 6001]

assert all_exploitation_ports_are_closed(target_host, exploitation_ports)
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5001, 6001]), (all_udp_ports_are_closed, [7001, 8001])],
)
def test_all_exploitation_ports_closed(target_host, all_ports_are_closed, ports_to_check):
assert all_ports_are_closed(target_host, ports_to_check)


def test_no_exploitation_ports_exist(target_host):
exploitation_ports = []
@pytest.mark.parametrize(
"all_ports_are_closed", [(all_exploitation_ports_are_closed), (all_udp_ports_are_closed)]
)
def test_no_exploitation_ports_exist(target_host, all_ports_are_closed):
exploitation_ports: List[NetworkPort] = []

assert all_exploitation_ports_are_closed(target_host, exploitation_ports)
assert all_ports_are_closed(target_host, exploitation_ports)


def test_host_has_no_ports_status():
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5000, 5001]), (all_udp_ports_are_closed, [7000, 7001])],
)
def test_host_has_no_ports_status(all_ports_are_closed, ports_to_check):
target_host = TargetHost(
ip=TARGET_IP,
operating_system=OperatingSystem.WINDOWS,
ports_status=PortScanDataDict({}),
)
exploitation_ports = [5000, 5001]

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)
assert not all_ports_are_closed(target_host, ports_to_check)