U
    >j>                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZ edZG dd deZG dd	 d	eeZG d
d dZdS )    N)contextmanager)Enum)PathZqueue_brokerc                   @   s   e Zd ZdZdS )Empty   Очередь пустаN)__name__
__module____qualname____doc__ r   r   *./modules/confluence/utils/queue_broker.pyr      s   r   c                   @   s$   e Zd ZdZdZdZdZdd ZdS )
ItemStatuspendingZin_progressZ	completedZfailedc                 C   s   | j S )N)valueselfr   r   r   __str__   s    zItemStatus.__str__N)r   r   r	   PENDINGIN_PROGRESS	COMPLETEDFAILEDr   r   r   r   r   r      s
   r   c                   @   s   e Zd ZdZd(ddZdd Zed)ed	d
dZdd Z	dd Z
d*ddZd+ddZd,ddZdd Zdd Zdd Zd-ddZd.d d!Zd"d# Zd$d% Zd&d' ZdS )/SQLiteQueueBrokerz/dev/shmqueueNc                 C   sz   || _ |d kr| j}t|}|jddd |d| j  d| _t | _| 	  t
| j td| d| j  d S )NT)parentsexist_okzSQLite.z.dbu   Очередь 'uK   ' успешно инициализирована. Путь к файлу: )nameDEFAULT_PATHr   mkdirjoinpath_db_path	threadinglocal_local_init_dbatexitregisterclose_local_connloggerinfo)r   r   Zdb_dirbase_dirr   r   r   __init__$   s    
zSQLiteQueueBroker.__init__c                 C   sV   t | jdsNtjt| jdd}tj|_|d |d |d || j_	| jj	S )uz   Возвращает или создает постоянное соединение для текущего потока.conn
   timeoutzPRAGMA busy_timeout = 5000;zPRAGMA synchronous = NORMAL;zPRAGMA cache_size = -65536;)
hasattrr"   sqlite3connectstrr   ZRowZrow_factoryexecuter+   r   r+   r   r   r   _get_thread_conn4   s    


z"SQLiteQueueBroker._get_thread_connDEFERREDmodec                 c   sj   |   }z"|d|  |V  |  W n: tk
rd   z|  W n tjk
r\   Y nX  Y nX dS )u   
        Контекстный менеджер транзакций
        mode="IMMEDIATE" для записи
        mode="DEFERRED" для чтения
        zBEGIN N)r5   r3   commit	ExceptionZrollbackr0   OperationalError)r   r8   r+   r   r   r   _transactionF   s    zSQLiteQueueBroker._transactionc                 C   sX   t jt| jdd}z4|d |d |d |d |  W 5 |  X dS )u0   Инициализация базы данных   r-   zPRAGMA journal_mode = WAL;a  
                CREATE TABLE IF NOT EXISTS queue (
                    id TEXT PRIMARY KEY,
                    payload TEXT NOT NULL,
                    status TEXT NOT NULL DEFAULT 'pending',
                    result TEXT,
                    created_at REAL NOT NULL,
                    started_at REAL,
                    finished_at REAL,
                    pid INTEGER
                )
            zBCREATE INDEX IF NOT EXISTS idx_status ON queue(status, created_at)z8CREATE INDEX IF NOT EXISTS idx_pid ON queue(pid, status)N)r0   r1   r2   r   closer3   r9   r4   r   r   r   r#   Y   s    



zSQLiteQueueBroker._init_dbc              	   C   sV   t t }t }| jdd,}|dtj d|tj	|dd|f W 5 Q R X |S )u7   Добавление элемента в очередь	IMMEDIATEr7   zd
                INSERT INTO queue (id, payload, status, created_at)
                VALUES (?, ?, 'z', ?)
            FZensure_ascii)
r2   uuidZuuid4timer<   r3   r   r   jsondumps)r   payloaditem_idnowr+   r   r   r   puts   s    zSQLiteQueueBroker.putTc              
   C   sH  t   }t }t   }d}z| jdd}|dtj d}| }	|	rd}|	d }
