Skip to content

Commit 54f69b7

Browse files
authoredApr 13, 2022
AKM-34: Product filter breaks configurable products (#65)
AKM-31: Split products and product models filters - do not allow duplicate jobs
1 parent 5747619 commit 54f69b7

20 files changed

+591
-130
lines changed
 

‎Controller/ValidateConnectionController.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public function validateConnectionAction(Request $request, Channel $channel = nu
113113
'akeneoLocales' => $akeneoLocales,
114114
'success' => $success,
115115
'message' => $message,
116-
'currencyList' => $this->currencyProvider->getCurrencies(),
116+
'currencyList' => $this->currencyProvider->getCurrencyList(),
117117
]
118118
);
119119
}

‎Entity/AkeneoSettings.php

+26
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ class AkeneoSettings extends Transport
8989
* @ORM\Column(name="akeneo_product_filter", type="text", nullable=true)
9090
*/
9191
protected $productFilter;
92+
/**
93+
* @var string
94+
*
95+
* @ORM\Column(name="akeneo_conf_product_filter", type="text", nullable=true)
96+
*/
97+
protected $configurableProductFilter;
9298
/**
9399
* @var string
94100
*
@@ -275,6 +281,26 @@ public function setProductFilter($productFilter)
275281
return $this;
276282
}
277283

284+
/**
285+
* @return string
286+
*/
287+
public function getConfigurableProductFilter()
288+
{
289+
return $this->configurableProductFilter;
290+
}
291+
292+
/**
293+
* @param string $configurableProductFilter
294+
*
295+
* @return self
296+
*/
297+
public function setConfigurableProductFilter($configurableProductFilter)
298+
{
299+
$this->configurableProductFilter = $configurableProductFilter;
300+
301+
return $this;
302+
}
303+
278304
/**
279305
* @return ParameterBag
280306
*/

‎Form/Extension/ChannelTypeExtension.php

+2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ class ChannelTypeExtension extends AbstractTypeExtension
1515
* @var array
1616
*/
1717
protected $connectorsOrder = [
18+
'brand',
1819
'category',
1920
'attribute',
2021
'attribute_family',
2122
'product',
23+
'configurable_product',
2224
];
2325

2426
/**

‎Form/Type/AkeneoSettingsType.php

+9
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,15 @@ public function buildForm(FormBuilderInterface $builder, array $options)
249249
],
250250
]
251251
)
252+
->add(
253+
'configurableProductFilter',
254+
TextareaType::class,
255+
[
256+
'required' => false,
257+
'label' => 'oro.akeneo.integration.settings.akeneo_configurable_product_filter.label',
258+
'constraints' => [new JsonConstraint()],
259+
]
260+
)
252261
->add(
253262
'priceList',
254263
PriceListSelectType::class,

‎ImportExport/Processor/AsyncProcessor.php

-43
Original file line numberDiff line numberDiff line change
@@ -6,51 +6,8 @@
66

77
class AsyncProcessor implements ProcessorInterface
88
{
9-
use CacheProviderAwareProcessor;
10-
11-
/** @var array */
12-
private $variants = [];
13-
149
public function process($item)
1510
{
16-
$this->updateVariants($item);
17-
1811
return $item;
1912
}
20-
21-
private function updateVariants(array &$item)
22-
{
23-
$sku = $item['sku'];
24-
25-
if (!empty($item['family_variant'])) {
26-
if (isset($item['parent'], $this->variants[$sku])) {
27-
$parent = $item['parent'];
28-
foreach (array_keys($this->variants[$sku]) as $sku) {
29-
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
30-
}
31-
}
32-
33-
return;
34-
}
35-
36-
if (empty($item['parent'])) {
37-
return;
38-
}
39-
40-
$parent = $item['parent'];
41-
42-
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
43-
}
44-
45-
public function initialize()
46-
{
47-
$this->variants = [];
48-
$this->cacheProvider->delete('product_variants');
49-
}
50-
51-
public function flush()
52-
{
53-
$this->cacheProvider->save('product_variants', $this->variants);
54-
$this->variants = [];
55-
}
5613
}

‎ImportExport/Writer/AsyncWriter.php

