
    h@5                         S r SSKrSSKrSSKrSSKrSSKrSSKJr  SSKJ	r	J
r
JrJr  SSKJr  SSKJr  SSKJr  SSKJrJr   " S	 S
\5      r " S S\5      r " S S\5      rg)zD
Test running processes with the APIs in L{twisted.internet.utils}.
    N)skipIf)error
interfacesreactorutils)Deferred)platform)SuppressedWarningsTests)SynchronousTestCaseTestCasec                       \ rS rSrSr\R                  " \S5      c  SrSr	Sr
\R                  rS rS rS rS rS	 rS
 r\" \R,                  " 5       S5      S 5       rS rS rS rS rS rS rS rS rS r Sr!g)ProcessUtilsTests   zh
Test running a process using L{getProcessOutput}, L{getProcessValue}, and
L{getProcessOutputAndValue}.
Nz)reactor doesn't implement IReactorProcessc                 *   U R                  5       n[        US5       nUR                  [        R                  R                  U5      [        R                  -   5        SSS5        [        R                  R                  U5      $ ! , (       d  f       N-= f)zR
Write the given list of lines to a text file and return the absolute
path to it.
wtN)mktempopenwriteoslinesepjoinpathabspath)selfsourceLinesscript
scriptFiles       _/root/1688_scrapy/alibaba-scraper/venv/lib/python3.13/site-packages/twisted/test/test_iutils.pymakeSourceFile ProcessUtilsTests.makeSourceFile$   s`    
 &$:RZZ__[9BJJFG  wwv&&  s   A B
Bc                     U R                  / SQ5      n[        R                  " U R                  SU/5      nUR	                  U R
                  S5      $ )z
L{getProcessOutput} returns a L{Deferred} which fires with the complete
output of the process it runs after that process exits.
)
import syszfor s in b'hello world\n':z    s = bytes([s])z    sys.stdout.buffer.write(s)z    sys.stdout.flush()-us   hello world
r   r   getProcessOutputexeaddCallbackassertEqualr   r   ds      r   test_outputProcessUtilsTests.test_output.   sN    
 ((

 ""488dJ-?@}}T--/?@@    c                    ^  T R                  SS/5      n[        R                  " T R                  SU/5      nT R	                  U[
        5      nU 4S jnUR                  U5        U$ )z
The L{Deferred} returned by L{getProcessOutput} is fired with an
L{IOError} L{Failure} if the child process writes to stderr.
r"   z!sys.stderr.write("hello world\n")r#   c                 X   > TR                  U R                  [        R                  5      $ N)assertFailureprocessEndedr   ProcessDone)errr   s    r   cbFailed?ProcessUtilsTests.test_outputWithErrorIgnored.<locals>.cbFailedL   s"    %%c&6&68I8IJJr-   )r   r   r%   r&   r1   IOErrorr'   )r   r   r*   r5   s   `   r   test_outputWithErrorIgnored-ProcessUtilsTests.test_outputWithErrorIgnored?   sf     ((?@

 ""488dJ-?@q'*	K 	
hr-   c                     U R                  / SQ5      n[        R                  " U R                  SU/SS9nUR	                  U R
                  S5      $ )z
If a C{True} value is supplied for the C{errortoo} parameter to
L{getProcessOutput}, the returned L{Deferred} fires with the child's
stderr output as well as its stdout output.
)r"   zsys.stdout.write("foo")sys.stdout.flush()zsys.stderr.write("foo")sys.stderr.flush()r#   T)errortoos   foofoor$   r)   s      r   test_outputWithErrorCollected/ProcessUtilsTests.test_outputWithErrorCollectedR   sO     ((


 ""488dJ-?$O}}T--y99r-   c                     U R                  S/5      n[        R                  " U R                  SU/5      nUR	                  U R
                  S5      $ )zd
