Create an additional import process

Sometimes we need additional imports besides the given.

In this example, we create our import pipeline, which observes the import directory for files and executes the import without writing any PHP code, but re-using existing executors.

You will find all this code also in this example module.

Create an own module

As the first step, we need to introduce a custom module. Please refer to Create a New Module in Magento’s developer documentation.

In this example, we name the module MyModule_ProductStatusUpdate.

Our module’s primary purpose will be to observe the import directory for a file with a specific name pattern. If this file is given we will introduce an import with the Import Library.

mkdir -p app/code/MyModule/ProductStatusUpdate/etc
touch app/code/MyModule/ProductStatusUpdate/etc/module.xml
touch app/code/MyModule/ProductStatusUpdate/registration.php
Content of app/code/MyModule/ProductStatusUpdate/registration.php
<?php
\Magento\Framework\Component\ComponentRegistrar::register(
    \Magento\Framework\Component\ComponentRegistrar::MODULE,
    'MyModule_ProductStatusUpdate',
    __DIR__
);
Content of app/code/MyModule/ProductStatusUpdate/etc/module.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
    <module name="MyModule_ProductStatusUpdate" setup_version="1.0.0">
        <sequence>
            <module name="TechDivision_PacemakerImportBase"/>
        </sequence>
    </module>
</config>

Since we will use components from TechDivision_PacemakerImportBase module, we need to add this module to the loading sequence of our new module.

Define the import pipeline

We create a pipeline.xml file within the etc folder of our module and add following content:

app/code/MyModule/ProductStatusUpdate/etc/pipeline.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:TechDivision_ProcessPipelines:etc/pipeline.xsd">
    <pipeline name="product_status_update_import" description="Example Pipeline for a custom product update import" use-working-directory="true">
        <conditions>
            <pipeline_condition type="TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn" description="No automatic start for this pipeline"/>
        </conditions>
        <step name="move_files" executorType="TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory" sortOrder="10" description="Move files to working directory.">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1" description="Try once."/>
            </conditions>
        </step>
        <step name="product_status_update" executorType="TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor" sortOrder="20" description="Import product data">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1" description="Try once."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted" description="Previous step needs to be finished."/>
                <step_condition type="MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess" description="Avoid conflicts between import steps."/>
            </conditions>
            <arguments>
                <argument key="command" value="import:products" />
                <argument key="operation" value="add-update" />
                <argument key="configuration" value="MyModule_ProductStatusUpdate::etc/m2if-config.json" />
            </arguments>
        </step>
    </pipeline>
</config>

In this sample, we create a pipeline with two steps.

  • The first step moves the relevant files to the working directory.

  • The second step executes the import.

But let’s go thru the code step-by-step.

Also check out the XML config documentation for deeper insides.

Pipeline definition

The first node of our XML file defines a new pipeline.

<pipeline
    name="product_status_update_import"
    description="Example Pipeline for a custom product update import"
    use-working-directory="true" />

With the attribute name, we give our pipeline a unique name. If there is already a pipeline with the same name that exists within our magento instance they will be merged.

Please refer to Transform foreign import source and Add steps to an existing pipeline to learn how to use this behaviour to extend existing pipelines.

The attribute description contains a human-readable description of the purpose of our pipeline. This content will be used in the admin UI.

Since our new pipeline works with files, we need a working directory for every instance of our pipeline.

Therefore we add the attribute use-working-directory and set it to true.

Pipeline conditions

With the `pipeline_condition`s, we define when our pipeline should be initialized.

Plese refer to How to create a pipeline condition for details.

<pipeline>
    <conditions>
        <pipeline_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn"
            description="No automatic start for this pipeline"/>
    </conditions>
</pipeline>

We do not want to have periodical instantiation for our product updates since we want to run the import once the required import file is present in the file system.

Therefore we will use the pipeline initializer feature .

To avoid an instantiation via the heartbeat we need to add the condition TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn.

Move files step

Our first step will move the relevant import files to the working directory of our pipeline.

<step
    name="move_files"
    executorType="TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory"
    sortOrder="10"
    description="Move files to working directory.">

    <conditions>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
            description="Try once."/>
    </conditions>
</step>

We name our step (e.g. move_files).

Step names are relevant if we need to specify parallel and sequential execution. As executor we use the already existing class`TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory`.

Please refer to How to create an executor.

The sortOrder of our step is essential since we want to execute the steps in a sequence (not parallel).

Since this step is the first in our pipeline, it does not require any specific sequence relevant conditions.

By adding the condition TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1 we ensure that our pipeline will be canceled if errors appear while moving the files to the working directory.

Import data step

The second step will execute the import.

We need to ensure this step is running after the first. And we need to check whether other imports are probably already running and wait until they are finished to avoid deadlocks in the database.

<step
    name="product_status_update"
    executorType="TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor"
    sortOrder="20"
    description="Import product data">

    <conditions>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
            description="Try once."/>
        <step_condition
            type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
            description="Previous step needs to be finished."/>
        <step_condition
            type="MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess"
            description="Avoid conflicts between import steps."/>
    </conditions>

    <arguments>
        <argument key="command" value="import:products" />
        <argument key="operation" value="add-update" />
        <argument key="configuration" value="MyModule_ProductStatusUpdate::etc/m2if-config.json" />
    </arguments>
