U
    >oeL_                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc                	   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__    s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc              
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _|
| _| jr|sttd| j d|std| j dz| | | j W n< tk
r } ztd| j d| j |W 5 d }~X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   show_bg_progressbarr   r   r   periodicregistry)r   r	   r   r   r    r!   r   r"   r   r   r#   r   r
   r
   r   __init__0   s.    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__M   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r'   r   r(   r
   r
   r   applyP   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r)   r*   )apply_asyncr(   r
   r
   r   delayW   s    zCmfDeferredJobWrapper.delayc	                    s&  ddl m}	m}
 dd l}|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|d krv| j}|	j	| j
| j||d}|d k	r| j|_|s|
jj|_| jrt| j
}d}| jd kr|rtd| j
 d| f |t|7 } r8|t fddt D 7 }n| jD ]}|t | 7 }q |rX|d	t|   7 }||_|	j	j|d
dr|
d| j
 d d S |d kr| j}|rt tj|d |_| | j| j|jj| j||jj||
j |d	d|_!|j"j#$  |%  W 5 Q R X |r|
j&t|j t|jS )Nr   )modelsg)r   r   r   r#    zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr*   r
   r   
<dictcomp>   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open)only_once_keystatuszCmfDeferredJobWrapper(z): already queued)Zseconds)	r   r    	person_idr   r!   r   r"   
admin_moder#   )r)   r*   options)'cmf.includer/   r0   
cmf.fieldsr!   r   r   r   r#   CmfDeferredJobr   r   Zcurrent_personidr:   strr    printsortedhashlibZmd5encodeZ	hexdigestr8   countdebugr"   cmf_nowdatetime	timedeltaplan_start_datetimevalueZacl_admin_modeparams_jsonutilcmfutilZdisable_aclsaveappend)r   r)   r*   r   r!   r   r"   r   r#   r/   r0   cmfjobr8   args_strZarg_keyr
   r4   r   r-   Z   s       

 

z!CmfDeferredJobWrapper.apply_async)	NFNNFNNNF)NN)NNNNNNNN)r   r   __qualname__r%   r$   staticmethodr   r   r   r   r&   r+   r,   r.   r-   r
   r
   r
   r   r      s4   


               

    r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r'   )r   )Zfunc_r4   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r)   r*   rW   r	   r
   r4   r   cmf_deferred_job   s    
r[   c                  C   s   dd l } | jj jS )Nr   )r>   fieldsZCmfDateTimenowrL   )rR   r
   r
   r   rH      s    rH   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   rX   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    ri   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r'   )rI   r]   timezoneutcr
   r
   r
   r   r]      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r'   )	worker_idsuperr&   )r   rm   r   r
   r   r&      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rm   r   r
   r
   r   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rU   rV   r]   r&   r   __classcell__r
   r
   ro   r   rj      s   
rj   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r'   )r   rn   r&   )r   r   r*   ro   r
   r   r&      s    zQueueProcessor.__init__c           	   
   C   sL  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r(d|_|j  | j|_|  W 5 Q R  W 5 Q R  qHW 5 Q R X W 5 Q R X td q|S )Nr   )r/   socketio
cmf_commit
   timeoutZblocking_timeoutr9   =r7   ORrK   z<=r   r   Tr8   z!=in_progressz--)filterr\   NOT INc                 S   s   g | ]
}|j qS r
   )r8   )r2   Zjob_r
   r
   r   
<listcomp>
  s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)r{   Zorder_byZ
for_updater\   r^   )r=   r/   rs   rt   cmf.apprN   rO   CmfLock_GET_LOCK_KEYappcmf_contextrH   r   r?   ZslistrQ   getr9   start_datetimeset_nowrm   rP   timesleep)	r   r\   r/   rs   rt   rR   Z
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next   sH    &

  
,zQueueProcessor._job_get_nextc              	   C   sh   ddl m} dd l}|j @ d|_d|_|j  t	|j|j
  |_||_|  W 5 Q R X d S )Nr   rs   successd   )r=   rs   r~   r   r   r9   Zprogress_pctend_datetimer   intr   total_secondsdurationresult_jsonrP   )r   rS   resultrs   rR   r
   r
   r   _job_set_success  s    
