U
    ad-                     @   s   d ddgZ ddlZddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
 ddlZddlmZ ddlmZ ejjZdd	lmZmZmZmZmZ G d
d  d eZe ZG dd deZG dd deZdS )QueueSimpleQueueJoinableQueue    N)EmptyFull   )
connection)context)debuginfoFinalizeregister_after_fork
is_exitingc                   @   s   e Zd Zd*ddZdd Zdd Zdd	 Zd+ddZd,ddZdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zd d! Zed"d# Zed$d% Zed&d' Zed(d) ZdS )-r   r   c                C   s   |dkrddl m} || _tjdd\| _| _| | _t	
 | _tjdkrTd | _n
| | _||| _d| _|   tjdkrt| tj d S )Nr   r   )SEM_VALUE_MAXFZduplexwin32)Zsynchronizer   _maxsizer   Pipe_reader_writerLock_rlockosgetpid_opidsysplatform_wlockZBoundedSemaphore_sem_ignore_epipe_after_forkr   r   selfmaxsizectx r%   ,/usr/lib/python3.8/multiprocessing/queues.py__init__$   s    




zQueue.__init__c                 C   s.   t |  | j| j| j| j| j| j| j| j	fS N)
r	   assert_spawningr   r   r   r   r   r   r   r   r"   r%   r%   r&   __getstate__9   s    
   zQueue.__getstate__c              	   C   s0   |\| _ | _| _| _| _| _| _| _|   d S r(   )	r   r   r   r   r   r   r   r   r    r"   stater%   r%   r&   __setstate__>   s       zQueue.__setstate__c                 C   sb   t d tt | _t | _d | _d | _	d| _
d| _d | _| jj| _| jj| _| jj| _d S )NzQueue._after_fork()F)r
   	threading	Conditionr   	_notemptycollectionsdeque_buffer_thread_jointhread_joincancelled_closed_closer   
send_bytes_send_bytesr   
recv_bytes_recv_bytespoll_pollr*   r%   r%   r&   r    C   s    


zQueue._after_forkTNc              	   C   sf   | j rtd| d| j||s(t| j. | jd krB|   | j	| | j
  W 5 Q R X d S NzQueue z
 is closed)r8   
ValueErrorr   acquirer   r1   r5   _start_threadr4   appendnotifyr"   objblocktimeoutr%   r%   r&   putP   s    
z	Queue.putc              	   C   s   | j rtd| d|rH|d krH| j |  }W 5 Q R X | j  nr|rXt | }| j||sjt	zB|r|t  }| 
|st	n| 
 st	|  }| j  W 5 | j  X t|S r@   )r8   rA   r   r=   r   releasetime	monotonicrB   r   r?   _ForkingPicklerloads)r"   rH   rI   resZdeadliner%   r%   r&   get\   s*    
z	Queue.getc                 C   s   | j | jj  S r(   )r   r   _semlockZ
_get_valuer*   r%   r%   r&   qsizev   s    zQueue.qsizec                 C   s
   |    S r(   r?   r*   r%   r%   r&   emptyz   s    zQueue.emptyc                 C   s   | j j S r(   )r   rR   _is_zeror*   r%   r%   r&   full}   s    z
Queue.fullc                 C   s
   |  dS NF)rQ   r*   r%   r%   r&   
get_nowait   s    zQueue.get_nowaitc                 C   s   |  |dS rX   )rJ   r"   rG   r%   r%   r&   
put_nowait   s    zQueue.put_nowaitc                 C   s2   d| _ z| j  W 5 | j}|r,d | _|  X d S )NT)r8   r9   r   close)r"   r\   r%   r%   r&   r\      s    zQueue.closec                 C   s.   t d | jstd| | jr*|   d S )NzQueue.join_thread()zQueue {0!r} not closed)r
   r8   AssertionErrorformatr6   r*   r%   r%   r&   join_thread   s    zQueue.join_threadc                 C   s6   t d d| _z| j  W n tk
r0   Y nX d S )NzQueue.cancel_join_thread()T)r
   r7   r6   ZcancelAttributeErrorr*   r%   r%   r&   cancel_join_thread   s    zQueue.cancel_join_threadc              
   C   s   t d | j  tjtj| j| j| j| j	| j
j| j| j| jfdd| _d| j_t d | j  t d | jst| jtjt| jgdd| _t| tj| j| jgd	d| _d S )
NzQueue._start_thread()ZQueueFeederThread)targetargsnameTzdoing self._thread.start()z... done self._thread.start())Zexitpriority
   )r
   r4   clearr/   ZThreadr   _feedr1   r;   r   r   r\   r   _on_queue_feeder_errorr   r5   Zdaemonstartr7   r   _finalize_joinweakrefrefr6   _finalize_closer9   r*   r%   r%   r&   rC      s<    
   
  
zQueue._start_threadc                 C   s4   t d |  }|d k	r(|  t d nt d d S )Nzjoining queue threadz... queue thread joinedz... queue thread already dead)r
   join)Ztwrthreadr%   r%   r&   rk      s    
