U
    7feU                     @   s
  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlZd dlZd dlZG dd dZdd Zdd Zd	d
 ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdd ZdS )    N)update_wrapperc                   @   sR   e Zd Zi Zedd Zdd ZdddZd	d
 ZdddZ	dd Z
dddZdS )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r	   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr	   r	   r
   __repr__   s    zCmfDeferredJobWrapper.__repr__NFc                 C   sJ   t | | || _| || _|| _|| _|| _|| _|| _| | j	| j< d S N)
r   r   r   r   priority
system_job	only_oncesoft_time_limit	countdownregistry)r   r   r   r   r   r   r   r	   r	   r
   __init__   s    
zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S r   r   r   argskwargsr	   r	   r
   __call__)   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r   r   r   r	   r	   r
   apply,   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r   r   )apply_asyncr   r	   r	   r
   delay3   s    zCmfDeferredJobWrapper.delayc              	   C   s2  ddl m}m} dd l}	| jrH|jj| jddrH|d| j d d S |d krTg }|d kr`i }|d krn| j	}|d kr|| j
}|d kr| j}|j| j| jd}
|d k	r| j|
_|s|jj|
_|d kr| j}|rt tj|d |
_||| j|
jj| j
||
jj|d	d
|
_|	jj  |
  W 5 Q R X t|
jS )Nr   )modelsgopen)r   statuszCmfDeferredJobWrapper(z): already queued)r   r   )Zseconds)r   	person_idr   r   r   r   )r   r   options)cmf.includer!   r"   
cmf.fieldsr   CmfDeferredJobcountr   debugr   r   r   Zcurrent_personidr%   r   cmf_nowdatetime	timedeltaplan_start_datetimevalueparams_jsonutilcmfutilZdisable_aclsavestr)r   r   r   r   r   r   r   r!   r"   cmfjobr	   r	   r
   r   6   sL    
z!CmfDeferredJobWrapper.apply_async)NFNFN)NN)NNNNNN)r   r   __qualname__r   staticmethodr   r   r   r   r   r    r   r	   r	   r	   r
   r      s   


r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r   )r   )Zfunc_r   r	   r
   wrappero   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r   r   r<   r   r	   r;   r
   cmf_deferred_jobj   s    
r@   c                  C   s   dd l } | jj jS )Nr   )r(   fieldsZCmfDateTimenowr1   )r7   r	   r	   r
   r-      s    r-   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   r=   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr	   r	   r
   format_exception   s    rN   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r   )r.   rB   timezoneutcr	   r	   r	   r
   rB      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r   )	worker_idsuperr   )r   rR   r   r	   r
   r      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rR   r   r	   r	   r
   r      s    zJobWorkerGreenlet.__repr__)N)r   r   r9   r:   rB   r   r   __classcell__r	   r	   rT   r
   rO      s   
rO   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r   )r   rS   r   )r   r   r   rT   r	   r
   r      s    zQueueProcessor.__init__c              
   C   s  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r||dd| jgg}|jj|dgd|d}|rd|_|j  | j|_|  W 5 Q R  W 5 Q R  qW 5 Q R X W 5 Q R X td q|d|j  |S )Nr   )r!   socketio
cmf_commit
   timeoutZblocking_timeoutr$   =r#   ORr0   z<=r   Zcmf_created_atT)filterZorder_byZ
for_updaterA   in_progressrC   zcmf-background-task-start-)r'   r!   rX   rY   cmf.appr3   r4   CmfLock_GET_LOCK_KEYappcmf_contextr-   r   r)   getr$   start_datetimeset_nowrR   r5   timesleepemitr,   )r   rA   r!   rX   rY   r7   Z
job_filterZnext_jobr	   r	   r
   _job_get_next   s4    $

  
*zQueueProcessor._job_get_nextc              	   C   sz   ddl m} dd l}|j R d|_|j  t|j|j	 
 |_||_|  |d|j t| W 5 Q R X d S )Nr   rX   successzcmf-background-task-result-)r'   rX   ra   rd   re   r$   end_datetimerh   intrg   total_secondsdurationresult_jsonr5   rk   r,   r6   )r   r8   resultrX   r7   r	   r	   r
   _job_set_success   s    
zQueueProcessor._job_set_successc              	   C   s   ddl m} dd l}|j h d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  |d|j | W 5 Q R X d S )Nr   rm   failzcmf-background-task-error-)r'   rX   ra   rd   re   r$   ro   rh   rp   rg   rq   rr   rs   sysexc_inforN   Z
error_textr5   rk   r,   )r   r8   errorrX   r7   rx   r	   r	   r
   _job_set_fail   s    

zQueueProcessor._job_set_failc                    s0   ddl m dd l  fdd}t||S )Nr   )r!   c               
      st   t jj }  j T jrH jjjj jj	j
d j  | jd jd W  5 Q R  S Q R X d S )NrA   r   r   )r   r   r   rd   re   r%   Zset_current_personZ	CmfPersonrf   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextr2   )job_funcr7   r8   r!   r	   r
   runner   s    
z'QueueProcessor._job_run.<locals>.runner)r'   r!   ra   geventZwith_timeout)r   r8   r   r   r	   r~   r
   _job_run   s    	zQueueProcessor._job_runc              
   C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n| tjk
rd    Y nf ttfk
r|    Y nN t	k
r } z0t
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr$   rR   r2   Zparam_pickler%   r{   r&   r   )r   zjob_runner(z, ) error)rl   r2   rf   r   ru   r   GreenletExit
SystemExitKeyboardInterrupt	Exceptionlogging	exceptionr,   r   rz   r6   )r   r8   r   rt   er	   r	   r
   
_run_cycle   s    zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for Greenletr=   z cycle error.   N)
r   ri   rj   r   r   r   r   r   r   r   r   r	   r	   r
   _run  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )NrU   z, priority=r   )r   r   rR   r   r   r	   r	   r
   r     s    zQueueProcessor.__repr__)N)N)r   r   r9   rc   rp   r   rl   ru   rz   r   r   r   r   rV   r	   r	   rT   r
   rW      s   
rW   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr!   ,  r[      )Zdays   Zhoursr^   r$   INrv   canceldeadro   <r]   rn   r_    error)ra   r'   r   r!   r3   r4   rb   	_LOCK_KEYrd   re   r-   r.   r/   r)   Zbulk_deleter   r   r   ri   rj   )r   r7   r   r!   rB   Zmax_end_datetimeZmax_success_end_datetimer	   r	   r
   r     s&    $zQueueCleaner._runNr   r   r9   r   r   r	   r	   r	   r
   r     s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   r[   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filer=   r   r   )r$   ro   rR   zNOT INc                 S   s   g | ]}|  qS r	   )decode).0Zw_idr	   r	   r
   
<listcomp>O  s     z%QueueWatcher._run.<locals>.<listcomp>r$   r]   r`   rg   r   r   r   <   ) ra   r'   r   r!   rd   r|   REDIS_DBredisr3   r4   rb   r   re   Zsmemberslistrf   r   Zsremremoveprintrw   stderrr-   r.   r/   r)   Zbulk_updater   r   r   ri   rj   )	r   r7   r   r!   redis_dbZ
db_workersrR   rB   Zmax_start_dater	   r	   r
   r   6  s6    $


