U
    h                     @   s6  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlZd dlZd dlZd dlZG dd dZdd Zdd	 Zd
d ZG dd dejZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZdddZ dS )    N)
namedtuple)update_wrapperc                   @   sn   e Zd Zi Zi Zedd Zdd Zedd Zedd Z	dddZ
dd ZdddZdd ZdddZd	S )CmfDeferredJobWrapperc                 C   s   | j  d| j S )N.)
__module____name__func r
   ./cmf/cmf_deferred_job.pygen_task_name   s    z#CmfDeferredJobWrapper.gen_task_namec                 C   s   | j j d| j dS )Nz(name=))	__class__r   nameselfr
   r
   r   __repr__!   s    zCmfDeferredJobWrapper.__repr__c                 C   s   | dkrd} | S )Nz	@minutelyz* * * * * Hr
   )scheduler
   r
   r   adapt_schedule$   s    z$CmfDeferredJobWrapper.adapt_schedulec              
   C   sN   zt j j| dd W n4 t jk
rH } ztd|  |W 5 d }~X Y nX d S )N   1Zhash_idzinvalid schedule: )croniterexpandZCroniterBadCronError
ValueError)r   er
   r
   r   validate_schedule*   s    z'CmfDeferredJobWrapper.validate_scheduleNFc              
   C   s   t | | || _| || _|	p$| j| _|| _|| _|| _|| _|| _	|| _
|| _|
| _| jr|sttd| j d|std| j dz| | | j W n< tk
r } ztd| j d| j |W 5 d }~X Y nX | | j| j< | | j| j< d S )Nz$CmfDeferredJobWrapper: periodic job(z ) must be with option only_once.z!) must be with option system_job.zinvalid job configuration(z): )r   r	   r   r   descriptionpriority
system_job	only_onceonly_once_argssoft_time_limit	countdownr   show_bg_progressbarr   r   r   periodicregistry)r   r	   r   r   r    r!   r   r"   r   r   r#   r   r
   r
   r   __init__1   s.    
,zCmfDeferredJobWrapper.__init__c                 O   s   | j ||S Nr   r   argskwargsr
   r
   r   __call__N   s    zCmfDeferredJobWrapper.__call__c                 C   s$   |d krg }|d kri }| j ||S r'   r   r(   r
   r
   r   applyQ   s
    zCmfDeferredJobWrapper.applyc                 O   s   | j ||dS )N)r)   r*   )apply_asyncr(   r
   r
   r   delayX   s    zCmfDeferredJobWrapper.delayc                    s  ddl m}m} dd l}|d kr$g } d kr0i  |d kr>| j}|d krL| j}|d krZ| j}|d krh| j}|d krv| j}|j	| j
| j||d}|d k	r| j|_|	s|jj}	|s|	|_|d kr| j}|rt tj|d |_| jr t| j
}d}| jd krJ|r$td| j
 d| f |t|7 } rh|t fdd	t D 7 }n| jD ]}|t | 7 }qP|r|d
t|   7 }||_|j	j|ddgd}|r |d| j
 d |jr|jr|j|jk r|d| j
 d|j  |j|_|jj !  |"  W 5 Q R X d S |
d kr0|j#}
|d kr@|j$}|d krP|j%}| | j| j|jj&| j||jj&||
|||dd|_'|jj !  |"  W 5 Q R X |r|j(t|j ddl m)} |ddt|ji|	gd t|jS )Nr   modelsg)r   r   r   r#   Zseconds zJCmfDeferredJobWrapper.apply_async: !!! Warning! Use args + only_once! job=z, args=c                    s   i | ]}| | qS r
   r
   ).0kr*   r
   r   
