Skip to content

Commit

Permalink
Add per-domain metrics
Browse files Browse the repository at this point in the history
Now that keys have a domain, we can implement per-domain
metrics.

Instantiate the `DefaultExecutor` with a metrics collector to collect
the metrics.

Keys without domain will use `default`.
  • Loading branch information
bisho committed Mar 12, 2024
1 parent 2c39568 commit 3f1f240
Showing 1 changed file with 62 additions and 3 deletions.
65 changes: 62 additions & 3 deletions src/meta_memcache/executors/default.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Callable, Dict, List, Optional, Tuple
from meta_memcache.metrics.base import BaseMetricsCollector, MetricDefinition

from meta_memcache_socket import (
RequestFlags,
Expand Down Expand Up @@ -34,18 +35,34 @@


class DefaultExecutor:

def __init__(
self,
serializer: BaseSerializer,
key_encoder_fn: Callable[[Key], bytes] = default_key_encoder,
raise_on_server_error: bool = True,
touch_ttl_to_consider_write_failure: Optional[int] = 50,
metrics_collector: Optional[BaseMetricsCollector] = None,
) -> None:
self._serializer = serializer
self._key_encoder_fn = key_encoder_fn
self._raise_on_server_error = raise_on_server_error
self._touch_ttl_to_consider_write_failure = touch_ttl_to_consider_write_failure
self.on_write_failure = WriteFailureEvent()
if metrics_collector:
labels = ("domain",)
metrics_collector.init_metrics(
namespace="cache",
metrics=[
MetricDefinition("read_hits", "Number of hits", labels),
MetricDefinition("read_misses", "Number of misses", labels),
MetricDefinition("read_bytes", "Size of writes", labels),
MetricDefinition("write_count", "Number of writes", labels),
MetricDefinition("write_bytes", "Size of writes", labels),
],
gauges=[],
)
self._metrics = metrics_collector

def _build_cmd(
self,
Expand Down Expand Up @@ -94,6 +111,36 @@ def _is_a_write_failure(
return True
return False

def _collect_metrics(
self,
command: MetaCommand,
key: Key,
value_size: Optional[int],
result: MemcacheResponse,
) -> None:
if not self._metrics:
return None

try:
labels = {"domain": key.domain or "default"}
if command == MetaCommand.META_GET:
if isinstance(result, Value):
self._metrics.metric_inc("hits", labels=labels)
if result.value:
self._metrics.metric_inc(
"read_bytes", value=len(result.value.value), labels=labels
)
elif isinstance(result, Miss):
self._metrics.metric_inc("misses", labels=labels)
if command == MetaCommand.META_SET:
self._metrics.metric_inc("write_count", labels=labels)
if value_size:
self._metrics.metric_inc(
"write_bytes", value=value_size, labels=labels
)
except Exception as e:
_log.exception(f"Error collecting metrics")

def exec_on_pool(
self,
pool: ConnectionPool,
Expand All @@ -120,7 +167,11 @@ def exec_on_pool(
value=cmd_value,
flags=flags,
)
return self._conn_recv_response(conn, flags=flags)
result = self._conn_recv_response(conn, flags=flags)
self._collect_metrics(
command, key, len(cmd_value) if cmd_value else None, result
)
return result
except Exception as e:
error = True
raise MemcacheServerError(pool.server, "Memcache error") from e
Expand Down Expand Up @@ -155,13 +206,15 @@ def exec_multi_on_pool( # noqa: C901
conn = pool.pop_connection()
error = False
try:
# with pool.get_connection() as conn:
value_sizes = {}
for key, value in key_values:
cmd_value, flags = (
(None, flags)
if value is None
else self._prepare_serialized_value_and_flags(key, value, flags)
)
if cmd_value:
value_sizes[key] = len(cmd_value)

self._conn_send_cmd(
conn,
Expand All @@ -170,8 +223,14 @@ def exec_multi_on_pool( # noqa: C901
value=cmd_value,
flags=flags,
)
for key, _ in key_values:
for key, value in key_values:
results[key] = self._conn_recv_response(conn, flags=flags)
self._collect_metrics(
command,
key,
value_sizes.get(key) if value else None,
results[key],
)
except Exception as e:
error = True
raise MemcacheServerError(pool.server, "Memcache error") from e
Expand Down

0 comments on commit 3f1f240

Please sign in to comment.