B
    fSg                 @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc            	   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc             C   s   | j  d| j S )N.)
__module____name__)func r	   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec             C   s   | j j d| j dS )Nz(name=))	__class__r   name)selfr	   r	   r
   __repr__    s    zCmfDeferredJobWrapper.__repr__c             C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr	   )scheduler	   r	   r
   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec          
   C   sN   yt j j| dd W n4 t jk
rH } ztd|  |W d d }~X Y nX d S )N   1)hash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er	   r	   r
   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc          
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _|
| _| jr|sttd| j d|std| j dy| | | j W n< tk
r } ztd| j d| j |W d d }~X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   show_bg_progressbarr   r   r   periodicregistry)r   r   r   r   r   r   r   r    r   r   r!   r   r	   r	   r
   __init__0   s.    
,zCmfDeferredJobWrapper.__init__c             O   s   | j ||S )N)r   )r   argskwargsr	   r	   r
   __call__M   s    zCmfDeferredJobWrapper.__call__c             C   s$   |d krg }|d kri }| j ||S )N)r   )r   r%   r&   r	   r	   r
   applyP   s
    zCmfDeferredJobWrapper.applyc             O   s   | j ||dS )N)r%   r&   )apply_async)r   r%   r&   r	   r	   r
   delayW   s    zCmfDeferredJobWrapper.delayc	                s  ddl m}	m}
 dd l}|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|d krv| j}|	j	| j
| j||d}|d k	r| j|_|s|
jj|_|d kr| j}|rt tj|d |_| jrt| j
}d}| jd krB|rtd| j
 d| f |t|7 } rd|t fdd	t D 7 }n"x | jD ]}|t | 7 }qJW |r|d
t|   7 }||_|	j	j|ddgd}|r|
d| j
 d |jr|jr|j|jk r|
d| j
 d|j  |j|_|jj !  |"  W d Q R X d S | | j| j|jj#| j||jj#||
j$|
j%|
j&|dd|_'|jj !  |"  W d Q R X |r|
j(t|j t|jS )Nr   )modelsg)r   r   r   r!   )seconds zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                s   i | ]} | |qS r	   r	   ).0k)r&   r	   r
   
<dictcomp>   s    z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open*)only_once_keystatusfieldszCmfDeferredJobWrapper(z): already queuedz): reschedule existent job )r   r   	person_idr   r   r   r    
admin_modecomponent_idsession_tab_idr!   )r%   r&   options))cmf.includer+   r,   
cmf.fieldsr   r   r   r   r!   CmfDeferredJobr   r   Zcurrent_personidr8   r    cmf_nowdatetime	timedeltaplan_start_datetimestrr   printsortedhashlibZmd5encodeZ	hexdigestr5   getdebugutilcmfutilZdisable_aclsavevalueZacl_admin_moder:   r;   params_jsonappend)r   r%   r&   r   r   r   r    r   r!   r+   r,   cmfjobr5   args_strZarg_keyZexistent_jobr	   )r&   r
   r)   Z   s    

 z!CmfDeferredJobWrapper.apply_async)	NFNNFNNNF)NN)NNNNNNNN)r   r   __qualname__r#   r"   staticmethodr   r   r   r   r$   r'   r(   r*   r)   r	   r	   r	   r
   r      s     

 r   c                 sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                s   t | f S )N)r   )Zfunc_)r&   r	   r
   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r%   r&   rW   r   r	   )r&   r
   cmf_deferred_job   s    
r[   c              C   s   dd l } | jj jS )Nr   )r>   r7   ZCmfDateTimenowrO   )rR   r	   r	   r
   rA      s    rA   c             C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   rX   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr	   r	   r
   format_exception   s    rh   c                   s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc               C   s   t j t jjS )N)rB   r\   timezoneutcr	   r	   r	   r
   r\      s    zJobWorkerGreenlet.nowNc                s   || _ t   d S )N)	worker_idsuperr$   )r   rl   )r   r	   r
   r$      s    zJobWorkerGreenlet.__init__c             C   s   | j j d| j dS )N(r   )r   r   rl   )r   r	   r	   r
   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rU   rV   r\   r$   r   __classcell__r	   r	   )r   r
   ri      s   ri   c                   sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                s   || _ t jf | d S )N)r   rm   r$   )r   r   r&   )r   r	   r
   r$      s    zQueueProcessor.__init__c       	   
   C   sD  ddl m}m}m} dd l}x |jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|rd|_|j  | j|_|d |  P W d Q R X W d Q R X td q W |S )Nr   )r+   socketio