+56-43
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use Doctrine\DBAL\Platforms\MySqlPlatform;
66
use Doctrine\DBAL\Types\Types;
77
use Oro\Bundle\AkeneoBundle\Async\Topics;
8-
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
8+
use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager;
99
use Oro\Bundle\BatchBundle\Entity\StepExecution;
1010
use Oro\Bundle\BatchBundle\Item\ItemWriterInterface;
1111
use Oro\Bundle\BatchBundle\Item\Support\ClosableInterface;
@@ -14,6 +14,7 @@
1414
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
1515
use Oro\Bundle\MessageQueueBundle\Client\BufferedMessageProducer;
1616
use Oro\Bundle\MessageQueueBundle\Entity\Job;
17+
use Oro\Bundle\PlatformBundle\Manager\OptionalListenerManager;
1718
use Oro\Component\MessageQueue\Client\Message;
1819
use Oro\Component\MessageQueue\Client\MessagePriority;
1920
use Oro\Component\MessageQueue\Client\MessageProducerInterface;
@@ -23,10 +24,6 @@ class AsyncWriter implements
2324
ClosableInterface,
2425
StepExecutionAwareInterface
2526
{
26-
use CacheProviderTrait;
27-
28-
private const VARIANTS_BATCH_SIZE = 25;
29-
3027
/** @var MessageProducerInterface * */
3128
private $messageProducer;
3229

@@ -39,17 +36,30 @@ class AsyncWriter implements
3936
/** @var DoctrineHelper */
4037
private $doctrineHelper;
4138

39+
/** @var OptionalListenerManager */
40+
private $optionalListenerManager;
41+
42+
/** @var AdditionalOptionalListenerManager */
43+
private $additionalOptionalListenerManager;
44+
4245
public function __construct(
4346
MessageProducerInterface $messageProducer,
44-
DoctrineHelper $doctrineHelper
47+
DoctrineHelper $doctrineHelper,
48+
OptionalListenerManager $optionalListenerManager,
49+
AdditionalOptionalListenerManager $additionalOptionalListenerManager
4550
) {
4651
$this->messageProducer = $messageProducer;
4752
$this->doctrineHelper = $doctrineHelper;
53+
$this->optionalListenerManager = $optionalListenerManager;
54+
$this->additionalOptionalListenerManager = $additionalOptionalListenerManager;
4855
}
4956

5057
public function initialize()
5158
{
5259
$this->size = 0;
60+
61+
$this->additionalOptionalListenerManager->disableListeners();
62+
$this->optionalListenerManager->disableListeners($this->optionalListenerManager->getListeners());
5363
}
5464

5565
public function write(array $items)
@@ -67,52 +77,30 @@ public function write(array $items)
6777
$this->stepExecution->setWriteCount($this->size);
6878

6979
$jobId = $this->insertJob($jobName);
70-
$this->createFieldsChanges($jobId, $items, 'items');
71-
$this->sendMessage($channelId, $jobId, true);
72-
}
73-
74-
public function flush()
75-
{
76-
$this->size = 0;
77-
78-
$variants = $this->cacheProvider->fetch('product_variants') ?? [];
79-
if (!$variants) {
80-
return;
81-
}
82-
83-
$channelId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('channel');
84-
85-
$chunks = array_chunk($variants, self::VARIANTS_BATCH_SIZE, true);
86-
87-
foreach ($chunks as $key => $chunk) {
88-
$jobName = sprintf(
89-
'oro_integration:sync_integration:%s:variants:%s-%s',
90-
$channelId,
91-
self::VARIANTS_BATCH_SIZE * $key + 1,
92-
self::VARIANTS_BATCH_SIZE * $key + count($chunk)
93-
);
94-
95-
$jobId = $this->insertJob($jobName);
96-
$this->createFieldsChanges($jobId, $chunk, 'variants');
97-
$this->sendMessage($channelId, $jobId);
80+
if ($jobId && $this->createFieldsChanges($jobId, $items, 'items')) {
81+
$this->sendMessage($channelId, $jobId, true);
9882
}
9983
}
10084

101-
private function createFieldsChanges(int $jobId, array &$data, string $key): void
85+
private function createFieldsChanges(int $jobId, array &$data, string $key): bool
10286
{
10387
$em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
10488
$fieldsChanges = $em
10589
->getRepository(FieldsChanges::class)
10690
->findOneBy(['entityId' => $jobId, 'entityClass' => Job::class]);
107-
if (!$fieldsChanges) {
108-
$fieldsChanges = new FieldsChanges([]);
109-
$fieldsChanges->setEntityClass(Job::class);
110-
$fieldsChanges->setEntityId($jobId);
91+
if ($fieldsChanges) {
92+
return false;
11193
}
94+
95+
$fieldsChanges = new FieldsChanges([]);
96+
$fieldsChanges->setEntityClass(Job::class);
97+
$fieldsChanges->setEntityId($jobId);
11298
$fieldsChanges->setChangedFields([$key => $data]);
11399
$em->persist($fieldsChanges);
114100
$em->flush($fieldsChanges);
115101
$em->clear(FieldsChanges::class);
102+
103+
return true;
116104
}
117105

118106
private function sendMessage(int $channelId, int $jobId, bool $incrementedRead = false): void
@@ -143,12 +131,15 @@ private function getRootJob(): ?int
143131
throw new \InvalidArgumentException('Root job id is empty');
144132
}
145133

146-
return $rootJobId;
134+
return (int)$rootJobId;
147135
}
148136

149137
public function close()
150138
{
151139
$this->size = 0;
140+
141+
$this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners());
142+
$this->additionalOptionalListenerManager->enableListeners();
152143
}
153144