<dictcomp>   s      z5CmfDeferredJobWrapper.apply_async.<locals>.<dictcomp>__open*)only_once_keystatusfieldszCmfDeferredJobWrapper(z): already queuedz): reschedule existent job )r   r    	person_idr   r!   r   r"   
admin_modecomponent_idsession_tab_idr#   )r)   r*   options)cmf_emit_eventz deferred-job-show_bg_progressbarZjob_id)Zevent_persons)*cmf.includer0   r1   
cmf.fieldsr!   r   r   r   r#   CmfDeferredJobr   r   Zcurrent_personidr>   r"   cmf_nowdatetime	timedeltaplan_start_datetimestrr    printsortedhashlibZmd5encodeZ	hexdigestr;   getdebugutilcmfutilZdisable_aclsaveZacl_admin_moder@   rA   valueparams_jsonappendrC   )r   r)   r*   r   r!   r   r"   r   r#   Zg_current_user_idZg_acl_admin_modeZg_component_idZg_session_tab_idr0   r1   cmfjobr;   args_strZarg_keyZexistent_jobrC   r
   r6   r   r-   [   s       
 




z!CmfDeferredJobWrapper.apply_async)	NFNNFNNNF)NN)NNNNNNNNNNNN)r   r   __qualname__r%   r$   staticmethodr   r   r   r   r&   r+   r,   r.   r-   r
   r
   r
   r   r      s<   


               

           r   c                     sN    fdd}d}| r:t | dkr*| d }ntd|  d|rF||S |S dS )za
    job decorator:
        return object like celery task with __call__, delay, apply_async
    c                    s   t | f S r'   )r   )Zfunc_r6   r
   r   wrapper   s    z!cmf_deferred_job.<locals>.wrapperN   r   z+cmf_deferred_job takes exactly 1 argument (z given)len	TypeError)r)   r*   r^   r	   r
   r6   r   cmf_deferred_job   s    
rb   c                  C   s   dd l } | jj jS )Nr   )rE   r=   ZCmfDateTimenowrV   )rY   r
   r
   r   rH      s    rH   c                 C   sZ   t  }| d }t| d | d |d| | }|  |dd dkrV|dd }|S )zfrom logging Formatter class   r   r_   N
)ioStringIO	tracebackprint_exceptiongetvalueclose)ZeiZsiotbsr
   r
   r   format_exception   s    ro   c                       s2   e Zd Zedd Zd fdd	Zdd Z  ZS )	JobWorkerGreenletc                   C   s   t j t jjS r'   )rI   rc   timezoneutcr
   r
   r
   r   rc      s    zJobWorkerGreenlet.nowNc                    s   || _ t   d S r'   )	worker_idsuperr&   )r   rs   r   r
   r   r&     s    zJobWorkerGreenlet.__init__c                 C   s   | j j d| j dS )N(r   )r   r   rs   r   r
   r
   r   r     s    zJobWorkerGreenlet.__repr__)N)r   r   r\   r]   rc   r&   r   __classcell__r
   r
   ru   r   rp      s   
rp   c                       sb   e Zd ZdZded fddZdd Zdd	 Zd
d ZdddZ	dd Z
dd Zdd Z  ZS )QueueProcessorzJobQueueProcessor:get:lockN)r   c                    s   || _ t jf | d S r'   )r   rt   r&   )r   r   r*   ru   r
   r   r&     s    zQueueProcessor.__init__c                 C   s  ddl m}m}m} dd l}|j  |jjj	| j
dddZ d }| jd k	r^dd| jg}dddgd	d| jgg}|r|| |jj|d
D ]*}|d tdd| d| j d qdddgdddd gddt ggg}	|r|	| |jjdddgddd gdddggddgd}
|
rD|	dddd gdddd |
D gg |jj|	dgd|d}|rd|_|j  | j|_|d |  W 5 Q R  qW 5 Q R X |  | jstd  q*ttd | j q*W 5 Q R X |S )!Nr   )r0   socketio
cmf_commit
   timeoutZblocking_timeoutr   =r<   in_progressrs   filterz+Job in_progress but queue processor is not.z%sz!!! Not running job , priority=z, mark as dead!r9   ORrK   z<=r   Tr;   z!=z--)r   r=   NOT INc                 S   s   g | ]
}|j qS r
   )r;   )r4   Zjob_r
   r
   r   
<listcomp><  s     z0QueueProcessor._job_get_next.<locals>.<listcomp>Zcmf_created_at)r   Zorder_byZ
for_updater=   z	job startrd   )rD   r0   ry   rz   cmf.appappcmf_contextrS   rT   CmfLock_GET_LOCK_KEYr   rs   rX   rF   liston_errorloggingerrorrH   ZslistrQ   r<   start_datetimeZset_nowZ	error_logrU   timesleepmax)r   r=   r0   ry   rz   rY   Zpriority_filterZstuck_filterZ	stuck_jobZ
job_filterZonly_once_in_progressZnext_jobr
   r
   r   _job_get_next  sh    





  

zQueueProcessor._job_get_nextc              	   C   s8   ddl m} dd l}|j  || W 5 Q R X d S )Nr   ry   )rD   ry   r   r   r   Z
on_success)r   rZ   resultry   rY   r
   r
   r   _job_set_successT  s    zQueueProcessor._job_set_successc              	   C   sT   ddl m} dd l}|j , d}t }|d r<t|}|| W 5 Q R X d S )Nr   r   zError?)	rD   ry   r   r   r   sysexc_inforo   r   )r   rZ   r   ry   rY   Z
