o
    !iD=                  
   @   s  d dl mZmZ d dlZd dlZd dlZd dlmZ d dlZd dlZd dl	Z	d dl
Z
d dlmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ dd	lmZmZ dd
lmZmZmZ ddlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% ddl&m'Z' d dl(m)Z) d dl*m+Z+ ddgZ,eG dd dZ-eG dd dZ.dZ/dejdejfddZ0de$defddZ1G dd deZ2G dd  d e2Z3G d!d" d"e2Z4de$de5fd#d$Z6d%ee$ deee$  fd&d'Z7d(d) Z8d*ej9d+ej9d,e"d-e5d.e:f
d/d0Z;G d1d deZ<G d2d deZ=dS )3    )ABCabstractmethodN)	dataclass)ListUnionDictcast)Tensor)Future)Path   )MetadataMetadataIndex)StorageReaderStorageWriterWriteResult)LoadItemTypeLoadPlannerLoadPlanSavePlanSavePlannerReadItem	WriteItemWriteItemType)_create_file_view)narrow_tensor_by_index)_get_device_moduleFileSystemWriterFileSystemReaderc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz,
    This is the per entry storage info
    relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r*   r*   v/var/www/html/eduruby.in/lip-sync/lip-sync-env/lib/python3.10/site-packages/torch/distributed/checkpoint/filesystem.pyr   2   s
   
 r   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r#   r$   r%   r'   r(   r*   r*   r*   r+   r,   =   s   
 r,   z.distcptensorreturnc                 C   s,   |    } |   |  kr|  } | S N)detachcpuZ_typed_storage_sizenumelclone)r.   r*   r*   r+   _trimE   s   r6   itemc                 C   s   t | j||dS )N)indexsize_in_bytesstorage_data)r   r8   )r7   r9   r:   r*   r*   r+   _result_from_write_itemL   s   r;   c                   @   s0   e Zd Zedd Zedd Zedd ZdS )_TensorLoaderc                 C      d S r0   r*   selfsizeobjr*   r*   r+   addU      z_TensorLoader.addc                 C   r=   r0   r*   r?   r*   r*   r+   start_loadingY   rC   z_TensorLoader.start_loadingc                 C   r=   r0   r*   rD   r*   r*   r+   values]   rC   z_TensorLoader.valuesN)r#   r$   r%   r   rB   rE   rF   r*   r*   r*   r+   r<   T   s    

r<   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_SerialCpuLoaderc                 C   s   || _ g | _d S r0   )resolve_funitems)r?   rH   r*   r*   r+   __init__c   s   
z_SerialCpuLoader.__init__c                 C   s   | j ||f d S r0   )rI   appendr>   r*   r*   r+   rB   g   s   z_SerialCpuLoader.addc                 C   r=   r0   r*   rD   r*   r*   r+   rE   j      z_SerialCpuLoader.start_loadingc                 c   sR    | j D ]"\}}| | }| }|  | kr!| }||fV  qd S r0   )rI   rH   r1   r2   storager@   r4   r5   r?   _rA   r.   r*   r*   r+   rF   m   s   z_SerialCpuLoader.valuesN)r#   r$   r%   rJ   rB   rE   rF   r*   r*   r*   r+   rG   b   s
    rG   c                   @   sR   e Zd ZdddZedd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )_OverlappingCpuLoaderN@B c                 C   s   || _ g | _|| _d| _t | _d| _d| _|r|j	nt
dj| _	t| j	| _|p/| j | _| j| j krD| j| j  d S d S )Nr   Fcuda)rH   rI   inflight_threshholdin_flight_datacollectionsdequecurrent_itemsidxstarteddevice_typetorchdevicetyper   device_moduleZcurrent_streamstreamZwait_stream)r?   rH   r_   rS   r*   r*   r+   rJ   z   s   
z_OverlappingCpuLoader.__init__c                 C   s   | j t| jkS r0   )rX   lenrI   rD   r*   r*   r+   _done   s   z_OverlappingCpuLoader._donec                 C   sl   g }| j | jkr| j  | j | jkr4| j }|  j |d  |d   8  _ || | j | jks|S Nr   )	rT   rS   r_   synchronizerW   popleftr4   element_sizerK   )r?   drainedvalr*   r*   r+   _drain   s   

