
    *                        d Z ddlmZ ddlmZ ddlmZ ddlZddlZddlZddlm	Z
 ddlmZ ddlm	Z ddlmZ dd	lmZ dd
lmZ ddlm	Z	 ddlmZ ddlmZ ddlmZ ddlZdZdZej:                  j<                  j>                  j@                  jC                  e       ej:                  j<                  j>                  jD                  jC                  e       ddZ#ddZ$d Z%ddZ&ddZ'ddZ(d Z) G d de*      Z+ G d de*      Z,y)zHelpers for accessing GCS.

Bulk object uploads and downloads use methods that shell out to gsutil.
Lightweight metadata / streaming operations use the StorageClient class.
    )absolute_import)division)unicode_literalsN)
exceptions)transfer)storage_api)storage_util)apis)log)
properties)	resourcesgs<   c                     t         j                  j                  j                  j	                         }|rt        | |       y t        | ||       y )N)storage_client)r   VALUESstorage
use_gsutilGetBool_UploadGsutil_UploadStorageClient)filesdestinationr   r   s       6lib/googlecloudsdk/api_lib/dataproc/storage_helpers.pyUploadr   8   s=       ((33;;=*%%NK    c           	         |xs t        j                         }| D ]s  }t        j                  j	                  |      }t        j                  j                  ||      }t        j                  j                  |      }	 |j                  ||       u y# t        j                  $ r:}t        j                  dj                  dj                  |       ||            d}~ww xY w)zUpload a list of local files to GCS.

  Args:
    files: The list of local files to upload.
    destination: A GCS "directory" to copy the files into.
    storage_client: Storage api client used to copy files to gcs.
  z)Failed to upload files ['{}'] to '{}': {}', 'N)r   StorageClientospathbasenamejoinr	   ObjectReferenceFromUrlCopyFileToGCSr   BadFileExceptiondp_exceptionsFileUploadErrorformat)	r   r   r   clientfile_to_upload	file_namedest_urldest_objecterrs	            r   r   r   B   s     8[668&n  0Iww||K3H..66x@K5>;7  && 5))
5
<
<kk% +s45 55s   =BC%5CCc                     | }||gz  }t        j                  d|      }|dk7  r4t        j                  dj	                  dj                  |       |            y)zUpload a list of local files to GCS.

  Args:
    files: The list of local files to upload.
    destination: A GCS "directory" to copy the files into.
  cpr   z5Failed to upload files ['{0}'] to '{1}' using gsutil.r   N)r	   RunGsutilCommandr(   r)   r*   r#   )r   r   args	exit_codes       r   r   r   W   sb     
$;-$++D$7)!^

'
'?FFKK	-. . r   c                     |xs t        j                         }	 |j                  |       S # t         j                  $ r Y yw xY w)zGets a bucket if it exists.

  Args:
    bucket: The bucket name.
    storage_client: Storage client instance.

  Returns:
    A bucket message, or None if it doesn't exist.
  N)r   r   	GetBucketBucketNotFoundError)bucketr   r+   s      r   r7   r7   g   sE     8[668&F##		(	( s   + A Ac                 \    |xs t        j                         }|j                  | ||       y)am  Creates a bucket.

  Creates a bucket in the specified region. If the region is None, the bucket
  will be created in global region.

  Args:
    bucket: Name of bucket to create.
    region: Region to create bucket in.
    storage_client: Storage client instance.
    project: The project to create the bucket in. If None, current Cloud SDK
    project is used.
  )locationprojectN)r   r   CreateBucketIfNotExists)r9   regionr   r<   r+   s        r   r=   r=   y   s,     8[668&  &' Jr   c                 X   |xs t        j                         }t        j                  j	                  |       }	 |j                  |      }t        j                  |d      }|j                         S # t        j                  $ r% t        j                  dj                  |             w xY w)aK  Reads an object's content from GCS.

  Args:
    object_url: The URL of the object to be read. Must have "gs://" prefix.
    storage_client: Storage api client used to read files from gcs.

  Raises:
    ObjectReadError:
      If the read of GCS object is not successful.

  Returns:
    A str for the content of the GCS object.
  zutf-8)encodingzFailed to read file '{0}'.)r   r   r	   r$   r%   
ReadObjectioTextIOWrapperreadr   r'   r(   ObjectReadErrorr*   )
object_urlr   r+   
object_refbytes_iowrappers         r   rA   rA      s     8[668&++33J?*9  ,Hx':G<<>		$	$ 9

'
'$++J79 99s   7A1 18B)c                     t         j                  j                  |       }|j                  |j                  |j
                        S )z.Build an Object proto message from a GCS path.)r9   name)r   REGISTRYParseStorageURLObjectr9   object)r!   messagesresources      r   GetObjectRefrR      s3    //5(	hoo	FFr   c                   *    e Zd ZdZd ZddZd Zd Zy)r   zMicro-client for accessing GCS.c                 p    t        j                  dd      | _        t        j                  dd      | _        y )Nr   v1)	core_apisGetClientInstancer+   GetMessagesModulerP   selfs    r   __init__zStorageClient.__init__   s*    --i>DK//	4@DMr   Nc                     | j                   j                  |j                  |j                        }	 | j                  j
                  j                  ||      S # t        j                  $ r Y y w xY w)N)r9   rO   )requestdownload)	rP   StorageObjectsGetRequestr9   rK   r+   objectsGetapitools_exceptionsHttpNotFoundError)rZ   rG   r^   r]   s       r   
