Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oro/5.0.0 rc2 #21

Open
wants to merge 15 commits into
base: feature/update-5.0
Choose a base branch
from
Prev Previous commit
Next Next commit
AKM-34: Product filter breaks configurable products (oroinc#65)
AKM-31: Split products and product models filters
- do not allow duplicate jobs
dxops authored Apr 13, 2022
commit 54f69b758a82a47e0c1f9988d0e3395735267b2b
2 changes: 1 addition & 1 deletion Controller/ValidateConnectionController.php
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ public function validateConnectionAction(Request $request, Channel $channel = nu
'akeneoLocales' => $akeneoLocales,
'success' => $success,
'message' => $message,
'currencyList' => $this->currencyProvider->getCurrencies(),
'currencyList' => $this->currencyProvider->getCurrencyList(),
]
);
}
26 changes: 26 additions & 0 deletions Entity/AkeneoSettings.php
Original file line number Diff line number Diff line change
@@ -89,6 +89,12 @@ class AkeneoSettings extends Transport
* @ORM\Column(name="akeneo_product_filter", type="text", nullable=true)
*/
protected $productFilter;
/**
* @var string
*
* @ORM\Column(name="akeneo_conf_product_filter", type="text", nullable=true)
*/
protected $configurableProductFilter;
/**
* @var string
*
@@ -275,6 +281,26 @@ public function setProductFilter($productFilter)
return $this;
}

/**
* @return string
*/
public function getConfigurableProductFilter()
{
return $this->configurableProductFilter;
}

/**
* @param string $configurableProductFilter
*
* @return self
*/
public function setConfigurableProductFilter($configurableProductFilter)
{
$this->configurableProductFilter = $configurableProductFilter;

return $this;
}

/**
* @return ParameterBag
*/
2 changes: 2 additions & 0 deletions Form/Extension/ChannelTypeExtension.php
Original file line number Diff line number Diff line change
@@ -15,10 +15,12 @@ class ChannelTypeExtension extends AbstractTypeExtension
* @var array
*/
protected $connectorsOrder = [
'brand',
'category',
'attribute',
'attribute_family',
'product',
'configurable_product',
];

/**
9 changes: 9 additions & 0 deletions Form/Type/AkeneoSettingsType.php
Original file line number Diff line number Diff line change
@@ -249,6 +249,15 @@ public function buildForm(FormBuilderInterface $builder, array $options)
],
]
)
->add(
'configurableProductFilter',
TextareaType::class,
[
'required' => false,
'label' => 'oro.akeneo.integration.settings.akeneo_configurable_product_filter.label',
'constraints' => [new JsonConstraint()],
]
)
->add(
'priceList',
PriceListSelectType::class,
43 changes: 0 additions & 43 deletions ImportExport/Processor/AsyncProcessor.php
Original file line number Diff line number Diff line change
@@ -6,51 +6,8 @@

class AsyncProcessor implements ProcessorInterface
{
use CacheProviderAwareProcessor;

/** @var array */
private $variants = [];

public function process($item)
{
$this->updateVariants($item);

return $item;
}

private function updateVariants(array &$item)
{
$sku = $item['sku'];

if (!empty($item['family_variant'])) {
if (isset($item['parent'], $this->variants[$sku])) {
$parent = $item['parent'];
foreach (array_keys($this->variants[$sku]) as $sku) {
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
}
}

return;
}

if (empty($item['parent'])) {
return;
}

$parent = $item['parent'];

$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
}

public function initialize()
{
$this->variants = [];
$this->cacheProvider->delete('product_variants');
}

