
    mN                         d Z ddlmZ ddlmZ ddlmZ ddlZddlZddlZddlZddl	m
Z ddl	mZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ dZdZdZdZdZdZdZ dZ!ejD                   G d dejF                               Z$y)z<Command to run an Airflow CLI sub-command in an environment.    )absolute_import)division)unicode_literalsN)environments_util)util)base)image_versions_util)resource_args)execution_utils)log)
console_iozairflow-workera  Because Cloud Composer manages the Airflow metadata database for your environment, support for the Airflow `{}` subcommand is being deprecated. To avoid issues related to Airflow metadata, we recommend that you do not use this subcommand unless you understand the outcome.   
      g      ?g      ?c                   z    e Zd ZdZej
                  Zed        Zd Zd Z	d Z
d Zd Zd Zd	 Zd
 Zd Zd Zd Zy)Runa  Run an Airflow sub-command remotely in a Cloud Composer environment.

  Executes an Airflow CLI sub-command remotely in an environment. If the
  sub-command takes flags, separate the environment name from the sub-command
  and its flags with ``--''. This command waits for the sub-command to
  complete; its exit code will match the sub-command's exit code.

  Note: Airflow CLI sub-command syntax differs between Airflow 1 and Airflow 2.
  Refer to the Airflow CLI reference documentation for more details.

  ## EXAMPLES

    The following command in environments with Airflow 2:

    {command} myenv dags trigger -- some_dag --run_id=foo

  is equivalent to running the following command from a shell inside the
  *my-environment* environment:

    airflow dags trigger --run_id=foo some_dag

  The same command, but for environments with Airflow 1.10.14+:

    {command} myenv trigger_dag -- some_dag --run_id=foo

  is equivalent to running the following command from a shell inside the
  *my-environment* environment:

    airflow trigger_dag some_dag --run_id=foo

  The following command (for environments with Airflow 1.10.14+):

    {command} myenv dags list

  is equivalent to running the following command from a shell inside the
  *my-environment* environment:

    airflow dags list
  c                    t        j                  |d       d}|j                  ddt        | j                  j                               dj                  dj                  t        | j                  j                         D cg c]4  \  }}dj                  ||j                  xs d|j                  xs d      6 c}}            |      	       g }| j                  j                         D ]c  \  }}|j                  s|j                  d
j                  |dj                  t        |j                  j                                                  e |j                  d       |j                  ddt        j                  dj                  dj                  |                   |j                  ddt        j                   dd       y c c}}w )Nz"in which to run an Airflow commandzThttps://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html
subcommand
SUBCOMMANDzThe Airflow CLI subcommand to run. Available subcommands include (listed with Airflow versions that support): {} (see {} for more info)., z{} [{}, {})z**)metavarchoiceshelpz- {}: {}z;- all other subcommands: all nested subcommands are allowedsubcommand_nestedSUBCOMMAND_NESTEDzeAdditional subcommand in case it is nested. The following is a list of allowed nested subcommands:
{}
)r   nargsr   cmd_argsCMD_ARGSz)Command line arguments to the subcommand.z4{command} myenv trigger_dag -- some_dag --run_id=foo)r   r   r   example)r
   AddEnvironmentResourceArgadd_argumentlistSUBCOMMAND_ALLOWLISTkeysformatjoinsorteditemsfrom_version
to_versionallowed_nested_subcommandsappendargparseOPTIONAL	REMAINDER)clsparserdoc_urlcmdrallowed_nested_subcommands_helpsub_cmds          (lib/surface/composer/environments/run.pyArgszRun.Args_   s   ++46 eG
S--2245()/)) ),(@(@(F(F(H )Ifc1 (..sANN4Jd/0||/CtE(H 
 #*$  %  ')#..446
 ))%,,


iiq;;@@BCD 7 $**E # &:;
<  	   8F  HCs    9Gc                    ddddddd| j                  |      }fd} ||      r\t        |j                  xs g       j                  ddh      r1|j                  xs g |_        |j                  j	                  d       yyy)a  Bypasses confirmations with "yes" responses.

    Prevents certain Airflow CLI subcommands from presenting a confirmation
    prompting (which can make the gcloud CLI stop responding). When necessary,
    bypass confirmations with a "yes" response.

    Args:
      args: argparse.Namespace, An object that contains the values for the
        arguments specified in the .Args() method.
      airflow_version: String, an Airflow semantic version.
    z1.10.6N)backfill
