Create an additional import process

Sometimes we need additional imports beside the given. In this example we create an own import pipeline, which observes the import directory for files and executes the import without writing any PHP code, but reusing existing executors.

You’ll find all this code also in this example module.

Create an own module

First of all we need to introduce a custom module. Please refer to Create a New Module in Magento’s developer documentation.

In this example we name the module MyModule_ProductStatusUpdate. The main purpose of our module will be to observer the import directory for a file with a specific name pattern. If this file is given we will introduce an import with the M2IF library.

mkdir -p app/code/MyModule/ProductStatusUpdate/etc
touch app/code/MyModule/ProductStatusUpdate/etc/module.xml
touch app/code/MyModule/ProductStatusUpdate/registration.php
Content of app/code/MyModule/ProductStatusUpdate/registration.php
<?php
\Magento\Framework\Component\ComponentRegistrar::register(
    \Magento\Framework\Component\ComponentRegistrar::MODULE,
    'MyModule_ProductStatusUpdate',
    __DIR__
);
Content of app/code/MyModule/ProductStatusUpdate/etc/module.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
    <module name="MyModule_ProductStatusUpdate" setup_version="1.0.0">
        <sequence>
            <module name="TechDivision_PacemakerImportBase"/>
        </sequence>
    </module>
</config>

Since we will use components from TechDivision_PacemakerImportBase module we need to add this module to the loading sequence of our new module.

Define the import pipeline

We create a pipeline.xml file within the etc folder of our module and add following content:

app/code/MyModule/ProductStatusUpdate/etc/pipeline.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:TechDivision_ProcessPipelines:etc/pipeline.xsd">
    <pipeline name="product_status_update_import" description="Example Pipeline for a custom product update import" use-working-directory="true">
        <conditions>
            <pipeline_condition type="TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn" description="No automatic start for this pipeline"/>
        </conditions>
        <step name="move_files" executorType="TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory" sortOrder="10" description="Move files to working directory.">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1" description="Try once."/>
            </conditions>
        </step>
        <step name="product_status_update" executorType="TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor" sortOrder="20" description="Import product data">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1" description="Try once."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted" description="Previous step needs to be finished."/>
                <step_condition type="MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess" description="Avoid conflicts between import steps."/>
            </conditions>
            <arguments>
                <argument key="command" value="import:products" />
                <argument key="operation" value="add-update" />
                <argument key="configuration" value="MyModule_ProductStatusUpdate::etc/m2if-config.json" />
            </arguments>
        </step>
    </pipeline>
</config>

In this sample we creat a pipeline with two steps. The first step moves the relevant files to the working directory. The second steps executes the import. But let’s go thru the code step-by-step. Also checkout the XML config documentation for deeper insides.

Pipeline definition

The first node of our XML file defines a new pipeline.

<pipeline
    name="product_status_update_import"
    description="Example Pipeline for a custom product update import"
    use-working-directory="true" />

With the attribute name we give our pipeline a unique name. If there is already a pipeline with the same name exists within our magento instance they will be merged. Please refer to Transform foreign import source and Add steps to an existing pipeline to learn how to use this behaviour to extend existing pipelines.

The attribute description contains a human readable description of the purpose of our pipeline. This content will be used in the admin UI.

Since our new pipeline works with files, we need a working directory for every instance of our pipeline. Therefore we add the attribute use-working-directory and set it to true.

Pipeline conditions

With the pipeline_conditions we define when our pipeline should be initialized.

Plese refer to How to create a pipeline condition for details.

<pipeline>
    <conditions>
        <pipeline_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn"
            description="No automatic start for this pipeline"/>
    </conditions>
</pipeline>

For our product updates we do not want to have periodical instantiation, since we want run the import once the required import file is present in the file system. Therefore we will use the pipeline initializer feature. To avoid an instantiation via the heartbeat we need to add the condition TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn.

Move files step

Our first step will simply move the relevant import files to the working directory of our pipeline.

<step
    name="move_files"
    executorType="TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory"
    sortOrder="10"
    description="Move files to working directory.">

    <conditions>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
            description="Try once."/>
    </conditions>
</step>

We name our step (e.g. move_files). Step names are relevant if we need to specify parallel and sequential execution. As executor we use the already existing class TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory. Please refer to How to create an executor.

The sortOrder of our step is important, since we want to execute the steps in a sequence (not parallel).

Since this step is the first in our pipeline it does not require any specific sequence relevant conditions. By adding the condition TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1 we ensure, that our pipeline will be canceled, if errors appear while moving the files to the working directory.

Import data step

The second step will execute the import. We need to ensure this step is running after the first. And we need to check whether other imports are probably already running and wait until they are finished to avoid deadlocks in database.

