U
    <xe|_                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlZd dlZd dlZG dd dZdd Zdd Zd	d
 ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdd ZdS )    N)update_wrapperc                   @   sR   e Zd Zi Zedd Zdd ZdddZd	d
 ZdddZ	dd Z
dddZdS )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r	   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr	   r	   r
   __repr__   s    zCmfDeferredJobWrapper.__repr__NFc                 C   sP   t | | || _| || _|| _|| _|| _|| _|| _|| _	| | j
| j< d S N)r   r   r   r   priority
system_job	only_onceonly_once_argssoft_time_limit	countdownregistry)r   r   r   r   r   r   r   r   r	   r	   r
   __init__   s    
zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S r   r   r   argskwargsr	   r	   r
   __call__-   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r   r   r   r	   r	   r
   apply0   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r   r   )apply_asyncr   r	   r	   r
   delay7   s    zCmfDeferredJobWrapper.delayc                    s  ddl m}m} dd l}	|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|j| j| j	d}
|d k	r|| j|
_|s|j
j|
_| j	rbt| j}d}| jd kr|rtd| j d| f |t|7 } r|t fddt D 7 }n| jD ]}|t | 7 }q|r2|d	t|   7 }||
_|jj|d
drb|d| j d d S |d krr| j}|rt tj|d |
_| | j	| j|
jj| j||
jj||jdd|
_|	j j!"  |
#  W 5 Q R X t|
jS )Nr   )modelsg)r   r    zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r	   r	   ).0kr   r	   r
   
<dictcomp>]   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open)only_once_keystatuszCmfDeferredJobWrapper(z): already queued)Zseconds)r   r   	person_idr   r   r   r   
admin_mode)r   r   options)$cmf.includer"   r#   
cmf.fieldsr   r   r   CmfDeferredJobr   r   Zcurrent_personidr-   strr   printsortedhashlibZmd5encodeZ	hexdigestr+   countdebugr   cmf_nowdatetime	timedeltaplan_start_datetimevalueZacl_admin_modeparams_jsonutilcmfutilZdisable_aclsave)r   r   r   r   r   r   r   r"   r#   cmfjobr+   args_strZarg_keyr	   r'   r
   r    :   sn    


 

z!CmfDeferredJobWrapper.apply_async)NFNNFN)NN)NNNNNN)r   r   __qualname__r   staticmethodr   r   r   r   r   r!   r    r	   r	   r	   r
   r      s   
          

r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r   )r   )Zfunc_r'   r	   r
   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r   r   rI   r   r	   r'   r
   cmf_deferred_job   s    
rM   c                  C   s   dd l } | jj jS )Nr   )r1   fieldsZCmfDateTimenowr?   )rD   r	   r	   r
   r;      s    r;   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   rJ   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr	   r	   r
   format_exception   s    r[   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r   )r<   rO   timezoneutcr	   r	   r	   r
   rO      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r   )	worker_idsuperr   )r   r_   r   r	   r
   r      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   r_   r   r	   r	   r
   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rG   rH   rO   r   r   __classcell__r	   r	   ra   r
   r\      s   