error_textr   r
   r
   r   _job_set_fail]  s    zQueueProcessor._job_set_failc                    s6   ddl mm dd l  fdd}t||S )Nr   r/   c               
      s   t jj }  j  j jrjjj	kr j
jjj jjjd j  jd ddr~j  jd ddrjd d _jd ddrjd d _| jd jd W  5 Q R  S Q R X d S )	Nr=   rB   r?   Fr@   rA   r)   r*   )r   r%   r   r   r   rF   Zset_current_deferred_jobr>   Zsystem_personrG   Zset_current_personZ	CmfPersonrQ   APPZcurrent_person_fieldsZCmfAccessListZsetup_contextrW   Zactivate_admin_moder@   rA   )Zjob_funcrY   r1   rZ   r0   r
   r   runnern  s    

z'QueueProcessor._job_run.<locals>.runner)rD   r0   r1   r   geventZwith_timeout)r   rZ   r!   r   r
   r   r   _job_runj  s    zQueueProcessor._job_runc                 C   s   | j dgd}|sd S |jd d}z| j||d}| || W n tttjfk
r } z6t	
d|j d|j d|  | |t|  W 5 d }~X Y nT ttjfk
r } z0t	
d|j d|j d	 | |t| W 5 d }~X Y nX d S )
Nr:   r   rB   r!   )r!   zjob_runner(z, z) interrupt by ) error)r   rW   rQ   r   r   
SystemExitKeyboardInterruptr   GreenletExitr   	exceptionrG   r   r   rL   	ExceptionTimeout)r   rZ   r!   r   r   r
   r
   r   
_run_cycle  s     zQueueProcessor._run_cyclec              
   C   sh   z|    td W q  tttjfk
