
                             d Z ddlmZ ddlmZ ddlmZ ddlZddlZddlmZ ddlm	Z	 ddlm
Z
  G d	 d
e
j                        Zy)zJImplements a file wrapper used for in-flight retries of streaming uploads.    )absolute_import)division)unicode_literalsN)errors)	hash_util)upload_streamc                   v     e Zd ZdZ	 	 d fd	Zd Zd Zd Zd Zd Z	d fd	Z
ej                  fd	Zd
 Z xZS )BufferedUploadStreamzHSupports limited seeking within a non-seekable stream by buffering data.c                     t         |   |d||       t        j                         | _        || _        d| _        d| _        d| _        t        j                  | j                        | _        y)a  Initializes a FilePart instance.

    Args:
      stream (io.IOBase): The underlying stream wrapped by this class.
      max_buffer_size: Maximum size of the internal buffer. This should be >= to
          the chunk size used by the API to execute streaming uploads to ensure
          that at least one full chunk write can be repeated in the event of a
          server error.
      digesters (dict[util.HashAlgorithm, hashlib hash object]|None): See super
        class.
      progress_callback (func[int]|None): See super class.
    N)streamlength	digestersprogress_callbackr   )super__init__collectionsdeque_buffer_max_buffer_size_buffer_start	_position_buffer_endr   copy_digesters
_digesters_checkpoint_digesters)selfr   max_buffer_sizer   r   	__class__s        @lib/googlecloudsdk/command_lib/storage/buffered_upload_stream.pyr   zBufferedUploadStream.__init__!   sm    " 
)T#+	 $ - $$&DL+DDDND!*!9!9$//!JD    c                     | j                   S Nr   r   s    r   _get_absolute_positionz+BufferedUploadStream._get_absolute_position@   s    >>r    c                     || _         y r"   r#   )r   offsets     r   _update_absolute_positionz.BufferedUploadStream._update_absolute_positionC   s	    DNr    c                    g }|}| j                   | j                  k  r| j                  }| j                  D ]  }|t	        |      z   | j                   k\  rZ| j                   |z
  }t	        |      |z
  }t        ||      }|j                  ||||z           ||z  }| xj                   |z  c_         |t	        |      z  } dj                  |      S )a  Get any buffered data required to complete a read.

    If a backward seek has not happened, the buffer will never contain any
    information needed to complete a read call. Return the empty string in
    these cases.

    If the current position is before the end of the buffer, some of the
    requested bytes will be in the buffer. For example, if our position is 1,
    five bytes are being read, and the buffer contains b'0123', we will return
    b'123'. Two additional bytes will be read from the stream at a later stage.

    Args:
      amount (int): The total number of bytes to be read

    Returns:
      A byte string, the length of which is equal to `amount` if there are
      enough buffered bytes to complete the read, or less than `amount` if there
      are not.
    r    )r   r   r   r   lenminappendjoin)	r   amountbuffered_databytes_remainingposition_in_bufferdataoffset_from_positionbytes_to_read_this_block	read_sizes	            r   _read_from_bufferz&BufferedUploadStream._read_from_bufferF   s    ( MO~~(((--,,$D	)T^^;!%2D!D
%(Y1E%E
"2OD)


t$89M$-:.  / 0
Y
&/
..I
%.c$i'  88M""r    c                      y)a#  Disables parent class digester checkpointing behavior.

    To guarantee that seeks within the buffer are possible, we need to ensure
    that the checkpoint is aligned with the buffer's start_byte. This is not
    possible if we save digester checkpoints when the parent class does so.
    N r$   s    r   _save_digesters_checkpointz/BufferedUploadStream._save_digesters_checkpointk   s     	r    c                    |re| j                   j                  |       | xj                  t        |      z  c_        d}| j                  | j                  z
  | j
                  kD  r| j                   j                         }| xj                  t        |      z  c_        |r| j
                  | j                  | j                  z
  z
  }|dk\  r4| j                   j                  || d        | xj                  |z  c_        t        j                  | j                  |dt        |      |z
          | j                  | _        | j                  | j                  z
  | j
                  kD  ryyy)ax  Adds data to the buffer, respecting max_buffer_size.

    The buffer can consist of many different blocks of data, e.g.

      [b'0', b'12', b'3']

    With a maximum size of 4, if we read two bytes, we must discard the oldest
    data and keep half of the second-oldest block:

      [b'2', b'3', b'45']

    Args:
      data (bytes): the data being added to the buffer.
    N   )r   r,   r   r*   r   r   popleft
appendleftr   update_digestersr   _checkpoint_absolute_index)r   r2   oldest_datarefill_amounts       r   _store_dataz BufferedUploadStream._store_datat   s9    
ll$
#d)#kt111D4I4IIll**,c+..//!3!335-aLL##K$@A-/ 
$
$((;3{+m;<> -1,>,>$
) t111D4I4II	 r    c                 .   |du xs |dk  }|r| j                   }n|}| j                  |      }|t        |      z  }|rt        |   d      }n|rt        |   |      }nd}| xj
                  t        |      z  c_        | j                  |       ||z   S )zReads from the wrapped stream.

    Args:
      size: The amount of bytes to read. If omitted or negative, the entire
          stream will be read and returned.

    Returns:
      Bytes from the wrapped stream.
    Nr   r    )r   r6   r*   r   	_get_datar   rB   )r   sizeread_all_bytesr0   r2   new_datar   s         r   rE   zBufferedUploadStream._get_data   s     T\-TAXN--oo!!/2Ds4y Oy$1"5h	y$1/BhhNNc(m#NX(?r    c                    |t         j                  k(  r[|| j                  k  s|| j                  kD  r:t	        j
                  dj                  || j                  | j                              |}n|t         j                  k(  rt        |      | j                  kD  r/t	        j
                  dj                  || j                              | j                  | j                        r	 | j                  | j                        r| j                  |z   }n%t	        j
                  dj                  ||            | j                  |       || _        y)z!Seeks within the buffered stream.zUnable to recover from an upload error because limited buffering is available for streaming uploads. Offset {} was requested, but only data from {} to {} is buffered.zNInvalid SEEK_END offset {} on streaming upload. Only {} bytes can be buffered.z;Invalid seek mode on streaming upload. Mode: {}, offset: {}N)osSEEK_SETr   r   r   ErrorformatSEEK_ENDabsr   readr   _catch_up_digesters)r   r'   whencenew_positions       r   seekzBufferedUploadStream.seek   s!   	$$$	$1A1A(All44:F**D,<,<5>? 	?
 l	2;;	 
Vt,,	,ll  &vt/D/D EG 	G IId++, IId++,^^f,lLL
G
N
Nf  	\*!DNr    c                      y)ay  Indicates that this stream is not seekable.

    Needed so that boto3 can correctly identify how to treat this stream.
    The library attempts to seek to the beginning after an upload completes,
    which is not always possible.

    Apitools does not check the return value of this method, so it will not
    raise issues for resumable uploads.

    Returns:
      False
    Fr8   r$   s    r   seekablezBufferedUploadStream.seekable   s     r    )NN)rD   )__name__
__module____qualname____doc__r   r%   r(   r6   r9   rB   rE   rJ   rK   rT   rV   __classcell__)r   s   @r   r
   r
      sH    P
 !%	K>##J	"?H@ !# "8r    r
   )rZ   
__future__r   r   r   r   rJ   "googlecloudsdk.command_lib.storager   r   r   UploadStreamr
   r8   r    r   <module>r_      s7     Q &  '  	 5 8 <C=55 Cr    