</step>

We name the step and define an executor. To run Import Framework based import, we can re-use the class TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor.

As sortOrder we need to define a higher number than for the move_files step since we want to use the condition TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted. This condition verifies whether steps with lower sortOrder are successfully executed.

To avoid endless retries of this step in case of errors, we need to add the TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1 as we did it for the first step.

As the third condition for this step, we add MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess. This condition does not exist yet, but we will create it in the next part of this documentation.

The import executor expects a set of arguments, which defines the command and operation to execute as well as the path to the configuration, we want to pass to the Import Library.

Take a look into the Import Library documentation for a better understanding of command, operation and configuration.

Avoid execution of conflicting steps

In the previous part of this documentation, we’ve already defined the step condition MyModule\ProductStatusUpdate\Virtual\Condition\Step\NoConflictingStepInProcess.

Now we need to create this class.

The namespace of this class contains Virtual. This class will be auto-generated by Magento.

Therefore we create a di.xml in our module and add the following content into it.

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\Condition\NoConflictingStepInProcess" type="TechDivision\PacemakerImportBase\Model\Condition\Step\NoConflictingStepsInProcess">
        <arguments>
            <argument name="stepNames" xsi:type="array">
                <item name="product_status_update" xsi:type="string">product_status_update</item>
                <item name="product_import" xsi:type="string">product_import</item>
            </argument>
        </arguments>
    </virtualType>
</config>

Refer to Magento DevDocs for details about virtual types.

This approach allows us to extend the list of conflicting processes.

Our condition checks now for active import steps from our pipeline as well as for the import step from the pacemaker_import_catalog pipeline.

We also should extend the list of conflicting steps for the pacemaker_import_catalog pipeline, by adding the following code to the di.xml of our module.

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    ...
    <virtualType name="TechDivision\PacemakerImportCatalog\Virtual\Condition\NoConflictingStepInProcess">
        <arguments>
            <argument name="stepNames" xsi:type="array">
                <item name="product_status_update" xsi:type="string">product_status_update</item>
            </argument>
        </arguments>
    </virtualType>
</config>

Import Framework configuration file

Now we need to add the Import Framework configuration file to the path we pass to our import executor (MyModule_ProductStatusUpdate::etc/m2if-config.json).

The content of this configuration file looks as following:

app/code/MyModule/ProductStatusUpdate/etc/m2if-config.json
{
    "magento-edition": "CE",
    "magento-version": "2.3.4",
    "operation-name" : "add-update",
    "archive-artefacts" : false,
    "debug-mode" : false,
    "entity-type-code" : "catalog_product",
    "listeners" : [
        {
            "app.set.up" : [
                "import.listener.render.ansi.art",
                "import.listener.initialize.registry"
            ]
        },
        {
            "app.tear.down" : [
                "import.listener.clear.registry"
            ]
        }
    ],
    "databases" : [],
    "loggers": [
        {
            "name": "system",
            "channel-name": "logger/system",
            "type": "Monolog\\Logger",
            "handlers": [
                {
                    "type": "Monolog\\Handler\\ErrorLogHandler",
                    "formatter": {
                        "type": "Monolog\\Formatter\\LineFormatter",
                        "params" : [
                            {
                                "format": "[%datetime%] %channel%.%level_name%: %message% %context% %extra%",
                                "date-format": "Y-m-d H:i:s",
                                "allow-inline-line-breaks": true,
                                "ignore-empty-context-and-extra": true
                            }
                        ]
                    }
                }
            ],
            "processors": [
                {
                    "type": "Monolog\\Processor\\MemoryPeakUsageProcessor"
                }
            ]
        }
    ],
    "operations" : [
        {
            "name" : "add-update",
            "plugins" : [
                {
                    "id": "import.plugin.cache.warmer"
                },
                {
                    "id": "import.plugin.global.data"
                },
                {
                    "id": "import.plugin.subject",
                    "subjects": [
                        {
                            "id": "import.subject.move.files",
                            "identifier": "move-files",
                            "file-resolver": {
                                "prefix": "product-status-update"
                            },
                            "ok-file-needed": true
                        },
                        {
                            "id": "import_product.subject.bunch",
                            "identifier": "files",
                            "file-resolver": {
                                "prefix": "product-status-update"
                            },
                            "params" : [
                                {
                                    "copy-images" : false,
                                    "clean-up-empty-columns" : [
                                        "base_image",
                                        "small_image",
                                        "swatch_image",
                                        "thumbnail_image",
                                        "special_price",
                                        "special_price_from_date",
                                        "special_price_to_date"
                                    ]
                                }
                            ],
                            "observers": [
                                {
                                    "import": [
                                        "import.observer.attribute.set",
                                        "import.observer.additional.attribute",
                                        "import_product.observer.product",
                                        "import_product.observer.product.attribute.update"
                                    ]
                                }
                            ]
                        }
                    ]
                },
                {
                    "id": "import.plugin.archive"
                }
            ]
        }
    ]
}