zQueueWatcher._runNr   r	   r	   r	   r
   r   3  s   r   c                   @   s    e Zd ZdZdd Zdd ZdS )JobScheduleru  
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    - HOOK_CRON_MINUTELY и HOOK_CRON_HOURLY в первый цикл не запускаем,
        т.е. HOOK_CRON_MINUTELY запускаем на второй минуте работы
    APP.HOOK_CRON_MINUTELY = []
    APP.HOOK_CRON_HOURLY = []
    TODO:
        APP.HOOK_CRON_DAILY = []
        APP.HOOK_CRON_WEEKLY = []
        APP.HOOK_CRON_MONTHLY = []
    c                 C   s   ddl }|j  |D ]}zt|r0|}i }n|\}}t|ts\t|  d| d W q|js|t|  d| d W q|j	st|  d| d W q|j
f | W q tk
r   t|  d| d Y qX qW 5 Q R X dS )	u  
        hook - вероятно должен быть только CmfDeferredJob.
            Произвольные лёгкие функции можно выполнять без джобов в отдельном контексте и с лимитом времени,
            но вероятно такое не нужно, и будут злоупотреблять.
        Вторая форма, если нужно передать параметры:
            hook = (hook, {**apply_async_params(args=, kwargs=, priority=, ...)})
        Есть мнение, что hook.only_once должен быть всегда True у периодических задач.
            Можно его выставлять принудительно, но наверное пока лучше падать,
            чтобы не забывали явно указывать в объявлении джоба.
        Так же пока будем требовать system_job=True
        r   Nz: periodic hook(z+) must be Job(@cmf_deferred_job). Skip run.z*) must be with option only_once. Skip run.z+) must be with option system_job. Skip run.z schedule hook(r   )ra   rd   re   callable