154145
public function setStepExecution(StepExecution $stepExecution)
@@ -159,12 +150,34 @@ public function setStepExecution(StepExecution $stepExecution)
159150
private function insertJob(string $jobName): ?int
160151
{
161152
$em = $this->doctrineHelper->getEntityManager(Job::class);
162-
$tableName = $em->getClassMetadata(Job::class)->getTableName();
163153
$connection = $em->getConnection();
154+
$rootJobId = $this->getRootJob();
155+
156+
$hasRootJob = $connection
157+
->executeStatement(
158+
'SELECT 1 FROM oro_message_queue_job WHERE id = :id LIMIT 1;',
159+
['id' => $rootJobId],
160+
['id' => Types::INTEGER]
161+
);
162+
163+
if (!$hasRootJob) {
164+
throw new \InvalidArgumentException(sprintf('Root job "%d" missing', $rootJobId));
165+
}
166+
167+
$childJob = $connection
168+
->executeStatement(
169+
'SELECT id FROM oro_message_queue_job WHERE root_job_id = :rootJob and name = :name LIMIT 1;',
170+
['rootJob' => $rootJobId, 'name' => $jobName],
171+
['rootJob' => Types::INTEGER, 'name' => Types::STRING]
172+
);
173+
174+
if ($childJob) {
175+
return $childJob;
176+
}
164177

165178
$qb = $connection->createQueryBuilder();
166179
$qb
167-
->insert($tableName)
180+
->insert('oro_message_queue_job')
168181
->values([
169182
'name' => ':name',
170183
'status' => ':status',
@@ -178,7 +191,7 @@ private function insertJob(string $jobName): ?int
178191
'interrupted' => false,
179192
'unique' => false,
180193
'createdAt' => new \DateTime(),
181-
'rootJob' => $this->getRootJob(),
194+
'rootJob' => $rootJobId,
182195
], [
183196
'name' => Types::STRING,
184197
'status' => Types::STRING,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
<?php
2+
3+
namespace Oro\Bundle\AkeneoBundle\ImportExport\Writer;
4+
5+
use Doctrine\DBAL\Platforms\MySqlPlatform;
6+
use Doctrine\DBAL\Types\Types;
7+
use Oro\Bundle\AkeneoBundle\Async\Topics;
8+
use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager;
9+
use Oro\Bundle\BatchBundle\Entity\StepExecution;
10+
use Oro\Bundle\BatchBundle\Item\ItemWriterInterface;
11+
use Oro\Bundle\BatchBundle\Step\StepExecutionAwareInterface;
12+
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
13+
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
14+
use Oro\Bundle\MessageQueueBundle\Client\BufferedMessageProducer;
15+
use Oro\Bundle\MessageQueueBundle\Entity\Job;
16+
use Oro\Bundle\PlatformBundle\Manager\OptionalListenerManager;
17+
use Oro\Component\MessageQueue\Client\Message;
18+
use Oro\Component\MessageQueue\Client\MessagePriority;
19+
use Oro\Component\MessageQueue\Client\MessageProducerInterface;
20+
21+
class ConfigurableAsyncWriter implements
22+
ItemWriterInterface,
23+
StepExecutionAwareInterface
24+
{
25+
private const VARIANTS_BATCH_SIZE = 25;
26+
27+
/** @var MessageProducerInterface * */
28+
private $messageProducer;
29+
30+
/** @var StepExecution */
31+
private $stepExecution;
32+
33+
/** @var DoctrineHelper */
34+
private $doctrineHelper;
35+
36+
/** @var OptionalListenerManager */
37+
private $optionalListenerManager;
38+
39+
/** @var AdditionalOptionalListenerManager */
40+
private $additionalOptionalListenerManager;
41+
42+
private $variants = [];
43+
44+
public function __construct(
45+
MessageProducerInterface $messageProducer,
46+
DoctrineHelper $doctrineHelper,
47+
OptionalListenerManager $optionalListenerManager,
48+
AdditionalOptionalListenerManager $additionalOptionalListenerManager
49+
) {
50+
$this->messageProducer = $messageProducer;
51+
$this->doctrineHelper = $doctrineHelper;
52+
$this->optionalListenerManager = $optionalListenerManager;
53+
$this->additionalOptionalListenerManager = $additionalOptionalListenerManager;
54+
}
55+
56+
public function initialize()
57+
{
58+
$this->variants = [];
59+
60+
$this->additionalOptionalListenerManager->disableListeners();
61+
$this->optionalListenerManager->disableListeners($this->optionalListenerManager->getListeners());
62+
}
63+
64+
public function write(array $items)
65+
{
66+
foreach ($items as $item) {
67+
$sku = $item['sku'];
68+
69+
if (!empty($item['family_variant'])) {
70+
if (isset($item['parent'], $this->variants[$sku])) {
71+
$parent = $item['parent'];
72+
foreach (array_keys($this->variants[$sku]) as $sku) {
73+
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
74+
}
75+
}
76+
77+
return;
78+
}
79+
80+
if (empty($item['parent'])) {
81+
return;
82+
}
83+
84+
$parent = $item['parent'];
85+
86+
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
87+
}
88+
}
89+
90+
public function close()
91+
{
92+
$this->variants = [];
93+
94+
$this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners());
95+
$this->additionalOptionalListenerManager->enableListeners();
96+
}
97+
98+
public function flush()
99+
{
100+
$channelId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('channel');
101+
102+
$chunks = array_chunk($this->variants, self::VARIANTS_BATCH_SIZE, true);
103+
104+
foreach ($chunks as $key => $chunk) {
105+
$jobName = sprintf(
106+
'oro_integration:sync_integration:%s:variants:%s-%s',
107+
$channelId,
108+
self::VARIANTS_BATCH_SIZE * $key + 1,
109+
self::VARIANTS_BATCH_SIZE * $key + count($chunk)
110+
);
111+
112+
$jobId = $this->insertJob($jobName);
113+
if ($jobId && $this->createFieldsChanges($jobId, $chunk, 'variants')) {
114+
$this->sendMessage($channelId, $jobId);
115+
}
116+
}
117+
}
118+
119+
private function createFieldsChanges(int $jobId, array &$data, string $key): bool
120+
{
121+
$em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
122+
$fieldsChanges = $em
123+
->getRepository(FieldsChanges::class)
124+
->findOneBy(['entityId' => $jobId, 'entityClass' => Job::class]);
125+
if ($fieldsChanges) {
126+
return false;
127+
}
128+
129+
$fieldsChanges = new FieldsChanges([]);
130+
$fieldsChanges->setEntityClass(Job::class);
131+
$fieldsChanges->setEntityId($jobId);
132+
$fieldsChanges->setChangedFields([$key => $data]);
133+
$em->persist($fieldsChanges);
134+
$em->flush($fieldsChanges);
135+
$em->clear(FieldsChanges::class);
136+
137+
return true;
138+
}
139+
140+
private function sendMessage(int $channelId, int $jobId, bool $incrementedRead = false): void
141+
{
142+
$this->messageProducer->send(
143+
Topics::IMPORT_PRODUCTS,
144+
new Message(
145+
[
146+
'integrationId' => $channelId,
147+
'jobId' => $jobId,
148+
'connector' => 'configurable_product',
149+
'connector_parameters' => ['incremented_read' => $incrementedRead],
150+
],
151+
MessagePriority::HIGH
152+
)
153+
);
154+
155+
if ($this->messageProducer instanceof BufferedMessageProducer
156+
&& $this->messageProducer->isBufferingEnabled()) {
157+
$this->messageProducer->flushBuffer();
158+
}
159+
}
160+
161+
private function getRootJob(): ?int
162+
{
163+
$rootJobId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('rootJobId') ?? null;
164+
if (!$rootJobId) {
165+
throw new \InvalidArgumentException('Root job id is empty');
166+
}
167+
168+
return (int)$rootJobId;
169+
}
170+
171+
public function setStepExecution(StepExecution $stepExecution)
172+
{
173+
$this->stepExecution = $stepExecution;
174+
}
175+
176+
private function insertJob(string $jobName): ?int
177+
{
178+
$em = $this->doctrineHelper->getEntityManager(Job::class);
179+
$connection = $em->getConnection();
180+
$rootJobId = $this->getRootJob();
181+
182+
$hasRootJob = $connection
183+
->executeStatement(
184+
'SELECT 1 FROM oro_message_queue_job WHERE id = :id LIMIT 1;',
185+
['id' => $rootJobId],
186+
['id' => Types::INTEGER]
187+
);
188+
189+
if (!$hasRootJob) {
190+
throw new \InvalidArgumentException(sprintf('Root job "%d" missing', $rootJobId));
191+
}
192+
193+
$childJob = $connection
194+
->executeStatement(
195+
'SELECT id FROM oro_message_queue_job WHERE root_job_id = :rootJob and name = :name LIMIT 1;',
196+
['rootJob' => $rootJobId, 'name' => $jobName],
197+
['rootJob' => Types::INTEGER, 'name' => Types::STRING]
198+
);
199+
200+
if ($childJob) {
201+
return $childJob;
202+
}
203+
204+
$qb = $connection->createQueryBuilder();
205+
$qb
206+
->insert('oro_message_queue_job')
207+
->values([
208+
'name' => ':name',
209+
'status' => ':status',
210+
'interrupted' => ':interrupted',
211+
'created_at' => ':createdAt',
212+
'root_job_id' => ':rootJob',
213+
])
214+
->setParameters([
215+
'name' => $jobName,
216+
'status' => Job::STATUS_NEW,
217+
'interrupted' => false,
218+
'unique' => false,
219+
'createdAt' => new \DateTime(),
220+
'rootJob' => $rootJobId,
221+
], [
222+
'name' => Types::STRING,
223+
'status' => Types::STRING,
224+
'interrupted' => Types::BOOLEAN,
225+
'unique' => Types::BOOLEAN,
226+
'createdAt' => Types::DATETIME_MUTABLE,
227+
'rootJob' => Types::INTEGER,
228+
]);
229+
230+
if ($connection->getDatabasePlatform() instanceof MySqlPlatform) {
231+
$qb->setValue('`unique`', ':unique');
232+
} else {
233+
$qb->setValue('"unique"', ':unique');
234+
}
235+
236+
$qb->execute();
237+
238+
return $connection->lastInsertId();
239+
}
240+
}

‎Integration/AkeneoTransport.php

+57-17
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Oro\Bundle\AkeneoBundle\Integration\Iterator\AttributeFamilyIterator;
1111
use Oro\Bundle\AkeneoBundle\Integration\Iterator\AttributeIterator;
1212
use Oro\Bundle\AkeneoBundle\Integration\Iterator\BrandIterator;
13+
use Oro\Bundle\AkeneoBundle\Integration\Iterator\ConfigurableProductIterator;
1314
use Oro\Bundle\AkeneoBundle\Integration\Iterator\ProductIterator;
1415
use Oro\Bundle\AkeneoBundle\Settings\DataProvider\SyncProductsDataProvider;
1516
use Oro\Bundle\CurrencyBundle\Provider\CurrencyProviderInterface;
@@ -209,14 +210,14 @@ public function getProducts(int $pageSize)
209210
$this->initAttributesList();
210211
$this->initMeasureFamilies();
211212

212-
$searchFilters = $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter());
213+
$queryParams = [
214+
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
215+
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter()),
216+
];
213217