The L{Deferred} returned by L{getProcessValue} is fired with the exit
status of the child process.
zraise SystemExit(1)r#      )r   r   getProcessValuer&   r'   r(   r)   s      r   
test_valueProcessUtilsTests.test_valueg   sJ    
 ((*?)@A
!!$((T:,>?}}T--q11r-   c                    ^  T R                  / SQ5      nU 4S jn[        R                  " T R                  SU/5      nUR	                  U5      $ )z
The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
three-tuple, the elements of which give the data written to the child's
stdout, the data written to the child's stderr, and the exit status of
the child.
)r"   z*sys.stdout.buffer.write(b'hello world!\n')z,sys.stderr.buffer.write(b'goodbye world!\n')zsys.exit(1)c                 |   > U u  pnTR                  US5        TR                  US5        TR                  US5        g )Ns   hello world!
s   goodbye world!
rA   r(   )out_err_codeoutr4   coder   s       r   gotOutputAndValue@ProcessUtilsTests.test_outputAndValue.<locals>.gotOutputAndValue   s>    )NCdS"34S"56T1%r-   r#   r   r   getProcessOutputAndValuer&   r'   r   r   rK   r*   s   `   r   test_outputAndValue%ProcessUtilsTests.test_outputAndValueq   sL     ((

	& **488dJ5GH}}.//r-   z"Windows doesn't have real signals.c                    ^  T R                  / SQ5      nU 4S jn[        R                  " T R                  SU/5      nT R	                  U[
        5      nUR                  U5      $ )z
