U
    Oe\                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZdddZdS )    N)
namedtuple)update_wrapperc                   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__    s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule#   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule)   s    z'CmfDeferredJobWrapper.validate_scheduleNFc	           
   
   C   s   t | | || _| || _|| _|| _|| _|| _|| _|| _	|| _
| j
r|sbtd| j d|sxtd| j dz| | | j
 W n< tk
r }	 ztd| j d| j
 |	W 5 d }	~	X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   priority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   r   r   r   periodicregistry)
r   r	   r   r   r   r    r   r!   r   r   r
   r
   r   __init__0   s*    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__J   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r%   r   r&   r
   r
   r   applyM   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r'   r(   )apply_asyncr&   r
   r
   r   delayT   s    zCmfDeferredJobWrapper.delayc                    s  ddl m}m} dd l}	|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|j| j| j	d}
|d k	r|| j|
_|s|j
j|
_| j	rbt| j}d}| jd kr|rtd| j d| f |t|7 } r|t fddt D 7 }n| jD ]}|t | 7 }q|r2|d	t|   7 }||
_|jj|d
drb|d| j d d S |d krr| j}|rt tj|d |
_| | j	| j|
jj| j||
jj||jdd|
_|	j j!"  |
#  W 5 Q R X t|
jS )Nr   )modelsg)r   r    zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr(   r
   r   
<dictcomp>z   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open)only_once_keystatuszCmfDeferredJobWrapper(z): already queued)Zseconds)r   r   	person_idr   r    r   r!   
admin_mode)r'   r(   options)$cmf.includer-   r.   
cmf.fieldsr    r   r   CmfDeferredJobr   r   Zcurrent_personidr8   strr   printsortedhashlibZmd5encodeZ	hexdigestr6   countdebugr!   cmf_nowdatetime	timedeltaplan_start_datetimevalueZacl_admin_modeparams_jsonutilcmfutilZdisable_aclsave)r   r'   r(   r   r    r   r!   r-   r.   cmfjobr6   args_strZarg_keyr
   r2   r   r+   W   sn    


 

z!CmfDeferredJobWrapper.apply_async)NFNNFNN)NN)NNNNNN)r   r   __qualname__r#   r"   staticmethodr   r   r   r   r$   r)   r*   r,   r+   r
   r
   r
   r   r      s*   


            

r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r%   )r   )Zfunc_r2   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r'   r(   rT   r	   r
   r2   r   cmf_deferred_job   s    
rX   c                  C   s   dd l } | jj jS )Nr   )r<   fieldsZCmfDateTimenowrJ   )rO   r
   r
   r   rF      s    rF   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   rU   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    rf   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r%   )rG   rZ   timezoneutcr
   r
   r
   r   rZ      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r%   )	worker_idsuperr$   )r   rj   r   r
   r   r$      s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rj   r   r
   r
   r   r      s    zJobWorkerGreenlet.__repr__)N)r   r   rR   rS   rZ   r$   r   __classcell__r
   r
   rl   r   rg      s   
rg   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r%   )r   rk   r$   )r   r   r(   rl   r
   r   r$      s    zQueueProcessor.__init__c           	   
   C   sL  ddl m}m}m} dd l}|jjj| jddd |j	
  dddgdd	dd gd	d
t ggg}| jd k	r~|dd| jgg}|jjdddgddd gdddggddgd}|r|dddd gdddd |D gg |jj|dgd|d}|r(d|_|j  | j|_|  W 5 Q R  W 5 Q R  qHW 5 Q R X W 5 Q R X td q|S )Nr   )r-   socketio
cmf_commit
   timeoutZblocking_timeoutr7   =r5   ORrI   z<=r   r   Tr6   z!=in_progressz--)filterrY   NOT INc                 S   s   g | ]
}|j qS r
   )r6   )r0   Zjob_r
   r
   r   
<listcomp>   s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)rx   Zorder_byZ
for_updaterY   r[   )r;   r-   rp   rq   cmf.apprL   rM   CmfLock_GET_LOCK_KEYappcmf_contextrF   r   r=   Zslistappendgetr7   start_datetimeset_nowrj   rN   timesleep)	r   rY   r-   rp   rq   rO   Z
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next   sH    &

  
,zQueueProcessor._job_get_nextc              	   C   sb   ddl m} dd l}|j : d|_|j  t|j|j	 
 |_||_|  W 5 Q R X d S )Nr   rp   success)r;   rp   r{   r~   r   r7   end_datetimer   intr   total_secondsdurationresult_jsonrN   )r   rP   resultrp   rO   r
   r
   r   _job_set_success  s    
zQueueProcessor._job_set_successc              	   C   s|   ddl m} dd l}|j T d|_|j  t|j|j	 
 |_d |_t }|d rft||_|  W 5 Q R X d S )Nr   r   fail)r;   rp   r{   r~   r   r7   r   r   r   r   r   r   r   sysexc_inforf   Z
error_textrN   )r   rP   errorrp   rO   r   r
   r
   r   _job_set_fail  s    

zQueueProcessor._job_set_failc                    s0   ddl m dd l  fdd}t||S )Nr   )r-   c               
      s   t jj }  j p jrd jjjj jj	j