<step
    name="product_status_update"
    executorType="TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor"
    sortOrder="20"
    description="Import product data">

    <conditions>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
            description="Try once."/>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
            description="Previous step needs to be finished."/>
        <step_condition
            type="MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess"
            description="Avoid conflicts between import steps."/>
    </conditions>

    <arguments>
        <argument key="command" value="import:products" />
        <argument key="operation" value="add-update" />
        <argument key="configuration" value="MyModule_ProductStatusUpdate::etc/m2if-config.json" />
    </arguments>
</step>

We name the step and define an executor. To run M2IF based import we can re-use the class TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor.

As sortOrder we need to define a higher number than for the move_files step, since we want use the condition TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted. This condition verifies, whether steps with lower sortOrder are successfully executed.

To avoid endless retries of this step in case of errors we need to add the TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1 as we did it for the first step.

As third condition for this step we add MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess. This condition does not exists yet, but we will create it in the next part of this documentation.

The import executor expects a set of arguments, which defines the command and operation to execute as well as the path to the configuration we want to pass to the import library.

Take a look into the M2IF documentation for a better understanding for command, operation and configuration.

Avoid execution of conflicting steps

In the previous part of this documentation we’ve already defined the step condition MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess. Now we need to create this class. The namespace of this class contains Virtual. This means this class will be auto-generated by Magento. Therefore we create a di.xml in our module and add following content into it.

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\Condition\NoConflictingStepInProcess" type="TechDivision\PacemakerImportBase\Model\Condition\Step\NoConflictingStepsInProcess">
        <arguments>
            <argument name="stepNames" xsi:type="array">
                <item name="product_status_update" xsi:type="string">product_status_update</item>
                <item name="product_import" xsi:type="string">product_import</item>
            </argument>
        </arguments>
    </virtualType>
</config>

Refer to Magento DevDocs for details about virtual types.

This approach allows us to extend the list of conflicting processes. Our condition checks now for active import steps from our pipeline as well as for the import step from the pacemaker_import_catalog pipeline.

We also should extend the list of conflicting steps for the pacemaker_import_catalog pipeline, by adding following code to the di.xml of our module.

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    ...
    <virtualType name="TechDivision\PacemakerImportCatalog\Virtual\Condition\NoConflictingStepInProcess">
        <arguments>
            <argument name="stepNames" xsi:type="array">
                <item name="product_status_update" xsi:type="string">product_status_update</item>
            </argument>
        </arguments>
    </virtualType>
</config>

M2IF configuration file

Now we need to add the M2IF configuration file to the path we pass to our import executor (MyModule_ProductStatusUpdate::etc/m2if-config.json). The content of this configuration file looks as following:

app/code/MyModule/ProductStatusUpdate/etc/m2if-config.json
{
    "magento-edition": "CE",
    "magento-version": "2.3.4",
    "operation-name" : "add-update",
    "archive-artefacts" : false,
    "debug-mode" : false,
    "entity-type-code" : "catalog_product",
    "listeners" : [
        {
            "app.set.up" : [
                "import.listener.render.ansi.art",
                "import.listener.initialize.registry"
            ]
        },
        {
            "app.tear.down" : [
                "import.listener.clear.registry"
            ]
        }
    ],
    "databases" : [],
    "loggers": [
        {
            "name": "system",
            "channel-name": "logger/system",
            "type": "Monolog\\Logger",
            "handlers": [
                {
                    "type": "Monolog\\Handler\\ErrorLogHandler",
                    "formatter": {
                        "type": "Monolog\\Formatter\\LineFormatter",
                        "params" : [
                            {
                                "format": "[%datetime%] %channel%.%level_name%: %message% %context% %extra%",
                                "date-format": "Y-m-d H:i:s",
                                "allow-inline-line-breaks": true,
                                "ignore-empty-context-and-extra": true
                            }
                        ]
                    }
                }
            ],
            "processors": [
                {
                    "type": "Monolog\\Processor\\MemoryPeakUsageProcessor"
                }
            ]
        }
    ],
    "operations" : [
        {
            "name" : "add-update",
            "plugins" : [
                {
                    "id": "import.plugin.cache.warmer"
                },
                {
                    "id": "import.plugin.global.data"
                },
                {
                    "id": "import.plugin.subject",
                    "subjects": [
                        {
                            "id": "import.subject.move.files",
                            "identifier": "move-files",
                            "file-resolver": {
                                "prefix": "product-status-update"
                            },
                            "ok-file-needed": true
                        },
                        {
                            "id": "import_product.subject.bunch",
                            "identifier": "files",
                            "file-resolver": {
                                "prefix": "product-status-update"
                            },
                            "params" : [
                                {
                                    "copy-images" : false,
                                    "clean-up-empty-columns" : [
                                        "base_image",
                                        "small_image",
                                        "swatch_image",
                                        "thumbnail_image",
                                        "special_price",
                                        "special_price_from_date",
                                        "special_price_to_date"
                                    ]
                                }
                            ],
                            "observers": [
                                {
                                    "import": [
                                        "import.observer.attribute.set",
                                        "import.observer.additional.attribute",
                                        "import_product.observer.product",
                                        "import_product.observer.product.attribute.update"
                                    ]
                                }
                            ]
                        }
                    ]
                },
                {
                    "id": "import.plugin.archive"
                }
            ]
        }
    ]
}