If the child process exits because of a signal, the L{Deferred}
returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
containing the child's stdout, stderr, and the signal which caused
it to exit.
)zimport sys, os, signalz"sys.stdout.write('stdout bytes\n')z"sys.stderr.write('stderr bytes\n')r;   r<   z$os.kill(os.getpid(), signal.SIGKILL)c                    > U u  pnTR                  US5        TR                  US5        TR                  U[        R                  5        g )Ns   stdout bytes
s   stderr bytes
)r(   signalSIGKILL)out_err_sigrI   r4   sigr   s       r   rK   >ProcessUtilsTests.test_outputSignal.<locals>.gotOutputAndValue   sB    'MCcS"34S"34S&..1r-   r#   )r   r   rN   r&   r1   tupler'   rO   s   `   r   test_outputSignal#ProcessUtilsTests.test_outputSignal   s^     ((	

	2 **488dJ5GHq%(}}.//r-   c                 @   [         R                  R                  U R                  5       5      n[         R                  " U5        U R                  SS/5      nU" U R                  SU/US9nUR                  X#R                  [        R                  " 5       5      5        U$ )Nimport os, syszsys.stdout.write(os.getcwd())r#   )r   )r   r   r   r   makedirsr   r&   r'   encodesysgetfilesystemencoding)r   utilFunccheckdirr   r*   s         r   	_pathTestProcessUtilsTests._pathTest   s{    ggoodkkm,
C((>?

 TXXj1<	eZZ(A(A(CDEr-   c                 V    U R                  [        R                  U R                  5      $ )zg
L{getProcessOutput} runs the given command with the working directory
given by the C{path} parameter.
)re   r   r%   r(   r   s    r   test_getProcessOutputPath+ProcessUtilsTests.test_getProcessOutputPath   s!    
 ~~e44d6F6FGGr-   c                 P   ^  U 4S jnT R                  [        R                  U5      $ )zf
L{getProcessValue} runs the given command with the working directory
given by the C{path} parameter.
c                 *   > TR                  U S5        g Nr   rG   resultignoredr   s     r   rc   9ProcessUtilsTests.test_getProcessValuePath.<locals>.check       VQ'r-   )re   r   rB   r   rc   s   ` r   test_getProcessValuePath*ProcessUtilsTests.test_getProcessValuePath   s!    	( ~~e33U;;r-   c                 P   ^  U 4S jnT R                  [        R                  U5      $ )zo
L{getProcessOutputAndValue} runs the given command with the working
directory given by the C{path} parameter.
c                 V   > U u  p#nTR                  X!5        TR                  US5        g rm   rG   out_err_statusrd   rI   r4   statusr   s        r   rc   BProcessUtilsTests.test_getProcessOutputAndValuePath.<locals>.check   +    -CfS&VQ'r-   )re   r   rN   rs   s   ` r   !test_getProcessOutputAndValuePath3ProcessUtilsTests.test_getProcessOutputAndValuePath   s!    	(
 ~~e<<eDDr-   c                    [         R                  R                  U R                  5       5      n[         R                  " U5        U R                  / SQ5      nU R                  [         R                  [         R                  " 5       5        [         R                  " U5        [        R                  " [         R                  " S5      R                  5      n[         R                  " U[        R                  [        R                  -  5        U R                  [         R                  X55        U" U R                  SU/5      nUR!                  X#R#                  [$        R&                  " 5       5      5        U$ )N)r]   zcdir = os.getcwd()zsys.stdout.write(cdir).r#   )r   r   r   r   r^   r   
addCleanupchdirgetcwdstatS_IMODEst_modechmodS_IXUSRS_IRUSRr&   r'   r_   r`   ra   )r   rb   rc   rd   r   originalModer*   s          r   _defaultPathTest"ProcessUtilsTests._defaultPathTest   s    ggoodkkm,
C((N


 	"))+.
 ||BGGCL$8$89
 	dllT\\12
 	#4TXXj12	eZZ(A(A(CDEr-   c                 V    U R                  [        R                  U R                  5      $ )z
If no value is supplied for the C{path} parameter, L{getProcessOutput}
runs the given command in the same working directory as the parent
process and succeeds even if the current working directory is not
accessible.
)r   r   r%   r(   rh   s    r    test_getProcessOutputDefaultPath2ProcessUtilsTests.test_getProcessOutputDefaultPath   s#     $$U%;%;T=M=MNNr-   c                 P   ^  U 4S jnT R                  [        R                  U5      $ )z
If no value is supplied for the C{path} parameter, L{getProcessValue}
runs the given command in the same working directory as the parent
process and succeeds even if the current working directory is not
accessible.
c                 *   > TR                  U S5        g rm   rG   rn   s     r   rc   @ProcessUtilsTests.test_getProcessValueDefaultPath.<locals>.check  rr   r-   )r   r   rB   rs   s   ` r   test_getProcessValueDefaultPath1ProcessUtilsTests.test_getProcessValueDefaultPath   s#    	( $$U%:%:EBBr-   c                 P   ^  U 4S jnT R                  [        R                  U5      $ )z
If no value is supplied for the C{path} parameter,
L{getProcessOutputAndValue} runs the given command in the same working
directory as the parent process and succeeds even if the current
working directory is not accessible.
c                 V   > U u  p#nTR                  X!5        TR                  US5        g rm   rG   rx   s        r   rc   IProcessUtilsTests.test_getProcessOutputAndValueDefaultPath.<locals>.check  r|   r-   )r   r   rN   rs   s   ` r   (test_getProcessOutputAndValueDefaultPath:ProcessUtilsTests.test_getProcessOutputAndValueDefaultPath  s#    	(
 $$U%C%CUKKr-   c                    ^ ^ T R                  SS/5      nSm[        R                  " T R                  SU/TS9nU U4S jnUR	                  U5        U$ )zl
Standard input can be made available to the child process by passing
bytes for the `stdinBytes` parameter.
r"   z"sys.stdout.write(sys.stdin.read())s   These are the bytes to see.r#   )
stdinBytesc                 X   > U u  pnTR                  TU5        TR                  SU5        g rm   )assertInr(   )rH   rI   r4   rJ   r   r   s       r   rK   PProcessUtilsTests.test_get_processOutputAndValueStdin.<locals>.gotOutputAndValue(  s,    )NCd MM*c*Q%r-   rM   )r   r   r*   rK   r   s   `   @r   #test_get_processOutputAndValueStdin5ProcessUtilsTests.test_get_processOutputAndValueStdin  sb    
 ((4

 4
**HH:!
	& 	
'(r-    )"__name__
__module____qualname____firstlineno____doc__r   IReactorProcessr   skipoutputvaluer`   
executabler&   r   r+   r8   r>   rC   rP   r   r	   	isWindowsrZ   re   ri   rt   r}   r   r   r   r   r   __static_attributes__r   r-   r   r   r      s    
 !!'408:FE
..C'A"&:*202 H "FG0 H0>H	<E<OCLr-   r   c                       \ rS rSrSrS rSrg)SuppressWarningsTestsi4  z&
Tests for L{utils.suppressWarnings}.
c                 \  ^ / mU4S jnU R                  [        SU5        S n[        R                  " US[	        SS945      nU" S5        U R                  [        T5      S5        U" S5        U R                  [        T5      S5        U" S	5        U R                  [        T5      S
5        g)z[
L{utils.suppressWarnings} decorates a function so that the given
warnings are suppressed.
c                 *   > TR                  X45        g r0   )append)r   akwro   s      r   showwarning@SuppressWarningsTests.test_suppressWarnings.<locals>.showwarning@  s    MM1'"r-   r   c                 0    [         R                  " U 5        g r0   )warningswarn)msgs    r   f6SuppressWarningsTests.test_suppressWarnings.<locals>.fE  s    MM#r-   )ignorezThis is message)messagezSanity check messagerA   zUnignored message   N)patchr   r   suppressWarningsdictr(   len)r   r   r   gro   s       @r   test_suppressWarnings+SuppressWarningsTests.test_suppressWarnings9  s    
 	# 	

8]K8	 ""1{DAR4S&TU 	

 !Va( 	

Va( 	

Va(r-   r   N)r   r   r   r   r   r   r   r   r-   r   r   r   4  s    )r-   r   c                   H    \ rS rSrSr\" \R                  5      rS rS r	Sr
g)DeferredSuppressedWarningsTestsiZ  zT
Tests for L{utils.runWithWarningsSuppressed}, the version that supports
Deferreds.
c                 B  ^ S0 4S0 4/n[        5       mU R                  UU4S j5        [        R                  " S5        TR	                  S5        [        R                  " S5        U R                  S/U R                  5        Vs/ s H  o"S   PM	     sn5        gs  snf )	z
If the function called by L{utils.runWithWarningsSuppressed} returns a
C{Deferred}, the warning filters aren't removed until the Deferred
fires.
r   z.*foo.*r   z.*bar.*c                     > T $ r0   r   ro   s   r   <lambda>GDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<lambda>l  s    r-   
ignore foo   ignore foo 2r   N)r   runWithWarningsSuppressedr   r   callbackr(   flushWarnings)r   filterswro   s      @r   test_deferredCallback5DeferredSuppressedWarningsTests.test_deferredCallbackd  s     *2.1F0KL&&w?l#n%.)$BTBTBV+WBVQiLBV+WX+Ws   B
c                 v  ^ S0 4S0 4/n[        5       mU R                  UU4S j5      n[        R                  " S5        TR	                  [        5       5        UR                  S 5        [        R                  " S5        U R                  S/U R                  5        Vs/ s H  o3S   PM	     sn5        gs  snf )	z
If the function called by L{utils.runWithWarningsSuppressed} returns a
C{Deferred}, the warning filters aren't removed until the Deferred
fires with an errback.
r   r   c                     > T $ r0   r   r   s   r   r   FDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>z  s    Fr-   r   c                 ,    U R                  [        5      $ r0   )trapZeroDivisionError)r   s    r   r   r   }  s    qvv&78r-   r   r   N)	r   r   r   r   errbackr   
addErrbackr(   r   )r   r   r*   r   ro   s       @r   test_deferredErrback4DeferredSuppressedWarningsTests.test_deferredErrbackr  s     *2.1F0KL**7NCl#(*+	89n%.)$BTBTBV+WBVQiLBV+WX+Ws    B6
r   N)r   r   r   r   r   staticmethodr   r   r   r   r   r   r-   r   r   r   Z  s&     !-U-L-L MYYr-   r   )r   r   rT   r   r`   r   unittestr   twisted.internetr   r   r   r   twisted.internet.deferr   twisted.python.runtimer	   twisted.python.test.test_utilr
   twisted.trial.unittestr   r   r   r   r   r   r-   r   <module>r      s\   
 
   
   > > + + A @Z Zz#)/ #)L%Y&= %Yr-   