public function flush()
{
$this->cacheProvider->save('product_variants', $this->variants);
$this->variants = [];
}
}
99 changes: 56 additions & 43 deletions ImportExport/Writer/AsyncWriter.php
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Types\Types;
use Oro\Bundle\AkeneoBundle\Async\Topics;
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager;
use Oro\Bundle\BatchBundle\Entity\StepExecution;
use Oro\Bundle\BatchBundle\Item\ItemWriterInterface;
use Oro\Bundle\BatchBundle\Item\Support\ClosableInterface;
@@ -14,6 +14,7 @@
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\MessageQueueBundle\Client\BufferedMessageProducer;
use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Oro\Bundle\PlatformBundle\Manager\OptionalListenerManager;
use Oro\Component\MessageQueue\Client\Message;
use Oro\Component\MessageQueue\Client\MessagePriority;
use Oro\Component\MessageQueue\Client\MessageProducerInterface;
@@ -23,10 +24,6 @@ class AsyncWriter implements
ClosableInterface,
StepExecutionAwareInterface
{
use CacheProviderTrait;

private const VARIANTS_BATCH_SIZE = 25;

/** @var MessageProducerInterface * */
private $messageProducer;

@@ -39,17 +36,30 @@ class AsyncWriter implements
/** @var DoctrineHelper */
private $doctrineHelper;

/** @var OptionalListenerManager */
private $optionalListenerManager;

/** @var AdditionalOptionalListenerManager */
private $additionalOptionalListenerManager;

public function __construct(
MessageProducerInterface $messageProducer,
DoctrineHelper $doctrineHelper
DoctrineHelper $doctrineHelper,
OptionalListenerManager $optionalListenerManager,
AdditionalOptionalListenerManager $additionalOptionalListenerManager
) {
$this->messageProducer = $messageProducer;
$this->doctrineHelper = $doctrineHelper;
$this->optionalListenerManager = $optionalListenerManager;
$this->additionalOptionalListenerManager = $additionalOptionalListenerManager;
}

public function initialize()
{
$this->size = 0;

$this->additionalOptionalListenerManager->disableListeners();
$this->optionalListenerManager->disableListeners($this->optionalListenerManager->getListeners());
}

public function write(array $items)
@@ -67,52 +77,30 @@ public function write(array $items)
$this->stepExecution->setWriteCount($this->size);

$jobId = $this->insertJob($jobName);
$this->createFieldsChanges($jobId, $items, 'items');
$this->sendMessage($channelId, $jobId, true);
}

public function flush()
{
$this->size = 0;

$variants = $this->cacheProvider->fetch('product_variants') ?? [];
if (!$variants) {
return;
}

$channelId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('channel');

$chunks = array_chunk($variants, self::VARIANTS_BATCH_SIZE, true);

foreach ($chunks as $key => $chunk) {
$jobName = sprintf(
'oro_integration:sync_integration:%s:variants:%s-%s',
$channelId,
self::VARIANTS_BATCH_SIZE * $key + 1,
self::VARIANTS_BATCH_SIZE * $key + count($chunk)
);

$jobId = $this->insertJob($jobName);
$this->createFieldsChanges($jobId, $chunk, 'variants');
$this->sendMessage($channelId, $jobId);
if ($jobId && $this->createFieldsChanges($jobId, $items, 'items')) {
$this->sendMessage($channelId, $jobId, true);
}
}

private function createFieldsChanges(int $jobId, array &$data, string $key): void
private function createFieldsChanges(int $jobId, array &$data, string $key): bool
{
$em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
$fieldsChanges = $em
->getRepository(FieldsChanges::class)
->findOneBy(['entityId' => $jobId, 'entityClass' => Job::class]);
if (!$fieldsChanges) {
$fieldsChanges = new FieldsChanges([]);
$fieldsChanges->setEntityClass(Job::class);
$fieldsChanges->setEntityId($jobId);
if ($fieldsChanges) {
return false;
}

$fieldsChanges = new FieldsChanges([]);
$fieldsChanges->setEntityClass(Job::class);
$fieldsChanges->setEntityId($jobId);
$fieldsChanges->setChangedFields([$key => $data]);
$em->persist($fieldsChanges);
$em->flush($fieldsChanges);
$em->clear(FieldsChanges::class);

return true;
}

private function sendMessage(int $channelId, int $jobId, bool $incrementedRead = false): void
@@ -143,12 +131,15 @@ private function getRootJob(): ?int
throw new \InvalidArgumentException('Root job id is empty');
}

return $rootJobId;
return (int)$rootJobId;
}

public function close()
{
$this->size = 0;

$this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners());
$this->additionalOptionalListenerManager->enableListeners();
}