r4    Y q  tk
r`   t	|  d td Y q X q dS )zmain() for Greenletr_   z cycle error.   N)
r   r   r   r   r   r   r   r   r   r   r   r
   r
   r   _run  s    zQueueProcessor._runc                 C   s   | j j d| j d| j dS )Nrv   r   r   )r   r   rs   r   r   r
   r
   r   r     s    zQueueProcessor.__repr__)N)N)r   r   r\   r   intr&   r   r   r   r   r   r   r   rw   r
   r
   ru   r   rx   
  s   C	
rx   c                   @   s   e Zd ZdZdd ZdS )QueueCleanerzJobQueueCleaner:lockc              
   C   s   dd l }ddlm}m} z|jjj| jddd| |j	 f t
 }|tjdd }|tjdd }|jjd	d
ddddggdd|ggd
ddgdd|gggd W 5 Q R X W 5 Q R X W n6 |k
r   Y n$ tk
r   t|  d Y nX td qd S )Nr   CmfGetLockErrorr0   ,  r|      )Zdays   )Zhoursr   r<   INZfailcancelZdeadZend_datetime<r~   successr    error)r   rD   r   r0   rS   rT   r   	_LOCK_KEYr   r   rH   rI   rJ   rF   Zbulk_deleter   r   r   r   r   )r   rY   r   r0   rc   Zmax_end_datetimeZmax_success_end_datetimer
   r
   r   r     s&    $zQueueCleaner._runNr   r   r\   r   r   r
   r
   r
   r   r     s   r   c                   @   s   e Zd ZdZdd ZdS )QueueWatcherzJobQueueWatcher:lockc              
   C   s  dd l }ddlm}m} |jjjj}z@|jj	j
| jddd |j  |d}|rrt t| dk rtd W 5 Q R  W 5 Q R  W q$|d}t|D ]L}|d	|  d kr|d| || td
|  dtjd qt }|tjdd }	|jjdgdddd |D gdddgdd|	ggdD ]}
|
d q>W 5 Q R X W 5 Q R X W n: |k
r|   Y n& tk
r   t |  d Y nX td q$d S )Nr   r   r   r|   redis_idr   r{   deferred_job_worker:workersdeferred_job_worker:heartbeat:zQueueWatcher: worker z seems dead!filer2   r:   rs   r   c                 S   s   g | ]}|  qS r
   )decode)r4   Zw_idr
   r
   r   r     s     z%QueueWatcher._run.<locals>.<listcomp>r<   r~   r   r   r   )r=   r   zJob Worker is deadr   <   )!r   rD   r   r0   r   r   REDIS_DBredisrS   rT   r   r   r   rQ   r   floatr   Zsmembersr   r   ZsremremoverM   r   stderrrH   rI   rJ   rF   r   r   r   r   )r   rY   r   r0   redis_dbr   Z
db_workersrs   rc   Zmax_start_daterZ   r
   r
   r   r     s<    (




&zQueueWatcher._runNr   r
   r
   r
   r   r     s   r   c                       s2   e Zd ZdZeddZ fddZdd Z  ZS )JobScheduleru   
    - защита от параллельного запуска
    - "размазывание нагрузки" во времени.
    PeriodicDatazjob croniterc                    s   t  j|| g | _d S r'   )rt   r&   r$   r(   ru   r
   r   r&     s    zJobScheduler.__init__c              
   C   s  dd l }ddlm} |jjjj}d}tjtj	j
 j}tj D ]}|j}|jr|j|jkr|j|j }t|  d|j d|j d|  nt|  d|j d|j d |st|  d|j d qDt|}t| tj||jd	}||_|  |rt|| n| }| j| || qD|rB|t  }	ntd
 d}	|	dkrntd|	dd d}	t|	 zt }
d}| jD ]\}}| }|
|kr$|  |jd|j d| | j dddr$z"|j!  |"  W 5 Q R X W n. t#k
r"   t$%|  d|j d Y nX |r8t|| n| }qW n0 t#k
rv   t$%|  d td Y nX q.d S )Nr   configz: config: job=z, app="z ", config.JOB_SCHEDULE_OVERRIDE="z: config: job z
 disabled!r   z1JobScheduler: warning next_ts == 0, nothing to dor{   z)JobScheduler: warning too low sleep time z.3frn      zJobScheduler:lock::T)exZnxz schedule job(r   r   )&r   rD   r   r   r   r   r   rI   rc   rq   rr   Z
astimezoneZtzinfor   r$   valuesr   ZJOB_SCHEDULE_OVERRIDEr   rM   r   r   r   ZAPP_FQDNZget_nextminZget_currentrX   r   r   r   setrs   r   r-   r   r   r   )r   rY   r   r   Znext_tsZlocal_tzinforZ   r   Z	cron_iterZsleep_tsZnow_tsZcurr_tsr
   r
   r   r     s\    $




$$zJobScheduler._run)	r   r   r\   __doc__r   r   r&   r   rw   r
   r
   ru   r   r     s   
r   c                   @   s   e Zd ZdZdZdd ZdS )RedisCleaneruD   Раз в сутки сбрасываем редис БД. В 02-50.zRedisCleaner:lockc              	   C   s  dd l }ddlm}m} |jjjj}t	d zt
j
 }|jdksN|jdk rRW q$|jjj| jdddL |d}|rt t| dk rW 5 Q R  W q$td	tjd
 |  W 5 Q R X W q$ tjjk
r   Y q$ |k
r   Y q$ tk
r
   t|  d Y q$X q$d S )Nr   )r   	CMF_CACHEr   rd   2   r|   r   i`T  z%RedisCleaner: run CMF_CACHE.flushdb()r   r   )r   rD   r   r   r   r   r   r   r   r   rI   rc   ZhourZminuterS   rT   r   r   rQ   r   rM   r   r   Zflushdb
exceptionsZLockNotOwnedErrorr   r   r   )r   rY   r   r   r   rc   r   r
   r
   r   r   <  s(    


zRedisCleaner._runN)r   r   r\   r   r   r   r
   r
   r
   r   r   8  s   r   c                       sl   e Zd Z fddZdd ZedddZdeed	d
dZ	edddZ
deedddZdd Z  ZS )RedisMonitorc                    s:   || _ |j| _|| _|   td| _t jf | d S )Nzredis-monitor)	r   redis_instance_nameredis_settings_manager_RedisMonitor__init_redisr   Z	getLoggerloggerrt   r&   )r   r   r   r*   ru   r
   r   r&   Y  s    zRedisMonitor.__init__c                 C   s`   ddl m} t|drT|j| j d dkrT|j| j  }|d tjf || _nt	dd S )Nr   r   cache_settingstyper   uJ   В конфиге не содержится конфигурации Redis)
rD   r   hasattrr   r   copypopr   ZRedisr   )r   r   Zcfgr
   r
   r   Z__init_redisa  s    
zRedisMonitor.__init_redis)returnc                 C   st   ddl m} t|j }t|dkr.|d S z$|| j}||d t|  }W n tk
rn   |d }Y nX |S )u!  
        Возвращает следующую redis конфигурацию из config.cache_settings.
        В случае, если текущий инстанс последний в списке, то начинает обходить
        список заново
        r   r   r_   )	rD   r   r   r   keysr`   indexr   r   )r   r   r   idxZnext_instancer
   r
   r   _get_new_instancem  s    zRedisMonitor._get_new_instance   )intervalr   c                 C   sB   |   | _| jd| j |   |  }|r2dS t| q d S )Nu=   Получена новая конфигурация Redis %sT)r   r   r   warningr   check_redis_connectionr   r   )r   r   
