U
    |e`                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc                	   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__    s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc              
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _|
| _| jr|sttd| j d|std| j dz| | | j W n< tk
r } ztd| j d| j |W 5 d }~X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   show_bg_progressbarr   r   r   periodicregistry)r   r	   r   r   r    r!   r   r"   r   r   r#   r   r
   r
   r   __init__0   s.    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__M   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r'   r   r(   r
   r
   r   applyP   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r)   r*   )apply_asyncr(   r
   r
   r   delayW   s    zCmfDeferredJobWrapper.delayc	                    s.  ddl m}	m}
 dd l}|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|d krv| j}|	j	| j
| j||d}|d k	r| j|_|s|
jj|_| jrt| j
}d}| jd kr|rtd| j
 d| f |t|7 } r8|t fddt D 7 }n| jD ]}|t | 7 }q |rX|d	t|   7 }||_|	j	j|d
dr|
d| j
 d d S |d kr| j}|rt tj|d |_| | j| j|jj| j||jj||
j |
j!|
j"|dd|_#|j$j%&  |'  W 5 Q R X |r$|
j(t|j t|jS )Nr   modelsg)r   r   r   r#    zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr*   r
   r   
<dictcomp>   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open)only_once_keystatuszCmfDeferredJobWrapper(z): already queued)Zseconds)r   r    	person_idr   r!   r   r"   
admin_modecomponent_idsession_tab_idr#   )r)   r*   options))cmf.includer0   r1   
cmf.fieldsr!   r   r   r   r#   CmfDeferredJobr   r   Zcurrent_personidr;   strr    printsortedhashlibZmd5encodeZ	hexdigestr9   countdebugr"   cmf_nowdatetime	timedeltaplan_start_datetimevalueZacl_admin_moder=   r>   params_jsonutilcmfutilZdisable_aclsaveappend)r   r)   r*   r   r!   r   r"   r   r#   r0   r1   cmfjobr9   args_strZarg_keyr
   r5   r   r-   Z   s       

 

z!CmfDeferredJobWrapper.apply_async)	NFNNFNNNF)NN)NNNNNNNN)r   r   __qualname__r%   r$   staticmethodr   r   r   r   r&   r+   r,   r.   r-   r
   r
   r
   r   r      s4   


               

    r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r'   )r   )Zfunc_r5   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r)   r*   rZ   r	   r
   r5   r   cmf_deferred_job   s    
r^   c                  C   s   dd l } | jj jS )Nr   )rA   fieldsZCmfDateTimenowrO   )rU   r
   r
   r   rK      s    rK   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   r[   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    rl   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r'   )rL   r`   timezoneutcr
   r
   r
   r   r`      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r'   )	worker_idsuperr&   )r   rp   r   r
   r   r&      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rp   r   r
   r
   r   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rX   rY   r`   r&   r   __classcell__r
   r
   rr   r   rm      s   
rm   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r'   )r   rq   r&   )r   r   r*   rr   r
   r   r&      s    zQueueProcessor.__init__c           	   
   C   sL  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r(d|_|j  | j|_|  W 5 Q R  W 5 Q R  qHW 5 Q R X W 5 Q R X td q|S )Nr   )r0   socketio
cmf_commit
   timeoutZblocking_timeoutr:   =r8   ORrN   z<=r   r   Tr9   z!=in_progressz--)filterr_   NOT INc                 S   s   g | ]
}|j qS r
   )r9   )r3   Zjob_r
   r
   r   
<listcomp>  s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)r~   Zorder_byZ
for_updater_   ra   )r@   r0   rv   rw   cmf.apprQ   rR   CmfLock_GET_LOCK_KEYappcmf_contextrK   r   rB   ZslistrT   getr:   start_datetimeset_nowrp   rS   timesleep)	r   r_   r0   rv   rw   rU   Z
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next   sH    &

  
,zQueueProcessor._job_get_nextc              	   C   sh   ddl m} dd l}|j @ d|_d|_|j  t	|j|j
  |_||_|  W 5 Q R X d S )Nr   rv   successd   )r@   rv   r   r   r   r:   Zprogress_pctend_datetimer   intr   total_secondsdurationresult_jsonrS   )r   rV   resultrv   rU   r
   r
   r   _job_set_success  s    
