Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3411 memory utilization component #3566

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 117 additions & 4 deletions monkey/agent_plugins/payloads/cryptojacker/src/memory_utilizer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,135 @@
import logging
import time
from typing import Optional

import psutil

from common.agent_events import RAMConsumptionEvent
from common.event_queue import IAgentEventPublisher
from common.types import AgentID, PercentLimited
from common.utils.code_utils import PeriodicCaller

MEMORY_CONSUMPTION_CHECK_INTERVAL = 30
# If target memory consumption is within 2% of actual consumption, we'll consider it close enough.
MEMORY_CONSUMPTION_NOP_THRESHOLD = 0.02
# We don't want to ever use more then 90% of available memory, otherwise we risk impacting the
# victim machines performance
MEMORY_CONSUMPTION_SAFETY_LIMIT = 0.9

logger = logging.getLogger(__name__)


class MemoryUtilizer:
def __init__(
self,
memory_utilization: PercentLimited,
target_utilization: PercentLimited,
agent_id: AgentID,
agent_event_publisher: IAgentEventPublisher,
):
self._memory_utilization = memory_utilization
self._agent_id = agent_id
self._agent_event_publisher = agent_event_publisher
self._target_utilization = target_utilization
self._consumed_bytes = b""

self._periodic_caller = PeriodicCaller(
self.adjust_memory_utilization,
MEMORY_CONSUMPTION_CHECK_INTERVAL,
name="Cryptojacker.MemoryUtilizer",
)

@property
def consumed_bytes_size(self) -> int:
try:
return len(self._consumed_bytes)
except AttributeError:
# self._consumed_bytes was deleted and is currently being reinitialized return 0 while
# we wait.
return 0

def start(self):
pass
logger.debug("Starting MemoryUtilizer")
self._periodic_caller.start()

def adjust_memory_utilization(self):
try:
memory_to_consume = self._calculate_memory_to_consume()
self.consume_bytes(len(self._consumed_bytes) + memory_to_consume)
except RuntimeError as err:
logger.error("Failed to adjust memory utilization: %s", err)

def _calculate_memory_to_consume(self) -> int:
total_virtual_memory = psutil.virtual_memory().total
available_virtual_memory = psutil.virtual_memory().available
used_virtual_memory = psutil.Process().memory_info().vms

if used_virtual_memory > total_virtual_memory:
raise RuntimeError("Impossible system state: Used memory is greater than total memory")

ideal_memory_to_consume = int(
total_virtual_memory * self._target_utilization.as_decimal_fraction()
- used_virtual_memory
)
maximum_memory_to_consume = int(
(available_virtual_memory + used_virtual_memory) * MEMORY_CONSUMPTION_SAFETY_LIMIT
- used_virtual_memory
)

# We never want to consume 100% of available memory, otherwise the OS could kill this
# process or one of the user's mission-critical processes. This logic limits the amount of
# memory we consume to 90% of available memory.
return min(ideal_memory_to_consume, maximum_memory_to_consume)

def consume_bytes(self, bytes_: int):
logger.debug(
f"Currently consumed: {self.consumed_bytes_size} bytes - Target: {bytes_} bytes"
)

if not self._should_change_byte_consumption(bytes_):
logger.debug("Not adjusting memory consumption, as the difference is too small")
return

timestamp = time.time()
if bytes_ <= 0:
self._consumed_bytes = bytearray(0)
else:
# If len(self._consumed_bytes) > 50% of available RAM, we must delete it before
# reassigning it to a new bytearray. Otherwise, the new bytearray may be allocated to
# more than 50% of total RAM before the original byte array is garbage collected.
# This will cause this process to consume all available ram until the OS to kills this
# process or an out-of-memory error occurs.
del self._consumed_bytes
self._consumed_bytes = bytearray(bytes_)

self._publish_ram_consumption_event(timestamp)

def _should_change_byte_consumption(self, target_consumption_bytes_: int) -> bool:
if target_consumption_bytes_ <= 0:
if self.consumed_bytes_size == 0:
return False

return True

percent_difference = (
abs(self.consumed_bytes_size - target_consumption_bytes_) / target_consumption_bytes_
)
if percent_difference <= MEMORY_CONSUMPTION_NOP_THRESHOLD:
return False

return True