is_successr
   r
   r   _check_new_instance  s    
z RedisMonitor._check_new_instancec              
   C   s   | j jj}|dp|d}z| j   W dS  t jk
r` } z| jd| W 5 d }~X Y n0 tk
r } z| jd| W 5 d }~X Y nX dS )NhostpathTuA   Ошибка подключения к Redis по адресу %su=   Системная ошибка с Redis по адресу %sF)	r   Zconnection_poolconnection_kwargsrQ   ZpingConnectionErrorr   r   r   )r   r   Zcurrent_hostr   r
   r
   r   r     s    

 z#RedisMonitor.check_redis_connectionr   )r   max_failure_countc                 C   s   d}|   }|rd}n^|d7 }| jd| ||krt| jd| |  }|rt| jd| j | jj| jdd tt	| qd S )Nr   r_   u.   Redis недоступен (%d попытка)u3   Redis недоступен %d раза подрядuA   Записываем новую конфигурацию Redis %sT)Z
with_flush)
r   r   r   r   r   r   Z
save_to_dbr   r   r   )r   r   r   Zfailure_countr   Zis_new_successr
   r
   r   monitor_redis  s    zRedisMonitor.monitor_redisc                 C   s   |    d S r'   )r   r   r
   r
   r   r     s    zRedisMonitor._run)r   )r   r   )r   r   r\   r&   r   rL   r   r   boolr   r   r   r   rw   r
   r
   ru   r   r   X  s   r   Fc                 C   s  ddl }ddlm} |jj}t  dt  }|j	rTt
|jjj||d}|  |  |jjjj}| svd|jj_g }| r|td|d n tdD ]}|t||d q|t|d	 |t|d	 |t|d	 |t|d	 |D ]}	|	  qzz&|jd
| ddd |d| W n4 tk
r` }
 ztd|
  W 5 d}
~
X Y nX tj|dd,}|D ] }	t d|	 dt!j"d t#qvW 5 Q R X qW nB t#t$tj%fk
r }
 zt d|
 dt!j"d W 5 d}
~
X Y nX ztj&|dd W n& tj'k
r$   t dt!j"d Y nX |D ]}	|	rNt d|	 dt!j"d q*|	( rrt d|	 dt!j"d q*z|	)  W n2 t#t$tj%tfk
r   td|	 d Y nX q*dS )u!  
    Функция - обработчик очередей задач.
    На каждый приоритет(очередь) запускаем отдельный поток - обработчик.
    Сервисный поток для очистки очереди от устаревших задач.
    Сервисный поток для контроля погибших задач(чей воркер погиб).
    Сервисный поток Beat, для планирования периодических задач.
    r   Nr   r   )r   r   rs   T)r   rs      )rs   r   -   )r   r   z$deferred_job_worker: redis error!!! r{   )r}   z%deferred_job_worker(): system thread(z) exited!!! Stop work.r   zdeferred_job_worker(): stop by z. Try kill all threads...z(deferred_job_worker(): killall timeout!.zdeferred_job_worker(): Thread z is not stopped!!!z stops normally!z error.)*r   rD   r   r   ZREDIS_SETTINGS_MANAGERplatformZnodeuuidZuuid1ZCACHE_REDIS_FAILOWERr   r   r   startZ
init_redisr   r   Zjob_daemon_moderX   rx   ranger   r   r   r   r   Zsaddr   r   r   r   ZiwaitrM   r   r   r   r   r   Zkillallr   Z
successfulrQ   )Zsingle_queuerY   r   r   rs   Zredis_monitorr   Z	greenletsr   glr   Zgl_iterr
   r
   r   deferred_job_worker  sj    

"(
r   )F)!rI   rO   rg   ossignalr   ri   r   r   r   r   collectionsr   	functoolsr   r   r   r   Zcmf.util.cmfutilrY   r   rb   rH   ro   ZGreenletrp   rx   r   r   r   r   r   r   r
   r
   r
   r   <module>   s<    >  ,F Z