cmf_commit
   )timeoutblocking_timeoutr6   =r3   ORrD   z<=r   r   Tr5   z!=in_progressz--)filterr7   zNOT INc             S   s   g | ]
}|j qS r	   )r5   )r/   Zjob_r	   r	   r
   
<listcomp>  s    z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)ry   Zorder_byZ
for_updater7   z	job startr]   )r=   r+   rq   rr   cmf.apprL   rM   CmfLock_GET_LOCK_KEYappcmf_contextrA   r   r?   ZslistrQ   rJ   r6   start_datetimeZset_nowrl   Z	error_logrN   timesleep)	r   r7   r+   rq   rr   rR   Z
job_filterZonly_once_in_progressZnext_jobr	   r	   r
   _job_get_next  s<    $


zQueueProcessor._job_get_nextc          	   C   s8   ddl m} dd l}|j  || W d Q R X d S )Nr   )rq   )r=   rq   r{   r~   r   Z
on_success)r   rS   resultrq   rR   r	   r	   r
   _job_set_success-  s    zQueueProcessor._job_set_successc          	   C   sT   ddl m} dd l}|j , d}t }|d r<t|}|| W d Q R X d S )Nr   )rq   zError?)	r=   rq   r{   r~   r   sysexc_inforh   on_error)r   rS   errorrq   rR   Z
error_textr   r	   r	   r
   _job_set_fail6  s    zQueueProcessor._job_set_failc                s6   ddl mm dd l  fdd}t||S )Nr   )r+   r,   c           	      s   t jj }  j  j jrjjj	kr j
jjj jjjd j  jd ddr~j  jd ddrjd d _jd ddrjd d _| jd jd S Q R X d S )	N)r7   r<   r9   Fr:   r;   r%   r&   )r   r#   r   r~   r   r?   Zset_current_deferred_jobr8   Zsystem_personr@   Zset_current_personZ	CmfPersonrJ   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrP   Zactivate_admin_moder:   r;   )Zjob_func)rR   r,   rS   r+   r	   r
   runnerG  s    

z'QueueProcessor._job_run.<locals>.runner)r=   r+   r,   r{   geventZwith_timeout)r   rS   r   r   r	   )rR   r,   rS   r+   r
   _job_runC  s    zQueueProcessor._job_runc             C   s   | j dgd}|sd S |jd d}y| j||d}| || W n tttjfk
r } z6t	
d|j d|j d|  | |t|  W d d }~X Y nT ttjfk
r } z0t	
d|j d|j d	 | |t| W d d }~X Y nX d S )
Nr4   )r7   r<   r   )r   zjob_runner(z, z) interrupt by z) error)r   rP   rJ   r   r   
SystemExitKeyboardInterruptr   GreenletExitlogging	exceptionr@   r   r   rE   	ExceptionTimeout)r   rS   r   r   r   r	   r	   r
   
_run_cycle[  s     zQueueProcessor._run_cyclec          
   C   sl   xfy|    td W q tttjfk
r6    Y q tk
rb   t	|  d td Y qX qW dS )zmain() for GreenletrX   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   )r   r	   r	   r
   _runq  s    zQueueProcessor._runc             C   s   | j j d| j d| j dS )Nrn   z, priority=r   )r   r   rl   r   )r   r	   r	   r
   r   ~  s    zQueueProcessor.__repr__)N)N)r   r   rU   r}   intr$   r   r   r   r   r   r   r   ro   r	   r	   )r   r
   rp      s   *	
rp   c               @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc          
   C   s   dd l }ddlm}m} xy|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W d Q R X W d Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qW d S )Nr   )CmfGetLockErrorr+   i,  )rt   ru      )Zdays   )Zhoursrw   r6   INZfailZcancelZdeadZend_datetime<rv   success)ry   z error)r{   r=   r   r+   rL   rM   r|   	_LOCK_KEYr~   r   rA   rB   rC   r?   Zbulk_deleter   r   r   r   r   )r   rR   r   r+   r\   Zmax_end_datetimeZmax_success_end_datetimer	   r	   r
   r     s$    $2zQueueCleaner._runN)r   r   rU   r   r   r	   r	   r	   r
   r     s   r   c               @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc          
   C   s  dd l }ddlm}m} |jjjj}x|y0|jj	j