public function setStepExecution(StepExecution $stepExecution)
@@ -159,12 +150,34 @@ public function setStepExecution(StepExecution $stepExecution)
private function insertJob(string $jobName): ?int
{
$em = $this->doctrineHelper->getEntityManager(Job::class);
$tableName = $em->getClassMetadata(Job::class)->getTableName();
$connection = $em->getConnection();
$rootJobId = $this->getRootJob();

$hasRootJob = $connection
->executeStatement(
'SELECT 1 FROM oro_message_queue_job WHERE id = :id LIMIT 1;',
['id' => $rootJobId],
['id' => Types::INTEGER]
);

if (!$hasRootJob) {
throw new \InvalidArgumentException(sprintf('Root job "%d" missing', $rootJobId));
}

$childJob = $connection
->executeStatement(
'SELECT id FROM oro_message_queue_job WHERE root_job_id = :rootJob and name = :name LIMIT 1;',
['rootJob' => $rootJobId, 'name' => $jobName],
['rootJob' => Types::INTEGER, 'name' => Types::STRING]
);

if ($childJob) {
return $childJob;
}

$qb = $connection->createQueryBuilder();
$qb
->insert($tableName)
->insert('oro_message_queue_job')
->values([
'name' => ':name',
'status' => ':status',
@@ -178,7 +191,7 @@ private function insertJob(string $jobName): ?int
'interrupted' => false,
'unique' => false,
'createdAt' => new \DateTime(),
'rootJob' => $this->getRootJob(),
'rootJob' => $rootJobId,
], [
'name' => Types::STRING,
'status' => Types::STRING,
240 changes: 240 additions & 0 deletions ImportExport/Writer/ConfigurableAsyncWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
<?php

namespace Oro\Bundle\AkeneoBundle\ImportExport\Writer;

use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Types\Types;
use Oro\Bundle\AkeneoBundle\Async\Topics;
use Oro\Bundle\AkeneoBundle\EventListener\AdditionalOptionalListenerManager;
use Oro\Bundle\BatchBundle\Entity\StepExecution;
use Oro\Bundle\BatchBundle\Item\ItemWriterInterface;
use Oro\Bundle\BatchBundle\Step\StepExecutionAwareInterface;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\IntegrationBundle\Entity\FieldsChanges;
use Oro\Bundle\MessageQueueBundle\Client\BufferedMessageProducer;
use Oro\Bundle\MessageQueueBundle\Entity\Job;
use Oro\Bundle\PlatformBundle\Manager\OptionalListenerManager;
use Oro\Component\MessageQueue\Client\Message;
use Oro\Component\MessageQueue\Client\MessagePriority;
use Oro\Component\MessageQueue\Client\MessageProducerInterface;

class ConfigurableAsyncWriter implements
ItemWriterInterface,
StepExecutionAwareInterface
{
private const VARIANTS_BATCH_SIZE = 25;

/** @var MessageProducerInterface * */
private $messageProducer;

/** @var StepExecution */
private $stepExecution;

/** @var DoctrineHelper */
private $doctrineHelper;

/** @var OptionalListenerManager */
private $optionalListenerManager;

/** @var AdditionalOptionalListenerManager */
private $additionalOptionalListenerManager;

private $variants = [];

public function __construct(
MessageProducerInterface $messageProducer,
DoctrineHelper $doctrineHelper,
OptionalListenerManager $optionalListenerManager,
AdditionalOptionalListenerManager $additionalOptionalListenerManager
) {
$this->messageProducer = $messageProducer;
$this->doctrineHelper = $doctrineHelper;
$this->optionalListenerManager = $optionalListenerManager;
$this->additionalOptionalListenerManager = $additionalOptionalListenerManager;
}

public function initialize()
{
$this->variants = [];

$this->additionalOptionalListenerManager->disableListeners();
$this->optionalListenerManager->disableListeners($this->optionalListenerManager->getListeners());
}

public function write(array $items)
{
foreach ($items as $item) {
$sku = $item['sku'];

if (!empty($item['family_variant'])) {
if (isset($item['parent'], $this->variants[$sku])) {
$parent = $item['parent'];
foreach (array_keys($this->variants[$sku]) as $sku) {
$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
}
}

return;
}

if (empty($item['parent'])) {
return;
}

$parent = $item['parent'];

$this->variants[$parent][$sku] = ['parent' => $parent, 'variant' => $sku];
}
}

public function close()
{
$this->variants = [];

$this->optionalListenerManager->enableListeners($this->optionalListenerManager->getListeners());
$this->additionalOptionalListenerManager->enableListeners();
}

public function flush()
{
$channelId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('channel');

$chunks = array_chunk($this->variants, self::VARIANTS_BATCH_SIZE, true);

foreach ($chunks as $key => $chunk) {
$jobName = sprintf(
'oro_integration:sync_integration:%s:variants:%s-%s',
$channelId,
self::VARIANTS_BATCH_SIZE * $key + 1,
self::VARIANTS_BATCH_SIZE * $key + count($chunk)
);

$jobId = $this->insertJob($jobName);
if ($jobId && $this->createFieldsChanges($jobId, $chunk, 'variants')) {
$this->sendMessage($channelId, $jobId);
}
}
}

private function createFieldsChanges(int $jobId, array &$data, string $key): bool
{
$em = $this->doctrineHelper->getEntityManager(FieldsChanges::class);
$fieldsChanges = $em
->getRepository(FieldsChanges::class)
->findOneBy(['entityId' => $jobId, 'entityClass' => Job::class]);
if ($fieldsChanges) {
return false;
}

$fieldsChanges = new FieldsChanges([]);
$fieldsChanges->setEntityClass(Job::class);
$fieldsChanges->setEntityId($jobId);
$fieldsChanges->setChangedFields([$key => $data]);
$em->persist($fieldsChanges);
$em->flush($fieldsChanges);
$em->clear(FieldsChanges::class);

return true;
}

private function sendMessage(int $channelId, int $jobId, bool $incrementedRead = false): void
{
$this->messageProducer->send(
Topics::IMPORT_PRODUCTS,
new Message(
[
'integrationId' => $channelId,
'jobId' => $jobId,
'connector' => 'configurable_product',
'connector_parameters' => ['incremented_read' => $incrementedRead],
],
MessagePriority::HIGH
)
);

if ($this->messageProducer instanceof BufferedMessageProducer
&& $this->messageProducer->isBufferingEnabled()) {
$this->messageProducer->flushBuffer();
}
}

private function getRootJob(): ?int
{
$rootJobId = $this->stepExecution->getJobExecution()->getExecutionContext()->get('rootJobId') ?? null;
if (!$rootJobId) {
throw new \InvalidArgumentException('Root job id is empty');
}

return (int)$rootJobId;
}

public function setStepExecution(StepExecution $stepExecution)
{
$this->stepExecution = $stepExecution;
}

private function insertJob(string $jobName): ?int
{
$em = $this->doctrineHelper->getEntityManager(Job::class);
$connection = $em->getConnection();
$rootJobId = $this->getRootJob();

$hasRootJob = $connection
->executeStatement(
'SELECT 1 FROM oro_message_queue_job WHERE id = :id LIMIT 1;',
['id' => $rootJobId],
['id' => Types::INTEGER]
);

if (!$hasRootJob) {
throw new \InvalidArgumentException(sprintf('Root job "%d" missing', $rootJobId));
}

$childJob = $connection
->executeStatement(
'SELECT id FROM oro_message_queue_job WHERE root_job_id = :rootJob and name = :name LIMIT 1;',
['rootJob' => $rootJobId, 'name' => $jobName],
['rootJob' => Types::INTEGER, 'name' => Types::STRING]
);

if ($childJob) {
return $childJob;
}

$qb = $connection->createQueryBuilder();
$qb
->insert('oro_message_queue_job')
->values([
'name' => ':name',
'status' => ':status',
'interrupted' => ':interrupted',
'created_at' => ':createdAt',
'root_job_id' => ':rootJob',
])
->setParameters([
'name' => $jobName,
'status' => Job::STATUS_NEW,
'interrupted' => false,
'unique' => false,
'createdAt' => new \DateTime(),
'rootJob' => $rootJobId,
], [
'name' => Types::STRING,
'status' => Types::STRING,
'interrupted' => Types::BOOLEAN,
'unique' => Types::BOOLEAN,
'createdAt' => Types::DATETIME_MUTABLE,
'rootJob' => Types::INTEGER,
]);

if ($connection->getDatabasePlatform() instanceof MySqlPlatform) {
$qb->setValue('`unique`', ':unique');
} else {
$qb->setValue('"unique"', ':unique');
}

$qb->execute();

return $connection->lastInsertId();
}
}
74 changes: 57 additions & 17 deletions Integration/AkeneoTransport.php
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
use Oro\Bundle\AkeneoBundle\Integration\Iterator\AttributeFamilyIterator;
use Oro\Bundle\AkeneoBundle\Integration\Iterator\AttributeIterator;
use Oro\Bundle\AkeneoBundle\Integration\Iterator\BrandIterator;
use Oro\Bundle\AkeneoBundle\Integration\Iterator\ConfigurableProductIterator;
use Oro\Bundle\AkeneoBundle\Integration\Iterator\ProductIterator;
use Oro\Bundle\AkeneoBundle\Settings\DataProvider\SyncProductsDataProvider;
use Oro\Bundle\CurrencyBundle\Provider\CurrencyProviderInterface;
@@ -209,14 +210,14 @@ public function getProducts(int $pageSize)
$this->initAttributesList();
$this->initMeasureFamilies();

$searchFilters = $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter());
$queryParams = [
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter()),
];

if ($this->transportEntity->getSyncProducts() === SyncProductsDataProvider::PUBLISHED) {
return new ProductIterator(
$this->client->getPublishedProductApi()->all(
$pageSize,
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
),
$this->client->getPublishedProductApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->attributes,
@@ -227,10 +228,7 @@ public function getProducts(int $pageSize)
}

return new ProductIterator(
$this->client->getProductApi()->all(
$pageSize,
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
),
$this->client->getProductApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->attributes,
@@ -240,6 +238,33 @@ public function getProducts(int $pageSize)
);
}

public function getProductsList(int $pageSize): iterable
{
$this->initAttributesList();

$queryParams = [
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter()),
'attributes' => key($this->attributes),
];

if ($this->transportEntity->getSyncProducts() === SyncProductsDataProvider::PUBLISHED) {
return new ConfigurableProductIterator(
$this->client->getPublishedProductApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->getAttributeMapping()
);
}

return new ConfigurableProductIterator(
$this->client->getProductApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->getAttributeMapping()
);
}