|dtj d	tj d
|||
f}|j	dkr|	d t
|	d dW  5 Q R  W S W 5 Q R X W n< tjk
r } zdt|krڂ td W 5 d}~X Y nX |std|dk	r,t   | }||kr,td|st tdd qdS )u8   Получение следующего элементаFr?   r7   z_
                        SELECT id, payload FROM queue
                        WHERE status = 'zf'
                        ORDER BY created_at ASC
                        LIMIT 1
                    TidzT
                            UPDATE queue
                            SET status = 'z',
                                started_at = ?,
                                pid = ?
                            WHERE id = ? AND status = 'z'
                        r   rE   )rI   rE   zdatabase is lockeduO   Очередь заблокирована, повторная попытка...Nr   u7   Таймаут ожидания задачи истекg?g333333?)rB   osgetpidr<   r3   r   r   fetchoner   rowcountrC   loadsr0   r;   r2   r'   debugr   sleeprandomZuniform)r   blockr.   Z
start_timeZcurrent_pidrG   Z
item_foundr+   cursorrowrF   updatedeelapsedr   r   r   get   sH    

"

zSQLiteQueueBroker.getc              	   C   sb   t   }|dk	rtj|ddnd}| jdd*}|dtj dtj d|||f W 5 Q R X dS )	uP   Отметка элемента как успешно обработанногоNFr@   r?   r7   <
                UPDATE queue
                SET status = 'r',
                    finished_at = ?,
                    result = ?
                WHERE id = ? AND status = ''
            )rB   rC   rD   r<   r3   r   r   r   )r   rF   resultrG   result_jsonr+   r   r   r   complete   s    zSQLiteQueueBroker.completec              	   C   s   t   }t|tr(t|jt|d}n2t|tr8|}n"|dk	rPdt|d}n
ddd}tj|dd}| j	dd	*}|
d
tj dtj d|||f W 5 Q R X dS )u;   Отметка элемента как неудачного)errormessageNRuntimeErrorZUnknownErrorzNo error message providedFr@   r?   r7   rY   rZ   r[   )rB   
isinstancer:   typer   r2   dictrC   rD   r<   r3   r   r   r   )r   rF   r_   rG   Z
error_datar]   r+   r   r   r   fail   s"    


zSQLiteQueueBroker.failc              	   C   s>   | j dd&}|dtj dtj d|f}W 5 Q R X |jS )u[   Восстановление зависших элементов конкретного PIDr?   r7   rY   zu',
                    started_at = NULL,
                    pid = NULL
                WHERE pid = ? AND status = 'r[   )r<   r3   r   r   r   rM   )r   pidr+   rS   r   r   r   recover   s    zSQLiteQueueBroker.recoverc              
   C   sD   | j dd.}|dtj d}| d W  5 Q R  S Q R X dS )u[   Текущее количество ожидающих элементов в очередиr6   r7   zL
                SELECT COUNT(*) FROM queue
                WHERE status = 'r[   r   N)r<   r3   r   r   rL   r   r+   rS   r   r   r   qsize   s
    
zSQLiteQueueBroker.qsizec              
   C   sJ   | j dd4}|dtj d}t| d  W  5 Q R  S Q R X dS )u2   Проверить, пуста ли очередьr6   r7   zm
                SELECT EXISTS (
                    SELECT 1 FROM queue
                    WHERE status = 'z '
                )
            r   N)r<   r3   r   r   boolrL   rh   r   r   r   empty   s
    
zSQLiteQueueBroker.empty      ?c              	   C   sf   | j ddD}|dtj dtj d}t| d }|sLW 5 Q R  dS W 5 Q R X t| q dS )	u   
        Блокирует выполнение, пока в очереди не останется активных элементов.
        Активные элементы = статус 'pending' ИЛИ 'in_progress'.
        r6   r7   z{
                    SELECT EXISTS (
                        SELECT 1 FROM queue
                        WHERE status IN ('', 'z)')
                    )
                r   TN)	r<   r3   r   r   r   rj   rL   rB   rP   )r   Zcheck_intervalr+   rS   Zhas_active_itemsr   r   r   wait   s    