isinstancer   r   ry   r   r   r   r   r   )r   Zhooksr7   hookr}   paramsr	   r	   r
   _schedule_hooksg  s(    
zJobScheduler._schedule_hooksc                 C   s   dd l }|jjjj}|  j}tdd}t	
d ztjtjj}|jd|d| jdddrr| |jjj |j|kr|j|kr|j}|jd	|d
| jdddr| |jjj W q* tk
r   t|  d Y q*X q*d S )Nr      r   zJobScheduler:minutely:z%Y%m%d_%H%Mx   T)exZnxzJobScheduler:hourly:z	%Y%m%d_%Hi   r   )ra   rd   r|   r   r   rB   hourrandomZrandintri   rj   r.   rP   rQ   setrR   r   HOOK_CRON_MINUTELYminuteHOOK_CRON_HOURLYr   r   r   )r   r7   r   Zhourly_lastZhourly_run_atrB   r	   r	   r
   r     s    

zJobScheduler._runN)r   r   r9   __doc__r   r   r	   r	   r	   r
   r   Z  s   &r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   rC   2   r[   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )ra   r'   r   r   rd   r|   r   r   ri   rj   r.   rB   r   r   r3   r4   rb   r   rf   r   rw   r   Zflushdbr   r   r   )r   r7   r   r   r   rB   r   r	   r	   r
   r     s$    


zRedisCleaner._runN)r   r   r9   r   r   r   r	   r	   r	   r
   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   N:)r   rR      )rR   r   -   )r   r   z$deferred_job_worker: redis error!!! rZ   )r\   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"ra   platformZnodeuuidZuuid1rd   r|   r   r   appendrW   ranger   r   r   r   startr   Zsaddr   r   r   r   Ziwaitr   rw   r   r   r   r   ZkillallZTimeoutZ
successfulrf   )	Zsingle_queuer7   rR   r   Z	greenletsr   glr   Zgl_iterr	   r	   r
   deferred_job_worker  sR    
"(
r   c                  C   s0   dd l } dd }|| jjj || jjj d S )Nr   c                 S   sr   t |  | D ]`}t|r|}n|\}}t|ts@td| d|jsVtd| d|jstd| dqd S )Nz'check_job_configuration: periodic hook(z%) must be Job(use @cmf_deferred_job).z ) must be with option only_once.z!) must be with option system_job.)r   r   r   r   
ValueErrorr   r   )Z	hook_listr   r}   Z_paramsr	   r	   r
   check_hook_list  s    
z0check_job_configuration.<locals>.check_hook_list)ra   rd   r|   r   r   )r7   r   r	   r	   r
   check_job_configuration  s    r   )F)r.   rF   r   ri   rH   rw   r   r   r   	functoolsr   r   r   Zcmf.util.cmfutilr7   r   r@   r-   rN   ZGreenletrO   rW   r   r   r   r   r   r   r	   r	   r	   r
   <module>   s0   Xu'N
C