/**
* @return \Iterator
*/
@@ -249,16 +274,13 @@ public function getProductModels(int $pageSize)
$this->initFamilyVariants();
$this->initMeasureFamilies();

$searchFilters = $this->akeneoSearchBuilder->getFilters($this->transportEntity->getProductFilter());
if (isset($searchFilters['completeness'])) {
unset($searchFilters['completeness']);
}
$queryParams = [
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getConfigurableProductFilter()),
];

return new ProductIterator(
$this->client->getProductModelApi()->all(
$pageSize,
['search' => $searchFilters, 'scope' => $this->transportEntity->getAkeneoActiveChannel()]
),
$this->client->getProductModelApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->attributes,
@@ -268,6 +290,24 @@ public function getProductModels(int $pageSize)
);
}

public function getProductModelsList(int $pageSize): iterable
{
$this->initAttributesList();

$queryParams = [
'scope' => $this->transportEntity->getAkeneoActiveChannel(),
'search' => $this->akeneoSearchBuilder->getFilters($this->transportEntity->getConfigurableProductFilter()),
'attributes' => key($this->attributes),
];

return new ConfigurableProductIterator(
$this->client->getProductModelApi()->all($pageSize, $queryParams),
$this->client,
$this->logger,
$this->getAttributeMapping()
);
}