214218
if ($this->transportEntity->getSyncProducts() === SyncProductsDataProvider::PUBLISHED) {
215219
return new ProductIterator(
216-
$this->client->getPublishedProductApi()->all(
217-
$pageSize,
218-
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
219-
),
220+
$this->client->getPublishedProductApi()->all($pageSize, $queryParams),
220221
$this->client,
221222
$this->logger,
222223
$this->attributes,
@@ -227,10 +228,7 @@ public function getProducts(int $pageSize)
227228
}
228229

229230
return new ProductIterator(
230-
$this->client->getProductApi()->all(
231-
$pageSize,
232-
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
233-
),
231+
$this->client->getProductApi()->all($pageSize, $queryParams),
234232
$this->client,
235233
$this->logger,
236234
$this->attributes,
@@ -240,6 +238,33 @@ public function getProducts(int $pageSize)
240238
);
241239
}
242240

241+
public function getProductsList(int $pageSize): iterable
242+
{
243+
$this->initAttributesList();
244+
245+
$queryParams = [
246+
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
247+
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter()),
248+
'attributes' => key($this->attributes),
249+
];
250+
251+
if ($this->transportEntity->getSyncProducts() === SyncProductsDataProvider::PUBLISHED) {
252+
return new ConfigurableProductIterator(
253+
$this->client->getPublishedProductApi()->all($pageSize, $queryParams),
254+
$this->client,
255+
$this->logger,
256+
$this->getAttributeMapping()
257+
);
258+
}
259+
260+
return new ConfigurableProductIterator(
261+
$this->client->getProductApi()->all($pageSize, $queryParams),
262+
$this->client,
263+
$this->logger,
264+
$this->getAttributeMapping()
265+
);
266+
}
267+
243268
/**
244269
* @return \Iterator
245270
*/
@@ -249,16 +274,13 @@ public function getProductModels(int $pageSize)
249274
$this->initFamilyVariants();
250275
$this->initMeasureFamilies();
251276

