U
    eX]                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc                   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__    s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc
              
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _| jr|sntd| j d|std| j dz| | | j W n< tk
r }
 ztd| j d| j |
W 5 d }
~
X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   r   r   r   periodicregistry)r   r	   r   r   r    r!   r   r"   r   r   r   r
   r
   r   __init__0   s,    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__K   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r&   r   r'   r
   r
   r   applyN   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r(   r)   )apply_asyncr'   r
   r
   r   delayU   s    zCmfDeferredJobWrapper.delayc                    s  ddl m}m}	 dd l}
|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|j| j	| j
|d}|d k	r| j|_|s|	jj|_| j
rxt| j	}d}| jd kr
|rtd| j	 d| f |t|7 } r(|t fddt D 7 }n| jD ]}|t | 7 }q|rH|d	t|   7 }||_|jj|d
drx|	d| j	 d d S |d kr| j}|rt tj|d |_| | j
| j|jj| j||jj||	jdd|_ |
j!j"#  |$  W 5 Q R X t|jS )Nr   )modelsg)r   r   r    zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr)   r
   r   
<dictcomp>~   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open)only_once_keystatuszCmfDeferredJobWrapper(z): already queued)Zseconds)r   r    	person_idr   r!   r   r"   
admin_mode)r(   r)   options)%cmf.includer.   r/   
cmf.fieldsr!   r   r   r   CmfDeferredJobr   r   Zcurrent_personidr9   strr    printsortedhashlibZmd5encodeZ	hexdigestr7   countdebugr"   cmf_nowdatetime	timedeltaplan_start_datetimevalueZacl_admin_modeparams_jsonutilcmfutilZdisable_aclsave)r   r(   r)   r   r!   r   r"   r   r.   r/   cmfjobr7   args_strZarg_keyr
   r3   r   r,   X   sr    

 

z!CmfDeferredJobWrapper.apply_async)NFNNFNNN)NN)NNNNNNN)r   r   __qualname__r$   r#   staticmethodr   r   r   r   r%   r*   r+   r-   r,   r
   r
   r
   r   r      s0   


              

  r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r&   )r   )Zfunc_r3   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r(   r)   rU   r	   r
   r3   r   cmf_deferred_job   s    
rY   c                  C   s   dd l } | jj jS )Nr   )r=   fieldsZCmfDateTimenowrK   )rP   r
   r
   r   rG      s    rG   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   rV   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    rg   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r&   )rH   r[   timezoneutcr
   r
   r
   r   r[      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r&   )	worker_idsuperr%   )r   rk   r   r
   r   r%      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rk   r   r
   r
   r   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rS   rT   r[   r%   r   __classcell__r
   r
   rm   r   rh      s   
rh   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r&   )r   rl   r%   )r   r   r)   rm   r
   r   r%      s    zQueueProcessor.__init__c           	   
   C   sL  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r(d|_|j  | j|_|  W 5 Q R  W 5 Q R  qHW 5 Q R X W 5 Q R X td q|S )Nr   )r.   socketio
cmf_commit
   timeoutZblocking_timeoutr8   =r6   ORrJ   z<=r   r   Tr7   z!=in_progressz--)filterrZ   NOT INc                 S   s   g | ]
}|j qS r
   )r7   )r1   Zjob_r
   r
   r   
<listcomp>  s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)ry   Zorder_byZ
for_updaterZ   r\   )r<   r.   rq   rr   cmf.apprM   rN   CmfLock_GET_LOCK_KEYappcmf_contextrG   r   r>   Zslistappendgetr8   start_datetimeset_nowrk   rO   timesleep)	r   rZ   r.   rq   rr   rP   Z
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next   sH    &

  
,zQueueProcessor._job_get_nextc              	   C   sb   ddl m} dd l}|j : d|_|j  t|j|j	 
 |_||_|  W 5 Q R X d S )Nr   rq   success)r<   rq   r|   r   r   r8   end_datetimer   intr   total_secondsdurationresult_jsonrO   )r   rQ   resultrq   rP   r
   r
   r   _job_set_success  s    
zQueueProcessor._job_set_successc              	   C   s|   ddl m} dd l}|j T d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  W 5 Q R X d S )Nr   r   fail)r<   rq   r|   r   r   r8   r   r   r   r   r   r   r   sysexc_inforg   Z
error_textrO   )r   rQ   errorrq   rP   r   r
   r
   r   _job_set_fail   s    