/**
* {@inheritdoc}
*/
4 changes: 4 additions & 0 deletions Integration/AkeneoTransportInterface.php
Original file line number Diff line number Diff line change
@@ -51,6 +51,10 @@ public function getProducts(int $pageSize);
*/
public function getProductModels(int $pageSize);

public function getProductsList(int $pageSize): iterable;

public function getProductModelsList(int $pageSize): iterable;

/**
* @return \Iterator
*/
72 changes: 72 additions & 0 deletions Integration/Connector/ConfigurableProductConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

namespace Oro\Bundle\AkeneoBundle\Integration\Connector;

use Oro\Bundle\AkeneoBundle\Integration\AkeneoTransport;
use Oro\Bundle\AkeneoBundle\Placeholder\SchemaUpdateFilter;
use Oro\Bundle\AkeneoBundle\Tools\CacheProviderTrait;
use Oro\Bundle\IntegrationBundle\Entity\Channel;
use Oro\Bundle\IntegrationBundle\Provider\AbstractConnector;
use Oro\Bundle\IntegrationBundle\Provider\AllowedConnectorInterface;
use Oro\Bundle\IntegrationBundle\Provider\ConnectorInterface;
use Oro\Bundle\ProductBundle\Entity\Product;

/**
* Integration configurable product connector.
*/
class ConfigurableProductConnector extends AbstractConnector implements ConnectorInterface, AllowedConnectorInterface
{
use CacheProviderTrait;

const PAGE_SIZE = 100;

/** @var AkeneoTransport */
protected $transport;

/** @var SchemaUpdateFilter */
protected $schemaUpdateFilter;

public function getLabel(): string
{
return 'oro.akeneo.connector.configurable_product.label';
}

public function getImportEntityFQCN()
{
return Product::class;
}

public function getImportJobName()
{
return 'akeneo_configurable_product_import';
}

public function getType()
{
return 'configurable_product';
}

public function setSchemaUpdateFilter(SchemaUpdateFilter $schemaUpdateFilter): void
{
$this->schemaUpdateFilter = $schemaUpdateFilter;
}

public function isAllowed(Channel $integration, array $processedConnectorsStatuses): bool
{
return $this->schemaUpdateFilter->isApplicable($integration, Product::class) === false;
}

protected function getConnectorSource()
{
$variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
if ($variants) {
return new \ArrayIterator();
}

$iterator = new \AppendIterator();
$iterator->append($this->transport->getProductsList(self::PAGE_SIZE));
$iterator->append($this->transport->getProductModelsList(self::PAGE_SIZE));

return $iterator;
}
}
5 changes: 0 additions & 5 deletions Integration/Connector/ProductConnector.php
Original file line number Diff line number Diff line change
@@ -81,11 +81,6 @@ protected function getConnectorSource()
return new \ArrayIterator();
}

