SubscriptionArbiter.smali 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. .class public Lio/reactivex/internal/subscriptions/SubscriptionArbiter;
  2. .super Ljava/util/concurrent/atomic/AtomicInteger;
  3. .source "SubscriptionArbiter.java"
  4. # interfaces
  5. .implements Lorg/reactivestreams/Subscription;
  6. # static fields
  7. .field private static final serialVersionUID:J = -0x1e62bfbf4b5b12feL
  8. # instance fields
  9. .field actual:Lorg/reactivestreams/Subscription;
  10. .field volatile cancelled:Z
  11. .field final missedProduced:Ljava/util/concurrent/atomic/AtomicLong;
  12. .field final missedRequested:Ljava/util/concurrent/atomic/AtomicLong;
  13. .field final missedSubscription:Ljava/util/concurrent/atomic/AtomicReference;
  14. .annotation system Ldalvik/annotation/Signature;
  15. value = {
  16. "Ljava/util/concurrent/atomic/AtomicReference<",
  17. "Lorg/reactivestreams/Subscription;",
  18. ">;"
  19. }
  20. .end annotation
  21. .end field
  22. .field requested:J
  23. .field protected unbounded:Z
  24. # direct methods
  25. .method public constructor <init>()V
  26. .locals 1
  27. .line 62
  28. invoke-direct {p0}, Ljava/util/concurrent/atomic/AtomicInteger;-><init>()V
  29. .line 63
  30. new-instance v0, Ljava/util/concurrent/atomic/AtomicReference;
  31. invoke-direct {v0}, Ljava/util/concurrent/atomic/AtomicReference;-><init>()V
  32. iput-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedSubscription:Ljava/util/concurrent/atomic/AtomicReference;
  33. .line 64
  34. new-instance v0, Ljava/util/concurrent/atomic/AtomicLong;
  35. invoke-direct {v0}, Ljava/util/concurrent/atomic/AtomicLong;-><init>()V
  36. iput-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedRequested:Ljava/util/concurrent/atomic/AtomicLong;
  37. .line 65
  38. new-instance v0, Ljava/util/concurrent/atomic/AtomicLong;
  39. invoke-direct {v0}, Ljava/util/concurrent/atomic/AtomicLong;-><init>()V
  40. iput-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedProduced:Ljava/util/concurrent/atomic/AtomicLong;
  41. return-void
  42. .end method
  43. # virtual methods
  44. .method public cancel()V
  45. .locals 1
  46. .line 176
  47. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->cancelled:Z
  48. if-nez v0, :cond_0
  49. const/4 v0, 0x1
  50. .line 177
  51. iput-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->cancelled:Z
  52. .line 179
  53. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drain()V
  54. :cond_0
  55. return-void
  56. .end method
  57. .method final drain()V
  58. .locals 1
  59. .line 184
  60. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->getAndIncrement()I
  61. move-result v0
  62. if-eqz v0, :cond_0
  63. return-void
  64. .line 187
  65. :cond_0
  66. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drainLoop()V
  67. return-void
  68. .end method
  69. .method final drainLoop()V
  70. .locals 19
  71. move-object/from16 v0, p0
  72. const/4 v1, 0x0
  73. const-wide/16 v2, 0x0
  74. const/4 v4, 0x1
  75. move-object v7, v1
  76. move-wide v4, v2
  77. const/4 v6, 0x1
  78. .line 198
  79. :cond_0
  80. iget-object v8, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedSubscription:Ljava/util/concurrent/atomic/AtomicReference;
  81. invoke-virtual {v8}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object;
  82. move-result-object v8
  83. check-cast v8, Lorg/reactivestreams/Subscription;
  84. if-eqz v8, :cond_1
  85. .line 201
  86. iget-object v8, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedSubscription:Ljava/util/concurrent/atomic/AtomicReference;
  87. invoke-virtual {v8, v1}, Ljava/util/concurrent/atomic/AtomicReference;->getAndSet(Ljava/lang/Object;)Ljava/lang/Object;
  88. move-result-object v8
  89. check-cast v8, Lorg/reactivestreams/Subscription;
  90. .line 204
  91. :cond_1
  92. iget-object v9, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedRequested:Ljava/util/concurrent/atomic/AtomicLong;
  93. invoke-virtual {v9}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
  94. move-result-wide v9
  95. cmp-long v11, v9, v2
  96. if-eqz v11, :cond_2
  97. .line 206
  98. iget-object v9, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedRequested:Ljava/util/concurrent/atomic/AtomicLong;
  99. invoke-virtual {v9, v2, v3}, Ljava/util/concurrent/atomic/AtomicLong;->getAndSet(J)J
  100. move-result-wide v9
  101. .line 209
  102. :cond_2
  103. iget-object v11, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedProduced:Ljava/util/concurrent/atomic/AtomicLong;
  104. invoke-virtual {v11}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
  105. move-result-wide v11
  106. cmp-long v13, v11, v2
  107. if-eqz v13, :cond_3
  108. .line 211
  109. iget-object v11, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedProduced:Ljava/util/concurrent/atomic/AtomicLong;
  110. invoke-virtual {v11, v2, v3}, Ljava/util/concurrent/atomic/AtomicLong;->getAndSet(J)J
  111. move-result-wide v11
  112. .line 214
  113. :cond_3
  114. iget-object v13, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  115. .line 216
  116. iget-boolean v14, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->cancelled:Z
  117. if-eqz v14, :cond_5
  118. if-eqz v13, :cond_4
  119. .line 218
  120. invoke-interface {v13}, Lorg/reactivestreams/Subscription;->cancel()V
  121. .line 219
  122. iput-object v1, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  123. :cond_4
  124. if-eqz v8, :cond_b
  125. .line 222
  126. invoke-interface {v8}, Lorg/reactivestreams/Subscription;->cancel()V
  127. goto :goto_0
  128. .line 225
  129. :cond_5
  130. iget-wide v14, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  131. const-wide v16, 0x7fffffffffffffffL
  132. cmp-long v18, v14, v16
  133. if-eqz v18, :cond_8
  134. .line 227
  135. invoke-static {v14, v15, v9, v10}, Lio/reactivex/internal/util/BackpressureHelper;->addCap(JJ)J
  136. move-result-wide v14
  137. cmp-long v18, v14, v16
  138. if-eqz v18, :cond_7
  139. sub-long v11, v14, v11
  140. cmp-long v14, v11, v2
  141. if-gez v14, :cond_6
  142. .line 232
  143. invoke-static {v11, v12}, Lio/reactivex/internal/subscriptions/SubscriptionHelper;->reportMoreProduced(J)V
  144. move-wide v11, v2
  145. :cond_6
  146. move-wide v14, v11
  147. .line 239
  148. :cond_7
  149. iput-wide v14, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  150. :cond_8
  151. if-eqz v8, :cond_a
  152. if-eqz v13, :cond_9
  153. .line 244
  154. invoke-interface {v13}, Lorg/reactivestreams/Subscription;->cancel()V
  155. .line 246
  156. :cond_9
  157. iput-object v8, v0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  158. cmp-long v9, v14, v2
  159. if-eqz v9, :cond_b
  160. .line 248
  161. invoke-static {v4, v5, v14, v15}, Lio/reactivex/internal/util/BackpressureHelper;->addCap(JJ)J
  162. move-result-wide v4
  163. move-object v7, v8
  164. goto :goto_0
  165. :cond_a
  166. if-eqz v13, :cond_b
  167. cmp-long v8, v9, v2
  168. if-eqz v8, :cond_b
  169. .line 252
  170. invoke-static {v4, v5, v9, v10}, Lio/reactivex/internal/util/BackpressureHelper;->addCap(JJ)J
  171. move-result-wide v4
  172. move-object v7, v13
  173. :cond_b
  174. :goto_0
  175. neg-int v6, v6
  176. .line 257
  177. invoke-virtual {v0, v6}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->addAndGet(I)I
  178. move-result v6
  179. if-nez v6, :cond_0
  180. cmp-long v1, v4, v2
  181. if-eqz v1, :cond_c
  182. .line 260
  183. invoke-interface {v7, v4, v5}, Lorg/reactivestreams/Subscription;->request(J)V
  184. :cond_c
  185. return-void
  186. .end method
  187. .method public final isCancelled()Z
  188. .locals 1
  189. .line 280
  190. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->cancelled:Z
  191. return v0
  192. .end method
  193. .method public final isUnbounded()Z
  194. .locals 1
  195. .line 272
  196. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->unbounded:Z
  197. return v0
  198. .end method
  199. .method public final produced(J)V
  200. .locals 5
  201. .line 145
  202. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->unbounded:Z
  203. if-eqz v0, :cond_0
  204. return-void
  205. .line 148
  206. :cond_0
  207. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->get()I
  208. move-result v0
  209. if-nez v0, :cond_4
  210. const/4 v0, 0x0
  211. const/4 v1, 0x1
  212. invoke-virtual {p0, v0, v1}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->compareAndSet(II)Z
  213. move-result v0
  214. if-eqz v0, :cond_4
  215. .line 149
  216. iget-wide v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  217. const-wide v2, 0x7fffffffffffffffL
  218. cmp-long v4, v0, v2
  219. if-eqz v4, :cond_2
  220. sub-long/2addr v0, p1
  221. const-wide/16 p1, 0x0
  222. cmp-long v2, v0, p1
  223. if-gez v2, :cond_1
  224. .line 154
  225. invoke-static {v0, v1}, Lio/reactivex/internal/subscriptions/SubscriptionHelper;->reportMoreProduced(J)V
  226. goto :goto_0
  227. :cond_1
  228. move-wide p1, v0
  229. .line 157
  230. :goto_0
  231. iput-wide p1, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  232. .line 160
  233. :cond_2
  234. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->decrementAndGet()I
  235. move-result p1
  236. if-nez p1, :cond_3
  237. return-void
  238. .line 164
  239. :cond_3
  240. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drainLoop()V
  241. return-void
  242. .line 169
  243. :cond_4
  244. iget-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedProduced:Ljava/util/concurrent/atomic/AtomicLong;
  245. invoke-static {v0, p1, p2}, Lio/reactivex/internal/util/BackpressureHelper;->add(Ljava/util/concurrent/atomic/AtomicLong;J)J
  246. .line 171
  247. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drain()V
  248. return-void
  249. .end method
  250. .method public final request(J)V
  251. .locals 6
  252. .line 111
  253. invoke-static {p1, p2}, Lio/reactivex/internal/subscriptions/SubscriptionHelper;->validate(J)Z
  254. move-result v0
  255. if-eqz v0, :cond_5
  256. .line 112
  257. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->unbounded:Z
  258. if-eqz v0, :cond_0
  259. return-void
  260. .line 115
  261. :cond_0
  262. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->get()I
  263. move-result v0
  264. if-nez v0, :cond_4
  265. const/4 v0, 0x0
  266. const/4 v1, 0x1
  267. invoke-virtual {p0, v0, v1}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->compareAndSet(II)Z
  268. move-result v0
  269. if-eqz v0, :cond_4
  270. .line 116
  271. iget-wide v2, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  272. const-wide v4, 0x7fffffffffffffffL
  273. cmp-long v0, v2, v4
  274. if-eqz v0, :cond_1
  275. .line 119
  276. invoke-static {v2, v3, p1, p2}, Lio/reactivex/internal/util/BackpressureHelper;->addCap(JJ)J
  277. move-result-wide v2
  278. .line 120
  279. iput-wide v2, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  280. cmp-long v0, v2, v4
  281. if-nez v0, :cond_1
  282. .line 122
  283. iput-boolean v1, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->unbounded:Z
  284. .line 125
  285. :cond_1
  286. iget-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  287. .line 127
  288. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->decrementAndGet()I
  289. move-result v1
  290. if-eqz v1, :cond_2
  291. .line 128
  292. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drainLoop()V
  293. :cond_2
  294. if-eqz v0, :cond_3
  295. .line 132
  296. invoke-interface {v0, p1, p2}, Lorg/reactivestreams/Subscription;->request(J)V
  297. :cond_3
  298. return-void
  299. .line 138
  300. :cond_4
  301. iget-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedRequested:Ljava/util/concurrent/atomic/AtomicLong;
  302. invoke-static {v0, p1, p2}, Lio/reactivex/internal/util/BackpressureHelper;->add(Ljava/util/concurrent/atomic/AtomicLong;J)J
  303. .line 140
  304. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drain()V
  305. :cond_5
  306. return-void
  307. .end method
  308. .method public final setSubscription(Lorg/reactivestreams/Subscription;)V
  309. .locals 5
  310. .line 73
  311. iget-boolean v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->cancelled:Z
  312. if-eqz v0, :cond_0
  313. .line 74
  314. invoke-interface {p1}, Lorg/reactivestreams/Subscription;->cancel()V
  315. return-void
  316. :cond_0
  317. const-string v0, "s is null"
  318. .line 78
  319. invoke-static {p1, v0}, Lio/reactivex/internal/functions/ObjectHelper;->requireNonNull(Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
  320. .line 80
  321. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->get()I
  322. move-result v0
  323. if-nez v0, :cond_4
  324. const/4 v0, 0x0
  325. const/4 v1, 0x1
  326. invoke-virtual {p0, v0, v1}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->compareAndSet(II)Z
  327. move-result v0
  328. if-eqz v0, :cond_4
  329. .line 81
  330. iget-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  331. if-eqz v0, :cond_1
  332. .line 84
  333. invoke-interface {v0}, Lorg/reactivestreams/Subscription;->cancel()V
  334. .line 87
  335. :cond_1
  336. iput-object p1, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->actual:Lorg/reactivestreams/Subscription;
  337. .line 89
  338. iget-wide v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->requested:J
  339. .line 91
  340. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->decrementAndGet()I
  341. move-result v2
  342. if-eqz v2, :cond_2
  343. .line 92
  344. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drainLoop()V
  345. :cond_2
  346. const-wide/16 v2, 0x0
  347. cmp-long v4, v0, v2
  348. if-eqz v4, :cond_3
  349. .line 96
  350. invoke-interface {p1, v0, v1}, Lorg/reactivestreams/Subscription;->request(J)V
  351. :cond_3
  352. return-void
  353. .line 102
  354. :cond_4
  355. iget-object v0, p0, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->missedSubscription:Ljava/util/concurrent/atomic/AtomicReference;
  356. invoke-virtual {v0, p1}, Ljava/util/concurrent/atomic/AtomicReference;->getAndSet(Ljava/lang/Object;)Ljava/lang/Object;
  357. move-result-object p1
  358. check-cast p1, Lorg/reactivestreams/Subscription;
  359. if-eqz p1, :cond_5
  360. .line 104
  361. invoke-interface {p1}, Lorg/reactivestreams/Subscription;->cancel()V
  362. .line 106
  363. :cond_5
  364. invoke-virtual {p0}, Lio/reactivex/internal/subscriptions/SubscriptionArbiter;->drain()V
  365. return-void
  366. .end method