_GetObjectzStorageClient._GetObject   sg    mm44   5 :G[[  $$Wx$HH00 s   &A A0/A0c                 $    | j                  |      S )a-  Get the object metadata of a GCS object.

    Args:
      object_ref: A proto message of the object to fetch. Only the bucket and
        name need be set.

    Raises:
      HttpError:
        If the responses status is not 2xx or 404.

    Returns:
      The object if it exists otherwise None.
    )rd   )rZ   rG   s     r   	GetObjectzStorageClient.GetObject   s     ??:&&r   c                     t         j                  j                  ||j                  d      }| j	                  ||       |S )a  Build an apitools Download from a stream and a GCS object reference.

    Note: This will always succeed, but HttpErrors with downloading will be
      raised when the download's methods are called.

    Args:
      stream: An Stream-like object that implements write(<string>) to write
        into.
      object_ref: A proto message of the object to fetch. Only the bucket and
        name need be set.

    Returns:
      The download.
    F)
total_sizeauto_transfer)r^   )r   Download
FromStreamsizerd   )rZ   streamrG   r^   s       r   BuildObjectStreamzStorageClient.BuildObjectStream   s@       ++:??% , AHOOJO2Or   N)__name__
__module____qualname____doc__r[   rd   rf   rn    r   r   r   r      s    'A' r   r   c                   X    e Zd ZdZd	dZed        Zd Zd Zd Z	e
j                  fdZy)
StorageObjectSeriesStreamzFI/O Stream-like class for communicating via a sequence of GCS objects.Nc                 b    || _         |xs
 t               | _        d| _        d| _        d| _        y)a+  Construct a StorageObjectSeriesStream for a specific gcs path.

    Args:
      path: A GCS object prefix which will be the base of the objects used to
          communicate across the channel.
      storage_client: a StorageClient for accessing GCS.

    Returns:
      The constructed stream.
    Tr   N)
_base_pathr   _gcs_open_current_object_index_current_object_pos)rZ   r!   r   s      r   r[   z"StorageObjectSeriesStream.__init__   s4     DO1-/DIDJ "#D  !Dr   c                     | j                   S )zWhether the stream is open.rz   rY   s    r   openzStorageObjectSeriesStream.open   s     ::r   c                     d| _         y)zClose the stream.FNr~   rY   s    r   ClosezStorageObjectSeriesStream.Close   s	    DJr   c                 2    | j                   st        d      y )NzI/O operation on closed stream.)r   
ValueErrorrY   s    r   _AssertOpenz%StorageObjectSeriesStream._AssertOpen  s    99899 r   c                     dj                  | j                  |      }| j                  j                  t	        || j                  j
                              S )z!Get the ith object in the series.z{0}.{1:09d})r*   rx   ry   rf   rR   rP   )rZ   ir!   s      r   rd   z$StorageObjectSeriesStream._GetObject  s>    3D99|D$))2D2DEFFr   c                 n   | j                          d}d}|}||k  ra| j                  | j                  dz         }|r|r!	 | j                  | j                        }|s	 |S |j                  | j                  z
  }|dk  r$t        dj                  |j                              |j                  dk(  r| j                          	 |S ||z
  }	t        |	|      }
|
dkD  rb| j                  j                  ||      }|j!                  | j                  | j                  |
z   dz
         | xj                  |
z  c_        ||
z  }|xr | j                  |j                  k(  }|r |}| xj                  dz  c_        d| _        d	 |S |S # t        j                  $ r!}t        j                  d|       Y d}~|S d}~ww xY w)a  Read from this stream into a writable.

    Reads at most n bytes, or until it sees there is not a next object in the
    series. This will block for the duration of each object's download,
    and possibly indefinitely if new objects are being added to the channel
    frequently enough.

    Args:
      writable: The stream-like object that implements write(<string>) to
          write into.
      n: A maximum number of bytes to read. Defaults to sys.maxsize
        (usually ~4 GB).

    Raises:
      ValueError: If the stream is closed or objects in the series are
        detected to shrink.

    Returns:
      The number of bytes read.
    r   N   zFailed to fetch GCS output:
%szObject [{0}] shrunk.)r   rd   r{   rb   	HttpErrorr   warningrl   r|   r   r*   rK   r   minry   rn   GetRange)rZ   writablen
bytes_readobject_infomax_bytes_to_readnext_object_infoerrornew_bytes_availablebytes_left_to_readnew_bytes_to_readr^   object_finisheds                r   ReadIntoWritablez*StorageObjectSeriesStream.ReadIntoWritable
  s   * 	JK
(
()C)Ca)GH ,	(B(BC+ 
L I (,,t/G/GG	q	 /66{7G7GHII			Q	

8 5 -z902EF	Q	99..xE$$$$'881<	> 	  $55 ''

 
Kt77;;K;KK  
&""a'"#$  	:W #,, 	
++7
?
R W	s    F   F4F//F4ro   )rp   rq   rr   rs   r[   propertyr   r   r   rd   sysmaxsizer   rt   r   r   rv   rv      s=    N!*  :G
 *- Lr   rv   ro   )NN)-rs   
__future__r   r   r   rB   r    r   apitools.base.pyr   rb   r   googlecloudsdk.api_lib.dataprocr(   googlecloudsdk.api_lib.storager   r	   googlecloudsdk.api_lib.utilr
   rV   googlecloudsdk.calliopegooglecloudsdk.corer   r   r   six.moves.urllib.parsesixSTORAGE_SCHEMEHTTP_TIMEOUTmovesurllibparseuses_relativeappenduses_netlocr   r   r   r7   r=   rA   rR   rO   r   rv   rt   r   r   <module>r      s     '  ' 	 	 
 > % G 6 7 9 . # * )    		     $ $ + +N ; 		     " " ) ). 9L5*. $K"92G4F 4nv vr   