$variants = $this->cacheProvider->fetch('akeneo')['variants'] ?? [];
if ($variants) {
return new \ArrayIterator();
}

$iterator = new \AppendIterator();
$iterator->append($this->transport->getProducts(self::PAGE_SIZE));
$iterator->append($this->transport->getProductModels(self::PAGE_SIZE));
38 changes: 38 additions & 0 deletions Integration/Iterator/ConfigurableProductIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Oro\Bundle\AkeneoBundle\Integration\Iterator;

use Akeneo\Pim\ApiClient\AkeneoPimClientInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Psr\Log\LoggerInterface;

class ConfigurableProductIterator extends AbstractIterator
{
private $attributeMapping = [];

public function __construct(
ResourceCursorInterface $resourceCursor,
AkeneoPimClientInterface $client,
LoggerInterface $logger,
array $attributeMapping = []
) {
parent::__construct($resourceCursor, $client, $logger);

$this->attributeMapping = $attributeMapping;
}

public function doCurrent()
{
$item = $this->resourceCursor->current();

$sku = $item['identifier'] ?? $item['code'];

if (array_key_exists('sku', $this->attributeMapping)) {
if (!empty($item['values'][$this->attributeMapping['sku']][0]['data'])) {
$sku = $item['values'][$this->attributeMapping['sku']][0]['data'];
}
}

return ['sku' => (string)$sku, 'parent' => $item['parent'] ?? null, 'family_variant' => $item['family_variant'] ?? null];
}
}
4 changes: 2 additions & 2 deletions Job/Context/SimpleContextAggregator.php
Original file line number Diff line number Diff line change
@@ -25,9 +25,9 @@ public function getAggregatedContext(JobExecution $jobExecution)
$context,
$this->contextRegistry->getByStepExecution($stepExecution)
);
//CUSTOMIZATION START
// CUSTOMIZATION START
$context->addErrors($stepExecution->getErrors());
//CUSTOMIZATION END
// CUSTOMIZATION END
}
}

3 changes: 2 additions & 1 deletion Migrations/Schema/OroAkeneoBundleInstaller.php
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ class OroAkeneoBundleInstaller implements Installation, ExtendExtensionAwareInte
*/
public function getMigrationVersion()
{
return 'v1_15';
return 'v1_16';
}