delete_dag)dagsr;   )r=   delete)tasksclear)dbcleanc                 t    | v rn| d   v r| d   } ny|    d u xs t        j                  |          dk\  S )Nr   F)image_versions_command_utilCompareVersions)sairflow_versionprompting_subcommandss    r8   _IsPromptingSubcommandz<Run.BypassConfirmationPrompt.<locals>._IsPromptingSubcommand   sc    	
#	#Q4((aD#A&$. B)99!#8#;=@ABC    z-yz--yes)_GetSubcommandTwoLevelsetr   
isdisjointr-   )selfargsrG   subcommand_two_levelrI   rH   s     `  @r8   BypassConfirmationPromptzRun.BypassConfirmationPrompt   s     "    66t<
C 	34DMMR ++T7O<mm)rdm
mm7# 	= 	5rJ   c           	      >   dg dgi}d }| j                  |      }|j                  |g       D ]m  }t        |      j                  t        |j                  xs g             s5t        j                  dj                  dj                  |       ||                   y)zPrevents running Airflow CLI commands without required arguments.

    Args:
      args: argparse.Namespace, An object that contains the values for the
        arguments specified in the .Args() method.
    )userscreate)z-pz
--passwordz--use-random-passwordc                     | D cg c]  }dj                  |       }}dj                  dj                  |            S c c}w )Nz"{}"z[{}]r   )r&   r'   )r   aquoted_argss      r8   _StringifyRequiredCmdArgsz>Run.CheckForRequiredCmdArgs.<locals>._StringifyRequiredCmdArgs   s<    /78x!V]]1%xk8]]499[122 9s   ?zMThe subcommand "{}" requires one of the following command line arguments: {}. N)	rK   getrL   rM   r   command_utilErrorr&   r'   )rN   rO   required_cmd_argsrX   rP   subcommand_required_cmd_argss         r8   CheckForRequiredCmdArgszRun.CheckForRequiredCmdArgs   s     	KL3  66t<
 ):(=(=b)"$	)	*	5	5c$--:M26N	O  #V-.)*FGIJ 	J)"rJ   c                     d}|j                   t        j                  v r5t        j                  t
        j                  |j                         dd      }|S )NTF)messagedefaultcancel_on_no)r   r[   SUBCOMMAND_DEPRECATIONr   PromptContinueDEPRECATION_WARNINGr&   )rN   rO   responses      r8   DeprecationWarningPromptzRun.DeprecationWarningPrompt   sH    H,===**%,,T__=h OrJ   c                     |j                   df}|j                  r|j                   |j                  f}|S |j                  r|j                   |j                  d   f}|S )a  Extract and return two level nested Airflow subcommand in unified shape.

    There are two ways to execute nested Airflow subcommands via gcloud, e.g.:
    1. {command} myenv users create -- -u User
    2. {command} myenv users -- create -u User
    The method returns here (users, create) in both cases.

    It is possible that first element of args.cmd_args will not be a nested
    subcommand, but that is ok as it will not break entire logic.
    So, essentially there can be subcommand_two_level = ['info', '--anonymize'].

    Args:
      args: argparse.Namespace, An object that contains the values for the
        arguments specified in the .Args() method.

    Returns:
      subcommand_two_level: two level subcommand in unified format
    Nr   )r   r   r   )rN   rO   rP   s      r8   rK   zRun._GetSubcommandTwoLevel   s`    ( !OOT2"oot/E/EF   
"oot}}Q/?@rJ   c                 p   fd}d | j                  |      \  }} |||| j                  |j                     j                  | j                  |j                     j                         | j                  |j                     j
                  sy dj                  ||      }|| j                  |j                     j
                  v rh |||| j                  |j                     j
                  |   j                  | j                  |j                     j
                  |   j                         y  ||       y )Nc                 H    t        j                  |||      s
 | |       y y N)rD   IsVersionInRange)commandrG   r*   r+   _RaiseLackOfSupportErrors       r8   _CheckIsSupportedSubcommandzFRun.CheckSubcommandAirflowSupport.<locals>._CheckIsSupportedSubcommand  s'    (99
<5 /:5rJ   c                 L    t        j                  dj                  | |            )NzWThe subcommand "{}" is not supported for Composer environments with Airflow version {}.)r[   r\   r&   )rn   rG   s     r8   ro   zCRun.CheckSubcommandAirflowSupport.<locals>._RaiseLackOfSupportError  s(    &&,fWo&FI IrJ   z{} {})rK   r$   r   r*   r+   r,   r&   )rN   rO   rG   rp   r   r   two_level_subcommand_stringro   s          @r8   CheckSubcommandAirflowSupportz!Run.CheckSubcommandAirflowSupport
  s!   ;I
 %)$?$?$E!J!O!!$//2??!!$//2==?
 $$334")..=N"OD55334!
