5
5
use Doctrine \DBAL \Platforms \MySqlPlatform ;
6
6
use Doctrine \DBAL \Types \Types ;
7
7
use Oro \Bundle \AkeneoBundle \Async \Topics ;
8
- use Oro \Bundle \AkeneoBundle \Tools \ CacheProviderTrait ;
8
+ use Oro \Bundle \AkeneoBundle \EventListener \ AdditionalOptionalListenerManager ;
9
9
use Oro \Bundle \BatchBundle \Entity \StepExecution ;
10
10
use Oro \Bundle \BatchBundle \Item \ItemWriterInterface ;
11
11
use Oro \Bundle \BatchBundle \Item \Support \ClosableInterface ;
14
14
use Oro \Bundle \IntegrationBundle \Entity \FieldsChanges ;
15
15
use Oro \Bundle \MessageQueueBundle \Client \BufferedMessageProducer ;
16
16
use Oro \Bundle \MessageQueueBundle \Entity \Job ;
17
+ use Oro \Bundle \PlatformBundle \Manager \OptionalListenerManager ;
17
18
use Oro \Component \MessageQueue \Client \Message ;
18
19
use Oro \Component \MessageQueue \Client \MessagePriority ;
19
20
use Oro \Component \MessageQueue \Client \MessageProducerInterface ;
@@ -23,10 +24,6 @@ class AsyncWriter implements
23
24
ClosableInterface,
24
25
StepExecutionAwareInterface
25
26
{
26
- use CacheProviderTrait;
27
-
28
- private const VARIANTS_BATCH_SIZE = 25 ;
29
-
30
27
/** @var MessageProducerInterface * */
31
28
private $ messageProducer ;
32
29
@@ -39,17 +36,30 @@ class AsyncWriter implements
39
36
/** @var DoctrineHelper */
40
37
private $ doctrineHelper ;
41
38
39
+ /** @var OptionalListenerManager */
40
+ private $ optionalListenerManager ;
41
+
42
+ /** @var AdditionalOptionalListenerManager */
43
+ private $ additionalOptionalListenerManager ;
44
+
42
45
public function __construct (
43
46
MessageProducerInterface $ messageProducer ,
44
- DoctrineHelper $ doctrineHelper
47
+ DoctrineHelper $ doctrineHelper ,
48
+ OptionalListenerManager $ optionalListenerManager ,
49
+ AdditionalOptionalListenerManager $ additionalOptionalListenerManager
45
50
) {
46
51
$ this ->messageProducer = $ messageProducer ;
47
52
$ this ->doctrineHelper = $ doctrineHelper ;
53
+ $ this ->optionalListenerManager = $ optionalListenerManager ;
54
+ $ this ->additionalOptionalListenerManager = $ additionalOptionalListenerManager ;
48
55
}
49
56
50
57
public function initialize ()
51
58
{
52
59
$ this ->size = 0 ;
60
+
61
+ $ this ->additionalOptionalListenerManager ->disableListeners ();
62
+ $ this ->optionalListenerManager ->disableListeners ($ this ->optionalListenerManager ->getListeners ());
53
63
}
54
64
55
65
public function write (array $ items )
@@ -67,52 +77,30 @@ public function write(array $items)
67
77
$ this ->stepExecution ->setWriteCount ($ this ->size );
68
78
69
79
$ jobId = $ this ->insertJob ($ jobName );
70
- $ this ->createFieldsChanges ($ jobId , $ items , 'items ' );
71
- $ this ->sendMessage ($ channelId , $ jobId , true );
72
- }
73
-
74
- public function flush ()
75
- {
76
- $ this ->size = 0 ;
77
-
78
- $ variants = $ this ->cacheProvider ->fetch ('product_variants ' ) ?? [];
79
- if (!$ variants ) {
80
- return ;
81
- }
82
-
83
- $ channelId = $ this ->stepExecution ->getJobExecution ()->getExecutionContext ()->get ('channel ' );
84
-
85
- $ chunks = array_chunk ($ variants , self ::VARIANTS_BATCH_SIZE , true );
86
-
87
- foreach ($ chunks as $ key => $ chunk ) {
88
- $ jobName = sprintf (
89
- 'oro_integration:sync_integration:%s:variants:%s-%s ' ,
90
- $ channelId ,
91
- self ::VARIANTS_BATCH_SIZE * $ key + 1 ,
92
- self ::VARIANTS_BATCH_SIZE * $ key + count ($ chunk )
93
- );
94
-
95
- $ jobId = $ this ->insertJob ($ jobName );
96
- $ this ->createFieldsChanges ($ jobId , $ chunk , 'variants ' );
97
- $ this ->sendMessage ($ channelId , $ jobId );
80
+ if ($ jobId && $ this ->createFieldsChanges ($ jobId , $ items , 'items ' )) {
81
+ $ this ->sendMessage ($ channelId , $ jobId , true );
98
82
}
99
83
}
100
84
101
- private function createFieldsChanges (int $ jobId , array &$ data , string $ key ): void
85
+ private function createFieldsChanges (int $ jobId , array &$ data , string $ key ): bool
102
86
{
103
87
$ em = $ this ->doctrineHelper ->getEntityManager (FieldsChanges::class);
104
88
$ fieldsChanges = $ em
105
89
->getRepository (FieldsChanges::class)
106
90
->findOneBy (['entityId ' => $ jobId , 'entityClass ' => Job::class]);
107
- if (!$ fieldsChanges ) {
108
- $ fieldsChanges = new FieldsChanges ([]);
109
- $ fieldsChanges ->setEntityClass (Job::class);
110
- $ fieldsChanges ->setEntityId ($ jobId );
91
+ if ($ fieldsChanges ) {
92
+ return false ;
111
93
}
94
+
95
+ $ fieldsChanges = new FieldsChanges ([]);
96
+ $ fieldsChanges ->setEntityClass (Job::class);
97
+ $ fieldsChanges ->setEntityId ($ jobId );
112
98
$ fieldsChanges ->setChangedFields ([$ key => $ data ]);
113
99
$ em ->persist ($ fieldsChanges );
114
100
$ em ->flush ($ fieldsChanges );
115
101
$ em ->clear (FieldsChanges::class);
102
+
103
+ return true ;
116
104
}
117
105
118
106
private function sendMessage (int $ channelId , int $ jobId , bool $ incrementedRead = false ): void
@@ -143,12 +131,15 @@ private function getRootJob(): ?int
143
131
throw new \InvalidArgumentException ('Root job id is empty ' );
144
132
}
145
133
146
- return $ rootJobId ;
134
+ return ( int ) $ rootJobId ;
147
135
}
148
136
149
137
public function close ()
150
138
{
151
139
$ this ->size = 0 ;
140
+
141
+ $ this ->optionalListenerManager ->enableListeners ($ this ->optionalListenerManager ->getListeners ());
142
+ $ this ->additionalOptionalListenerManager ->enableListeners ();
152
143
}
153
144
154
145
public function setStepExecution (StepExecution $ stepExecution )
@@ -159,12 +150,34 @@ public function setStepExecution(StepExecution $stepExecution)
159
150
private function insertJob (string $ jobName ): ?int
160
151
{
161
152
$ em = $ this ->doctrineHelper ->getEntityManager (Job::class);
162
- $ tableName = $ em ->getClassMetadata (Job::class)->getTableName ();
163
153
$ connection = $ em ->getConnection ();
154
+ $ rootJobId = $ this ->getRootJob ();
155
+
156
+ $ hasRootJob = $ connection
157
+ ->executeStatement (
158
+ 'SELECT 1 FROM oro_message_queue_job WHERE id = :id LIMIT 1; ' ,
159
+ ['id ' => $ rootJobId ],
160
+ ['id ' => Types::INTEGER ]
161
+ );
162
+
163
+ if (!$ hasRootJob ) {
164
+ throw new \InvalidArgumentException (sprintf ('Root job "%d" missing ' , $ rootJobId ));
165
+ }
166
+
167
+ $ childJob = $ connection
168
+ ->executeStatement (
169
+ 'SELECT id FROM oro_message_queue_job WHERE root_job_id = :rootJob and name = :name LIMIT 1; ' ,
170
+ ['rootJob ' => $ rootJobId , 'name ' => $ jobName ],
171
+ ['rootJob ' => Types::INTEGER , 'name ' => Types::STRING ]
172
+ );
173
+
174
+ if ($ childJob ) {
175
+ return $ childJob ;
176
+ }
164
177
165
178
$ qb = $ connection ->createQueryBuilder ();
166
179
$ qb
167
- ->insert ($ tableName )
180
+ ->insert (' oro_message_queue_job ' )
168
181
->values ([
169
182
'name ' => ':name ' ,
170
183
'status ' => ':status ' ,
@@ -178,7 +191,7 @@ private function insertJob(string $jobName): ?int
178
191
'interrupted ' => false ,
179
192
'unique ' => false ,
180
193
'createdAt ' => new \DateTime (),
181
- 'rootJob ' => $ this -> getRootJob () ,
194
+ 'rootJob ' => $ rootJobId ,
182
195
], [
183
196
'name ' => Types::STRING ,
184
197
'status ' => Types::STRING ,
0 commit comments