
    Bj>                         d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZ  ej                  d      Z G d de      Z G d d	ee      Z G d
 d      Zy)    N)contextmanager)Enum)Pathqueue_brokerc                       e Zd ZdZy)Empty   Очередь пустаN)__name__
__module____qualname____doc__     *./modules/confluence/utils/queue_broker.pyr   r      s    #r   r   c                   "    e Zd ZdZdZdZdZd Zy)
ItemStatuspendingin_progress	completedfailedc                     | j                   S N)valueselfs    r   __str__zItemStatus.__str__   s    zzr   N)r
   r   r   PENDINGIN_PROGRESS	COMPLETEDFAILEDr   r   r   r   r   r      s    GKIFr   r   c                       e Zd ZdZddZd Zeddefd       Zd Z	d Z
dd	Zdd
ZddZd Zd Zd ZddZddZd Zd Zd Zy)SQLiteQueueBrokerz/dev/shmNc                    || _         || j                  }t        |      }|j                  dd       |j	                  d| j                    d      | _        t        j                         | _        | j                          t        j                  | j                         t        j                  d| d| j
                          y )NT)parentsexist_okzSQLite.z.dbu   Очередь 'uK   ' успешно инициализирована. Путь к файлу: )nameDEFAULT_PATHr   mkdirjoinpath_db_path	threadinglocal_local_init_dbatexitregisterclose_local_connloggerinfo)r   r&   db_dirbase_dirs       r   __init__zSQLiteQueueBroker.__init__$   s    	>&&F<td3 ))GDII;c*BCoo'--.&tf,wx|  yF  yF  xG  H  	Ir   c                 `   t        | j                  d      st        j                  t	        | j
                        d      }t        j                  |_        |j                  d       |j                  d       |j                  d       || j                  _	        | j                  j                  S )uz   Возвращает или создает постоянное соединение для текущего потока.conn
   timeoutzPRAGMA busy_timeout = 5000;zPRAGMA synchronous = NORMAL;zPRAGMA cache_size = -65536;)
hasattrr-   sqlite3connectstrr*   Rowrow_factoryexecuter8   r   r8   s     r   _get_thread_connz"SQLiteQueueBroker._get_thread_conn4   s|    t{{F+??3t}}#5rBD&{{D LL67 LL78LL67#DKK{{r   modec              #      K   | j                         }	 |j                  d|        | |j                          y# t        $ r, 	 |j	                           # t
        j                  $ r Y  w xY ww xY ww)u   
        Контекстный менеджер транзакций
        mode="IMMEDIATE" для записи
        mode="DEFERRED" для чтения
        zBEGIN N)rD   rB   commit	Exceptionrollbackr=   OperationalError)r   rE   r8   s      r   _transactionzSQLiteQueueBroker._transactionF   sy      $$&		LL6$)JKKM 	  ++ 	s=   A5(= A5	A2AA2A.+A2-A..A22A5c                 L   t        j                  t        | j                        d      }	 |j	                  d       |j	                  d       |j	                  d       |j	                  d       |j                          |j                          y# |j                          w xY w)u0   Инициализация базы данных   r:   zPRAGMA journal_mode = WAL;a  
                CREATE TABLE IF NOT EXISTS queue (
                    id TEXT PRIMARY KEY,
                    payload TEXT NOT NULL,
                    status TEXT NOT NULL DEFAULT 'pending',
                    result TEXT,
                    created_at REAL NOT NULL,
                    started_at REAL,
                    finished_at REAL,
                    pid INTEGER
                )
            zBCREATE INDEX IF NOT EXISTS idx_status ON queue(status, created_at)z8CREATE INDEX IF NOT EXISTS idx_pid ON queue(pid, status)N)r=   r>   r?   r*   rB   rG   closerC   s     r   r.   zSQLiteQueueBroker._init_dbY   sv    s4==12>	LL56LL   LL]^LLSTKKMJJLDJJLs   AB B#c           	      0   t        t        j                               }t        j                         }| j	                  d      5 }|j                  dt        j                   d|t        j                  |d      |f       ddd       |S # 1 sw Y   |S xY w)u7   Добавление элемента в очередь	IMMEDIATErE   zd
                INSERT INTO queue (id, payload, status, created_at)
                VALUES (?, ?, 'z', ?)
            Fensure_asciiN)
