
    h
                         S r SSKrSSKrSSKJr   SSKrSrSSKJ	r	  SSK
JrJr   " S S	5      r\	R                  " \5         " S
 S\5      r " S S\5      rg! \ a    Sr NKf = f)z)
Tests for L{twisted.python.threadable}.
    N)skipIfFT)
threadable)FailTestSynchronousTestCasec                   (    \ rS rSrS/rSrSrS rSrg)
TestObject   aMethod   c                     [        S5       Hi  nU R                  U R                  sU l        U l        U R                  U R                  -   U l        U R                  S:X  a  MV   SU R                  4-  5       e   g )N
   r   zz == %d, not 0 as expected)rangeyxz)selfis     c/root/1688_scrapy/alibaba-scraper/venv/lib/python3.13/site-packages/twisted/test/test_threadable.pyr
   TestObject.aMethod   s[    rA!VVTVVNDFDFVVdff_DF66Q;H <y HH;     )r   r   r   N)	__name__
__module____qualname____firstlineno__synchronizedr   r   r
   __static_attributes__ r   r   r   r      s    ;L
A	AIr   r   c                   b    \ rS rSrS rS r\" \S5      S 5       r\" \S5      S 5       r	S r
Srg	)
SynchronizationTests(   c                     U R                  [        R                  [        R                  " 5       5        [        R                  " S5        g)z
Reduce the CPython check interval so that thread switches happen much
more often, hopefully exercising more possible race conditions.  Also,
delay actual test startup until the reactor has been started.
gHz>N)
addCleanupsyssetswitchintervalgetswitchintervalr   s    r   setUpSynchronizationTests.setUp)   s/     	--s/D/D/FGi(r   c                 X    U R                  S[        R                  R                  5        g)zS
The name of a synchronized method is inaffected by the synchronization
decorator.
r
   N)assertEqualr   r
   r   r'   s    r   test_synchronizedName*SynchronizationTests.test_synchronizedName2   s     
 	J$6$6$?$?@r   !Platform does not support threadsc                   ^ [         R                  " 5         / m[        R                  " U4S jS9nUR	                  5         UR                  5         U R                  TS   S5        U R                  [         R                  " 5       S5        g)z
L{threadable.isInIOThread} returns C{True} if and only if it is called
in the same thread as L{threadable.registerAsIOThread}.
c                  L   > T R                  [        R                  " 5       5      $ N)appendr   isInIOThread)foreignResults   r   <lambda>8SynchronizationTests.test_isInIOThread.<locals>.<lambda>B   s    =//
0G0G0IJr   targetr   z#Non-IO thread reported as IO threadz#IO thread reported as not IO threadN)	r   registerAsIOThread	threadingThreadstartjoinassertFalse
assertTruer3   )r   tr4   s     @r   test_isInIOThread&SynchronizationTests.test_isInIOThread9   sn     	%%'J
 	
		q)+PQ##%'L	
r   c                   ^^ [        5       m/ mUU4S jn/ n[        S5       H8  n[        R                  " US9nUR	                  U5        UR                  5         M:     U H  nUR                  5         M     T(       a  [        T5      eg )Nc                     >  [        S5       H  n TR                  5         M     g ! [         a$  nTR                  [	        U5      5         S nAg S nAff = fNi  )r   r
   AssertionErrorr2   str)r   eerrorsos     r   callMethodLotsHSynchronizationTests.testThreadedSynchronization.<locals>.callMethodLotsQ   sB    &tAIIK %! &c!f%%&s   "& 
AAA   r7   )r   r   r:   r;   r2   r<   r=   r   )r   rK   threadsr   r@   rI   rJ   s        @@r   testThreadedSynchronization0SynchronizationTests.testThreadedSynchronizationK   sx    L	& qA  7ANN1GGI 
 AFFH  6"" r   c                 \    [        5       n[        S5       H  nUR                  5         M     g rE   )r   r   r
   )r   rJ   r   s      r   testUnthreadedSynchronization2SynchronizationTests.testUnthreadedSynchronizationd   s     LtAIIK r   r   N)r   r   r   r   r(   r,   r   threadingSkiprA   rO   rR   r   r   r   r   r    r    (   sL    )A M>?
 @
" M>?# @#0r   r    c                   8    \ rS rSr\" \S5      S 5       rS rSrg)SerializationTestsj   r.   c                     [         R                  " 5       n[        U5      n[        R                  " U5      n[        R
                  " U5      nU R                  XB5        g r1   )r   XLocktypepickledumpsloadsassertIsInstance)r   locklockType
lockPicklenewLocks        r   testPicklingSerializationTests.testPicklingk   sC    !:\\$'
,,z*g0r   c                     Sn[         R                  " U5      n[         R                  " US5      n[         R                  " U5        g )Ns6   ctwisted.python.threadable
unpickle_lock
p0
(tp1
Rp2
.   )r[   r]   r\   )r   ra   r_   	newPickles       r   testUnpickling!SerializationTests.testUnpicklings   s1    S
||J'LLq)	Yr   r   N)	r   r   r   r   r   rT   rc   rh   r   r   r   r   rV   rV   j   s"    M>?1 @1 r   rV   )__doc__r[   r$   unittestr   r:   rT   ImportErrortwisted.pythonr   twisted.trial.unittestr   r   r   synchronizer    rV   r   r   r   <module>rp      sx   
  
  M % @
I 
I   z "?. ?D ,  w  Ms   A A$#A$