zQueueProcessor._job_set_successc              	   C   s|   ddl m} dd l}|j T d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  W 5 Q R X d S )Nr   r   fail)r@   rv   r   r   r   r:   r   r   r   r   r   r   r   sysexc_inforl   Z
error_textrS   )r   rV   errorrv   rU   r   r
   r
   r   _job_set_fail+  s    

zQueueProcessor._job_set_failc                    s6   ddl mm dd l  fdd}t||S )Nr   r/   c               
      s   t jj }  j  jr jjjj jj	j
d j  jd ddrdj  jd ddrjd d _jd ddrjd d _| jd jd W  5 Q R  S Q R X d S )	Nr_   r?   r<   Fr=   r>   r)   r*   )r   r%   r   r   r   r;   Zset_current_personZ	CmfPersonr   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrP   Zactivate_admin_moder=   r>   )Zjob_funcrU   r1   rV   r0   r
   r   runner?  s    

z'QueueProcessor._job_run.<locals>.runner)r@   r0   r1   r   geventZwith_timeout)r   rV   r!   r   r
   r   r   _job_run;  s    zQueueProcessor._job_runc                 C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n tttjfk
r } z6t	
d
|j d|j d|  | |t|  W 5 d }~X Y nT ttjfk
r } z0t	
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr:   rp   rP   Zparam_pickler;   r   r?   r!   )r!   zjob_runner(z, z) interrupt by ) error)r   rP   r   r   r   
SystemExitKeyboardInterruptr   GreenletExitlogging	exceptionrC   r   r   rD   	ExceptionTimeout)r   rV   r!   r   r   r
   r
   r   
_run_cycleR  s     zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for Greenletr[   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _runh  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrs   z, priority=r   )r   r   rp   r   r   r
   r
   r   r   u  s    zQueueProcessor.__repr__)N)N)r   r   rX   r   r   r&   r   r   r   r   r   r   r   rt   r
   r
   rr   r   ru      s   )
ru   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr0   ,  ry      )Zdays   Zhoursr|   r:   INr   canceldeadr   <r{   r   r~    error)r   r@   r   r0   rQ   rR   r   	_LOCK_KEYr   r   rK   rL   rM   rB   Zbulk_deleter   r   r   r   r   )r   rU   r   r0   r`   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r   |  s&    $zQueueCleaner._runNr   r   rX   r   r   r
   r
   r
   r   r   y  s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   ry   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filer[   r   r   )r:   r   rp   r   c                 S   s   g | ]}|  qS r
   )decode)r3   Zw_idr
   r
   r   r     s     z%QueueWatcher._run.<locals>.<listcomp>r:   r{   r}   r   r   r   r   <   ) r   r@   r   r0   r   r   REDIS_DBredisrQ   rR   r   r   r   Zsmemberslistr   r   ZsremremoverE   r   stderrrK   rL   rM   rB   Zbulk_updater   r   r   r   r   )	r   rU   r   r0   redis_dbZ
db_workersrp   r`   Zmax_start_dater
   r
   r   r     s6    $


zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r'   )rq   r&   r$   r(   rr   r
   r   r&     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to dorx   z)JobScheduler: warning too low sleep time z.3frk      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r   r@   r   r   r   r   r   rL   r`   rn   ro   Z
astimezoneZtzinfor   r$   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rE   r   r   r   ZAPP_FQDNZget_nextminZget_currentrT   r   r   r   setrp   r   r-   r   r   r   )r   rU   r   r   Znext_tsZlocal_tzinforV   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   rX   __doc__r   r   r&   r   rt   r
   r
   rr   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   ra   2   ry   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r   r@   r   r   r   r   r   r   r   r   rL   r`   ZhourZminuterQ   rR   r   r   r   rE   r   r   Zflushdbr   r   r   )r   rU   r   r   r   r`   r   r
   r
   r   r     s$    


zRedisCleaner._runN)r   r   rX   r   r   r   r
   r
   r
   r   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rp      )rp   r   -   )r   r   z$deferred_job_worker: redis error!!! rx   )rz   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r   platformZnodeuuidZuuid1r   r   r   r   rT   ru   ranger   r   r   r   startr   Zsaddr   r   r   r   ZiwaitrE   r   r   r   r   r   Zkillallr   Z
successfulr   )	Zsingle_queuerU   rp   r   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker   sR    
"(
r   )F)rL   rG   rd   Zrandomr   rf   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrU   r   r^   rK   rl   ZGreenletrm   ru   r   r   r   r   r   r
   r
   r
   r   <module>   s8    " 'F