
    h                         S r SSKrSSKrSSKr SSKrSSKJr  SSK	J
r
  SSKJr   " S S\R                  5      r " S	 S
\R                  5      r " S S\R                  5      rg! \ a    Sr NVf = f)z&
Tests for L{twisted.internet.fdesc}.
    N)fdescznot supported on this platform)untilConcludes)unittestc                   $    \ rS rSrSrS rS rSrg)NonBlockingTests   z=
Tests for L{fdesc.setNonBlocking} and L{fdesc.setBlocking}.
c                    [         R                  " 5       u  pU R                  [         R                  U5        U R                  [         R                  U5        U R	                  [
        R
                  " U[
        R                  5      [         R                  -  5        [        R                  " U5        U R                  [
        R
                  " U[
        R                  5      [         R                  -  5        g)zB
L{fdesc.setNonBlocking} sets a file description to non-blocking.
N)ospipe
addCleanupcloseassertFalsefcntlF_GETFL
O_NONBLOCKr   setNonBlocking
assertTrueselfrws      ^/root/1688_scrapy/alibaba-scraper/venv/lib/python3.13/site-packages/twisted/test/test_fdesc.pytest_setNonBlocking$NonBlockingTests.test_setNonBlocking   s     wwy!$!$Q6FGQAu}}5EF    c                    [         R                  " 5       u  pU R                  [         R                  U5        U R                  [         R                  U5        [        R
                  " U5        [        R                  " U5        U R                  [        R                  " U[        R                  5      [         R                  -  5        g)z;
L{fdesc.setBlocking} sets a file description to blocking.
N)r
   r   r   r   r   r   setBlockingr   r   r   r   r   s      r   test_setBlocking!NonBlockingTests.test_setBlocking'   sw     wwy!$!$Q!Q6FGr    N)__name__
__module____qualname____firstlineno____doc__r   r   __static_attributes__r    r   r   r   r      s    	G	Hr   r   c                   `    \ rS rSrSrS rS rS rS rS r	S r
S	 rS
 rS rS rS rS rSrg)ReadWriteTests3   z4
Tests for L{fdesc.readFromFD}, L{fdesc.writeToFD}.
c                     [         R                  " 5       u  U l        U l        [        R
                  " U R                  5        [        R
                  " U R                  5        g)z7
Create a non-blocking pipe that can be used in tests.
N)r
   r   r   r   r   r   r   s    r   setUpReadWriteTests.setUp8   s;     TVV$TVV$r   c                      [         R                  " U R                  5         [         R                  " U R                  5        g! [         a     N.f = f! [         a     gf = f)z
Close pipes.
N)r
   r   r   OSErrorr   r+   s    r   tearDownReadWriteTests.tearDown@   sS    	HHTVV	HHTVV  		  		s"    A  A 
AA
A! A!c                 D    [         R                  " U R                  U5      $ )z
Write data to the pipe.
)r   	writeToFDr   )r   ds     r   writeReadWriteTests.writeM   s     tvvq))r   c                     / n[         R                  " U R                  UR                  5      nUc  U(       a  US   $ gU$ )z
Read data from the pipe.
r   r   )r   
readFromFDr   append)r   lress      r   readReadWriteTests.readS   s<     tvvqxx0;tJr   c                     U R                  S5      nU R                  US:  5        U R                  5       nU R                  [	        U5      U5        U R                  SSU U5        g)z|
Test that the number of bytes L{fdesc.writeToFD} reports as written
with its return value are seen by L{fdesc.readFromFD}.
s   hellor   N)r5   r   r<   assertEquallen)r   nss      r   test_writeAndRead ReadWriteTests.test_writeAndReada   sW    
 JJx AIIKQ#"1q)r   c                    SS-  nU R                  U5      nU R                  US:  5        / nSnSnXB:  d  US:  aD  UR                  U R                  5       5        U[	        US   5      -  nUS-  nXB:  a  M<  US:  a  MD  SR                  U5      nU R                  [	        U5      U5        U R                  USU U5        g)	ze
Similar to L{test_writeAndRead}, but use a much larger string to verify
the behavior for that case.
s
   0123456879i'  r   2      r   N)r5   r   r9   r<   r@   joinr?   )r   origwrittenresultresultlengthis         r   test_writeAndReadLarge%ReadWriteTests.test_writeAndReadLargel   s    
 u$**T"!$$BMM$))+&Cr
O+LFA	 $B
 &!Vg.hw0r   c                     / n[         R                  " U R                  UR                  5      nU R	                  U/ 5        U R                  U5        g)z
Verify that reading from a file descriptor with no data does not raise
an exception and does not result in the callback function being called.
N)r   r8   r   r9   r?   assertIsNone)r   r:   rL   s      r   test_readFromEmpty!ReadWriteTests.test_readFromEmpty   sA    
 !!$&&!((3B&!r   c                     [         R                  " U R                  5        U R                  U R	                  5       [
        R                  5        g)zn
Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor
returns a connection done indicator.
N)r
   r   r   r?   r<   r   CONNECTION_DONEr+   s    r   test_readFromCleanClose&ReadWriteTests.test_readFromCleanClose   0    
 	e&;&;<r   c                     [         R                  " U R                  5        U R                  U R	                  S5      [
        R                  5        g)zq
Verify that writing with L{fdesc.writeToFD} when the read end is closed
results in a connection lost indicator.
   sN)r
   r   r   r?   r5   r   CONNECTION_LOSTr+   s    r   test_writeToClosed!ReadWriteTests.test_writeToClosed   3    
 	D)5+@+@Ar   c                     [         R                  " U R                  5        U R                  U R	                  5       [
        R                  5        g)zr
Verify that reading with L{fdesc.readFromFD} when the read end is
closed results in a connection lost indicator.
N)r
   r   r   r?   r<   r   r\   r+   s    r   test_readFromInvalid#ReadWriteTests.test_readFromInvalid   rY   r   c                     [         R                  " U R                  5        U R                  U R	                  S5      [
        R                  5        g)zr
Verify that writing with L{fdesc.writeToFD} when the write end is
closed results in a connection lost indicator.
r[   N)r
   r   r   r?   r5   r   r\   r+   s    r   test_writeToInvalid"ReadWriteTests.test_writeToInvalid   r_   r   c                 P   [         R                  nS nU[         l         U R                  U R                  S5      S5        U[         l        S nU[         l         U R                  U R                  S5      S5        U[         l        g! U[         l        f = f! U[         l        f = f)z(
Test error path for L{fdesc.writeTod}.
c                 D    [        5       n[        R                  Ul        UeN)r/   errnoEAGAINfddataerrs      r   eagainWrite4ReadWriteTests.test_writeErrors.<locals>.eagainWrite   s    )CCIIr   r[   r   c                 D    [        5       n[        R                  Ul        Uerh   )r/   ri   EINTRrk   s      r   
eintrWrite3ReadWriteTests.test_writeErrors.<locals>.eintrWrite   s    )CCIIr   N)r
   r5   r?   )r   
oldOsWritero   rs   s       r   test_writeErrorsReadWriteTests.test_writeErrors   s     XX
	
 	"TZZ-q1!BH	
 	"TZZ-q1!BH "BH "BHs   !B !B BB%)r   r   N)r!   r"   r#   r$   r%   r,   r0   r5   r<   rC   rO   rS   rW   r]   ra   rd   rv   r&   r    r   r   r(   r(   3   sE    %*	*1("=B=B"r   r(   c                   .    \ rS rSrSrSrS rS rS rSr	g)	CloseOnExecTests   zD
Tests for L{fdesc._setCloseOnExec} and L{fdesc._unsetCloseOnExec}.
z
import os, errno
try:
    os.write(%d, b'lul')
except OSError as e:
    if e.errno == errno.EBADF:
        os._exit(0)
    os._exit(5)
except BaseException:
    os._exit(10)
else:
    os._exit(20)
c                    [         R                  " 5       nUS:X  aT   [         R                  " [        R                  [        R                  SU R
                  UR                  5       4-  /5        g [        [         R                  US5      S   $ ! [         a-    SS KnUR                  5         [         R                  " S5         g f = f)Nr   z-c   rH   )r
   forkexecvsys
executableprogramfilenoBaseException	traceback	print_exc_exitr   waitpid)r   fObjpidr   s       r   _execWithFileDescriptor(CloseOnExecTests._execWithFileDescriptor   s    ggi!8	NN^^T4<<4;;=:J+JK ""**c15a88 !  ##%	s   AB 4CCc                 l   [        U R                  5       S5       n[        R                  " UR	                  5       5        U R                  U5      nU R                  [        R                  " U5      5        U R                  [        R                  " U5      S5        SSS5        g! , (       d  f       g= f)z
A file descriptor passed to L{fdesc._setCloseOnExec} is not inherited
by a new process image created with one of the exec family of
functions.
wbr   N)openmktempr   _setCloseOnExecr   r   r   r
   	WIFEXITEDr?   WEXITSTATUSr   r   statuss      r   test_setCloseOnExec$CloseOnExecTests.test_setCloseOnExec   st     $++-&$!!$++-011$7FOOBLL01R^^F3Q7	 '&&s   BB%%
B3c                    [        U R                  5       S5       n[        R                  " UR	                  5       5        [        R
                  " UR	                  5       5        U R                  U5      nU R                  [        R                  " U5      5        U R                  [        R                  " U5      S5        SSS5        g! , (       d  f       g= f)z
A file descriptor passed to L{fdesc._unsetCloseOnExec} is inherited by
a new process image created with one of the exec family of functions.
r      N)r   r   r   r   r   _unsetCloseOnExecr   r   r
   r   r?   r   r   s      r   test_unsetCloseOnExec&CloseOnExecTests.test_unsetCloseOnExec   s    
 $++-&$!!$++-0##DKKM211$7FOOBLL01R^^F3R8 '&&s   B%C		
Cr    N)
r!   r"   r#   r$   r%   r   r   r   r   r&   r    r   r   ry   ry      s    G9$
8
9r   ry   )r%   ri   r
   r   r   twisted.internetr   ImportErrorskiptwisted.python.utilr   twisted.trialr   SynchronousTestCaser   r(   ry   r    r   r   <module>r      sz     	 
' ' . "Hx33 H8Q"X11 Q"h;9x33 ;9s  ,+D,s   A& &A10A1