zSQLiteQueueBroker.waitc              	      sD  |dk	rdnd}|dk	r|fnd}| j dd}d| d}|||}d	d
 | D  |dk	rnd}|f}nd}d}|||}	|	 }
|
d dk	r|
d nd}|
d }|
d }|
d }d}d}|dkr|dk	r|dk	r|| }|dkr|| }nt|}W 5 Q R X  fdd
tD }t| |d< ||d< ||d< ||d< |S )u   Полная статистика по статусам, времени и скорости обработки элементовNzWHERE pid = ? r   r6   r7   zu
                SELECT
                    status,
                    COUNT(*) as count
                FROM queue z-
                GROUP BY status
            c                 S   s   i | ]}|d  |d qS )statuscountr   .0rT   r   r   r   
<dictcomp>  s      z/SQLiteQueueBroker.get_stats.<locals>.<dictcomp>ao  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed' AND pid = ?
                ac  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed'
                Zavg_time        completed_countmin_startedmax_finishedr   c                    s   i | ]}|j  |j d qS )r   )r   rX   )rs   rp   Zstatsr   r   rt   F  s      totalZavg_execution_time_secZtotal_execution_time_secitems_per_second)r<   r3   fetchallrL   floatr   sumvalues)r   rf   Zpid_conditionZ	pid_paramr+   Zquery_statusrS   Zquery_metricsZmetrics_paramZcursor_metricsZmetricsZavg_execution_timerv   rw   rx   Ztotal_execution_timer{   Zreportr   ry   r   	get_stats  s@    		
zSQLiteQueueBroker.get_statsc              
   C   sR   | j dd<}|dtj dtj d}dd | D W  5 Q R  S Q R X dS )	ui   Выгрузить результаты всех успешно выполненных элементовr6   r7   z
                SELECT id, status, payload, result, finished_at - started_at as duration
                FROM queue
                WHERE status IN ('rm   z')
            c              
   S   s\   i | ]T}|d  |d |d dk	r(|d ndt |d |d dk	rPt |d nddqS )rI   rp   ZdurationNru   rE   r\   )rp   Zduration_secrE   r\   )rC   rN   rr   r   r   r   rt   W  s   z5SQLiteQueueBroker.get_all_results.<locals>.<dictcomp>N)r<   r3   r   r   r   r|   rh   r   r   r   get_all_resultsO  s    
z!SQLiteQueueBroker.get_all_resultsc                 C   sN   t | jdrJ| jjdk	rJz| jj  W n tk
r<   Y nX t| jd dS )um   Закрывает соединение текущего потока и удаляет его из кэша.r+   N)r/   r"   r+   r>   r:   delattrr   r   r   r   r&   a  s    z"SQLiteQueueBroker.close_local_connc                 C   s:  t | jdr~| jjdk	r~z| jjd | jj  W n: tk
rp   z| jj  W n tk
rj   Y nX Y nX t| jd | j| j| jj	 d| j| jj	 dg}|D ]}t
dD ]t}| s qz"|  td|  W  qW q ttfk
r0   |dkr"td	|  n
td
 Y qX qqdS )ua   Полное уничтожение очереди и удаление файлов с дискаr+   Nz PRAGMA wal_checkpoint(TRUNCATE);z-walz-shm   u5   Файл очереди успешно удален:    uY   Не удалось удалить файл очереди после 15 попыток: g?)r/   r"   r+   r3   r>   r:   r   r   	with_namer   rangeexistsunlinkr'   rO   PermissionErrorOSErrorr_   rB   rP   )r   Zfiles_to_removeZ	file_pathZattemptr   r   r   cleanupj  s4    

zSQLiteQueueBroker.cleanup)r   N)r6   )TN)N)N)rl   )N)r   r   r	   r   r*   r5   r   r2   r<   r#   rH   rX   r^   re   rg   ri   rk   rn   r   r   r&   r   r   r   r   r   r   !   s$   

6

	

A	r   )r$   rC   ZloggingrJ   rQ   r0   r    rB   rA   
contextlibr   enumr   pathlibr   Z	getLoggerr'   r:   r   r2   r   r   r   r   r   r   <module>   s   

