#!/usr/bin/env python3
import argparse
import json
import logging
import time
import redis
import string
from random import choices

logger = logging.getLogger(__name__)

MB = 1024 * 1024
KEY_PREFIX = "eva_bandwidth_test_"


class TestRunner:
    def __init__(self, redis_params, huge_mb=200):
        self.redis_params = redis_params
        self.huge_mb = huge_mb
        self.metrics = []
        self.results = []

        self.redis = redis.Redis(**redis_params, decode_responses=False)

    def _check_eviction(self, warned):
        try:
            info = self.redis.info("stats")
            if info.get("evicted_keys", 0) > 0 and not warned:
                mem = self.redis.info("memory")
                logger.debug("Redis started evicting keys (used_memory=%d)",
                               mem.get("used_memory", 0))
                return True
        except Exception:
            pass
        return warned

    def _count_own_keys(self, prefix):
        cursor = 0
        count = 0
        while True:
            cursor, keys = self.redis.scan(cursor=cursor, match=f"{prefix}*", count=1000)
            count += len(keys)
            if cursor == 0:
                break

        return count

    def _delete_own_keys(self, prefix):
        cursor = 0
        pipe = self.redis.pipeline()
        count = 0
        while True:
            cursor, keys = self.redis.scan(cursor=cursor, match=f"{prefix}*", count=1000)
            if keys:
                pipe.delete(*keys)
                count += len(keys)
            if cursor == 0:
                break

        pipe.execute()

    def _test_small(self, reqs=1000):
        # 1 digit = 1 ASCII byte
        payload = ''.join(choices(string.digits, k=128))
        prefix = f"{KEY_PREFIX}small:"

        logger.debug('test small payload (128b): %s', payload)

        warned = False
        written = 0

        start = time.time()

        for i in range(reqs):
            key = f"{prefix}{i}"
            prev_key = f"{prefix}{i-1}"

            if i % 2 == 0:
                self.redis.set(key, payload)
                written += 1
            else:
                self.redis.get(prev_key)

            if i % 1000 == 0 and written > 0:
                existing = self._count_own_keys(prefix)
                if existing < written and not warned:
                    logger.debug("Redis удалил тестовые ключи (запросы \"small\", %d ключей)",
                                   written - existing)
                    warned = True

        duration = time.time() - start
        ops = reqs / duration

        self._delete_own_keys(prefix)

        return ops

    def _test_write(self, payload_mb, max_total_mb, label):
        # 1 digit = 1 ASCII byte
        payload = ''.join(choices(string.digits, k=(payload_mb * MB)))
        logger.debug('test %s payload generated', label)
        prefix = f"{KEY_PREFIX}{label}:"

        max_reqs = max_total_mb // payload_mb

        warned = False
        written = 0

        start = time.time()

        for i in range(max_reqs):
            self.redis.set(f"{prefix}{i}", payload)
            written += 1

            if i % 10 == 0:
                existing = self._count_own_keys(prefix)
                if existing < written and not warned:
                    logger.debug("Redis удалил ключи (тест \"%s\", %d ключей)",
                                 label, written - existing)
                    warned = True

        duration = time.time() - start
        throughput = (written * payload_mb) / duration

        self._delete_own_keys(prefix)

        return throughput, written

    def run_test(self):
        mem = self.redis.info("memory")
        maxmemory_mb = mem.get("maxmemory", 0) // MB or 1024

        logger.info("maxmemory: %d MB", maxmemory_mb)

        # 28.01.2026
        # Тест в облаке между нодами в одном ДЦ, пинг 200-400мс
        # Результат в районе 4000-5000 ops/sec.
        # В разных ДЦ, с пингом 8-9с, результат падает до 90-110 ops/sec.
        test_small_reqs = 1000
        logger.info("start test small")
        small = self._test_small(reqs=test_small_reqs)
        logger.info("end test small")

        ref = test_small_reqs
        fmt = "{x:.2f} ops/sec"
        self.metrics.append({
            "title": f"Redis небольшие запросы, {test_small_reqs} req, 128b (не менее {fmt.format(x=ref)})",
            "value": small,
            "reference": ref,
            "format": fmt,
            "condition": "greater",
        })

        logger.info("start test big")
        big = self._test_write(max_total_mb=min(self.huge_mb, maxmemory_mb, 1024),
                               payload_mb=10, label="big",)
        logger.info("end test big")

        throughput, reqs = big

        ref = 70
        fmt = "{x:.2f} MB/sec"
        self.metrics.append({
            "title": (f"Redis скорость записи больших данных, {reqs} req, 10mb payload"
                      f" (не менее {fmt.format(x=ref)})"),
            "value": throughput,
            "reference": ref,
            "format": fmt,
            "condition": "greater",
        })

        # logger.info("start test huge")
        # huge = self._test_write(payload_mb=self.huge_mb,
        #                         max_total_mb=self.huge_mb, label="huge")
        # logger.info("end test huge")

        # throughput, _ = huge

        # ref = 70
        # fmt = "{x:.2f} MB/sec"
        # self.metrics.append({
        #     "title": (f"Redis скорость записи огромных данных, 1 req,"
        #               f" {self.huge_mb}mb payload (не менее {fmt.format(x=ref)})"),
        #     "value": throughput,
        #     "reference": ref,
        #     "format": fmt,
        #     "condition": "greater",
        # })


