U
    [SeT                     @   s
  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlZd dlZd dlZG dd dZdd Zdd Zd	d
 ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdd ZdS )    N)update_wrapperc                   @   sH   e Zd Zi Zedd Zdd ZdddZd	d
 Zdd Z	dddZ
dS )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r	   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr	   r	   r
   __repr__   s    zCmfDeferredJobWrapper.__repr__NFc                 C   sJ   t | | || _| || _|| _|| _|| _|| _|| _| | j	| j< d S N)
r   r   r   r   priority
system_job	only_oncesoft_time_limit	countdownregistry)r   r   r   r   r   r   r   r	   r	   r
   __init__   s    
zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S r   r   r   argskwargsr	   r	   r
   __call__)   s    zCmfDeferredJobWrapper.__call__c                 O   s   | j ||dS )N)r   r   )apply_asyncr   r	   r	   r
   delay,   s    zCmfDeferredJobWrapper.delayc              	   C   s2  ddl m}m} dd l}	| jrH|jj| jddrH|d| j d d S |d krTg }|d kr`i }|d krn| j	}|d kr|| j
}|d kr| j}|j| j| jd}
|d k	r| j|
_|s|jj|
_|d kr| j}|rt tj|d |
_||| j|
jj| j
||
jj|d	d
|
_|	jj  |
  W 5 Q R X t|
jS )Nr   )modelsgopen)r   statuszCmfDeferredJobWrapper(z): already queued)r   r   )Zseconds)r   	person_idr   r   r   r   )r   r   options)cmf.includer    r!   
cmf.fieldsr   CmfDeferredJobcountr   debugr   r   r   Zcurrent_personidr$   r   cmf_nowdatetime	timedeltaplan_start_datetimevalueparams_jsonutilcmfutilZdisable_aclsavestr)r   r   r   r   r   r   r   r    r!   cmfjobr	   r	   r
   r   /   sL    
z!CmfDeferredJobWrapper.apply_async)NFNFN)NNNNNN)r   r   __qualname__r   staticmethodr   r   r   r   r   r   r	   r	   r	   r
   r      s   

r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r   )r   )Zfunc_r   r	   r
   wrapperh   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r   r   r;   r   r	   r:   r
   cmf_deferred_jobc   s    
r?   c                  C   s   dd l } | jj jS )Nr   )r'   fieldsZCmfDateTimenowr0   )r6   r	   r	   r
   r,   x   s    r,   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   r<   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr	   r	   r
   format_exception~   s    rM   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r   )r-   rA   timezoneutcr	   r	   r	   r
   rA      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r   )	worker_idsuperr   )r   rQ   r   r	   r
   r      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rQ   r   r	   r	   r
   r      s    zJobWorkerGreenlet.__repr__)N)r   r   r8   r9   rA   r   r   __classcell__r	   r	   rS   r
   rN      s   
rN   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r   )r   rR   r   )r   r   r   rS   r	   r
   r      s    zQueueProcessor.__init__c              
   C   s  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r||dd| jgg}|jj|dgd|d}|rd|_|j  | j|_|  W 5 Q R  W 5 Q R  qW 5 Q R X W 5 Q R X td q|d|j  |S )Nr   )r    socketio
cmf_commit
   timeoutZblocking_timeoutr#   =r"   ORr/   z<=r   Zcmf_created_atT)filterZorder_byZ
for_updater@   in_progressrB   zcmf-background-task-start-)r&   r    rW   rX   cmf.appr2   r3   CmfLock_GET_LOCK_KEYappcmf_contextr,   r   r(   getr#   start_datetimeset_nowrQ   r4   timesleepemitr+   )r   r@   r    rW   rX   r6   Z
job_filterZnext_jobr	   r	   r
   _job_get_next   s4    $

  
*zQueueProcessor._job_get_nextc              	   C   sz   ddl m} dd l}|j R d|_|j  t|j|j	 
 |_||_|  |d|j t| W 5 Q R X d S )Nr   rW   successzcmf-background-task-result-)r&   rW   r`   rc   rd   r#   end_datetimerg   intrf   total_secondsdurationresult_jsonr4   rj   r+   r5   )r   r7   resultrW   r6   r	   r	   r
   _job_set_success   s    