252-
$searchFilters = $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter());
253-
if (isset($searchFilters['completeness'])) {
254-
unset($searchFilters['completeness']);
255-
}
277+
$queryParams = [
278+
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
279+
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getConfigurableProductFilter()),
280+
];
256281

257282
return new ProductIterator(
258-
$this->client->getProductModelApi()->all(
259-
$pageSize,
260-
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
261-
),
283+
$this->client->getProductModelApi()->all($pageSize, $queryParams),
262284
$this->client,
263285
$this->logger,
264286
$this->attributes,
@@ -268,6 +290,24 @@ public function getProductModels(int $pageSize)
268290
);
269291
}
270292

293+
public function getProductModelsList(int $pageSize): iterable
294+
{
295+
$this->initAttributesList();
296+
297+
$queryParams = [
298+
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
299+
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getConfigurableProductFilter()),
300+
'attributes' => key($this->attributes),
301+
];
302+
303+
return new ConfigurableProductIterator(
304+
$this->client->getProductModelApi()->all($pageSize, $queryParams),
305+
$this->client,
306+
$this->logger,
307+
$this->getAttributeMapping()
308+
);
309+
}
310+
271311
/**
272312
* {@inheritdoc}
273313
*/

‎Integration/AkeneoTransportInterface.php

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public function getProducts(int $pageSize);
5151
*/
5252
public function getProductModels(int $pageSize);
5353

