U
    Eff                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc                	   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__    s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc              
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _|
| _| jr|sttd| j d|std| j dz| | | j W n< tk
r } ztd| j d| j |W 5 d }~X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   show_bg_progressbarr   r   r   periodicregistry)r   r	   r   r   r    r!   r   r"   r   r   r#   r   r
   r
   r   __init__0   s.    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__M   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r'   r   r(   r
   r
   r   applyP   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r)   r*   )apply_asyncr(   r
   r
   r   delayW   s    zCmfDeferredJobWrapper.delayc	                    s|  ddl m}	m}
 dd l}|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|d krv| j}|	j	| j
| j||d}|d k	r| j|_|s|
jj|_|d kr| j}|rt tj|d |_| jr t| j
}d}| jd krB|rtd| j
 d| f |t|7 } r`|t fdd	t D 7 }n| jD ]}|t | 7 }qH|r|d
t|   7 }||_|	j	j|ddgd}|r |
d| j
 d |jr|jr|j|jk r|
d| j
 d|j  |j|_|  d S | | j| j|jj | j||jj ||
j!|
j"|
j#|dd|_$|j%j&'  |  W 5 Q R X |rr|
j(t|j t|jS )Nr   modelsg)r   r   r   r#   Zseconds zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr*   r
   r   
<dictcomp>   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open*)only_once_keystatusfieldszCmfDeferredJobWrapper(z): already queuedz): reschedule existent job )r   r    	person_idr   r!   r   r"   
admin_modecomponent_idsession_tab_idr#   )r)   r*   options))cmf.includer0   r1   
cmf.fieldsr!   r   r   r   r#   CmfDeferredJobr   r   Zcurrent_personidr>   r"   cmf_nowdatetime	timedeltaplan_start_datetimestrr    printsortedhashlibZmd5encodeZ	hexdigestr;   getdebugsavevalueZacl_admin_moder@   rA   params_jsonutilcmfutilZdisable_aclappend)r   r)   r*   r   r!   r   r"   r   r#   r0   r1   cmfjobr;   args_strZarg_keyZexistent_jobr
   r6   r   r-   Z   s       

 

z!CmfDeferredJobWrapper.apply_async)	NFNNFNNNF)NN)NNNNNNNN)r   r   __qualname__r%   r$   staticmethodr   r   r   r   r&   r+   r,   r.   r-   r
   r
   r
   r   r      s4   


               

    r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r'   )r   )Zfunc_r6   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r)   r*   r]   r	   r
   r6   r   cmf_deferred_job   s    
ra   c                  C   s   dd l } | jj jS )Nr   )rD   r=   ZCmfDateTimenowrS   )rX   r
   r
   r   rG      s    rG   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   r^   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    rn   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r'   )rH   rb   timezoneutcr
   r
   r
   r   rb      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r'   )	worker_idsuperr&   )r   rr   r   r
   r   r&      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rr   r   r
   r
   r   r      s    zJobWorkerGreenlet.__repr__)N)r   r   r[   r\   rb   r&   r   __classcell__r
   r
   rt   r   ro      s   
ro   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r'   )r   rs   r&   )r   r   r*   rt   r
   r   r&      s    zQueueProcessor.__init__c           	   
   C   sV  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r2d|_|j  | j|_|d |  W 5 Q R  W 5 Q R  qRW 5 Q R X W 5 Q R X td q|S )Nr   )r0   socketio
cmf_commit
   timeoutZblocking_timeoutr<   =r9   ORrJ   z<=r   r   Tr;   z!=in_progressz--)filterr=   NOT INc                 S   s   g | ]
}|j qS r
   )r;   )r4   Zjob_r
   r
   r   
<listcomp>  s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)r   Zorder_byZ
for_updater=   z	job startrc   )rC   r0   rx   ry   cmf.apprU   rV   CmfLock_GET_LOCK_KEYappcmf_contextrG   r   rE   ZslistrW   rP   r<   start_datetimeZset_nowrr   Z	error_logrR   timesleep)	r   r=   r0   rx   ry   rX   Z
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next  sJ    &

  

,zQueueProcessor._job_get_nextc              	   C   s8   ddl m} dd l}|j  || W 5 Q R X d S )Nr   rx   )rC   rx   r   r   r   Z
on_success)r   rY   resultrx   rX   r
   r
   r   _job_set_success,  s    zQueueProcessor._job_set_successc              	   C   sT   ddl m} dd l}|j , d}t }|d r<t|}|| W 5 Q R X d S )Nr   r   zError?)	rC   rx   r   r   r   sysexc_inforn   on_error)r   rY   errorrx   rX   Z
error_textr   r
   r
   r   _job_set_fail5  s    zQueueProcessor._job_set_failc                    s6   ddl mm dd l  fdd}t||S )Nr   r/   c               
      s   t jj }  j  jr jjjj jj	j
d j  jd ddrdj  jd ddrjd d _jd ddrjd d _| jd jd W  5 Q R  S Q R X d S )	Nr=   rB   r?   Fr@   rA   r)   r*   )r   r%   r   r   r   r>   Zset_current_personZ	CmfPersonrP   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrT   Zactivate_admin_moder@   rA   )Zjob_funcrX   r1   rY   r0   r
   r   runnerF  s    

z'QueueProcessor._job_run.<locals>.runner)rC   r0   r1   r   geventZwith_timeout)r   rY   r!   r   r
   r   r   _job_runB  s    zQueueProcessor._job_runc                 C   s   | j dgd}|sd S |jd d}z| j||d}| || W n tttjfk
r } z6t	
d|j d|j d|  | |t|  W 5 d }~X Y nT ttjfk
r } z0t	
d|j d|j d	 | |t| W 5 d }~X Y nX d S )
Nr:   r   rB   r!   )r!   zjob_runner(z, z) interrupt by ) error)r   rT   rP   r   r   
SystemExitKeyboardInterruptr   GreenletExitlogging	exceptionrF   r   r   rK   	ExceptionTimeout)r   rY   r!   r   r   r
   r
   r   
_run_cycleY  s     zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for Greenletr^   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _runo  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nru   z, priority=r   )r   r   rr   r   r   r
   r
   r   r   |  s    zQueueProcessor.__repr__)N)N)r   r   r[   r   intr&   r   r   r   r   r   r   r   rv   r
   r
   rt   r   rw      s   *	
rw   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr0   ,  r{      )Zdays   )Zhoursr~   r<   INZfailcancelZdeadZend_datetime<r}   success)r    error)r   rC   r   r0   rU   rV   r   	_LOCK_KEYr   r   rG   rH   rI   rE   Zbulk_deleter   r   r   r   r   )r   rX   r   r0   rb   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r     s&    $zQueueCleaner._runNr   r   r[   r   r   r
   r
   r
   r   r     s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc              
   C   s  dd l }ddlm}m} |jjjj}z@|jj	j
| jddd |j  |d}|rrt t| dk rtd W 5 Q R  W 5 Q R  W q$|d}t|D ]L}|d	|  d kr|d| || td
|  dtjd qt }|tjdd }	|jjdgdddd |D gdddgdd|	ggdD ]}
|
d q>W 5 Q R X W 5 Q R X W n: |k
r|   Y n& tk
r   t |  d Y nX td q$d S )Nr   r   r   r{   redis_idr   rz   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filer2   r:   rr   r   c                 S   s   g | ]}|  qS r
   )decode)r4   Zw_idr
   r
   r   r     s     z%QueueWatcher._run.<locals>.<listcomp>r<   r}   r   r   r   )r=   r   zJob Worker is deadr   <   )!r   rC   r   r0   r   r   REDIS_DBredisrU   rV   r   r   r   rP   r   floatr   Zsmemberslistr   ZsremremoverL   r   stderrrG   rH   rI   rE   r   r   r   r   )r   rX   r   r0   redis_dbr   Z
db_workersrr   rb   Zmax_start_daterY   r
   r
   r   r     s<    (




&zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r'   )rs   r&   r$   r(   rt   r
   r   r&     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to dorz   z)JobScheduler: warning too low sleep time z.3frm      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r   rC   r   r   r   r   r   rH   rb   rp   rq   Z
astimezoneZtzinfor   r$   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rL   r   r   r   ZAPP_FQDNZget_nextminZget_currentrW   r   r   r   setrr   r   r-   r   r   r   )r   rX   r   r   Znext_tsZlocal_tzinforY   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   r[   __doc__r   r   r&   r   rv   r
   r
   rt   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s  dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddL |d}|rt t| dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ tjjk
r   Y q$ |k
r   Y q$ tk
r
   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   rc   2   r{   r   i`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r   rC   r   r   r   r   r   r   r   r   rH   rb   ZhourZminuterU   rV   r   r   rP   r   rL   r   r   Zflushdb
exceptionsZLockNotOwnedErrorr   r   r   )r   rX   r   r   r   rb   r   r
   r
   r   r     s(    


zRedisCleaner._runN)r   r   r[   r   r   r   r
   r
   r
   r   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rr      )rr   r   -   )r   r   z$deferred_job_worker: redis error!!! rz   )r|   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r   platformZnodeuuidZuuid1r   r   r   r   rW   rw   ranger   r   r   r   startr   Zsaddr   r   r   r   ZiwaitrL   r   r   r   r   r   Zkillallr   Z
successfulrP   )	Zsingle_queuerX   rr   r   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker/  sR    
"(
r   )F)rH   rN   rf   Zrandomr   rh   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrX   r   ra   rG   rn   ZGreenletro   rw   r   r   r   r   r   r
   r
   r
   r   <module>   s8    0 ,F 