d j  jd ddrdj  | jd jd W  5 Q R  S Q R X d S )NrY   r:   r9   Fr'   r(   )r   r#   r   r~   r   r8   Zset_current_personZ	CmfPersonr   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrK   Zactivate_admin_mode)Zjob_funcrO   rP   r-   r
   r   runner0  s    

z'QueueProcessor._job_run.<locals>.runner)r;   r-   r{   geventZwith_timeout)r   rP   r    r   r
   r   r   _job_run,  s    zQueueProcessor._job_runc              
   C   s   | j dddddgd}|sd S |jd d}z| j||d	}| || W n| tjk
rd    Y nf ttfk
r|    Y nN t	k
r } z0t
d
|j d|j d | |t| W 5 d }~X Y nX d S )Nr7   rj   rK   Zparam_pickler8   r   r:   r    )r    zjob_runner(z, ) error)r   rK   r   r   r   r   GreenletExit
SystemExitKeyboardInterrupt	Exceptionlogging	exceptionr>   r   r   r?   )r   rP   r    r   r   r
   r
   r   
_run_cycle?  s    zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for GreenletrU   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _runV  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrm   z, priority=r   )r   r   rj   r   r   r
   r
   r   r   c  s    zQueueProcessor.__repr__)N)N)r   r   rR   r}   r   r$   r   r   r   r   r   r   r   rn   r
   r
   rl   r   ro      s   )
ro   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr-   ,  rs      )Zdays   Zhoursrv   r7   INr   canceldeadr   <ru   r   rx    error)r{   r;   r   r-   rL   rM   r|   	_LOCK_KEYr~   r   rF   rG   rH   r=   Zbulk_deleter   r   r   r   r   )r   rO   r   r-   rZ   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r   j  s&    $zQueueCleaner._runNr   r   rR   r   r   r
   r
   r
   r   r   g  s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc           	   
   C   sZ  dd l }ddlm}m} |jjjj}z|jj	j
| jddd |j  |d}t|D ]L}|d|  d kr\|d| || td|  dtjd	 q\t }|tjd
d }|jjd|ddddd |D gdddgdd|ggd W 5 Q R X W 5 Q R X W n: |k
r$   Y n& tk
rH   t|  d Y nX td q$d S )Nr   r   r   rs   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filerU   r   r   )r7   r   rj   ry   c                 S   s   g | ]}|  qS r
   )decode)r0   Zw_idr
   r
   r   rz     s     z%QueueWatcher._run.<locals>.<listcomp>r7   ru   rw   r   r   r   r   <   ) r{   r;   r   r-   r~   r   REDIS_DBredisrL   rM   r|   r   r   Zsmemberslistr   r   Zsremremover@   r   stderrrF   rG   rH   r=   Zbulk_updater   r   r   r   r   )	r   rO   r   r-   redis_dbZ
db_workersrj   rZ   Zmax_start_dater
   r
   r   r     s6    $


zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r%   )rk   r$   r"   r&   rl   r
   r   r$     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   )configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to dorr   z)JobScheduler: warning too low sleep time z.3fre      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r{   r;   r   r~   r   r   r   rG   rZ   rh   ri   Z
astimezoneZtzinfor   r"   valuesr   ZJOB_SCHEDULE_OVERRIDEr   r@   r   r   r   ZAPP_FQDNZget_nextminZget_currentr   r   r   r   setrj   r   r+   r   r   r   )r   rO   r   r   Znext_tsZlocal_tzinforP   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   rR   __doc__r   r   r$   r   rn   r
   r
   rl   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s   dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddH |d}|rt | dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ |k
r   Y q$ tk
r   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   r[   2   rs   redis_idi`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r{   r;   r   r   r~   r   r   r   r   r   rG   rZ   ZhourZminuterL   rM   r|   r   r   r@   r   r   Zflushdbr   r   r   )r   rO   r   r   r   rZ   r   r
   r
   r   r     s$    


zRedisCleaner._runN)r   r   rR   r   r   r   r
   r
   r
   r   r     s   r   Fc           	      C   sl  ddl }t  dt  }|jjjj}g }| rF|	t
d|d n tdD ]}|	t
||d qN|	t|d |	t|d |	t|d |	t|d |D ]}|  qzz&|jd| dd	d
 |d| W n4 tk
r } ztd|  W 5 d}~X Y nX tj|dd,}|D ] }td| dtjd tq*W 5 Q R X qW nB tttjfk
r } ztd| dtjd W 5 d}~X Y nX ztj|dd W n& tjk
r   tdtjd Y nX |D ]}|r td| dtjd q|  r$td| dtjd qz|!  W n2 tttjtfk
rb   td| d Y nX qdS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   )r   rj      )rj   r   -   )r   r   z$deferred_job_worker: redis error!!! rr   )rt   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)"r{   platformZnodeuuidZuuid1r~   r   r   r   r   ro   ranger   r   r   r   startr   Zsaddr   r   r   r   Ziwaitr@   r   r   r   r   r   ZkillallZTimeoutZ
successfulr   )	Zsingle_queuerO   rj   r   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker  sR    
"(
r   )F)rG   rB   r^   Zrandomr   r`   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrO   r   rX   rF   rf   ZGreenletrg   ro   r   r   r   r   r   r
   r
   r
   r   <module>   s8     	'F