%

#
#DOO
4%%&799E

#
#DOO
4%%&799CE :OLrJ   c                 v    |j                   r-t        j                  |dd       st        j                  d      y y )Nz1.10.14zgNested subcommands are supported only for Composer environments with Airflow version 1.10.14 or higher.)r   rD   rm   r[   r\   )rN   rO   rG   s      r8   #CheckSubcommandNestedAirflowSupportz'Run.CheckSubcommandNestedAirflowSupport.  sB    '88Y.45 5. 	rJ   c                     |j                   j                  xr  |j                   j                  j                  }|r!t        j                  t        |      dz         S |S )Nz Make sure you have followed https://cloud.google.com/composer/docs/how-to/accessing/airflow-cli#private-ip to enable access to your private Cloud Composer environment from your machine.)configprivateEnvironmentConfigenablePrivateEnvironmentr[   r\   str)rN   errorenv_obj
is_privates       r8   ConvertKubectlErrorzRun.ConvertKubectlError6  s]    // 	I//HH  
e*  LrJ   c                 4    t        j                  d|      d   S )Nz-airflow-([\d\.]+)r   )refindall)rN   image_versions     r8   _ExtractAirflowVersionzRun._ExtractAirflowVersionD  s    ::+];A>>rJ   c                 <   |j                   j                  }t        j                  |      }d|v}t        j                  ||d      5  	 |j                   j
                  j                  }| j                  |      }| j                  ||       | j                  ||       t        j                  |      }t        j                  t        |      }	t        j                  j                  dj!                  |             | j#                  ||       d|	dg}
|r|
j%                  d       |
j'                  dt(        d	d
|j*                  g       |j,                  r|
j%                  |j,                         |j.                  r|
j'                  |j.                         t        j0                  t        j2                  ||
      t        j4                  j                         	 ddd       y# t        j6                  $ r}| j9                  ||      d}~ww xY w# 1 sw Y   yxY w)a  Runs Airflow command using kubectl on the GKE Cluster.

    This mode the command is executed by connecting to the cluster and
    running `kubectl pod exec` command.
    It requires access to GKE Control plane.

    Args:
      args: argparse.Namespace, An object that contains the values for the
        arguments specified in the .Args() method.
      env_obj: Cloud Composer Environment object.
    zno-ttyN)
pod_substrkubectl_namespacez?Executing within the following Kubernetes cluster namespace: {}execz--stdinz--ttyz--containerz--airflow)out_func)rw   
gkeClusterr[   ExtractGkeClusterLocationIdTemporaryKubeconfigsoftwareConfigimageVersionr   rs   ru   FetchKubectlNamespace	GetGkePodWORKER_POD_SUBSTRr   statusPrintr&   rQ   r-   extendWORKER_CONTAINERr   r   r   RunKubectlCommandAddKubectlNamespaceoutKubectlErrorr~   )rN   rO   r|   
cluster_idcluster_location_idttyr   rG   
kubectl_nspodkubectl_argses               r8   _RunKubectlzRun._RunKubectlG  s    **J&BB7K
$
C		)	)Z
355BB55mD**4A00G!77F
$$(JH 	


#	% 	%%dO<Y/


g
&,dItO	Q!!


d44
5==


dmm
,&&,,ZFWW]]	$9
 
> && 3&&q'223?
 
s+   H	FG%%H8H