| jddd |j  |d}|rtt t| dk rtd w(|d}xXt|D ]L}|d	|  d kr|d| || td
|  dtjd qW t }|tjdd }	xH|jjdgdddd |D gdddgdd|	ggdD ]}
|
d q0W W d Q R X W d Q R X W n: |k
rp   Y n& tk
r   t |  d Y nX td q(W d S )Nr   )r   r+   i,  )rt   ru   redis_idr   rs   zdeferred_job_worker:workerszdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!)file)r-   r4   rl   zNOT INc             S   s   g | ]}|  qS r	   )decode)r/   Zw_idr	   r	   r
   rz     s    z%QueueWatcher._run.<locals>.<listcomp>r6   rv   rx   r   r   )r7   ry   zJob Worker is deadz error<   )!r{   r=   r   r+   r~   r   REDIS_DBredisrL   rM   r|   r   r   rJ   r   floatr   Zsmemberslistr   ZsremremoverF   r   stderrrA   rB   rC   r?   r   r   r   r   )r   rR   r   r+   redis_dbr   Z
db_workersrl   r\   Zmax_start_daterS   r	   r	   r
   r     s:    &



(zQueueWatcher._runN)r   r   rU   r   r   r	   r	   r	   r
   r     s   r   c                   s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                s   t  j|| g | _d S )N)rm   r$   r"   )r   r%   r&   )r   r	   r
   r$     s    zJobScheduler.__init__c          
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}xtj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qFt|}t| tj||jd	}||_|  |rt|| n| }| j| || qFW xT|rJ|t  }	ntd
 d}	|	dkrvtd|	dd d}	t|	 yt }
d}x| jD ]\}}| }|
|kr.|  |jd|j d| | j dddr.y"|j!  |"  W d Q R X W n. t#k
r,   t$%|  d|j d Y nX |rBt|| n| }qW W n0 t#k
r   t$%|  d td Y nX q6W d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!)r   z1JobScheduler: warning next_ts == 0, nothing to dors   z)JobScheduler: warning too low sleep time z.3frg      zJobScheduler:lock::T)exZnxz schedule job(z) errorz error)&r{   r=   r   r~   r   r   r   rB   r\   rj   rk   Z
astimezoneZtzinfor   r"   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rF   r   r   r   ZAPP_FQDNZget_nextminZget_currentrQ   r   r   r   setrl   r   r)   r   r   r   )r   rR   r   r   Znext_tsZlocal_tzinforS   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr	   r	   r
   r     s^    $




$&zJobScheduler._run)	r   r   rU   __doc__r   r   r$   r   ro   r	   r	   )r   r
   r     s   
r   c               @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc          	   C   s  dd l }ddlm}m} |jjjj}xt	d yt
j
 }|jdksP|jdk rRw&|jjj| jddd@ |d}|rt t| dk rw&td	tjd
 |  W d Q R X W q& tjjk
r   Y q& |k
r   Y q& tk
r   t|  d Y q&X q&W d S )Nr   )r   	CMF_CACHEi,  r]   2   )rt   ru   r   i`T  z%RedisCleaner: run CMF_CACHE.flushdb())r   z error)r{   r=   r   r   r~   r   r   r   r   r   rB   r\   ZhourZminuterL   rM   r|   r   rJ   r   rF   r   r   Zflushdb
exceptionsZLockNotOwnedErrorr   r   r   )r   rR   r   r   r   r\   r   r	   r	   r
   r     s*    


zRedisCleaner._runN)r   r   rU   r   r   r   r	   r	   r	   r
   r     s   r   Fc       	      C   s  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n$x"tdD ]}|	t
||d qPW |	t|d |	t|d |	t|d |	t|d x|D ]}|  qW yxy&|jd| dd	d
 |d| W n4 tk
r } ztd|  W dd}~X Y nX tj|dd0}x(|D ] }td| dtjd tq6W W dQ R X qW W nB tttjfk
r } ztd| dtjd W dd}~X Y nX ytj|dd W n& tjk
r   tdtjd Y nX x|D ]}|rtd| dtjd q|  r6td| dtjd qy|!  W n2 tttjtfk
rt   td| d Y nX qW dS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rl      )rl   zdeferred_job_worker:heartbeat:-   )r   zdeferred_job_worker:workersz$deferred_job_worker: redis error!!! rs   )rt   z%deferred_job_worker(): system thread(z) exited!!! Stop work.)r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r{   platformZnodeuuidZuuid1r~   r   r   r   rQ   rp   ranger   r   r   r   startr   Zsaddr   r   r   r   ZiwaitrF   r   r   r   r   r   Zkillallr   Z
successfulrJ   )	Zsingle_queuerR   rl   r   Z	greenletsr   glr   Zgl_iterr	   r	   r
   deferred_job_worker1  sT    
"
(

r   )F)rB   rH   r`   Zrandomr   rb   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrR   r   r[   rA   rh   ZGreenletri   rp   r   r   r   r   r   r	   r	   r	   r
   <module>   s8    1 ,F 