zQueue._finalize_joinc              	   C   s.   t d | | t |  W 5 Q R X d S )Nztelling queue thread to quit)r
   rD   	_sentinelrE   )buffernotemptyr%   r%   r&   rn      s    
zQueue._finalize_closec              
   C   sX  t d |j}|j}	|j}
| j}t}tjdkr<|j}|j}nd }z|  z| sT|
  W 5 |	  X zb| }||krt d |  W W d S t	|}|d kr|| qb|  z|| W 5 |  X qbW n t
k
r   Y nX W q@ tk
rP } zV|rt|ddtjkrW Y 6d S t r.td| W Y d S |  ||| W 5 d }~X Y q@X q@d S )Nz$starting thread to feed data to piper   z%feeder thread got sentinel -- exitingerrnor   zerror in queue thread: %s)r
   rB   rK   waitpopleftrq   r   r   rN   dumps
IndexError	Exceptiongetattrrt   ZEPIPEr   r   )rr   rs   r:   Z	writelockr\   Zignore_epipeonerrorZ	queue_semZnacquireZnreleaseZnwaitZbpopleftsentinelZwacquireZwreleaserG   er%   r%   r&   rh      sN    







zQueue._feedc                 C   s   ddl }|  dS )z
        Private API hook called when feeding data in the background thread
        raises an exception.  For overriding by concurrent.futures.
        r   N)	traceback	print_exc)r}   rG   r~   r%   r%   r&   ri     s    zQueue._on_queue_feeder_error)r   )TN)TN)__name__
__module____qualname__r'   r+   r.   r    rJ   rQ   rS   rU   rW   rY   r[   r\   r_   ra   rC   staticmethodrk   rn   rh   ri   r%   r%   r%   r&   r   "   s.   



 
	

=c                   @   s@   e Zd ZdddZdd Zdd Zdd
dZdd Zdd Zd	S )r   r   c                C   s*   t j| ||d |d| _| | _d S )N)r$   r   )r   r'   Z	Semaphore_unfinished_tasksr0   _condr!   r%   r%   r&   r'   #  s    zJoinableQueue.__init__c                 C   s   t | | j| jf S r(   )r   r+   r   r   r*   r%   r%   r&   r+   (  s    zJoinableQueue.__getstate__c                 C   s,   t | |d d  |dd  \| _| _d S )N)r   r.   r   r   r,   r%   r%   r&   r.   +  s    zJoinableQueue.__setstate__TNc              
   C   s   | j rtd| d| j||s(t| jJ | j8 | jd krJ|   | j	
| | j  | j  W 5 Q R X W 5 Q R X d S r@   )r8   rA   r   rB   r   r1   r   r5   rC   r4   rD   r   rK   rE   rF   r%   r%   r&   rJ   /  s    

zJoinableQueue.putc              	   C   s@   | j 0 | jdstd| jj r2| j   W 5 Q R X d S )NFz!task_done() called too many times)r   r   rB   rA   rR   rV   Z
notify_allr*   r%   r%   r&   	task_done<  s
    zJoinableQueue.task_donec              	   C   s,   | j  | jj s| j   W 5 Q R X d S r(   )r   r   rR   rV   ru   r*   r%   r%   r&   ro   C  s    zJoinableQueue.join)r   )TN)	r   r   r   r'   r+   r.   rJ   r   ro   r%   r%   r%   r&   r   !  s   

c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )r   c                C   sH   t jdd\| _| _| | _| jj| _tj	dkr:d | _
n
| | _
d S )NFr   r   )r   r   r   r   r   r   r>   r?   r   r   r   )r"   r$   r%   r%   r&   r'   N  s    


zSimpleQueue.__init__c                 C   s
   |    S r(   rT   r*   r%   r%   r&   rU   W  s    zSimpleQueue.emptyc                 C   s   t |  | j| j| j| jfS r(   )r	   r)   r   r   r   r   r*   r%   r%   r&   r+   Z  s    
zSimpleQueue.__getstate__c                 C   s"   |\| _ | _| _| _| j j| _d S r(   )r   r   r   r   r>   r?   r,   r%   r%   r&   r.   ^  s    zSimpleQueue.__setstate__c              	   C   s&   | j  | j }W 5 Q R X t|S r(   )r   r   r<   rN   rO   )r"   rP   r%   r%   r&   rQ   b  s    zSimpleQueue.getc              	   C   sD   t |}| jd kr"| j| n| j | j| W 5 Q R X d S r(   )rN   rw   r   r   r:   rZ   r%   r%   r&   rJ   h  s
    

zSimpleQueue.putN)	r   r   r   r'   rU   r+   r.   rQ   rJ   r%   r%   r%   r&   r   L  s   	)__all__r   r   r/   r2   rL   rl   rt   Zqueuer   r   Z_multiprocessing r   r	   Z	reductionZForkingPicklerrN   utilr
   r   r   r   r   objectr   rq   r   r   r%   r%   r%   r&   <module>
   s$   
 v
+