With this configuration, we define an import, which updates attributes for already existing products.

Please refer to Import Framework documentation for details.

Pipeline instantiation

In the last part of this documentation, we teach the pipeline-initializer to observe the import directory for our import files and instantiates our pipeline once the file is present.

All this can be entirely done in a declarative way.

System configuration

Let’s add some configuration options to Magento’s admin UI.

We create a system.xml in the etc/adminhtml directory of our module an adds the following content into this file.

app/code/MyModule/ProductStatusUpdate/etc/adminhtml/system.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Config:etc/system_file.xsd">
    <system>
        <section id="techdivision_pacemaker_import">
            <group id="product_status_update" showInDefault="1" showInStore="0" showInWebsite="0" sortOrder="1000" translate="label">
                <label>Product Status Update</label>
                <field id="enabled" translate="label" type="select" sortOrder="25" showInDefault="1" showInWebsite="0" showInStore="0" canRestore="0">
                    <label>Enable Pipeline</label>
                    <source_model>Magento\Config\Model\Config\Source\Yesno</source_model>
                </field>
                <field id="file_name_pattern" translate="label comment" type="text" sortOrder="30" showInDefault="1" showInWebsite="0" showInStore="0" canRestore="0">
                    <label>File Name Pattern</label>
                    <comment>Pattern for import file bunches</comment>
                </field>
            </group>
        </section>
    </system>
</config>

It will add a new configuration group to Stores  Configuration  Pacemaker  Import with two fields.

One to enable/disable our pipeline and a second one, which defines the file name pattern for our import files.

Create now a default configuration by creating following file:

app/code/MyModule/ProductStatusUpdate/etc/config.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Store:etc/config.xsd">
    <default>
        <techdivision_pacemaker_import>
            <product_status_update>
                <enabled>1</enabled>
                <file_name_pattern><![CDATA[/product-status-update_(?P<identifier>[0-9a-z\-]*)([_0-9]*?).(csv|ok)/i]]></file_name_pattern>
            </product_status_update>
        </techdivision_pacemaker_import>
    </default>
</config>

It will enable our module by default and pre-defines a file name pattern.

Extend Pipeline Initializer

Add the following content to the di.xml of our module.

app/code/MyModule/ProductStatusUpdate/etc/di.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    ...
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern" type="TechDivision\PacemakerImportBase\Model\ConfigProvider\GetFileNamePattern">
        <arguments>
            <argument name="scopeConfigPath" xsi:type="string">techdivision_pacemaker_import/product_status_update/file_name_pattern</argument>
        </arguments>
    </virtualType>
    <virtualType name="MyModule\ProductStatusUpdateImport\Virtual\ImportBunchResolver" type="TechDivision\PacemakerImportBase\Model\ImportBunchResolver">
        <arguments>
            <argument name="getFileNamePattern" xsi:type="object">MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern</argument>
        </arguments>
    </virtualType>
    <type name="TechDivision\PacemakerImportBase\Model\ImportFilesDataFetcher">
        <arguments>
            <argument name="resolverConfig" xsi:type="array">
                <item name="my_module.product_status_update_import" xsi:type="array">
                    <item name="resolver" xsi:type="object">MyModule\ProductStatusUpdateImport\Virtual\ImportBunchResolver</item>
                    <item name="validator" xsi:type="object">TechDivision\PacemakerImportBase\Api\ImportBunchValidatorInterface</item>
                    <item name="pipeline_name" xsi:type="string">product_status_update_import</item>
                    <item name="enable_config_path" xsi:type="string">techdivision_pacemaker_import/product_status_update/enabled</item>
                </item>
            </argument>
        </arguments>
    </type>
</config>

With this configuration, we register an additional virtual type (auto-generated class) MyModule\ProductStatusUpdateImport\Virtual\ConfigProvider\GetFileNamePattern. This class retrieves the file pattern to our "ImportBunchResolver". If you do not want a configurable file pattern, you could also implement

your implementation for TechDivision\PacemakerImportBase\Api\GetFileNamePatternInterface and use this instead of the virtual type.

The "ImportBunchResolver" is also a virtual type and requires our previously defined FileNamePattern provider.

The third node of the XML code above extends the resolverConfig array of the class TechDivision\PacemakerImportBase\Model\ImportFilesDataFetcher.

The new item we add to this array requires at least three items:

  • resolver: This is an instance of TechDivision\PacemakerImportBase\Api\ImportBunchResolverInterface. We add here our virtual type, which we defined before.

  • validator: This is an instance of TechDivision\PacemakerImportBase\Api\ImportBunchValidatorInterface. Since we do not add any custom validation logic, we can add the interface directly, and Magento’s DI will instantiate the default preference for it.

  • pipeline_name: Our pipeline’s name should be instantiated once the resolver found import files, and the validator approved them.

The fourth item in this array is optional. With enable_config_path we can add the configuration path to our feature toggle.

It allows us to disable the instantiation of this pipeline.

Examples

You will find an example module, which also provides sample CSV files to test this import.