
    Qz>j>                         d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZ  ej        d          Z G d de          Z G d d	ee          Z G d
 d          ZdS )    N)contextmanager)Enum)Pathqueue_brokerc                       e Zd ZdZdS )Empty   Очередь пустаN)__name__
__module____qualname____doc__     *./modules/confluence/utils/queue_broker.pyr   r      s        ##Dr   r   c                   $    e Zd ZdZdZdZdZd ZdS )
ItemStatuspendingin_progress	completedfailedc                     | j         S N)valueselfs    r   __str__zItemStatus.__str__   s
    zr   N)r
   r   r   PENDINGIN_PROGRESS	COMPLETEDFAILEDr   r   r   r   r   r      s7        GKIF    r   r   c                       e Zd ZdZddZd Zeddefd            Zd	 Z	d
 Z
ddZddZddZd Zd Zd ZddZddZd Zd Zd ZdS )SQLiteQueueBrokerz/dev/shmqueueNc                    || _         || j        }t          |          }|                    dd           |                    d| j          d          | _        t          j                    | _        | 	                                 t          j        | j                   t                              d| d| j                    d S )NT)parentsexist_okzSQLite.z.dbu   Очередь 'uK   ' успешно инициализирована. Путь к файлу: )nameDEFAULT_PATHr   mkdirjoinpath_db_path	threadinglocal_local_init_dbatexitregisterclose_local_connloggerinfo)r   r'   db_dirbase_dirs       r   __init__zSQLiteQueueBroker.__init__$   s    	>&F<<td333 ))*BDI*B*B*BCCo''-...  Ht  H  Hx|  yF  H  H  	I  	I  	I  	I  	Ir   c                 L   t          | j        d          st          j        t	          | j                  d          }t          j        |_        |                    d           |                    d           |                    d           || j        _	        | j        j	        S )uz   Возвращает или создает постоянное соединение для текущего потока.conn
   timeoutzPRAGMA busy_timeout = 5000;zPRAGMA synchronous = NORMAL;zPRAGMA cache_size = -65536;)
hasattrr.   sqlite3connectstrr+   Rowrow_factoryexecuter9   r   r9   s     r   _get_thread_connz"SQLiteQueueBroker._get_thread_conn4   s    t{F++ 	$?3t}#5#5rBBBD&{D LL6777 LL7888LL6777#DK{r   DEFERREDmodec              #     K   |                                  }	 |                    d|            |V  |                                 dS # t          $ r- 	 |                                 n# t
          j        $ r Y nw xY w w xY w)u   
        Контекстный менеджер транзакций
        mode="IMMEDIATE" для записи
        mode="DEFERRED" для чтения
        zBEGIN N)rE   rC   commit	Exceptionrollbackr>   OperationalError)r   rG   r9   s      r   _transactionzSQLiteQueueBroker._transactionF   s       $$&&		LL$)))JJJKKMMMMM 	 	 	+   	s/   0A
 

BA*)B*A<9B;A<<Bc                    t          j        t          | j                  d          }	 |                    d           |                    d           |                    d           |                    d           |                                 |                                 dS # |                                 w xY w)u0   Инициализация базы данных   r;   zPRAGMA journal_mode = WAL;a  
                CREATE TABLE IF NOT EXISTS queue (
                    id TEXT PRIMARY KEY,
                    payload TEXT NOT NULL,
                    status TEXT NOT NULL DEFAULT 'pending',
                    result TEXT,
                    created_at REAL NOT NULL,
                    started_at REAL,
                    finished_at REAL,
                    pid INTEGER
                )
            zBCREATE INDEX IF NOT EXISTS idx_status ON queue(status, created_at)z8CREATE INDEX IF NOT EXISTS idx_pid ON queue(pid, status)N)r>   r?   r@   r+   rC   rI   closerD   s     r   r/   zSQLiteQueueBroker._init_dbY   s    s4=112>>>	LL5666LL     LL]^^^LLSTTTKKMMMJJLLLLLDJJLLLLs   A(B( (B>c           	      >   t          t          j                              }t          j                    }|                     d          5 }|                    dt          j         d|t          j	        |d          |f           ddd           n# 1 swxY w Y   |S )u7   Добавление элемента в очередь	IMMEDIATErG   zd
                INSERT INTO queue (id, payload, status, created_at)
                VALUES (?, ?, 'z', ?)
            Fensure_asciiN)