def _publish_ram_consumption_event(self, timestamp: float):
total_virtual_memory = psutil.virtual_memory().total
used_virtual_memory = psutil.Process().memory_info().vms

self._agent_event_publisher.publish(
RAMConsumptionEvent(
source=self._agent_id,
timestamp=timestamp,
bytes=used_virtual_memory,
utilization=(used_virtual_memory / total_virtual_memory) * 100,
)
)

def stop(self, timeout: Optional[float] = None):
pass
logger.debug("Stopping MemoryUtilizer")
self._periodic_caller.stop(timeout=timeout)
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from typing import Callable
from unittest.mock import MagicMock

import pytest
from agent_plugins.payloads.cryptojacker.src.memory_utilizer import (
MEMORY_CONSUMPTION_SAFETY_LIMIT,
MemoryUtilizer,
)

from common.event_queue import IAgentEventPublisher
from common.types import AgentID, PercentLimited

AGENT_ID = AgentID("9614480d-471b-4568-86b5-cb922a34ed8a")
TARGET_UTILIZATION = PercentLimited(50)
TARGET_UTILIZATION_BYTES = 4 * 1024 * 1024 # 4 MB


class MockMemoryInfo:
def __init__(self, vms: int):
self.vms = vms


class MockProcess:
def __init__(self, memory_info: MockMemoryInfo):
self._memory_info = memory_info

def memory_info(self):
return self._memory_info


class MockSystemVirtualMemory:
def __init__(self, total: int, available: int):
self.total = total
self.available = available


@pytest.fixture
def mock_agent_event_publisher() -> IAgentEventPublisher:
return MagicMock(spec=IAgentEventPublisher)


@pytest.fixture
def memory_utilizer(mock_agent_event_publisher) -> MemoryUtilizer:
return MemoryUtilizer(TARGET_UTILIZATION, AGENT_ID, mock_agent_event_publisher)


def set_system_virtual_memory(
monkeypatch, total_virtual_memory: int, available_virtual_memory: int
):
monkeypatch.setattr(
"agent_plugins.payloads.cryptojacker.src.memory_utilizer.psutil.virtual_memory",
lambda: MockSystemVirtualMemory(total_virtual_memory, available_virtual_memory),
)


def set_consumed_virtual_memory(monkeypatch, vms: int):
monkeypatch.setattr(
"agent_plugins.payloads.cryptojacker.src.memory_utilizer.psutil.Process",
lambda: MockProcess(MockMemoryInfo(vms)),
)


@pytest.mark.parametrize(
"parent_process_consumed_virtual_memory",
(
0,
int(TARGET_UTILIZATION_BYTES / 1),
int(TARGET_UTILIZATION_BYTES / 2),
int(TARGET_UTILIZATION_BYTES / 4),
),
)
def test_adjust_memory_utilization__raise_to_target(
monkeypatch: Callable,
memory_utilizer: MemoryUtilizer,
parent_process_consumed_virtual_memory: int,
):
set_consumed_virtual_memory(monkeypatch, parent_process_consumed_virtual_memory)

total_virtual_memory = int(TARGET_UTILIZATION_BYTES / TARGET_UTILIZATION.as_decimal_fraction())
set_system_virtual_memory(monkeypatch, total_virtual_memory, total_virtual_memory)

memory_utilizer.adjust_memory_utilization()
assert memory_utilizer.consumed_bytes_size == (
TARGET_UTILIZATION_BYTES - parent_process_consumed_virtual_memory
)


@pytest.mark.parametrize(
"parent_process_consumed_virtual_memory",
(
0,
int(TARGET_UTILIZATION_BYTES / 2),
int(TARGET_UTILIZATION_BYTES / 4),
),
)
def test_adjust_memory_utilization__drop_to_target(
monkeypatch: Callable,
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
parent_process_consumed_virtual_memory: int,
):
consumed_bytes = int(TARGET_UTILIZATION_BYTES * 1.5)
set_consumed_virtual_memory(
monkeypatch, parent_process_consumed_virtual_memory + consumed_bytes
)

total_virtual_memory = int(TARGET_UTILIZATION_BYTES / TARGET_UTILIZATION.as_decimal_fraction())
set_system_virtual_memory(monkeypatch, total_virtual_memory, total_virtual_memory)