zQueueProcessor._job_set_successc              	   C   s   ddl m} dd l}|j h d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  |d|j | W 5 Q R X d S )Nr   rl   failzcmf-background-task-error-)r&   rW   r`   rc   rd   r#   rn   rg   ro   rf   rp   rq   rr   sysexc_inforM   Z
error_textr4   rj   r+   )r   r7   errorrW   r6   rw   r	   r	   r
   _job_set_fail   s    

zQueueProcessor._job_set_failc              
   C   s   ddl m} dd l}tj|j }|j ^ |jr\|j	|j
j|j|jjjd |j  tj||f|jd |jd W  5 Q R  S Q R X d S )Nr   )r    r@   r   r   )r&   r    r`   r   r   r   rc   rd   r$   Zset_current_personZ	CmfPersonre   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextgeventZwith_timeoutr1   )r   r7   r   r    r6   job_funcr	   r	   r
   _job_run   s"    
 zQueueProcessor._job_runc              
   C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n| tjk
rd    Y nf ttfk
r|    Y nN t	k
r } z0t
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr#   rQ   r1   Zparam_pickler$   rz   r%   r   )r   zjob_runner(z, ) error)rk   r1   re   r~   rt   r|   GreenletExit
SystemExitKeyboardInterrupt	Exceptionlogging	exceptionr+   r   ry   r5   )r   r7   r   rs   er	   r	   r
   
_run_cycle   s    zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for Greenletr<   z cycle error.   N)
r   rh   ri   r   r   r|   r   r   r   r   r   r	   r	   r
   _run   s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )NrT   z, priority=r   )r   r   rQ   r   r   r	   r	   r
   r     s    zQueueProcessor.__repr__)N)N)r   r   r8   rb   ro   r   rk   rt   ry   r~   r   r   r   rU   r	   r	   rS   r
   rV      s   
rV   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr    ,  rZ      )Zdays   Zhoursr]   r#   INru   canceldeadrn   <r\   rm   r^    error)r`   r&   r   r    r2   r3   ra   	_LOCK_KEYrc   rd   r,   r-   r.   r(   Zbulk_deleter   r   r   rh   ri   )r   r6   r   r    rA   Zmax_end_datetimeZmax_success_end_datetimer	   r	   r
   r     s&    $zQueueCleaner._runNr   r   r8   r   r   r	   r	   r	   r
   r     s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   rZ   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filer<   r   r   )r#   rn   rQ   zNOT INc                 S   s   g | ]}|  qS r	   )decode).0Zw_idr	   r	   r
   
<listcomp>H  s     z%QueueWatcher._run.<locals>.<listcomp>r#   r\   r_   rf   r   r   r   <   ) r`   r&   r   r    rc   r{   REDIS_DBredisr2   r3   ra   r   rd   Zsmemberslistre   r   Zsremremoveprintrv   stderrr,   r-   r.   r(   Zbulk_updater   r   r   rh   ri   )	r   r6   r   r    redis_dbZ
db_workersrQ   rA   Zmax_start_dater	   r	   r
   r   /  s6    $


zQueueWatcher._runNr   r	   r	   r	   r
   r   ,  s   r   c                   @   s    e Zd ZdZdd Zdd ZdS )JobScheduleru  
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    - HOOK_CRON_MINUTELY и HOOK_CRON_HOURLY в первый цикл не запускаем,
        т.е. HOOK_CRON_MINUTELY запускаем на второй минуте работы
    APP.HOOK_CRON_MINUTELY = []
    APP.HOOK_CRON_HOURLY = []
    TODO:
        APP.HOOK_CRON_DAILY = []
        APP.HOOK_CRON_WEEKLY = []
        APP.HOOK_CRON_MONTHLY = []
    c                 C   s   ddl }|j  |D ]}zt|r0|}i }n|\}}t|ts\t|  d| d W q|js|t|  d| d W q|j	st|  d| d W q|j
