#!/usr/bin/python3 -u
# -u unbuffered stdout
import asyncio.queues
import datetime
import sys
import time

import aioconsole  # for std stream reader/writer

try:
    import ujson as json
except ImportError:
    import json


class Piper:
    # @dataclasses.dataclass
    # class Record:
    #     timestamp: str  # 2026-03-03 HH:MM:SS.ms
    #     level: str  # DEBUG, INFO, WARNING, ERROR
    #     message: str
    REQUIRED_KEYS = {'timestamp', 'level', 'message'}

    def __init__(self, buffer_size=10_000, read_chunk_max_size = 500_000):
        self.queue = asyncio.queues.Queue(maxsize=buffer_size)
        self.read_chunk_max_size = read_chunk_max_size
        self.eof = False
        self.overflow_count = 0
        self.overflow_timestamp = 0

    # connect_{read,write}_pipe - не со всеми файлами работает.
    # Используем aioconsole.get_standard_streams(), в нем есть fallback
    #
    # @staticmethod
    # async def _connect_stdin_stdout():
    #     loop = asyncio.get_event_loop()
    #     reader = asyncio.StreamReader()
    #     r_protocol = asyncio.StreamReaderProtocol(reader)
    #
    #     await loop.connect_read_pipe(lambda: r_protocol, sys.stdin)
    #     w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    #
    #     writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    #     return reader, writer

    @classmethod
    def split_chunk(cls, chunk: str):
        # Режем входные данные на структурированные сообщения. Пытаемся определить параметры.
        # TODO: datetime custom format?
        #   2026-03-03T17:18:19Z     - s7
        #   2026-03-03 17:18:19.123  - rshb
        # TODO: log format: text, json, ...
        #
        # 2026-03-05 17:08:05,487 890761 socketio.server INFO
        # [pid: 890574|app: -1|req: -1/233207] 10.32.0.20 () {66 vars in 2027 bytes}
        default_timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]  # strip to ms
        for line in chunk.split('\n'):
            if not line:
                continue
            # TODO: parse common messages: python, webserver, g.debug, ...
            # TODO: agg multiline messages
            record = None

            # JSON passthrough
            if line.startswith('{'):
                try:
                    record = json.loads(line)
                except ValueError:
                    pass

            if record and isinstance(record, dict) and set(record).issuperset(cls.REQUIRED_KEYS):
                yield record
                continue

            record = dict(timestamp=default_timestamp, level='INFO', message=line)
            # print(record, file=sys.stderr)
            yield record

    async def reader_task(self, reader):
        # Считаем, что входные данные поступают дискретно, т.е. не может прийти половина записи,
        #  но может прийти(скопиться) несколько записей, размер чанка делаем большой,
        #  чтобы избежать его переполнения, и возможного разрезания записи.
        while chunk := await reader.read(self.read_chunk_max_size):
            try:
                chunk_str = chunk.decode()
            except ValueError as e:
                print(f'Piper() encoding error: {e}, data={chunk}', file=sys.stderr)
                continue
            # print(f'Piper() read: {json.encode(chunk_str)}', file=sys.stderr)

            for record in self.split_chunk(chunk_str):
                if self.queue.full():
                    self.overflow_count += 1
                    continue
                self.queue.put_nowait(record)

            # Control buffer overflow
            if self.overflow_count:
                now = time.time()
                if now > self.overflow_timestamp + 60:
                    now_tuple = time.gmtime(now)
                    print(
                        f'{time.strftime("%Y-%m-%d %TZ", now_tuple)} Piper() buffer overflow:'
                        f' {self.overflow_count} - messages lost', file=sys.stderr)
                    self.overflow_count = 0
                    self.overflow_timestamp = now
        self.eof = True

    async def writer_task(self, writer):
        def write_records():
            record_str = json.dumps(record)
            writer.write(record_str.encode())
            writer.write(b'\n')

        while not (self.eof and self.queue.empty()):
            record = await self.queue.get()
            write_records()
            while not self.queue.empty():
                record = self.queue.get_nowait()
                write_records()
            await writer.drain()

    async def run(self):
        # await self._connect_stdin_stdout() - ValueError: Pipe transport is for pipes/sockets only.
        reader, writer = await aioconsole.get_standard_streams()

        await asyncio.gather(self.reader_task(reader), self.writer_task(writer))


async def main():
    await Piper().run()


if __name__ == '__main__':
    try:
        asyncio.run(main())
    except (SystemExit, KeyboardInterrupt):
        pass
