EachPromiseTest.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. <?php
  2. namespace GuzzleHttp\Promise\Tests;
  3. use GuzzleHttp\Promise\RejectedPromise;
  4. use GuzzleHttp\Promise\FulfilledPromise;
  5. use GuzzleHttp\Promise\Promise;
  6. use GuzzleHttp\Promise\PromiseInterface;
  7. use GuzzleHttp\Promise\EachPromise;
  8. use GuzzleHttp\Promise as P;
  9. use PHPUnit\Framework\TestCase;
  10. /**
  11. * @covers GuzzleHttp\Promise\EachPromise
  12. */
  13. class EachPromiseTest extends TestCase
  14. {
  15. public function testReturnsSameInstance()
  16. {
  17. $each = new EachPromise([], ['concurrency' => 100]);
  18. $this->assertSame($each->promise(), $each->promise());
  19. }
  20. public function testResolvesInCaseOfAnEmptyList()
  21. {
  22. $promises = [];
  23. $each = new EachPromise($promises);
  24. $p = $each->promise();
  25. $this->assertNull($p->wait());
  26. $this->assertEquals(PromiseInterface::FULFILLED, $p->getState());
  27. }
  28. public function testInvokesAllPromises()
  29. {
  30. $promises = [new Promise(), new Promise(), new Promise()];
  31. $called = [];
  32. $each = new EachPromise($promises, [
  33. 'fulfilled' => function ($value) use (&$called) {
  34. $called[] = $value;
  35. }
  36. ]);
  37. $p = $each->promise();
  38. $promises[0]->resolve('a');
  39. $promises[1]->resolve('c');
  40. $promises[2]->resolve('b');
  41. P\queue()->run();
  42. $this->assertEquals(['a', 'c', 'b'], $called);
  43. $this->assertEquals(PromiseInterface::FULFILLED, $p->getState());
  44. }
  45. public function testIsWaitable()
  46. {
  47. $a = $this->createSelfResolvingPromise('a');
  48. $b = $this->createSelfResolvingPromise('b');
  49. $called = [];
  50. $each = new EachPromise([$a, $b], [
  51. 'fulfilled' => function ($value) use (&$called) { $called[] = $value; }
  52. ]);
  53. $p = $each->promise();
  54. $this->assertNull($p->wait());
  55. $this->assertEquals(PromiseInterface::FULFILLED, $p->getState());
  56. $this->assertEquals(['a', 'b'], $called);
  57. }
  58. public function testCanResolveBeforeConsumingAll()
  59. {
  60. $called = 0;
  61. $a = $this->createSelfResolvingPromise('a');
  62. $b = new Promise(function () { $this->fail(); });
  63. $each = new EachPromise([$a, $b], [
  64. 'fulfilled' => function ($value, $idx, Promise $aggregate) use (&$called) {
  65. $this->assertSame($idx, 0);
  66. $this->assertEquals('a', $value);
  67. $aggregate->resolve(null);
  68. $called++;
  69. },
  70. 'rejected' => function (\Exception $reason) {
  71. $this->fail($reason->getMessage());
  72. }
  73. ]);
  74. $p = $each->promise();
  75. $p->wait();
  76. $this->assertNull($p->wait());
  77. $this->assertEquals(1, $called);
  78. $this->assertEquals(PromiseInterface::FULFILLED, $a->getState());
  79. $this->assertEquals(PromiseInterface::PENDING, $b->getState());
  80. // Resolving $b has no effect on the aggregate promise.
  81. $b->resolve('foo');
  82. $this->assertEquals(1, $called);
  83. }
  84. public function testLimitsPendingPromises()
  85. {
  86. $pending = [new Promise(), new Promise(), new Promise(), new Promise()];
  87. $promises = new \ArrayIterator($pending);
  88. $each = new EachPromise($promises, ['concurrency' => 2]);
  89. $p = $each->promise();
  90. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  91. $pending[0]->resolve('a');
  92. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  93. $this->assertTrue($promises->valid());
  94. $pending[1]->resolve('b');
  95. P\queue()->run();
  96. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  97. $this->assertTrue($promises->valid());
  98. $promises[2]->resolve('c');
  99. P\queue()->run();
  100. $this->assertCount(1, $this->readAttribute($each, 'pending'));
  101. $this->assertEquals(PromiseInterface::PENDING, $p->getState());
  102. $promises[3]->resolve('d');
  103. P\queue()->run();
  104. $this->assertNull($this->readAttribute($each, 'pending'));
  105. $this->assertEquals(PromiseInterface::FULFILLED, $p->getState());
  106. $this->assertFalse($promises->valid());
  107. }
  108. public function testDynamicallyLimitsPendingPromises()
  109. {
  110. $calls = [];
  111. $pendingFn = function ($count) use (&$calls) {
  112. $calls[] = $count;
  113. return 2;
  114. };
  115. $pending = [new Promise(), new Promise(), new Promise(), new Promise()];
  116. $promises = new \ArrayIterator($pending);
  117. $each = new EachPromise($promises, ['concurrency' => $pendingFn]);
  118. $p = $each->promise();
  119. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  120. $pending[0]->resolve('a');
  121. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  122. $this->assertTrue($promises->valid());
  123. $pending[1]->resolve('b');
  124. $this->assertCount(2, $this->readAttribute($each, 'pending'));
  125. P\queue()->run();
  126. $this->assertTrue($promises->valid());
  127. $promises[2]->resolve('c');
  128. P\queue()->run();
  129. $this->assertCount(1, $this->readAttribute($each, 'pending'));
  130. $this->assertEquals(PromiseInterface::PENDING, $p->getState());
  131. $promises[3]->resolve('d');
  132. P\queue()->run();
  133. $this->assertNull($this->readAttribute($each, 'pending'));
  134. $this->assertEquals(PromiseInterface::FULFILLED, $p->getState());
  135. $this->assertEquals([0, 1, 1, 1], $calls);
  136. $this->assertFalse($promises->valid());
  137. }
  138. public function testClearsReferencesWhenResolved()
  139. {
  140. $called = false;
  141. $a = new Promise(function () use (&$a, &$called) {
  142. $a->resolve('a');
  143. $called = true;
  144. });
  145. $each = new EachPromise([$a], [
  146. 'concurrency' => function () { return 1; },
  147. 'fulfilled' => function () {},
  148. 'rejected' => function () {}
  149. ]);
  150. $each->promise()->wait();
  151. $this->assertNull($this->readAttribute($each, 'onFulfilled'));
  152. $this->assertNull($this->readAttribute($each, 'onRejected'));
  153. $this->assertNull($this->readAttribute($each, 'iterable'));
  154. $this->assertNull($this->readAttribute($each, 'pending'));
  155. $this->assertNull($this->readAttribute($each, 'concurrency'));
  156. $this->assertTrue($called);
  157. }
  158. public function testCanBeCancelled()
  159. {
  160. $called = false;
  161. $a = new FulfilledPromise('a');
  162. $b = new Promise(function () use (&$called) { $called = true; });
  163. $each = new EachPromise([$a, $b], [
  164. 'fulfilled' => function ($value, $idx, Promise $aggregate) {
  165. $aggregate->cancel();
  166. },
  167. 'rejected' => function ($reason) use (&$called) {
  168. $called = true;
  169. },
  170. ]);
  171. $p = $each->promise();
  172. $p->wait(false);
  173. $this->assertEquals(PromiseInterface::FULFILLED, $a->getState());
  174. $this->assertEquals(PromiseInterface::PENDING, $b->getState());
  175. $this->assertEquals(PromiseInterface::REJECTED, $p->getState());
  176. $this->assertFalse($called);
  177. }
  178. public function testDoesNotBlowStackWithFulfilledPromises()
  179. {
  180. $pending = [];
  181. for ($i = 0; $i < 100; $i++) {
  182. $pending[] = new FulfilledPromise($i);
  183. }
  184. $values = [];
  185. $each = new EachPromise($pending, [
  186. 'fulfilled' => function ($value) use (&$values) {
  187. $values[] = $value;
  188. }
  189. ]);
  190. $called = false;
  191. $each->promise()->then(function () use (&$called) {
  192. $called = true;
  193. });
  194. $this->assertFalse($called);
  195. P\queue()->run();
  196. $this->assertTrue($called);
  197. $this->assertEquals(range(0, 99), $values);
  198. }
  199. public function testDoesNotBlowStackWithRejectedPromises()
  200. {
  201. $pending = [];
  202. for ($i = 0; $i < 100; $i++) {
  203. $pending[] = new RejectedPromise($i);
  204. }
  205. $values = [];
  206. $each = new EachPromise($pending, [
  207. 'rejected' => function ($value) use (&$values) {
  208. $values[] = $value;
  209. }
  210. ]);
  211. $called = false;
  212. $each->promise()->then(
  213. function () use (&$called) { $called = true; },
  214. function () { $this->fail('Should not have rejected.'); }
  215. );
  216. $this->assertFalse($called);
  217. P\queue()->run();
  218. $this->assertTrue($called);
  219. $this->assertEquals(range(0, 99), $values);
  220. }
  221. public function testReturnsPromiseForWhatever()
  222. {
  223. $called = [];
  224. $arr = ['a', 'b'];
  225. $each = new EachPromise($arr, [
  226. 'fulfilled' => function ($v) use (&$called) { $called[] = $v; }
  227. ]);
  228. $p = $each->promise();
  229. $this->assertNull($p->wait());
  230. $this->assertEquals(['a', 'b'], $called);
  231. }
  232. public function testRejectsAggregateWhenNextThrows()
  233. {
  234. $iter = function () {
  235. yield 'a';
  236. throw new \Exception('Failure');
  237. };
  238. $each = new EachPromise($iter());
  239. $p = $each->promise();
  240. $e = null;
  241. $received = null;
  242. $p->then(null, function ($reason) use (&$e) { $e = $reason; });
  243. P\queue()->run();
  244. $this->assertInstanceOf(\Exception::class, $e);
  245. $this->assertEquals('Failure', $e->getMessage());
  246. }
  247. public function testDoesNotCallNextOnIteratorUntilNeededWhenWaiting()
  248. {
  249. $results = [];
  250. $values = [10];
  251. $remaining = 9;
  252. $iter = function () use (&$values) {
  253. while ($value = array_pop($values)) {
  254. yield $value;
  255. }
  256. };
  257. $each = new EachPromise($iter(), [
  258. 'concurrency' => 1,
  259. 'fulfilled' => function ($r) use (&$results, &$values, &$remaining) {
  260. $results[] = $r;
  261. if ($remaining > 0) {
  262. $values[] = $remaining--;
  263. }
  264. }
  265. ]);
  266. $each->promise()->wait();
  267. $this->assertEquals(range(10, 1), $results);
  268. }
  269. public function testDoesNotCallNextOnIteratorUntilNeededWhenAsync()
  270. {
  271. $firstPromise = new Promise();
  272. $pending = [$firstPromise];
  273. $values = [$firstPromise];
  274. $results = [];
  275. $remaining = 9;
  276. $iter = function () use (&$values) {
  277. while ($value = array_pop($values)) {
  278. yield $value;
  279. }
  280. };
  281. $each = new EachPromise($iter(), [
  282. 'concurrency' => 1,
  283. 'fulfilled' => function ($r) use (&$results, &$values, &$remaining, &$pending) {
  284. $results[] = $r;
  285. if ($remaining-- > 0) {
  286. $pending[] = $values[] = new Promise();
  287. }
  288. }
  289. ]);
  290. $i = 0;
  291. $each->promise();
  292. while ($promise = array_pop($pending)) {
  293. $promise->resolve($i++);
  294. P\queue()->run();
  295. }
  296. $this->assertEquals(range(0, 9), $results);
  297. }
  298. private function createSelfResolvingPromise($value)
  299. {
  300. $p = new Promise(function () use (&$p, $value) {
  301. $p->resolve($value);
  302. });
  303. return $p;
  304. }
  305. public function testMutexPreventsGeneratorRecursion()
  306. {
  307. $results = $promises = [];
  308. for ($i = 0; $i < 20; $i++) {
  309. $p = $this->createSelfResolvingPromise($i);
  310. $pending[] = $p;
  311. $promises[] = $p;
  312. }
  313. $iter = function () use (&$promises, &$pending) {
  314. foreach ($promises as $promise) {
  315. // Resolve a promises, which will trigger the then() function,
  316. // which would cause the EachPromise to try to add more
  317. // promises to the queue. Without a lock, this would trigger
  318. // a "Cannot resume an already running generator" fatal error.
  319. if ($p = array_pop($pending)) {
  320. $p->wait();
  321. }
  322. yield $promise;
  323. }
  324. };
  325. $each = new EachPromise($iter(), [
  326. 'concurrency' => 5,
  327. 'fulfilled' => function ($r) use (&$results, &$pending) {
  328. $results[] = $r;
  329. }
  330. ]);
  331. $each->promise()->wait();
  332. $this->assertCount(20, $results);
  333. }
  334. public function testIteratorWithSameKey()
  335. {
  336. $iter = function () {
  337. yield 'foo' => $this->createSelfResolvingPromise(1);
  338. yield 'foo' => $this->createSelfResolvingPromise(2);
  339. yield 1 => $this->createSelfResolvingPromise(3);
  340. yield 1 => $this->createSelfResolvingPromise(4);
  341. };
  342. $called = 0;
  343. $each = new EachPromise($iter(), [
  344. 'fulfilled' => function ($value, $idx, Promise $aggregate) use (&$called) {
  345. $called++;
  346. if ($value < 3) {
  347. $this->assertSame('foo', $idx);
  348. } else {
  349. $this->assertSame(1, $idx);
  350. }
  351. },
  352. ]);
  353. $each->promise()->wait();
  354. $this->assertSame(4, $called);
  355. }
  356. }