zQueueProcessor._job_set_failc                    s0   ddl m dd l  fdd}t||S )Nr   )r.   c               
      s   t jj }  j p jrd jjjj jj	j
d j  jd ddrdj  | jd jd W  5 Q R  S Q R X d S )NrZ   r;   r:   Fr(   r)   )r   r$   r   r   r   r9   Zset_current_personZ	CmfPersonr   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrL   Zactivate_admin_mode)Zjob_funcrP   rQ   r.   r
   r   runner4  s    

z'QueueProcessor._job_run.<locals>.runner)r<   r.   r|   geventZwith_timeout)r   rQ   r!   r   r
   r   r   _job_run0  s    zQueueProcessor._job_runc              
   C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n| tjk
rd    Y nf ttfk
r|    Y nN t	k
r } z0t
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr8   rk   rL   Zparam_pickler9   r   r;   r!   )r!   zjob_runner(z, ) error)r   rL   r   r   r   r   GreenletExit
SystemExitKeyboardInterrupt	Exceptionlogging	exceptionr?   r   r   r@   )r   rQ   r!   r   r   r
   r
   r   
_run_cycleC  s    zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for GreenletrV   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _runZ  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrn   z, priority=r   )r   r   rk   r   r   r
   r
   r   r   g  s    zQueueProcessor.__repr__)N)N)r   r   rS   r~   r   r%   r   r   r   r   r   r   r   ro   r
   r
   rm   r   rp      s   )
rp   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr.   ,  rt      )Zdays   Zhoursrw   r8   INr   canceldeadr   <rv   r   ry    error)r|   r<   r   r.   rM   rN   r}   	_LOCK_KEYr   r   rG   rH   rI   r>   Zbulk_deleter   r   r   r   r   )r   rP   r   r.   r[   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r   n  s&    $zQueueCleaner._runNr   r   rS   r   r   r
   r
   r
   r   r   k  s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   rt   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filerV   r   r   )r8   r   rk   rz   c                 S   s   g | ]}|  qS r
   )decode)r1   Zw_idr
   r
   r   r{     s     z%QueueWatcher._run.<locals>.<listcomp>r8   rv   rx   r   r   r   r   <   ) r|   r<   r   r.   r   r   REDIS_DBredisrM   rN   r}   r   r   Zsmemberslistr   r   ZsremremoverA   r   stderrrG   rH   rI   r>   Zbulk_updater   r   r   r   r   )	r   rP   r   r.   redis_dbZ
db_workersrk   r[   Zmax_start_dater
   r
   r   r     s6    $


zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r&   )rl   r%   r#   r'   rm   r
   r   r%     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to dors   z)JobScheduler: warning too low sleep time z.3frf      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r|   r<   r   r   r   r   r   rH   r[   ri   rj   Z
astimezoneZtzinfor   r#   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rA   r   r   r   ZAPP_FQDNZget_nextminZget_currentr   r   r   r   setrk   r   r,   r   r   r   )r   rP   r   r   Znext_tsZlocal_tzinforQ   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   rS   __doc__r   r   r%   r   ro   r
   r
   rm   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   r\   2   rt   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r|   r<   r   r   r   r   r   r   r   r   rH   r[   ZhourZminuterM   rN   r}   r   r   rA   r   r   Zflushdbr   r   r   )r   rP   r   r   r   r[   r   r
   r
   r   r     s$    


zRedisCleaner._runN)r   r   rS   r   r   r   r
   r
   r
   r   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rk      )rk   r   -   )r   r   z$deferred_job_worker: redis error!!! rs   )ru   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r|   platformZnodeuuidZuuid1r   r   r   r   r   rp   ranger   r   r   r   startr   Zsaddr   r   r   r   ZiwaitrA   r   r   r   r   r   ZkillallZTimeoutZ
successfulr   )	Zsingle_queuerP   rk   r   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker  sR    
"(
r   )F)rH   rC   r_   Zrandomr   ra   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrP   r   rY   rG   rg   ZGreenletrh   rp   r   r   r   r   r   r
   r
   r
   r   <module>   s8     	'F