r?   uuiduuid4timerK   rB   r   r   jsondumps)r   payloaditem_idnowr8   s        r   putzSQLiteQueueBroker.puts   s    djjl#iikK0 	JDLL   * 2 23 4 4::gEBCHJ	J 	J s   =BBc                 p   t        j                          }t        j                         }	 t        j                          }d}	 | j                  d      5 }|j	                  dt
        j                   d      }|j                         }	|	rud}|	d   }
|j	                  dt
        j                   d	t
        j                   d
|||
f      }|j                  dkD  r'|	d   t        j                  |	d         dcddd       S ddd       |st#        d      |'t        j                          |z
  }||k\  rt#        d      |s)t        j$                  t'        j(                  dd             ?# 1 sw Y   lxY w# t        j                  $ r-}dt        |      vr t        j!                  d       Y d}~d}~ww xY w)u8   Получение следующего элементаTFrP   rQ   z_
                        SELECT id, payload FROM queue
                        WHERE status = 'zf'
                        ORDER BY created_at ASC
                        LIMIT 1
                    idzT
                            UPDATE queue
                            SET status = 'z',
                                started_at = ?,
                                pid = ?
                            WHERE id = ? AND status = 'z'
                        r   rY   )r^   rY   Nzdatabase is lockeduO   Очередь заблокирована, повторная попытка...r	   u7   Таймаут ожидания задачи истекg?g333333?)rV   osgetpidrK   rB   r   r   fetchoner   rowcountrW   loadsr=   rJ   r?   r2   debugr   sleeprandomuniform)r   blockr;   
start_timecurrent_pidr[   
item_foundr8   cursorrowrZ   updatedeelapseds                 r   getzSQLiteQueueBroker.get   s   YY[
iik))+CJp&&K&8 D!\\ /))3););(< =+ F !//+C%)
"%d)"&,, 4++5+A+A*B C8 9C8J8J7K L0 ";8#: #++a/&)$i+/::c)n+E$-  @ 788 "))+
2g% YZZ

6>>$56_ 
 4 ++ p's1v5noops7   E5 B!E)4	E5 >E5 )E2.E5 5F5#F00F5c                     t        j                          }|t        j                  |d      nd}| j                  d      5 }|j	                  dt
        j                   dt
        j                   d|||f       ddd       y# 1 sw Y   yxY w)	uP   Отметка элемента как успешно обработанногоNFrR   rP   rQ   <
                UPDATE queue
                SET status = 'r',
                    finished_at = ?,
                    result = ?
                WHERE id = ? AND status = ''
            )rV   rW   rX   rK   rB   r   r   r   )r   rZ   resultr[   result_jsonr8   s         r   completezSQLiteQueueBroker.complete   s    iik@F@Rdjje<X\K0 	.DLL )334 5, -7,B,B+C D {G,.	. 	. 	.s   9BBc                    t        j                          }t        |t              r"t        |      j                  t        |      d}n)t        |t              r|}n|dt        |      d}nddd}t        j                  |d      }| j                  d	      5 }|j                  d