def configure_logging(debug=False):
    level = logging.DEBUG if debug else logging.INFO
    logging.basicConfig(
        level=level,
        format="%(asctime)s - [%(levelname)s] - %(module)s.%(funcName)s - %(message)s",
    )


def parse_args():
    p = argparse.ArgumentParser("Redis benchmark")
    p.add_argument("--json", action="store_true", default=False,
                   help='Вывод результатов в json')
    p.add_argument("--debug", action="store_true", default=False,
                   help='Вывод дебага (при --json всегда отключено)')
    p.add_argument("--size", type=int, default=80,
                   help="Общий размер данных в мегабайтах для теста, по-умолчанию 80Мб")
    p.add_argument("--db", type=int, default=7, help="Номер БД, по-умолчанию 7")
    p.add_argument("--host", type=str, help="Хост (ip, домен), имеет приоритет над сокетом")
    p.add_argument("--port", type=int, default=6379, help='Порт, по-умолчанию 6379')
    p.add_argument("--unix-socket-path", type=str, default="/var/run/redis/redis-server.sock",
                   help="Путь к сокету при использовании такого типа подключения")
    p.add_argument("--user", type=str, default=None,
                   help='Пользователь для подключения к Redis, необязательный параметр')
    p.add_argument("--password", type=str, default=None,
                   help='Пароль для подключения к Redis, необязательный параметр')
    p.add_argument("--ssl", action="store_true", default=False,
                   help='Использовать SSL/TLS для подключения')
    p.add_argument("--ssl-ca-certs", type=str, default="/opt/eva-app/custom/redis-ca.crt",
                   help="Путь к сертификату CA")
    p.add_argument("--ssl-certfile", type=str, default="/opt/eva-app/custom/redis.crt",
                   help="Путь к сертификату клиента")
    p.add_argument("--ssl-keyfile", type=str, default="/opt/eva-app/custom/redis.key",
                   help="Путь к ключи сертификата клиента")
    p.add_argument("--type", type=str, default="redis",
                   help=argparse.SUPPRESS)
                   # help='Тип кеша Евы, пока поддерживается только Redis'
    return p.parse_args()


def print_item(item):
    caption = item["title"]
    val = item["value"]
    if val is None:
        print(f"{caption}: Skipped")
        return

    fmt = item["format"]
    rendered = fmt(val) if callable(fmt) else fmt.format(x=val)
    logger.info("%s: %s", caption, rendered)


if __name__ == "__main__":
    args = parse_args()
    size_min = 50
    if args.size < size_min:
        raise ValueError(f'Укажите --size не менее {size_min}')

    if not args.json:
        configure_logging(debug=args.debug)

    if args.host:
        redis_params = {"host": args.host, "port": args.port, "username": args.user,
                        "password": args.password, "db": args.db}
        if args.ssl:
            redis_params['ssl_ca_certs'] = args.ssl_ca_certs
            redis_params['ssl_certfile'] = args.ssl_certfile
            redis_params['ssl_keyfile'] = args.ssl_keyfile
    else:
        redis_params = {"unix_socket_path": args.unix_socket_path, "username": args.user,
                        "password": args.password, "db": args.db}

    runner = TestRunner(redis_params=redis_params, huge_mb=args.size)
    runner.run_test()

    if args.json:
        print(json.dumps(runner.metrics, ensure_ascii=False))
    else:
        for metric in runner.metrics:
            print_item(metric)