/**
@@ -113,6 +113,7 @@ protected function updateIntegrationTransportTable(Schema $schema)
$table->addColumn('akeneo_active_channel', 'string', ['notnull' => false, 'length' => 255]);
$table->addColumn('akeneo_acl_voter_enabled', 'boolean', ['notnull' => false]);
$table->addColumn('akeneo_product_filter', 'text', ['notnull' => false]);
$table->addColumn('akeneo_conf_product_filter', 'text', ['notnull' => false]);
$table->addColumn('akeneo_attributes_list', 'text', ['notnull' => false]);
$table->addColumn('rootcategory_id', 'integer', ['notnull' => false]);
$table->addColumn('pricelist_id', 'integer', ['notnull' => false]);
19 changes: 19 additions & 0 deletions Migrations/Schema/v1_16/OroAkeneoMigration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Oro\Bundle\AkeneoBundle\Migrations\Schema\v1_16;

use Doctrine\DBAL\Schema\Schema;
use Oro\Bundle\MigrationBundle\Migration\Migration;
use Oro\Bundle\MigrationBundle\Migration\QueryBag;

class OroAkeneoMigration implements Migration
{
public function up(Schema $schema, QueryBag $queries)
{
$table = $schema->getTable('oro_integration_transport');
$table->addColumn('akeneo_conf_product_filter', 'text', ['notnull' => false]);

$queries->addPostQuery('UPDATE oro_integration_transport SET akeneo_conf_product_filter = akeneo_product_filter ' .
'WHERE akeneo_product_filter IS NOT NULL;');
}
}
16 changes: 15 additions & 1 deletion Resources/config/batch_jobs.yml
Original file line number Diff line number Diff line change
@@ -92,9 +92,23 @@ connector:
reader: oro_akeneo.importexport.reader.price
processor: oro_akeneo.importexport.processor.import.product_price
writer: oro_pricing.importexport.writer.product_price
import_variants:

akeneo_configurable_product_import:
title: "Configurable Product import from Akeneo"
type: import
steps:
api:
title: import
class: Oro\Bundle\BatchBundle\Step\ItemStep
services:
reader: oro_akeneo.integration.connector.configurable_product
processor: oro_akeneo.importexport.processor.async
writer: oro_akeneo.importexport.writer.configurable_async_product
parameters:
batch_size: 25
import_variants:
title: import
class: Oro\Bundle\AkeneoBundle\ImportExport\Step\ItemStep
services:
reader: oro_akeneo.importexport.reader.product_variant
processor: oro_akeneo.importexport.processor.import.product_variant
32 changes: 25 additions & 7 deletions Resources/config/importexport.yml
Original file line number Diff line number Diff line change
@@ -106,8 +106,6 @@ services:
oro_akeneo.importexport.processor.async:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Processor\AsyncProcessor'
public: true
calls:
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]

oro_akeneo.importexport.processor.category_parent:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Processor\CategoryParentProcessor'
@@ -202,6 +200,15 @@ services:
calls:
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]

oro_akeneo.integration.connector.brand:
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\BrandConnector'
arguments:
- '@oro_importexport.context_registry'
- '@oro_integration.logger.strategy'
- '@oro_integration.provider.connector_context_mediator'
tags:
- { name: oro_integration.connector, type: brand, channel_type: oro_akeneo }

oro_akeneo.integration.connector.product:
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\ProductConnector'
arguments:
@@ -214,14 +221,17 @@ services:
tags:
- { name: oro_integration.connector, type: product, channel_type: oro_akeneo }

oro_akeneo.integration.connector.brand:
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\BrandConnector'
oro_akeneo.integration.connector.configurable_product:
class: 'Oro\Bundle\AkeneoBundle\Integration\Connector\ConfigurableProductConnector'
arguments:
- '@oro_importexport.context_registry'
- '@oro_integration.logger.strategy'
- '@oro_integration.provider.connector_context_mediator'
calls:
- [ setSchemaUpdateFilter, [ '@oro_akeneo.placeholder.schema_update_filter' ] ]
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
tags:
- { name: oro_integration.connector, type: brand, channel_type: oro_akeneo }
- { name: oro_integration.connector, type: configurable_product, channel_type: oro_akeneo }

oro_akeneo.importexport.data_converter.product:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\DataConverter\ProductDataConverter'
@@ -315,8 +325,16 @@ services:
arguments:
- '@oro_message_queue.message_producer'
- '@oro_entity.doctrine_helper'
calls:
- [ setCacheProvider, [ '@oro_akeneo.importexport.cache' ] ]
- '@oro_platform.optional_listeners.manager'
- '@oro_akeneo.event_listener.additional_optional_listeners_manager'

oro_akeneo.importexport.writer.configurable_async_product:
class: 'Oro\Bundle\AkeneoBundle\ImportExport\Writer\ConfigurableAsyncWriter'
arguments:
- '@oro_message_queue.message_producer'
- '@oro_entity.doctrine_helper'
- '@oro_platform.optional_listeners.manager'
- '@oro_akeneo.event_listener.additional_optional_listeners_manager'

oro_akeneo.importexport.cache:
parent: oro_cache.array_cache
5 changes: 5 additions & 0 deletions Resources/translations/messages.en.yml
Original file line number Diff line number Diff line change
@@ -38,6 +38,9 @@ oro:
akeneo_product_filter:
label: 'Product Filter'
tooltip: 'This field enables you to apply filters to sync only the products you want. As this filter is passed via API request, it must be filled in JSON format. Details on the format and filter options available for the products can be found in the <a target="_blank" href="https://api.akeneo.com/documentation/filter.html#filters">Filters section of the Akeneo PIM documentation</a>'
akeneo_configurable_product_filter:
label: 'Configurable Product Filter'
tooltip: 'This field enables you to apply filters to sync only the configurable products you want. As this filter is passed via API request, it must be filled in JSON format. Details on the format and filter options available for the products can be found in the <a target="_blank" href="https://api.akeneo.com/documentation/filter.html#filters">Filters section of the Akeneo PIM documentation</a>'
akeneo_attribute_list:
label: 'Attribute Filter'
tooltip: 'This field enables you to apply filters to sync only the attributes you want. Values must be attribute code, separated with a semi-colon. IMPORTANT: if not defined before to save the integration, all attributes will be imported.'
@@ -84,6 +87,8 @@ oro:
label: Category connector
product:
label: Product connector
configurable_product:
label: Configurable product connector
attribute_family:
label: Attribute family connector
attribute:
11 changes: 11 additions & 0 deletions Resources/views/Form/fields.html.twig
Original file line number Diff line number Diff line change
@@ -184,6 +184,17 @@
</div>
</div>

<div class="control-group control-group-choice">
<div class="control-label wrap">
{{ UI.tooltip('oro.akeneo.integration.settings.akeneo_configurable_product_filter.tooltip'|trans, {}, 'right') }}
{{ form_label(form.configurableProductFilter) }}
</div>
<div class="controls {% if (form_errors(form.configurableProductFilter)|length > 0) %} validation-error {% endif %}">
{{ form_widget(form.configurableProductFilter) }}
{{ form_errors(form.configurableProductFilter) }}
</div>
</div>

<div class="control-group control-group-choice">
<div class="control-label wrap">
{{ UI.tooltip('oro.akeneo.integration.settings.akeneo_attribute_list.tooltip'|trans, {}, 'right') }}
17 changes: 7 additions & 10 deletions Validator/UniqueProductVariantLinksValidator.php
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@

namespace Oro\Bundle\AkeneoBundle\Validator;

use Doctrine\ORM\PersistentCollection;
use Oro\Bundle\EntityBundle\ORM\DoctrineHelper;
use Oro\Bundle\ProductBundle\Entity\Product;
use Oro\Bundle\ProductBundle\Validator\Constraints\ConfigurableProductAccessorTrait;
use Symfony\Component\Validator\Constraint;
use Symfony\Component\Validator\ConstraintValidator;
@@ -40,17 +40,14 @@ public function validate($value, Constraint $constraint)
return;
}
if (count($product->getVariantFields()) === 0) {
return null;
return;
}

$uow = $this->doctrineHelper->getEntityManagerForClass(Product::class)->getUnitOfWork();
$collections = array_merge($uow->getScheduledCollectionUpdates(), $uow->getScheduledCollectionDeletions());
if (
!in_array($value->getVariantLinks(), $collections)
&& !in_array($value->getParentVariantLinks(), $collections)
&& empty($uow->getEntityChangeSet($value)['variantFields'])
) {
return;
$variantLinks = $value->getVariantLinks();
if ($variantLinks instanceof PersistentCollection) {
if ($variantLinks->isInitialized() && !$variantLinks->isDirty()) {
return;
}
}

$this->validator->validate($value, $constraint);