54+
public function getProductsList(int $pageSize): iterable;
55+
56+
public function getProductModelsList(int $pageSize): iterable;
57+
5458
/**
5559
* @return \Iterator
5660
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?php
2+
3+
namespace Oro\Bundle\AkeneoBundle\Integration\Connector;
4+
5+
use Oro\Bundle\AkeneoBundle\Integration\AkeneoTransport;
6+
use Oro\Bundle\AkeneoBundle\Placeholder\SchemaUpdateFilter;
7+
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
8+
use Oro\Bundle\IntegrationBundle\Entity\Channel;
9+
use Oro\Bundle\IntegrationBundle\Provider\AbstractConnector;
10+
use Oro\Bundle\IntegrationBundle\Provider\AllowedConnectorInterface;
11+
use Oro\Bundle\IntegrationBundle\Provider\ConnectorInterface;
12+
use Oro\Bundle\ProductBundle\Entity\Product;
13+
14+
/**
15+
* Integration configurable product connector.
16+
*/
17+
class ConfigurableProductConnector extends AbstractConnector implements ConnectorInterface, AllowedConnectorInterface
18+
{
19+
use CacheProviderTrait;
20+
21+
const PAGE_SIZE = 100;
22+
23+
/** @var AkeneoTransport */
24+
protected $transport;
25+
26+
/** @var SchemaUpdateFilter */
27+
protected $schemaUpdateFilter;
28+
29+
public function getLabel(): string
30+
{
31+
return 'oro.akeneo.connector.configurable_product.label';
32+
}
33+
34+
public function getImportEntityFQCN()
35+
{
36+
return Product::class;
37+
}
38+
39+
public function getImportJobName()
40+
{
41+
return 'akeneo_configurable_product_import';
42+
}
43+
44+
public function getType()
45+
{
46+
return 'configurable_product';
47+
}
48+
49+
public function setSchemaUpdateFilter(SchemaUpdateFilter $schemaUpdateFilter): void
50+
{
51+
$this->schemaUpdateFilter = $schemaUpdateFilter;
52+
}
53+
54+
public function isAllowed(Channel $integration, array $processedConnectorsStatuses): bool
55+
{
56+
return $this->schemaUpdateFilter->isApplicable($integration, Product::class) === false;
57+
}
58+
59+
protected function getConnectorSource()
60+
{
61+
$variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
62+
if ($variants) {
63+
return new \ArrayIterator();
64+
}
65+
66+
$iterator = new \AppendIterator();
67+
$iterator->append($this->transport->getProductsList(self::PAGE_SIZE));
68+
$iterator->append($this->transport->getProductModelsList(self::PAGE_SIZE));
69+
70+
return $iterator;
71+
}
72+
}

‎Integration/Connector/ProductConnector.php

-5
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,6 @@ protected function getConnectorSource()
8181
return new \ArrayIterator();
8282
}
8383

84-
$variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
85-
if ($variants) {
86-
return new \ArrayIterator();
87-
}
88-
8984
$iterator = new \AppendIterator();
9085
$iterator->append($this->transport->getProducts(self::PAGE_SIZE));
9186
$iterator->append($this->transport->getProductModels(self::PAGE_SIZE));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace Oro\Bundle\AkeneoBundle\Integration\Iterator;
4+
5+
use Akeneo\Pim\ApiClient\AkeneoPimClientInterface;
6+
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
7+
use Psr\Log\LoggerInterface;
8+
9+
class ConfigurableProductIterator extends AbstractIterator
10+
{
11+
private $attributeMapping = [];
12+
13+
public function __construct(
14+
ResourceCursorInterface $resourceCursor,
15+
AkeneoPimClientInterface $client,
16+
LoggerInterface $logger,
17+
array $attributeMapping = []
18+
) {
19+
parent::__construct($resourceCursor, $client, $logger);
20+
21+
$this->attributeMapping = $attributeMapping;
22+
}
23+
24+
public function doCurrent()
25+
{
26+
$item = $this->resourceCursor->current();
27+
28+
$sku = $item['identifier'] ?? $item['code'];
29+
30+
if (array_key_exists('sku', $this->attributeMapping)) {
31+
if (!empty($item['values'][$this->attributeMapping['sku']][0]['data'])) {
32+
$sku = $item['values'][$this->attributeMapping['sku']][0]['data'];
33+
}
34+
}
35+
36+
return ['sku' => (string)$sku, 'parent' => $item['parent'] ?? null, 'family_variant' => $item['family_variant'] ?? null];
37+
}
38+
}

‎Job/Context/SimpleContextAggregator.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public function getAggregatedContext(JobExecution $jobExecution)
2525
$context,
2626
$this->contextRegistry->getByStepExecution($stepExecution)
2727
);
28-
//CUSTOMIZATION START
28+
// CUSTOMIZATION START
2929
$context->addErrors($stepExecution->getErrors());
30-
//CUSTOMIZATION END
30+
// CUSTOMIZATION END
3131
}
3232
}
3333

