33namespace Enqueue \Consumption ;
44
55use Enqueue \Consumption \Context \End ;
6+ use Enqueue \Consumption \Context \InitLogger ;
67use Enqueue \Consumption \Context \MessageReceived ;
78use Enqueue \Consumption \Context \MessageResult ;
89use Enqueue \Consumption \Context \PostConsume ;
@@ -32,7 +33,7 @@ final class QueueConsumer implements QueueConsumerInterface
3233 private $ interopContext ;
3334
3435 /**
35- * @var ExtensionInterface|ChainExtension
36+ * @var ExtensionInterface
3637 */
3738 private $ staticExtension ;
3839
@@ -46,11 +47,6 @@ final class QueueConsumer implements QueueConsumerInterface
4647 */
4748 private $ receiveTimeout ;
4849
49- /**
50- * @var ExtensionInterface|ChainExtension
51- */
52- private $ extension ;
53-
5450 /**
5551 * @var LoggerInterface
5652 */
@@ -128,11 +124,16 @@ public function bindCallback($queue, callable $processor): QueueConsumerInterfac
128124
129125 public function consume (ExtensionInterface $ runtimeExtension = null ): void
130126 {
131- $ this -> extension = $ runtimeExtension ?
127+ $ extension = $ runtimeExtension ?
132128 new ChainExtension ([$ this ->staticExtension , $ runtimeExtension ]) :
133129 $ this ->staticExtension
134130 ;
135131
132+ $ initLogger = new InitLogger ($ this ->logger );
133+ $ extension ->onInitLogger ($ initLogger );
134+
135+ $ this ->logger = $ initLogger ->getLogger ();
136+
136137 $ startTime = (int ) (microtime (true ) * 1000 );
137138
138139 $ start = new Start (
@@ -143,10 +144,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
143144 $ startTime
144145 );
145146
146- $ this -> extension ->onStart ($ start );
147+ $ extension ->onStart ($ start );
147148
148149 if ($ start ->isExecutionInterrupted ()) {
149- $ this ->onEnd ($ startTime );
150+ $ this ->onEnd ($ extension , $ startTime );
150151
151152 return ;
152153 }
@@ -176,7 +177,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
176177 $ receivedMessagesCount = 0 ;
177178 $ interruptExecution = false ;
178179
179- $ callback = function (InteropMessage $ message , Consumer $ consumer ) use (&$ receivedMessagesCount , &$ interruptExecution ) {
180+ $ callback = function (InteropMessage $ message , Consumer $ consumer ) use (&$ receivedMessagesCount , &$ interruptExecution, $ extension ) {
180181 ++$ receivedMessagesCount ;
181182
182183 $ receivedAt = (int ) (microtime (true ) * 1000 );
@@ -188,19 +189,19 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
188189 $ processor = $ this ->boundProcessors [$ queue ->getQueueName ()]->getProcessor ();
189190
190191 $ messageReceived = new MessageReceived ($ this ->interopContext , $ consumer , $ message , $ processor , $ receivedAt , $ this ->logger );
191- $ this -> extension ->onMessageReceived ($ messageReceived );
192+ $ extension ->onMessageReceived ($ messageReceived );
192193 $ result = $ messageReceived ->getResult ();
193194 $ processor = $ messageReceived ->getProcessor ();
194195 if (null === $ result ) {
195196 try {
196197 $ result = $ processor ->process ($ message , $ this ->interopContext );
197198 } catch (\Exception $ e ) {
198- $ result = $ this ->onProcessorException ($ message , $ e , $ receivedAt );
199+ $ result = $ this ->onProcessorException ($ extension , $ message , $ e , $ receivedAt );
199200 }
200201 }
201202
202203 $ messageResult = new MessageResult ($ this ->interopContext , $ message , $ result , $ receivedAt , $ this ->logger );
203- $ this -> extension ->onResult ($ messageResult );
204+ $ extension ->onResult ($ messageResult );
204205 $ result = $ messageResult ->getResult ();
205206
206207 switch ($ result ) {
@@ -220,7 +221,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
220221 }
221222
222223 $ postMessageReceived = new PostMessageReceived ($ this ->interopContext , $ message , $ result , $ receivedAt , $ this ->logger );
223- $ this -> extension ->onPostMessageReceived ($ postMessageReceived );
224+ $ extension ->onPostMessageReceived ($ postMessageReceived );
224225
225226 if ($ postMessageReceived ->isExecutionInterrupted ()) {
226227 $ interruptExecution = true ;
@@ -241,7 +242,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
241242 $ this ->logger
242243 );
243244
244- $ this -> extension ->onPreSubscribe ($ preSubscribe );
245+ $ extension ->onPreSubscribe ($ preSubscribe );
245246
246247 $ subscriptionConsumer ->subscribe ($ consumer , $ callback );
247248 }
@@ -252,21 +253,21 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
252253 $ interruptExecution = false ;
253254
254255 $ preConsume = new PreConsume ($ this ->interopContext , $ subscriptionConsumer , $ this ->logger , $ cycle , $ this ->receiveTimeout , $ startTime );
255- $ this -> extension ->onPreConsume ($ preConsume );
256+ $ extension ->onPreConsume ($ preConsume );
256257
257258 if ($ preConsume ->isExecutionInterrupted ()) {
258- $ this ->onEnd ($ startTime , $ subscriptionConsumer );
259+ $ this ->onEnd ($ extension , $ startTime , $ subscriptionConsumer );
259260
260261 return ;
261262 }
262263
263264 $ subscriptionConsumer ->consume ($ this ->receiveTimeout );
264265
265266 $ postConsume = new PostConsume ($ this ->interopContext , $ subscriptionConsumer , $ receivedMessagesCount , $ cycle , $ startTime , $ this ->logger );
266- $ this -> extension ->onPostConsume ($ postConsume );
267+ $ extension ->onPostConsume ($ postConsume );
267268
268269 if ($ interruptExecution || $ postConsume ->isExecutionInterrupted ()) {
269- $ this ->onEnd ($ startTime , $ subscriptionConsumer );
270+ $ this ->onEnd ($ extension , $ startTime , $ subscriptionConsumer );
270271
271272 return ;
272273 }
@@ -285,11 +286,11 @@ public function setFallbackSubscriptionConsumer(SubscriptionConsumer $fallbackSu
285286 $ this ->fallbackSubscriptionConsumer = $ fallbackSubscriptionConsumer ;
286287 }
287288
288- private function onEnd (int $ startTime , SubscriptionConsumer $ subscriptionConsumer = null ): void
289+ private function onEnd (ExtensionInterface $ extension , int $ startTime , SubscriptionConsumer $ subscriptionConsumer = null ): void
289290 {
290291 $ endTime = (int ) (microtime (true ) * 1000 );
291292
292- $ this -> extension ->onEnd (new End ($ this ->interopContext , $ startTime , $ endTime , $ this ->logger ));
293+ $ extension ->onEnd (new End ($ this ->interopContext , $ startTime , $ endTime , $ this ->logger ));
293294
294295 if ($ subscriptionConsumer ) {
295296 $ subscriptionConsumer ->unsubscribeAll ();
@@ -301,12 +302,12 @@ private function onEnd(int $startTime, SubscriptionConsumer $subscriptionConsume
301302 *
302303 * https://github.com/symfony/symfony/blob/cbe289517470eeea27162fd2d523eb29c95f775f/src/Symfony/Component/HttpKernel/EventListener/ExceptionListener.php#L77
303304 */
304- private function onProcessorException (Message $ message , \Exception $ exception , int $ receivedAt )
305+ private function onProcessorException (ExtensionInterface $ extension , Message $ message , \Exception $ exception , int $ receivedAt )
305306 {
306307 $ processorException = new ProcessorException ($ this ->interopContext , $ message , $ exception , $ receivedAt , $ this ->logger );
307308
308309 try {
309- $ this -> extension ->onProcessorException ($ processorException );
310+ $ extension ->onProcessorException ($ processorException );
310311
311312 $ result = $ processorException ->getResult ();
312313 if (null === $ result ) {
0 commit comments