# Instruct the memory utilizer to use more than the target so we can test its ability to reduce
# its memory consumption
memory_utilizer.consume_bytes(consumed_bytes)

# Adjust memory utilization so that the memory utilizer is consuming the appropriate amount of
# memory
memory_utilizer.adjust_memory_utilization()

assert memory_utilizer.consumed_bytes_size == (
TARGET_UTILIZATION_BYTES - parent_process_consumed_virtual_memory
)
assert mock_agent_event_publisher.publish.called


def test_adjust_memory_utilization__parent_process_over_limit(
monkeypatch: Callable,
memory_utilizer: MemoryUtilizer,
):
parent_process_consumed_virtual_memory = int(TARGET_UTILIZATION_BYTES * 1.25)
set_consumed_virtual_memory(monkeypatch, parent_process_consumed_virtual_memory)

total_virtual_memory = int(TARGET_UTILIZATION_BYTES / TARGET_UTILIZATION.as_decimal_fraction())
set_system_virtual_memory(monkeypatch, total_virtual_memory, total_virtual_memory)

# Adjust memory utilization so that the memory utilizer is consuming some memory
memory_utilizer.adjust_memory_utilization()

assert memory_utilizer.consumed_bytes_size == 0


def test_adjust_memory_utilization__used_gt_total(
monkeypatch: Callable,
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
):
total_virtual_memory = 1024
set_system_virtual_memory(monkeypatch, total_virtual_memory, total_virtual_memory)
set_consumed_virtual_memory(monkeypatch, total_virtual_memory * 2)

memory_utilizer.adjust_memory_utilization()

assert memory_utilizer.consumed_bytes_size == 0
assert not mock_agent_event_publisher.publish.called


@pytest.mark.parametrize(
"parent_process_consumed_virtual_memory,expected_consumed_bytes_size",
(
(0, 1509949),
(int(TARGET_UTILIZATION_BYTES / 2), 1300234),
(int(TARGET_UTILIZATION_BYTES / 4), 1405091),
),
)
def test_adjust_memory_utilization__limits(
monkeypatch: Callable,
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
parent_process_consumed_virtual_memory: int,
expected_consumed_bytes_size: int,
):
set_consumed_virtual_memory(monkeypatch, parent_process_consumed_virtual_memory)

total_virtual_memory = int(TARGET_UTILIZATION_BYTES / TARGET_UTILIZATION.as_decimal_fraction())
available_virtual_memory = total_virtual_memory * 0.2
set_system_virtual_memory(monkeypatch, total_virtual_memory, available_virtual_memory)

memory_utilizer.adjust_memory_utilization()

assert memory_utilizer.consumed_bytes_size == expected_consumed_bytes_size
assert mock_agent_event_publisher.publish.called, MEMORY_CONSUMPTION_SAFETY_LIMIT


def test_consume_bytes__publishes_event(
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
):
memory_utilizer.consume_bytes(1)
assert mock_agent_event_publisher.publish.call_count == 1


@pytest.mark.parametrize(
"bytes_to_consume_1,bytes_to_consume_2,expected_publish_count",
((0, 0, 0), (1024, 1025, 1), (1024, 1023, 1)),
)
def test_consume_bytes__no_change_publishes_no_event(
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
bytes_to_consume_1: int,
bytes_to_consume_2: int,
expected_publish_count: int,
):
memory_utilizer.consume_bytes(bytes_to_consume_1)
memory_utilizer.consume_bytes(bytes_to_consume_2)

assert mock_agent_event_publisher.publish.call_count == expected_publish_count


@pytest.mark.parametrize(
"bytes_to_consume_1,bytes_to_consume_2,expected_publish_count",
((0, 1, 1), (1, 0, 2), (0, 1025, 1), (1024, 2048, 2)),
)
def test_consume_bytes__change_publishes_event(
memory_utilizer: MemoryUtilizer,
mock_agent_event_publisher: IAgentEventPublisher,
bytes_to_consume_1: int,
bytes_to_consume_2: int,
expected_publish_count: int,
):
memory_utilizer.consume_bytes(bytes_to_consume_1)
memory_utilizer.consume_bytes(bytes_to_consume_2)

assert mock_agent_event_publisher.publish.call_count == expected_publish_count