r\   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r   )r   r`   r   )r   r   r   ra   r	   r
   r      s    zQueueProcessor.__init__c           	   
   C   s^  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r(d|_|j  | j|_|  W 5 Q R  W 5 Q R  qHW 5 Q R X W 5 Q R X td q|d|j  |S )Nr   )r"   socketio
cmf_commit
   timeoutZblocking_timeoutr,   =r*   ORr>   z<=r   r   Tr+   z!=in_progressz--)filterrN   NOT INc                 S   s   g | ]
}|j qS r	   )r+   )r%   Zjob_r	   r	   r
   
<listcomp>   s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)rm   Zorder_byZ
for_updaterN   rP   zcmf-background-task-start-)r0   r"   re   rf   cmf.apprA   rB   CmfLock_GET_LOCK_KEYappcmf_contextr;   r   r2   Zslistappendgetr,   start_datetimeset_nowr_   rC   timesleepemitr3   )	r   rN   r"   re   rf   rD   Z
job_filterZonly_once_in_progressZnext_jobr	   r	   r
   _job_get_next   sJ    &

  
,zQueueProcessor._job_get_nextc              	   C   sz   ddl m} dd l}|j R d|_|j  t|j|j	 
 |_||_|  |d|j t| W 5 Q R X d S )Nr   re   successzcmf-background-task-result-)r0   re   rp   rs   rt   r,   end_datetimerx   intrw   total_secondsdurationresult_jsonrC   r{   r3   r4   )r   rE   resultre   rD   r	   r	   r
   _job_set_success   s    
zQueueProcessor._job_set_successc              	   C   s   ddl m} dd l}|j h d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  |d|j | W 5 Q R X d S )Nr   r}   failzcmf-background-task-error-)r0   re   rp   rs   rt   r,   r   rx   r   rw   r   r   r   sysexc_infor[   Z
error_textrC   r{   r3   )r   rE   errorre   rD   r   r	   r	   r
   _job_set_fail   s    

zQueueProcessor._job_set_failc                    s0   ddl m dd l  fdd}t||S )Nr   )r"   c               
      s   t jj }  j p jrd jjjj jj	j
d j  jd ddrdj  | jd jd W  5 Q R  S Q R X d S )NrN   r/   r.   Fr   r   )r   r   r   rs   rt   r-   Zset_current_personZ	CmfPersonrv   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextr@   Zactivate_admin_mode)job_funcrD   rE   r"   r	   r
   runner  s    

z'QueueProcessor._job_run.<locals>.runner)r0   r"   rp   geventZwith_timeout)r   rE   r   r   r	   r   r
   _job_run  s    zQueueProcessor._job_runc              
   C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n| tjk
rd    Y nf ttfk
r|    Y nN t	k
r } z0t
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr,   r_   r@   Zparam_pickler-   r   r/   r   )r   zjob_runner(z, ) error)r|   r@   rv   r   r   r   GreenletExit
SystemExitKeyboardInterrupt	Exceptionlogging	exceptionr3   r   r   r4   )r   rE   r   r   er	   r	   r
   
_run_cycle  s    zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for GreenletrJ   z cycle error.   N)
r   ry   rz   r   r   r   r   r   r   r   r   r	   r	   r
   _run6  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrb   z, priority=r   )r   r   r_   r   r   r	   r	   r
   r   C  s    zQueueProcessor.__repr__)N)N)r   r   rG   rr   r   r   r|   r   r   r   r   r   r   rc   r	   r	   ra   r
   rd      s   (
rd   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr"   ,  rh      )Zdays   Zhoursrk   r,   INr   canceldeadr   <rj   r~   rm    error)rp   r0   r   r"   rA   rB   rq   	_LOCK_KEYrs   rt   r;   r<   r=   r2   Zbulk_deleter   r   r   ry   rz   )r   rD   r   r"   rO   Zmax_end_datetimeZmax_success_end_datetimer	   r	   r
   r   J  s&    $zQueueCleaner._runNr   r   rG   r   r   r	   r	   r	   r
   r   G  s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   rh   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filerJ   r   r   )r,   r   r_   rn   c                 S   s   g | ]}|  qS r	   )decode)r%   Zw_idr	   r	   r
   ro     s     z%QueueWatcher._run.<locals>.<listcomp>r,   rj   rl   rw   r   r   r   <   ) rp   r0   r   r"   rs   r   REDIS_DBredisrA   rB   rq   r   rt   Zsmemberslistrv   r   Zsremremover5   r   stderrr;   r<   r=   r2   Zbulk_updater   r   r   ry   rz   )	r   rD   r   r"   redis_dbZ
db_workersr_   rO   Zmax_start_dater	   r	   r
   r   g  s6    $


zQueueWatcher._runNr   r	   r	   r	   r
   r   d  s   r   c                   @   s    e Zd ZdZdd Zdd ZdS )JobScheduleru  
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    - HOOK_CRON_MINUTELY и HOOK_CRON_HOURLY в первый цикл не запускаем,
        т.е. HOOK_CRON_MINUTELY запускаем на второй минуте работы
    APP.HOOK_CRON_MINUTELY = []
    APP.HOOK_CRON_HOURLY = []
    TODO:
        APP.HOOK_CRON_DAILY = []
        APP.HOOK_CRON_WEEKLY = []
        APP.HOOK_CRON_MONTHLY = []
    c                 C   s   ddl }|j  |D ]}zt|r0|}i }n|\}}t|ts\t|  d| d W q|js|t|  d| d W q|j	st|  d| d W q|j
f | W q tk
r   t|  d| d Y qX qW 5 Q R X dS )	u  
        hook - вероятно должен быть только CmfDeferredJob.
            Произвольные лёгкие функции можно выполнять без джобов в отдельном контексте и с лимитом времени,
            но вероятно такое не нужно, и будут злоупотреблять.
        Вторая форма, если нужно передать параметры:
            hook = (hook, {**apply_async_params(args=, kwargs=, priority=, ...)})
        Есть мнение, что hook.only_once должен быть всегда True у периодических задач.
            Можно его выставлять принудительно, но наверное пока лучше падать,
            чтобы не забывали явно указывать в объявлении джоба.
        Так же пока будем требовать system_job=True
        r   Nz: periodic hook(z+) must be Job(@cmf_deferred_job). Skip run.z*) must be with option only_once. Skip run.z+) must be with option system_job. Skip run.z schedule hook(r   )rp   rs   rt   callable
isinstancer   r   r   r   r   r    r   r   )r   ZhooksrD   hookr   paramsr	   r	   r
   _schedule_hooks  s(    
zJobScheduler._schedule_hooksc                 C   s   dd l }|jjjj}|  j}tdd}t	
d ztjtjj}|jd|d| jdddrr| |jjj |j|kr|j|kr|j}|jd	|d
| jdddr| |jjj W q* tk
r   t|  d Y q*X q*d S )Nr      r   zJobScheduler:minutely:z%Y%m%d_%H%Mx   T)exZnxzJobScheduler:hourly:z	%Y%m%d_%Hi   r   )rp   rs   r   r   r   rO   hourrandomZrandintry   rz   r<   r]   r^   setr_   r   HOOK_CRON_MINUTELYminuteHOOK_CRON_HOURLYr   r   r   )r   rD   r   Zhourly_lastZhourly_run_atrO   r	   r	   r
   r     s    

zJobScheduler._runN)r   r   rG   __doc__r   r   r	   r	   r	   r
   r     s   &r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   rP   2   rh   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )rp   r0   r   r   rs   r   r   r   ry   rz   r<   rO   r   r   rA   rB   rq   r   rv   r5   r   r   Zflushdbr   r   r   )r   rD   r   r   r   rO   r   r	   r	   r
   r     s$    


zRedisCleaner._runN)r   r   rG   r   r   r   r	   r	   r	   r
   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   N:)r   r_      )r_   r   -   )r   r   z$deferred_job_worker: redis error!!! rg   )ri   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"rp   platformZnodeuuidZuuid1rs   r   r   r   ru   rd   ranger   r   r   r   startr   Zsaddr   r   r   r   Ziwaitr5   r   r   r   r   r   ZkillallZTimeoutZ
successfulrv   )	Zsingle_queuerD   r_   r   Z	greenletsr   glr   Zgl_iterr	   r	   r
   deferred_job_worker  sR    
"(
r   c                  C   s0   dd l } dd }|| jjj || jjj d S )Nr   c                 S   sj   | D ]`}t |r|}n|\}}t|ts8td| d|jsNtd| d|jstd| dqd S )Nz'check_job_configuration: periodic hook(z%) must be Job(use @cmf_deferred_job).z ) must be with option only_once.z!) must be with option system_job.)r   r   r   
ValueErrorr   r   )Z	hook_listr   r   Z_paramsr	   r	   r
   check_hook_list<  s    
z0check_job_configuration.<locals>.check_hook_list)rp   rs   r   r   r   )rD   r   r	   r	   r
   check_job_configuration9  s    r   )F)r<   r7   rS   r   ry   rU   r   r   r   r   	functoolsr   r   r   Zcmf.util.cmfutilrD   r   rM   r;   r[   ZGreenletr\   rd   r   r   r   r   r   r   r	   r	   r	   r
   <module>   s4   x 'N
C