HHHc           
      
   |j                   j                  j                  }| j                  |      }|j                  j
                  j                         }| j                  ||       | j                  ||       | j                  ||       |j                  g}|j                  r|j                  |j                         |j                  r|j                  |j                         t        j                   j#                  dj%                  dj'                  |                   t)        j*                  |j                  |j                  xs d|j                  xs g || j-                               }|rD|j.                  r8t        j                   j#                  dj%                  |j.                               |j.                  st1        j2                  d      t        j                   j#                  d       d}d	}	d
}
t4        }d }d}d}|sV|sSd }	 t7        j8                         5  t;        j<                  |t?        j@                  tB         tB              z          t)        jD                  |j.                  |jF                  |jH                  |	|| j-                               }d
}
|jJ                  }|jL                  }|jO                  d        d d d        |
tX        k(  rt1        j2                  d      |st[        |t\        z  t^              }nLt4        }|D ]/  }t        j"                  |j`                  r|j`                  nd       1 |d   jb                  d	z   }	|s|sS|r|jd                  r|jd                  jf                  r|jd                  jh                  r8t        jh                  dj%                  |jd                  jh                               t        jh                  dj%                  |jd                  jf                               tk        |jd                  jf                         y y y y # 1 sw Y   axY w# tP        $ r t        j                   j#                  d       	 t        jR                  d       t)        jT                  |j.                  |jF                  ||jH                  || j-                               }t        jR                  dtW        |      z          |r2|jL                  r&|jL                  D ]  }t        j"                  |        |rd}d}n!#  t        jR                  d       |
d	z  }
Y nxY wY i |
d	z  }
Y rxY w)Nz(Executing the command: [ airflow {} ]...rY    )rn   r   
parametersenvironment_refrelease_trackz)Command has been started. execution_id={}zBCannot execute subcommand for environment. Got empty execution Id.z#Use ctrl-c to interrupt the commandF   r   )execution_idpod_namepod_namespacenext_line_numberr   r   c                     | j                   S rl   )
lineNumber)lines    r8   <lambda>zRun._RunApi.<locals>.<lambda>  s    doorJ   )keyzInterrupting the command...zStopping the airflow command...)r   r   forcer   r   r   zStop airflow command result...Tz7Error during stopping airflow command. Retrying pollingz$Cannot fetch airflow command status.zError message: {}zCommand exit code: {})6rw   r   r   r   CONCEPTSenvironmentParsers   ru   rQ   r   r   r-   r   r   r   r   r   r&   r'   environments_api_utilExecuteAirflowCommandReleaseTrackexecutionIdr[   r\   DEFAULT_POLL_TIME_SECONDSr   RaisesKeyboardInterrupttimesleeprandomuniformPOLL_JITTER_SECONDSPollAirflowCommandr   podNamespace	outputEndoutputsortKeyboardInterruptdebugStopAirflowCommandrz   MAX_CONSECUTIVE_POLL_ERRORSminEXP_BACKOFF_MULTIPLIERMAX_POLL_TIME_SECONDScontentr   exitInfoexitCoder{   exit)rN   rO   r|   r   rG   env_refr4   execute_result
output_end	next_linecur_consequetive_poll_errorswait_time_secondspoll_resultinterrupted
force_stoplinesstop_resultr   s                     r8   _RunApizRun._RunApiz  s^   NN11>>M11-@Omm''--/G&&t_=,,T?C!!$8??
C	jj''(}}	jjJJ299#((3-H +@@))/R==&B'')N .44	jj
5
<
<(( %%
N  JJ:;JI#$ 1KKJe)*446
** 335HIJ .@@)55%))*77(% --/+ *+
&",,*$$%
**5*
6! 7T 
&)D	D  !GHH 668M
 6D
))DLLDLLb
9 "I((1,	q t {++0D0D0M0M				#	#		%,,[-A-A-G-GHI	ii'..{/C/C/L/LMN
;(()	 1N+{o 76"  ,

67	,
))5
6-@@)55%))*77% --/+ ))4S5EE
F[//#**iio +J+	,
))M
N
&!
+
&*$)$sD   P= 'B*P0P= 0P:5P= =(U&B7TUT<:UUc                    | j                  |       | j                  |       t        j                  | j	                               j
                  j                  j                  }|j                  j                  j                         }t        j                  || j	                               }|j                  |k7  r.t        j                  dj!                  |j                              t#        j$                  |j&                  j(                  j*                        r| j-                  ||       y | j/                  ||       y )N)r   zGCannot execute subcommand for environment in state {}. Must be RUNNING.)r   )rh   r_   api_utilGetMessagesModuler   EnvironmentStateValueValuesEnumRUNNINGr   r   r   r   Getstater[   r\   r&   rD   %IsVersionAirflowCommandsApiCompatiblerw   r   r   r   r   )rN   rO   running_stater   r|   s        r8   r   zRun.Run  s    !!$'  &..'')k&&ww  mm''--/G#''t002G }}%#VGMM2  #HHnn33@@ ll4!
tW%rJ   N)__name__
__module____qualname____doc__r[   r$   classmethodr9   rQ   r_   rh   rK   rs   ru   r~   r   r   r   r    rJ   r8   r   r   3   sj    &P &::3H 3Hj,$\JB :"MH5?13fi*V&rJ   r   )%r   
__future__r   r   r   r.   r   r   r   googlecloudsdk.api_lib.composerr   r   r   r   googlecloudsdk.callioper   #googlecloudsdk.command_lib.composerr	   rD   r
   r[   googlecloudsdk.corer   r   googlecloudsdk.core.consoler   r   r   rf   r   r   r   r   r   DefaultUniverseOnlyCommandr   r   rJ   r8   <module>r      s    C &  '   	  V < ( b = D / # 2$ # 5         H&$,, H& H&rJ   