‎Migrations/Schema/OroAkeneoBundleInstaller.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class OroAkeneoBundleInstaller implements Installation, ExtendExtensionAwareInte
4747
*/
4848
public function getMigrationVersion()
4949
{
50-
return 'v1_15';
50+
return 'v1_16';
5151
}
5252

5353
/**
@@ -113,6 +113,7 @@ protected function updateIntegrationTransportTable(Schema $schema)
113113
$table->addColumn('akeneo_active_channel', 'string', ['notnull' => false, 'length' => 255]);
114114
$table->addColumn('akeneo_acl_voter_enabled', 'boolean', ['notnull' => false]);
115115
$table->addColumn('akeneo_product_filter', 'text', ['notnull' => false]);
116+
$table->addColumn('akeneo_conf_product_filter', 'text', ['notnull' => false]);
116117
$table->addColumn('akeneo_attributes_list', 'text', ['notnull' => false]);
117118
$table->addColumn('rootcategory_id', 'integer', ['notnull' => false]);
118119
$table->addColumn('pricelist_id', 'integer', ['notnull' => false]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
namespace Oro\Bundle\AkeneoBundle\Migrations\Schema\v1_16;
4+
5+
use Doctrine\DBAL\Schema\Schema;
6+
use Oro\Bundle\MigrationBundle\Migration\Migration;
7+
use Oro\Bundle\MigrationBundle\Migration\QueryBag;
8+
9+
class OroAkeneoMigration implements Migration
10+
{
11+
public function up(Schema $schema, QueryBag $queries)
12+
{
13+
$table = $schema->getTable('oro_integration_transport');
14+
$table->addColumn('akeneo_conf_product_filter', 'text', ['notnull' => false]);
15+
16+
$queries->addPostQuery('UPDATE oro_integration_transport SET akeneo_conf_product_filter = akeneo_product_filter ' .
17+
'WHERE akeneo_product_filter IS NOT NULL;');
18+
}
19+
}

‎Resources/config/batch_jobs.yml

+15-1
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,23 @@ connector:
9292
reader: oro_akeneo.importexport.reader.price
9393
processor: oro_akeneo.importexport.processor.import.product_price
9494
writer: oro_pricing.importexport.writer.product_price
95-
import_variants:
95+
96+
akeneo_configurable_product_import:
97+
title: "Configurable Product import from Akeneo"
98+
type: import
99+
steps:
100+
api:
96101
title: import
97102
class: Oro\Bundle\BatchBundle\Step\ItemStep
103+
services:
104+
reader: oro_akeneo.integration.connector.configurable_product
105+
processor: oro_akeneo.importexport.processor.async
106+
writer: oro_akeneo.importexport.writer.configurable_async_product
107+
parameters:
108+
batch_size: 25
109+
import_variants:
110+
title: import
111+
class: Oro\Bundle\AkeneoBundle\ImportExport\Step\ItemStep
98112
services:
99113
reader: oro_akeneo.importexport.reader.product_variant
100114
processor: oro_akeneo.importexport.processor.import.product_variant

‎Resources/config/importexport.yml

+25-7
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ services:
106106
oro_akeneo.importexport.processor.async:
107107
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Processor\AsyncProcessor'
108108
public: true
109-
calls:
110-
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
111109

112110
oro_akeneo.importexport.processor.category_parent:
113111
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Processor\CategoryParentProcessor'
@@ -202,6 +200,15 @@ services:
202200
calls:
203201
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
204202

203+
oro_akeneo.integration.connector.brand:
204+
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\BrandConnector'
205+
arguments:
206+
- '@oro_importexport.context_registry'
207+
- '@oro_integration.logger.strategy'
208+
- '@oro_integration.provider.connector_context_mediator'
209+
tags:
210+
- { name: oro_integration.connector, type: brand, channel_type: oro_akeneo }
211+
205212
oro_akeneo.integration.connector.product:
206213
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\ProductConnector'
207214
arguments:
@@ -214,14 +221,17 @@ services:
214221
tags:
215222
- { name: oro_integration.connector, type: product, channel_type: oro_akeneo }
216223

217-
oro_akeneo.integration.connector.brand:
218-
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\BrandConnector'
224+
oro_akeneo.integration.connector.configurable_product:
225+
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\ConfigurableProductConnector'
219226
arguments:
220227
- '@oro_importexport.context_registry'
221228
- '@oro_integration.logger.strategy'
222229
- '@oro_integration.provider.connector_context_mediator'
230+
calls:
231+
- [ setSchemaUpdateFilter, [ '@oro_akeneo.placeholder.schema_update_filter' ] ]
232+
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
223233
tags:
224-
- { name: oro_integration.connector, type: brand, channel_type: oro_akeneo }
234+
- { name: oro_integration.connector, type: configurable_product, channel_type: oro_akeneo }
225235

226236
oro_akeneo.importexport.data_converter.product:
227237
class: 'Oro\Bundle\AkeneoBundle\ImportExport\DataConverter\ProductDataConverter'
@@ -315,8 +325,16 @@ services:
315325
arguments:
316326
- '@oro_message_queue.message_producer'
317327
- '@oro_entity.doctrine_helper'
318-
calls:
319-
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
328+
- '@oro_platform.optional_listeners.manager'
329+
- '@oro_akeneo.event_listener.additional_optional_listeners_manager'
330+
331+
oro_akeneo.importexport.writer.configurable_async_product:
332+
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Writer\ConfigurableAsyncWriter'
333+
arguments:
334+
- '@oro_message_queue.message_producer'
335+
- '@oro_entity.doctrine_helper'
336+
- '@oro_platform.optional_listeners.manager'
337+
- '@oro_akeneo.event_listener.additional_optional_listeners_manager'
320338

321339
oro_akeneo.importexport.cache:
322340
parent: oro_cache.array_cache

‎Resources/translations/messages.en.yml

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ oro:
3838
akeneo_product_filter:
3939
label: 'Product Filter'
4040
tooltip: 'This field enables you to apply filters to sync only the products you want. As this filter is passed via API request, it must be filled in JSON format. Details on the format and filter options available for the products can be found in the <a target="_blank" href="https://api.akeneo.com/documentation/filter.html#filters">Filters section of the Akeneo PIM documentation</a>'
41+
akeneo_configurable_product_filter:
42+
label: 'Configurable Product Filter'
43+
tooltip: 'This field enables you to apply filters to sync only the configurable products you want. As this filter is passed via API request, it must be filled in JSON format. Details on the format and filter options available for the products can be found in the <a target="_blank" href="https://api.akeneo.com/documentation/filter.html#filters">Filters section of the Akeneo PIM documentation</a>'
4144
akeneo_attribute_list:
4245
label: 'Attribute Filter'
4346
tooltip: 'This field enables you to apply filters to sync only the attributes you want. Values must be attribute code, separated with a semi-colon. IMPORTANT: if not defined before to save the integration, all attributes will be imported.'
@@ -84,6 +87,8 @@ oro:
8487
label: Category connector
8588
product:
8689
label: Product connector
90+
configurable_product:
91+
label: Configurable product connector
8792
attribute_family:
8893
label: Attribute family connector
8994
attribute:

‎Resources/views/Form/fields.html.twig

+11
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,17 @@
184184
</div>
185185
</div>
186186

187+
<div class="control-group control-group-choice">
188+
<div class="control-label wrap">
189+
{{ UI.tooltip('oro.akeneo.integration.settings.akeneo_configurable_product_filter.tooltip'|trans, {}, 'right') }}
190+
{{ form_label(form.configurableProductFilter) }}
191+
</div>
192+
<div class="controls {% if (form_errors(form.configurableProductFilter)|length > 0) %} validation-error {% endif %}">
193+
{{ form_widget(form.configurableProductFilter) }}
194+
{{ form_errors(form.configurableProductFilter) }}
195+
</div>
196+
</div>
197+
187198
<div class="control-group control-group-choice">
188199
<div class="control-label wrap">
189200
{{ UI.tooltip('oro.akeneo.integration.settings.akeneo_attribute_list.tooltip'|trans, {}, 'right') }}

‎Validator/UniqueProductVariantLinksValidator.php

+7-10
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
namespace Oro\Bundle\AkeneoBundle\Validator;
44

5+
use Doctrine\ORM\PersistentCollection;
56
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
6-
use Oro\Bundle\ProductBundle\Entity\Product;
77
use Oro\Bundle\ProductBundle\Validator\Constraints\ConfigurableProductAccessorTrait;
88
use Symfony\Component\Validator\Constraint;
99
use Symfony\Component\Validator\ConstraintValidator;
@@ -40,17 +40,14 @@ public function validate($value, Constraint $constraint)
4040
return;
4141
}
4242
if (count($product->getVariantFields()) === 0) {
43-
return null;
43+
return;
4444
}
4545

46-
$uow = $this->doctrineHelper->getEntityManagerForClass(Product::class)->getUnitOfWork();
47-
$collections = array_merge($uow->getScheduledCollectionUpdates(), $uow->getScheduledCollectionDeletions());
48-
if (
49-
!in_array($value->getVariantLinks(), $collections)
50-
&& !in_array($value->getParentVariantLinks(), $collections)
51-
&& empty($uow->getEntityChangeSet($value)['variantFields'])
52-
) {
53-
return;
46+
$variantLinks = $value->getVariantLinks();
47+
if ($variantLinks instanceof PersistentCollection) {
48+
if ($variantLinks->isInitialized() && !$variantLinks->isDirty()) {
49+
return;
50+
}
5451
}
5552

5653
$this->validator->validate($value, $constraint);

0 commit comments

Comments
 (0)
Please sign in to comment.