r@   uuiduuid4timerM   rC   r   r   jsondumps)r   payloaditem_idnowr9   s        r   putzSQLiteQueueBroker.puts   s    djll##ikkK00 	JDLL  * 2   4:gEBBBCHJ J J	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J 	J s   
<BBBTc                    t          j                     }t          j                    }	 t          j                     }d}	 |                     d          5 }|                    dt
          j         d          }|                                }	|	rxd}|	d         }
|                    dt
          j         d	t
          j         d
|||
f          }|j	        dk    r/|	d         t          j        |	d                   dcddd           S ddd           n# 1 swxY w Y   nH# t          j        $ r6}dt          |          vr t                              d           Y d}~nd}~ww xY w|st#          d          |+t          j                     |z
  }||k    rt#          d          |s't          j        t'          j        dd                     )u8   Получение следующего элементаTFrR   rS   z_
                        SELECT id, payload FROM queue
                        WHERE status = 'zf'
                        ORDER BY created_at ASC
                        LIMIT 1
                    idzT
                            UPDATE queue
                            SET status = 'z',
                                started_at = ?,
                                pid = ?
                            WHERE id = ? AND status = 'z'
                        r   r[   )r`   r[   Nzdatabase is lockeduO   Очередь заблокирована, повторная попытка...r	   u7   Таймаут ожидания задачи истекg?g333333?)rX   osgetpidrM   rC   r   r   fetchoner   rowcountrY   loadsr>   rL   r@   r3   debugr   sleeprandomuniform)r   blockr<   
start_timecurrent_pidr]   
item_foundr9   cursorrowr\   updatedeelapseds                 r   getzSQLiteQueueBroker.get   sj   Y[[
ikk/	7)++CJp&&K&88 D!\\ +)3);+ + +  F !//++C %)
"%d)"&,, 0+5+A0 0
 9C8J0 0 0 ";8#: #: #+a//&)$i+/:c)n+E+E$ $-                      4 + p p p's1vv55noooooooop  97888 ")++
2g%% YZZZ 7
6>$55666_/	7sB   D B%D9D D DD DD E#-,EE#c                 &   t          j                     }|t          j        |d          nd}|                     d          5 }|                    dt
          j         dt
          j         d|||f           ddd           dS # 1 swxY w Y   dS )	uP   Отметка элемента как успешно обработанногоNFrT   rR   rS   <
                UPDATE queue
                SET status = 'r',
                    finished_at = ?,
                    result = ?
                WHERE id = ? AND status = ''
            )rX   rY   rZ   rM   rC   r   r   r   )r   r\   resultr]   result_jsonr9   s         r   completezSQLiteQueueBroker.complete   s    ikk@F@Rdje<<<<X\K00 	.DLL )3 
 -7,B   {G,. . .	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s   5BB
B
c                    t          j                     }t          |t                    r%t          |          j        t          |          d}n2t          |t                    r|}n|dt          |          d}nddd}t          j        |d          }| 	                    d	          5 }|
                    d
t          j         dt          j         d|||f           ddd           dS # 1 swxY w Y   dS )u;   Отметка элемента как неудачного)errormessageNRuntimeErrorUnknownErrorzNo error message providedFrT   rR   rS   ru   rv   rw   )rX   
isinstancerJ   typer
   r@   dictrY   rZ   rM   rC   r   r    r   )r   r\   r|   r]   
error_datary   r9   s          r   failzSQLiteQueueBroker.fail   s[   ikkeY'' 	[#';;#7CJJOOJJt$$ 	[JJ#1c%jjIIJJ#1>YZZJj%@@@K00 	.DLL )0 
 -7,B   {G,. . .	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s   ,5C..C25C2c                     |                      d          5 }|                    dt          j         dt          j         d|f          }ddd           n# 1 swxY w Y   |j        S )u[   Восстановление зависших элементов конкретного PIDrR   rS   ru   zu',
                    started_at = NULL,
                    pid = NULL
                WHERE pid = ? AND status = 'rw   N)rM   rC   r   r   r   rd   )r   pidr9   rn   s       r   recoverzSQLiteQueueBroker.recover   s    K00 	D\\ #)1# #
 .8-C# # #  F	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 s   3AAAc                     |                      d          5 }|                    dt          j         d          }|                                d         cddd           S # 1 swxY w Y   dS )u[   Текущее количество ожидающих элементов в очередиrF   rS   zL
                SELECT COUNT(*) FROM queue
                WHERE status = 'rw   r   N)rM   rC   r   r   rc   r   r9   rn   s      r   qsizezSQLiteQueueBroker.qsize   s    J// 	(4\\ #!+!3# # #  F ??$$Q'	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   =A!!A%(A%c                     |                      d          5 }|                    dt          j         d          }t	          |                                d                    cddd           S # 1 swxY w Y   dS )u2   Проверить, пуста ли очередьrF   rS   zm
                SELECT EXISTS (
                    SELECT 1 FROM queue
                    WHERE status = 'z '
                )
            r   N)rM   rC   r   r   boolrc   r   s      r   emptyzSQLiteQueueBroker.empty   s    J// 	24\\ # &0%7# # #  F FOO--a0111	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2s   AA//A36A3      ?c                 Z   	 |                      d          5 }|                    dt          j         dt          j         d          }t          |                                d                   }|s	 ddd           dS 	 ddd           n# 1 swxY w Y   t          j        |           )	u   
        Блокирует выполнение, пока в очереди не останется активных элементов.
        Активные элементы = статус 'pending' ИЛИ 'in_progress'.
        TrF   rS   z{
                    SELECT EXISTS (
                        SELECT 1 FROM queue
                        WHERE status IN ('', 'z)')
                    )
                r   N)	rM   rC   r   r   r   r   rc   rX   rg   )r   check_intervalr9   rn   has_active_itemss        r   waitzSQLiteQueueBroker.wait   s!   
	'""
"33 	 t ' ,6+=' ' DNCY' ' '   $((9(9!(<#=#= '  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  	  J~&&&	's   ABBBc                    |dnd}||fnd}|                      d          5 }d| d}|                    ||          }d	 |                                D             |d
}|f}nd}d}|                    ||          }	|	                                }
|
d         |
d         nd}|
d         }|
d         }|
d         }d}d}|dk    r$|"| ||z
  }|dk    r||z  }nt	          |          }ddd           n# 1 swxY w Y   fdt
          D             }t          |                                          |d<   ||d<   ||d<   ||d<   |S )u   Полная статистика по статусам, времени и скорости обработки элементовNzWHERE pid = ? r   rF   rS   zu
                SELECT
                    status,
                    COUNT(*) as count
                FROM queue z-
                GROUP BY status
            c                 ,    i | ]}|d          |d         S )statuscountr   .0ro   s     r   
<dictcomp>z/SQLiteQueueBroker.get_stats.<locals>.<dictcomp>  s"    NNNSS]CLNNNr   ao  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed' AND pid = ?
                ac  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed'
                avg_time        completed_countmin_startedmax_finishedr   c                 R    i | ]#}|j                             |j         d           $S )r   )r   rs   )r   r   statss     r   r   z/SQLiteQueueBroker.get_stats.<locals>.<dictcomp>F  s-    TTTv&,		&, : :TTTr   totalavg_execution_time_sectotal_execution_time_secitems_per_second)rM   rC   fetchallrc   floatr   sumvalues)r   r   pid_condition	pid_paramr9   query_statusrn   query_metricsmetrics_paramcursor_metricsmetricsavg_execution_timer   r   r   total_execution_timer   reportr   s                     @r   	get_statszSQLiteQueueBroker.get_stats  s	   +.?!oSFF2	J// 1	>4 *	  L \\,	::FNNFOO<M<MNNNE! "%! !#!\\-GGN$--//G8?
8K8W!4!4]`%&78O!-0K">2L#& """{'><C['3k'A$'!++'69M'M$$',_'='=$c1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	> 1	>f UTTTTTTfmmoo..w+='(-A)*%5!"s   C	C::C>C>c                     |                      d          5 }|                    dt          j         dt          j         d          }d |                                D             cddd           S # 1 swxY w Y   dS )ui   Выгрузить результаты всех успешно выполненных элементовrF   rS   z
                SELECT id, status, payload, result, finished_at - started_at as duration
                FROM queue
                WHERE status IN ('r   z')
            c           
          i | ]`}|d          |d         |d         |d         ndt          j        |d                   |d         t          j        |d                   nddaS )r`   r   durationNr   r[   rx   )r   duration_secr[   rx   )rY   re   r   s     r   r   z5SQLiteQueueBroker.get_all_results.<locals>.<dictcomp>W  s         D	!(m7::7RC
OOX[#z#i.99;>x=;TdjX777Z^	   r   N)rM   rC   r   r   r    r   r   s      r   get_all_resultsz!SQLiteQueueBroker.get_all_resultsO  s    J// 	4\\ # $.#7# # >H=N# # #  F
  "??,,  	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   AA22A69A6c                     t          | j        d          rS| j        j        I	 | j        j                                         n# t          $ r Y nw xY wt          | j        d           dS dS dS )um   Закрывает соединение текущего потока и удаляет его из кэша.r9   N)r=   r.   r9   rP   rJ   delattrr   s    r   r2   z"SQLiteQueueBroker.close_local_conna  s    4;'' 	)DK,<,H &&((((   DK(((((	) 	),H,Hs   A 
AAc                 v   t          | j        d          r| j        j        	 | j        j                            d           | j        j                                         n@# t
          $ r3 	 | j        j                                         n# t
          $ r Y nw xY wY nw xY wt          | j        d           | j        | j                            | j        j	         d          | j                            | j        j	         d          g}|D ]}t          d          D ]}|                                s n	 |                                 t                              d|             nO# t          t           f$ r; |dk    rt                              d	|            nt%          j        d
           Y w xY wdS )ua   Полное уничтожение очереди и удаление файлов с дискаr9   Nz PRAGMA wal_checkpoint(TRUNCATE);z-walz-shm   u5   Файл очереди успешно удален:    uY   Не удалось удалить файл очереди после 15 попыток: g?)r=   r.   r9   rC   rP   rJ   r   r+   	with_namer'   rangeexistsunlinkr3   rf   PermissionErrorOSErrorr|   rX   rg   )r   files_to_remove	file_pathattempts       r   cleanupzSQLiteQueueBroker.cleanupj  s    4;'' 		)DK,<,H (()KLLL &&((((   K$**,,,,    D
 DK((( MM##t}'9$?$?$?@@M##t}'9$?$?$?@@
 ) 	( 	(I 99 ( ( '')) E($$&&&LL!dYb!d!deeeE'1 ( ( ("}}  &M  BK  &M  &M  N  N  N  N
3	(	( 	(sH   =A! !
B,B
B
BBBBB61E))A	F54F5)r#   N)rF   )TNr   )r   )r
   r   r   r(   r7   rE   r   r@   rM   r/   r^   rs   rz   r   r   r   r   r   r   r   r2   r   r   r   r   r"   r"   !   sJ       LI I I I      $      ^$  4  47 47 47 47l. . . .. . . ..
 
 
( ( (	2 	2 	2' ' ' '&? ? ? ?B  $) ) )!( !( !( !( !(r   r"   )r0   rY   loggingra   rh   r>   r,   rX   rV   
contextlibr   enumr   pathlibr   	getLoggerr3   rJ   r   r@   r   r"   r   r   r   <module>r      s3      				         % % % % % %            		>	*	*	 	 	 	 	I 	 	 	
    d   j( j( j( j( j( j( j( j( j( j(r   