With this configuration we define an import, which simply updates attributes for already existing products. Please refer to M2IF documentation for details.

Pipeline instantiation

In the last part of this documentation we teaches the pipeline-initializer to observe the import directory for our import files and instantiates our pipeline once the file is present. All this can be done completely in declarative way.

System configuration

Let’s add some configuration options to Magento’s admin UI. We create a system.xml in the etc/adminhtml directory of our module an add following content into this file.

app/code/MyModule/ProductStatusUpdate/etc/adminhtml/system.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Config:etc/system_file.xsd">
    <system>
        <section id="techdivision_pacemaker_import">
            <group id="product_status_update" showInDefault="1" showInStore="0" showInWebsite="0" sortOrder="1000" translate="label">
                <label>Product Status Update</label>
                <field id="enabled" translate="label" type="select" sortOrder="25" showInDefault="1" showInWebsite="0" showInStore="0" canRestore="0">
                    <label>Enable Pipeline</label>
                    <source_model>Magento\Config\Model\Config\Source\Yesno</source_model>
                </field>
                <field id="file_name_pattern" translate="label comment" type="text" sortOrder="30" showInDefault="1" showInWebsite="0" showInStore="0" canRestore="0">
                    <label>File Name Pattern</label>
                    <comment>Pattern for import file bunches</comment>
                </field>
            </group>
        </section>
    </system>
</config>

This will add a new configuration group to Stores  Configuration  Pacemaker  Import with two fields. One to enable/disable our pipeline and a second one, which defines the file name pattern for our import files.

Create now a default configuration by creating following file:

app/code/MyModule/ProductStatusUpdate/etc/config.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Store:etc/config.xsd">
    <default>
        <techdivision_pacemaker_import>
            <product_status_update>
                <enabled>1</enabled>
                <file_name_pattern><![CDATA[/product-status-update_(?P<identifier>[0-9a-z\-]*)([_0-9]*?).(csv|ok)/i]]></file_name_pattern>
            </product_status_update>
        </techdivision_pacemaker_import>
    </default>
</config>

This will enable our module by default and pre-defines a file name pattern.

Extend Pipeline Initializer

Add following content to the di.xml of our module

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    ...
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern" type="TechDivision\PacemakerImportBase\Model\ConfigProvider\GetFileNamePattern">
        <arguments>
            <argument name="scopeConfigPath" xsi:type="string">techdivision_pacemaker_import/product_status_update/file_name_pattern</argument>
        </arguments>
    </virtualType>
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\ImportBunchResolver" type="TechDivision\PacemakerImportBase\Model\ImportBunchResolver">
        <arguments>
            <argument name="getFileNamePattern" xsi:type="object">MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern</argument>
        </arguments>
    </virtualType>
    <type name="TechDivision\PacemakerImportBase\Model\ImportFilesDataFetcher">
        <arguments>
            <argument name="resolverConfig" xsi:type="array">
                <item name="my_module.product_status_update_import" xsi:type="array">
                    <item name="resolver" xsi:type="object">MyModule\ProductStatusUpdateImport\Virtual\ImportBunchResolver</item>
                    <item name="validator" xsi:type="object">TechDivision\PacemakerImportBase\Api\ImportBunchValidatorInterface</item>
                    <item name="pipeline_name" xsi:type="string">product_status_update_import</item>
                    <item name="enable_config_path" xsi:type="string">techdivision_pacemaker_import/product_status_update/enabled</item>
                </item>
            </argument>
        </arguments>
    </type>
</config>

With this configuration we register an additional virtual type (auto-generated class) MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern. This class retrieves the file pattern to our "ImportBunchResolver". If you do not want a configurable file pattern, you could also implement an own implementation for TechDivision\PacemakerImportBase\Api\GetFileNamePatternInterface and use this instead of the virtual type.

The "ImportBunchResolver" is also a virtual type and requires our previously defined FileNamePattern provider.

The third node of the XML code above extends the resolverConfig array of the class TechDivision\PacemakerImportBase\Model\ImportFilesDataFetcher. The new item we add to this array requires at least three items:

  • resolver: This is an instance of TechDivision\PacemakerImportBase\Api\ImportBunchResolverInterface. We add here our virtual type, which we defined before.

  • validator: This is an instance of TechDivision\PacemakerImportBase\Api\ImportBunchValidatorInterface. Since we do not add any custom validation logic we can add the interface directly and Magento’s DI will instantiate the default preference for it.

  • pipeline_name: The name of our pipeline, which should be instantiated once the resolver found import files and the validator approved them.

The fourth item in this array is optional. With enable_config_path we can add the configuration path to our feature toggle. This allows us to disable the instantiation of this pipeline.

Examples

You’ll find an example module, which also provides sample CSV files to test this import.