f | W q tk
r   t|  d| d Y qX qW 5 Q R X dS )	u  
        hook - вероятно должен быть только CmfDeferredJob.
            Произвольные лёгкие функции можно выполнять без джобов в отдельном контексте и с лимитом времени,
            но вероятно такое не нужно, и будут злоупотреблять.
        Вторая форма, если нужно передать параметры:
            hook = (hook, {**apply_async_params(args=, kwargs=, priority=, ...)})
        Есть мнение, что hook.only_once должен быть всегда True у периодических задач.
            Можно его выставлять принудительно, но наверное пока лучше падать,
            чтобы не забывали явно указывать в объявлении джоба.
        Так же пока будем требовать system_job=True
        r   Nz: periodic hook(z+) must be Job(@cmf_deferred_job). Skip run.z*) must be with option only_once. Skip run.z+) must be with option system_job. Skip run.z schedule hook(r   )r`   rc   rd   callable
isinstancer   r   rx   r   r   r   r   r   )r   Zhooksr6   hookr}   paramsr	   r	   r
   _schedule_hooks`  s(    
zJobScheduler._schedule_hooksc                 C   s   dd l }|jjjj}|  j}tdd}t	
d ztjtjj}|jd|d| jdddrr| |jjj |j|kr|j|kr|j}|jd	|d
| jdddr| |jjj W q* tk
r   t|  d Y q*X q*d S )Nr      r   zJobScheduler:minutely:z%Y%m%d_%H%Mx   T)exZnxzJobScheduler:hourly:z	%Y%m%d_%Hi   r   )r`   rc   r{   r   r   rA   hourrandomZrandintrh   ri   r-   rO   rP   setrQ   r   HOOK_CRON_MINUTELYminuteHOOK_CRON_HOURLYr   r   r   )r   r6   r   Zhourly_lastZhourly_run_atrA   r	   r	   r
   r     s    

zJobScheduler._runN)r   r   r8   __doc__r   r   r	   r	   r	   r
   r   S  s   &r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   rB   2   rZ   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r`   r&   r   r   rc   r{   r   r   rh   ri   r-   rA   r   r   r2   r3   ra   r   re   r   rv   r   Zflushdbr   r   r   )r   r6   r   r   r   rA   r   r	   r	   r
   r     s$    


zRedisCleaner._runN)r   r   r8   r   r   r   r	   r	   r	   r
   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   N:)r   rQ      )rQ   r   -   )r   r   z$deferred_job_worker: redis error!!! rY   )r[   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r`   platformZnodeuuidZuuid1rc   r{   r   r   appendrV   ranger   r   r   r   startr   Zsaddr   r   r   r|   Ziwaitr   rv   r   r   r   r   ZkillallZTimeoutZ
successfulre   )	Zsingle_queuer6   rQ   r   Z	greenletsr   glr   Zgl_iterr	   r	   r
   deferred_job_worker  sR    
"(
r   c                  C   s0   dd l } dd }|| jjj || jjj d S )Nr   c                 S   sr   t |  | D ]`}t|r|}n|\}}t|ts@td| d|jsVtd| d|jstd| dqd S )Nz'check_job_configuration: periodic hook(z%) must be Job(use @cmf_deferred_job).z ) must be with option only_once.z!) must be with option system_job.)r   r   r   r   
ValueErrorr   r   )Z	hook_listr   r}   Z_paramsr	   r	   r
   check_hook_list  s    
z0check_job_configuration.<locals>.check_hook_list)r`   rc   r{   r   r   )r6   r   r	   r	   r
   check_job_configuration  s    r   )F)r-   rE   r   rh   rG   rv   r   r   r   	functoolsr   r|   r   Zcmf.util.cmfutilr6   r   r?   r,   rM   ZGreenletrN   rV   r   r   r   r   r   r   r	   r	   r	   r
   <module>   s0   Qu'N
C