t        j                   dt        j                   d|||f       ddd       y# 1 sw Y   yxY w)u;   Отметка элемента как неудачного)errormessageNRuntimeErrorUnknownErrorzNo error message providedFrR   rP   rQ   rs   rt   ru   )rV   
isinstancerH   typer
   r?   dictrW   rX   rK   rB   r   r    r   )r   rZ   rz   r[   
error_datarw   r8   s          r   failzSQLiteQueueBroker.fail   s    iikeY'#';#7#7CJOJt$J#1c%jIJ#1>YZJjj%@K0 	.DLL )001 2, -7,B,B+C D {G,.	. 	. 	.s   9CC$c                     | j                  d      5 }|j                  dt        j                   dt        j                   d|f      }ddd       |j
                  S # 1 sw Y   j
                  S xY w)u[   Восстановление зависших элементов конкретного PIDrP   rQ   rs   zu',
                    started_at = NULL,
                    pid = NULL
                WHERE pid = ? AND status = 'ru   N)rK   rB   r   r   r   rb   )r   pidr8   rl   s       r   recoverzSQLiteQueueBroker.recover   s    K0 	D\\ ')112 3- .8-C-C,D E# F	 	 s   7AA2c                     | j                  d      5 }|j                  dt        j                   d      }|j	                         d   cddd       S # 1 sw Y   yxY w)u[   Текущее количество ожидающих элементов в очередиDEFERREDrQ   zL
                SELECT COUNT(*) FROM queue
                WHERE status = 'ru   r   N)rK   rB   r   r   ra   r   r8   rl   s      r   qsizezSQLiteQueueBroker.qsize   sd    J/ 	(4\\ '!!+!3!3 4 5# F ??$Q'	( 	( 	(s   6AAc                     | j                  d      5 }|j                  dt        j                   d      }t	        |j                         d          cddd       S # 1 sw Y   yxY w)u2   Проверить, пуста ли очередьr   rQ   zm
                SELECT EXISTS (
                    SELECT 1 FROM queue
                    WHERE status = 'z '
                )
            r   N)rK   rB   r   r   boolra   r   s      r   emptyzSQLiteQueueBroker.empty   sn    J/ 	24\\ '% &0%7%7$8 9# F FOO-a011	2 	2 	2s   A AA&c                 8   	 | j                  d      5 }|j                  dt        j                   dt        j                   d      }t        |j                         d         }|s
	 ddd       y	 ddd       t        j                  |       # 1 sw Y   xY w)	u   
        Блокирует выполнение, пока в очереди не останется активных элементов.
        Активные элементы = статус 'pending' ИЛИ 'in_progress'.
        Tr   rQ   z{
                    SELECT EXISTS (
                        SELECT 1 FROM queue
                        WHERE status IN ('', 'z)')
                    )
                r   N)	rK   rB   r   r   r   r   ra   rV   re   )r   check_intervalr8   rl   has_active_itemss        r   waitzSQLiteQueueBroker.wait   s    
 ""
"3 	 t ++ ,6+=+=*>d:CYCYBZ ['  $((9!(<#= '	  	  (	  JJ~& 	  	 s   ABBc                    |dnd}||fnd}| j                  d      5 }d| d}|j                  ||      }|j                         D ci c]  }|d	   |d
    }}|d}	|f}
nd}	d}
|j                  |	|
      }|j                         }|d   |d   nd}|d   }|d   }|d   }d}d}|dkD  r||||z
  }|dkD  r||z  }nt	        |      }ddd       t
        D ci c])  }|j                  j                  |j                  d      + }}t        |j                               |d<   |d<   |d<   |d<   |S c c}w # 1 sw Y   sxY wc c}w )u   Полная статистика по статусам, времени и скорости обработки элементовNzWHERE pid = ? r   r   rQ   zu
                SELECT
                    status,
                    COUNT(*) as count
                FROM queue z-
                GROUP BY status
            statuscountao  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed' AND pid = ?
                ac  
                    SELECT
                        COUNT(*) as completed_count,
                        AVG(finished_at - started_at) as avg_time,
                        MIN(started_at) as min_started,
                        MAX(finished_at) as max_finished
                    FROM queue
                    WHERE status = 'completed'
                avg_time        completed_countmin_startedmax_finishedr   totalavg_execution_time_sectotal_execution_time_secitems_per_second)