"
z_OverlappingCpuLoader._drainc                 C   s(  | j | j | jsr| j| jk rz| j| j \}}|  jd7  _| | }|j	j
| jkr6|jddd}n|j	t	dkrL|  | krL| }| j||f |  j| |  7  _| js| j| jk sW d    d S W d    d S W d    d S W d    d S 1 sw   Y  d S )Nr   r2   T)r\   Znon_blocking)r^   r_   ra   rT   rS   rI   rX   rH   r1   r\   r]   rZ   tor[   rM   r@   r4   r5   rW   rK   re   rN   r*   r*   r+   _refill   s8   "z_OverlappingCpuLoader._refillc                 C   s(   | j sJ t| jdkr| j  | jS rb   )ra   r`   rW   r_   rc   rD   r*   r*   r+   _finish   s   

z_OverlappingCpuLoader._finishc                 C   s"   | j rtd| j||f d S )Nz&cannot add items after loading started)rY   RuntimeErrorrI   rK   r>   r*   r*   r+   rB      s   z_OverlappingCpuLoader.addc                 C   s.   | j rd S d| _ | jjdd d |   d S )NTc                 S      | d S rb   r*   xr*   r*   r+   <lambda>       z5_OverlappingCpuLoader.start_loading.<locals>.<lambda>key)rY   rI   sortrj   rD   r*   r*   r+   rE      s
   z#_OverlappingCpuLoader.start_loadingc                 c   sB    |    | js|  }|   |E d H  | jr|  E d H  d S r0   )rE   ra   rh   rj   rk   )r?   rf   r*   r*   r+   rF      s   
z_OverlappingCpuLoader.values)NrQ   )r#   r$   r%   rJ   propertyra   rh   rj   rk   rB   rE   rF   r*   r*   r*   r+   rP   y   s    


rP   c                 C   sB   d}| j d us	J | j jD ]}||9 }q| j jj}|tj| S Nr   )Ztensor_datar@   
propertiesdtyper[   _utilsZ_element_size)r7   r@   srx   r*   r*   r+   
_item_size   s   

r{   rI   c           	      C   s   | dkr|gS dd |D }dd |D }dd t | D }dd t | D }|jtdd t|D ]\}}|||   | q2|D ]}tt|d	d
 dd }|| | ||  t|7  < qB|S )Nr   c                 S      g | ]
}|j tjkr|qS r*   r]   r   BYTE_IO.0wir*   r*   r+   
<listcomp>       z+_split_by_size_and_type.<locals>.<listcomp>c                 S      g | ]
}|j tjkr|qS r*   r}   r   r*   r*   r+   r      r   c                 S   s   g | ]}g qS r*   r*   r   rO   r*   r*   r+   r          c                 S   s   g | ]}d qS )r   r*   r   r*   r*   r+   r      r   T)rs   reversec                 S   rm   rv   r*   rn   r*   r*   r+   rp      rq   z)_split_by_size_and_type.<locals>.<lambda>rr   r   )rangert   r{   	enumeraterK   min)	ZbinsrI   bytes_wtensor_wZbucketsZbucket_sizesir   rX   r*   r*   r+   _split_by_size_and_type   s   r   c                 C   s   |   }|jtjkrt|tjsJ | |  nt|t	j
s"J |jt	dks,J t	||  |   | }t||t|||S )Nr2   )tellr]   r   r~   
isinstanceioBytesIOwrite	getbufferr[   r	   r\   saver;   r   )r_   data
write_itemstorage_keyr!   r"   r*   r*   r+   _write_item   s   r   
file_queueresult_queueplannerrS   	use_fsyncc              	      sB  z	 |   \}}}tj r|dkrt fdd|d}nt fdd}dd |D }	|	D ]
}
|t|
|
 q.|  d	d |D }g }t	|d
=}|D ]}
 
|
}|t|||
| qN| D ]\}}
|jsmJ |t|||
| qd|rt|  W d    n1 sw   Y  || q tjy   Y d S w )NTr   c                    
     | S r0   resolve_datarn   r   r*   r+   rp        
 z)_write_files_from_queue.<locals>.<lambda>)rS   c                    r   r0   r   rn   r   r*   r+   rp     r   c                 S   r   r*   r}   r   r*   r*   r+   r         z+_write_files_from_queue.<locals>.<listcomp>c                 S   r|   r*   r}   r   r*   r*   r+   r     r   wb)
get_nowaitr[   rR   Zis_availablerP   rG   rB   r{   rE   openr   rK   r   rF   Zis_cpuosfsyncfilenoputqueueEmpty)r   r   r   rS   r   	file_namer   Zwrite_itemsloaderr   r   r   Zwrite_resultsr_   r   r.   r*   r   r+   _write_files_from_queue   sR   




*r   c                       s   e Zd ZdZ				ddeeejf dedede	d	e	d
df fddZ
ded
dfddZded
efddZdee d
ee fddZdeded
eee  fddZdedeee  d
dfddZ  ZS )r   aa  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    Tr   逖 pathsingle_file_per_rank
sync_filesthread_countper_thread_copy_aheadr/   Nc                    s0   t    t|| _|| _|| _|| _|| _dS )a  
        Initialize the writer pointing to `path`

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)superrJ   r   r   r   r   r   r   )r?   r   r   r   r   r   	__class__r*   r+   rJ   D  s   