zQueueProcessor._job_set_successc              	   C   s|   ddl m} dd l}|j T d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  W 5 Q R X d S )Nr   r   fail)r=   rs   r~   r   r   r9   r   r   r   r   r   r   r   sysexc_infori   Z
error_textrP   )r   rS   errorrs   rR   r   r
   r
   r   _job_set_fail)  s    

zQueueProcessor._job_set_failc                    s0   ddl m dd l  fdd}t||S )Nr   )r/   c               
      s   t jj }  j p jrd jjjj jj	j
d j  jd ddrdj  | jd jd W  5 Q R  S Q R X d S )Nr\   r<   r;   Fr)   r*   )r   r%   r   r   r   r:   Zset_current_personZ	CmfPersonr   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrM   Zactivate_admin_mode)Zjob_funcrR   rS   r/   r
   r   runner=  s    

z'QueueProcessor._job_run.<locals>.runner)r=   r/   r~   geventZwith_timeout)r   rS   r!   r   r
   r   r   _job_run9  s    zQueueProcessor._job_runc                 C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n tttjfk
r } z6t	
d
|j d|j d|  | |t|  W 5 d }~X Y nT ttjfk
r } z0t	
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr9   rm   rM   Zparam_pickler:   r   r<   r!   )r!   zjob_runner(z, z) interrupt by ) error)r   rM   r   r   r   
SystemExitKeyboardInterruptr   GreenletExitlogging	exceptionr@   r   r   rA   	ExceptionTimeout)r   rS   r!   r   r   r
   r
   r   
_run_cycleL  s     zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for GreenletrX   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _runb  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrp   z, priority=r   )r   r   rm   r   r   r
   r
   r   r   o  s    zQueueProcessor.__repr__)N)N)r   r   rU   r   r   r&   r   r   r   r   r   r   r   rq   r
   r
   ro   r   rr      s   )
rr   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr/   ,  rv      )Zdays   Zhoursry   r9   INr   canceldeadr   <rx   r   r{    error)r~   r=   r   r/   rN   rO   r   	_LOCK_KEYr   r   rH   rI   rJ   r?   Zbulk_deleter   r   r   r   r   )r   rR   r   r/   r]   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r   v  s&    $zQueueCleaner._runNr   r   rU   r   r   r
   r
   r
   r   r   s  s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   rv   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filerX   r   r   )r9   r   rm   r|   c                 S   s   g | ]}|  qS r
   )decode)r2   Zw_idr
   r
   r   r}     s     z%QueueWatcher._run.<locals>.<listcomp>r9   rx   rz   r   r   r   r   <   ) r~   r=   r   r/   r   r   REDIS_DBredisrN   rO   r   r   r   Zsmemberslistr   r   ZsremremoverB   r   stderrrH   rI   rJ   r?   Zbulk_updater   r   r   r   r   )	r   rR   r   r/   redis_dbZ
db_workersrm   r]   Zmax_start_dater
   r
   r   r     s6    $


zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r'   )rn   r&   r$   r(   ro   r
   r   r&     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to doru   z)JobScheduler: warning too low sleep time z.3frh      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r~   r=   r   r   r   r   r   rI   r]   rk   rl   Z
astimezoneZtzinfor   r$   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rB   r   r   r   ZAPP_FQDNZget_nextminZget_currentrQ   r   r   r   setrm   r   r-   r   r   r   )r   rR   r   r   Znext_tsZlocal_tzinforS   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   rU   __doc__r   r   r&   r   rq   r
   r
   ro   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   r^   2   rv   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r~   r=   r   r   r   r   r   r   r   r   rI   r]   ZhourZminuterN   rO   r   r   r   rB   r   r   Zflushdbr   r   r   )r   rR   r   r   r   r]   r   r
   r
   r   r     s$    


zRedisCleaner._runN)r   r   rU   r   r   r   r
   r
   r
   r   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rm      )rm   r   -   )r   r   z$deferred_job_worker: redis error!!! ru   )rw   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r~   platformZnodeuuidZuuid1r   r   r   r   rQ   rr   ranger   r   r   r   startr   Zsaddr   r   r   r   ZiwaitrB   r   r   r   r   r   Zkillallr   Z
successfulr   )	Zsingle_queuerR   rm   r   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker  sR    
"(
r   )F)rI   rD   ra   Zrandomr   rc   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrR   r   r[   rH   ri   ZGreenletrj   rr   r   r   r   r   r   r
   r
   r
   r   <module>   s8      	'F