rK   rB   fetchallra   floatr   r   rq   sumvalues)r   r   pid_condition	pid_paramr8   query_statusrl   rm   statsquery_metricsmetrics_paramcursor_metricsmetricsavg_execution_timer   r   r   total_execution_timer   r   reports                        r   	get_statszSQLiteQueueBroker.get_stats  s   +.?!oSF2	J/ 1	>4  *? +	L \\,	:F<BOO<MNSS]CL0NEN! "%! !#!\\-GN$--/G8?
8K8W!4]`%&78O!-0K">2L#& ""{'><C['3k'A$'!+'69M'M$',_'=$c1	>f JTTv&,,		&,, ::TTfmmo.w+='(-A)*%5!"c O1	> 1	>f Us$   +E D;A3E .E;E  E	c                    | j                  d      5 }|j                  dt        j                   dt        j                   d      }|j                         D ci c]M  }|d   |d   |d   |d   nd
t        j                  |d         |d   t        j                  |d         nd	dO c}cd	d	d	       S c c}w # 1 sw Y   y	xY w)ui   Выгрузить результаты всех успешно выполненных элементовr   rQ   z
                SELECT id, status, payload, result, finished_at - started_at as duration
                FROM queue
                WHERE status IN ('r   z')
            r^   r   durationNr   rY   rv   )r   duration_secrY   rv   )rK   rB   r   r   r    r   rW   rc   )r   r8   rl   rm   s       r   get_all_resultsz!SQLiteQueueBroker.get_all_resultsO  s    J/ 	4\\ '# $.#7#7"8Z=N=N<O P# F "??,  D	!(m7::7RC
OX[#zz#i.9;>x=;TdjjX7Z^	 	 		 	s   AB=AB8,B=8B==Cc                     t        | j                  d      rS| j                  j                  <	 | j                  j                  j                          t        | j                  d       yyy# t        $ r Y $w xY w)um   Закрывает соединение текущего потока и удаляет его из кэша.r8   N)r<   r-   r8   rN   rH   delattrr   s    r   r1   z"SQLiteQueueBroker.close_local_conna  sg    4;;'DKK,<,<,H  &&( DKK( -I'  s   $A+ +	A76A7c                    t        | j                  d      rv| j                  j                  `	 | j                  j                  j                  d       | j                  j                  j	                          t        | j                  d       | j                  | j                  j                  | j                  j                   d      | j                  j                  | j                  j                   d      g}|D ]N  }t        d      D ]>  }|j                         s #	 |j                          t        j                  d|         N P y# t
        $ r9 	 | j                  j                  j	                          n# t
        $ r Y nw xY wY w xY w# t        t         f$ r6 |dk(  rt        j#                  d	|        nt%        j&                  d
       Y w xY w)ua   Полное уничтожение очереди и удаление файлов с дискаr8   Nz PRAGMA wal_checkpoint(TRUNCATE);z-walz-shm   u5   Файл очереди успешно удален:    uY   Не удалось удалить файл очереди после 15 попыток: g?)r<   r-   r8   rB   rN   rH   r   r*   	with_namer&   rangeexistsunlinkr2   rd   PermissionErrorOSErrorrz   rV   re   )r   files_to_remove	file_pathattempts       r   cleanupzSQLiteQueueBroker.cleanupj  s    4;;'DKK,<,<,H  (()KL  &&( DKK( MMMM##t}}'9'9&:$$?@MM##t}}'9'9&:$$?@
 ) 	(I 9 ( '')($$&LL#XYbXc!de(	(  KK$$**,  , (1 ("}  (A  BK  AL  &M  N

3	(sI   A	E "(F	F$E?>F?	FF
FFFAGG)queueN)r   )TNr   )g      ?)r
   r   r   r'   r6   rD   r   r?   rK   r.   r\   rq   rx   r   r   r   r   r   r   r   r1   r   r   r   r   r"   r"   !   sp    LI  $   $447l...
(	2'&?B$)!(r   r"   )r/   rW   loggingr_   rf   r=   r+   rV   rT   
contextlibr   enumr   pathlibr   	getLoggerr2   rH   r   r?   r   r"   r   r   r   <module>r      sa       	      %  			>	*	I 	
d j( j(r   