zFileSystemWriter.__init__is_coordinatorc                 C   r=   r0   r*   )r?   r   r*   r*   r+   set_up_storage_writer_  rL   z&FileSystemWriter.set_up_storage_writerplanc                 C   s   | j jddd |S )NT)parentsexist_ok)r   mkdirr?   r   r*   r*   r+   prepare_local_planb  s   z#FileSystemWriter.prepare_local_planglobal_planc                 C   s   dd t |D }|S )Nc                 S   s*   g | ]\}}t j|td | ddqS )__rO   r:   )dataclassesreplacer,   )r   r   r   r*   r*   r+   r   i  s    z8FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)r   )r?   r   Z	new_plansr*   r*   r+   prepare_global_planf  s   z$FileSystemWriter.prepare_global_planr   c                    s6  |j d  fdd}t }| jr,t| j|jD ]}| }|| j| ||f qn|jD ]}| }|| j| ||gf q/t }g }	t	d| jD ]}
t
jt|||| j| jfd}|  |	| qMt|||| j| jd |	D ]}|  qtg }z	 || 7 }q tjy   	 t }|| | Y S w )Nr   c                     s   j    t }  d7  | S rv   )r-   DEFAULT_SUFFIX)r   Z
file_countZstorage_planr*   r+   gen_filew  s   z-FileSystemWriter.write_data.<locals>.gen_filer   )targetargs)r   r   r   rS   r   )r:   r   Queuer   r   r   rI   r   r   r   	threadingThreadr   r   r   startrK   joinr   r   r
   
set_result)r?   r   r   r   r   Zbucketr   r7   r   threadsrO   tresfutr*   r   r+   
write_datao  s`   



zFileSystemWriter.write_datametadataresultsc                 C   s   t  }|D ]}|dd |D  q||_| jd d}t|| t|	  W d    n1 s5w   Y  | jd 
| jd  d S )Nc                 S   s   i | ]}|j |jqS r*   )r8   r:   )r   wrr*   r*   r+   
<dictcomp>  s    z+FileSystemWriter.finish.<locals>.<dictcomp>z.metadata.tmpr   	.metadata)dictupdater:   r   r   pickledumpr   r   r   rename)r?   r   r   Z
storage_mdZwr_listmetadata_filer*   r*   r+   finish  s   zFileSystemWriter.finish)TTr   r   )r#   r$   r%   r&   r   r'   r   PathLikeboolr)   rJ   r   r   r   r   r   r   r
   r   r   r   r   __classcell__r*   r*   r   r+   r   6  sP    
	

A
c                       s   e Zd Zdeeejf ddf fddZdefddZ	d	e
d
eded fddZdefddZdededdfddZd	e
de
fddZdee
 dee
 fddZ  ZS )r   r   r/   Nc                    s    t    t|| _t | _d S r0   )r   rJ   r   r   r   r:   )r?   r   r   r*   r+   rJ     s   

zFileSystemReader.__init__sinfoc                 C   s   t ||j|jS r0   )r   r!   r"   )r?   filer   r*   r*   r+   _slice_file  s   zFileSystemReader._slice_filer   r   c                 C   s`  t  }|jD ]}| j|j }|j}||g | q| D ]\}}| j| dr}	|D ]g}
| j|
j }| 	|	|}|
j
tjkrWt||j}|d ||
| q.tttj|dd}t||
j|
j}||
 }| | ksJ d|
j d|  d|  || ||
| q.W d    n1 sw   Y  qt }| d  |S )Nrbr   r2   )Zmap_locationzreq z mismatch sizes z vs )!r   rI   r:   Zstorage_indexr    
setdefaultrK   r   r   r   r]   r   r~   r   r   readr"   seekZ
load_bytesr   r	   r[   loadr   Zstorage_offsetslengthsZresolve_tensorr1   r@   Zcopy_Zcommit_tensorr
   r   )r?   r   r   Zper_fileZ	read_itemZitem_mdr   r    reqsr   reqZ
file_slicebytesr.   Ztarget_tensorr   r*   r*   r+   	read_data  s@   




zFileSystemReader.read_datac                 C   s>   | j d d}t|W  d    S 1 sw   Y  d S )Nr   r   )r   r   r   r   )r?   r   r*   r*   r+   read_metadata  s   $zFileSystemReader.read_metadatar   r   c                 C   s   |j | _ | j d usJ d S r0   r   )r?   r   r   r*   r*   r+   set_up_storage_reader  s   z&FileSystemReader.set_up_storage_readerc                 C      |S r0   r*   r   r*   r*   r+   r     rL   z#FileSystemReader.prepare_local_planr   c                 C   r   r0   r*   )r?   r   r*   r*   r+   r     s   z$FileSystemReader.prepare_global_plan)r#   r$   r%   r   r'   r   r   rJ   r   r   r   r   r
   r   r   r   r   r   r   r   r   r   r*   r*   r   r+   r     s     &)>abcr   r   r   r   rU   r   r   r   r   r   typingr   r   r   r   r[   r	   Ztorch.futuresr
   pathlibr   r   r   r   rM   r   r   r   r   r   r   r   r   r   r   r   r   utilsr   Ztorch.distributed._shard._utilsr   Ztorch._utilsr   __all__r   r,   r   r6   r;   r<   rG   rP   r)   r{   r   r   r   r   r   r   r   r*   r*   r*   r+   <module>   st    (

Q


6 	