Try our conversational search powered by Generative AI!

Aniket
Feb 27, 2023
  995
(0 votes)

Azure Service Bus Messaging (Topic/Queues) for transferring data between external PIM/ERP to Optimizely

Optimizely provides a PIM solution as a part of the DXP package. More information here: https://www.optimizely.com/product-information-management/

More often that not, clients have their existing PIM and/or ERP systems that feed other systems in their organization. For ex: Their PIM/ERP system may be serving physical stores, running reports, feeding their invoicing details and the SOURCE of TRUTH. There are numerous blog posts on importing catalog one time into Optimizely using the out-of-the-box Optimizely APIs.

Needless to say, as updates are made to say pricing, inventory, assets, delivery charges, taxes etc. in ERP/PIM/DAM, we need to keep that data synchronized in the Optimizely catalog. to ensure the customers see the most up-to-date information on the website as quickly as possible.

This requires a strategy to figure out how to move content between two systems and do it on a regular fault tolerant basis. A quick solution is the use of Optimizely's scheduled job to fetch data and update it in the database. though there are some limitations with a scheduled job - timeouts, low fault tolerance, logging, speed, resource constraints, alerting etc. 

Another alternative is to Azure Service Bus Messaging to line up the product updates from the source system (client's PIM/ERP) and synchronize it to the Optimizely catalog on a configurable schedule. Azure Service bus have a lot of advantages as described below and you can also read up online. 

Advantages:

  • Message Sessions
  • Auto-forwarding
  • Dead-lettering
  • Scheduled Delivery
  • Message deferral
  • Transactions
  • Auto-delete on idle
  • Duplicate detection
  • Geo Disaster recovery

You can use the Azure Service Bus .NET SDK for integration: https://learn.microsoft.com/en-us/dotnet/api/overview/azure/service-bus?preserve-view=true&view=azure-dotnet

Strategy:

We have used the following strategy on a huge B2C retail client and works really well.  

  1. Our custom C# function/console app (extract job) deployed on Azure gets all products that have been updated in the last 'x' mins/hours by pinging the custom endpoint provided by client
  2. This function app is run using a 'TimerTrigger' configurable in Azure function app configuration. More info on function apps: https://learn.microsoft.com/en-us/azure/azure-functions/functions-create-your-first-function-visual-studio?tabs=in-process
  3. This function app is responsible for getting the data from the endpoint, serializing each message as a JSON and send it to the ASB topic (product extract topic)
  4. A second custom C# function app (transload job) which was subscribed to the above topic in ASB using 'ServiceBusTrigger' (executes every time there's a new message)
  5. This function app's job was to read the message from the topic, deserialize it and update the product item using Optimizely Service API

Diagram:

Sample Code (Export Job):

namespace ClientNamespace.Export.Features.CartCheckout.TaxSync
{
    using System;
    using System.Linq;
    using System.Net.Http;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.WebJobs;
    using ClientNamespace.Export.Core.Features.CartCheckout.TaxRateSync.Models;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Services;
    using ClientNamespace.Export.Core.Features.Infrastructure.Logging;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Extensions;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Services;
    using ClientNamespace.Export.Features.Infrastructure.Rfapi.Clients;
    using Serilog;
    using Serilog.Core;
    using ConnectionStringNames = ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants.ConnectionStringNames;
    using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;

    public class TaxRatesExportFunction
    {
        private const int ShortCircuit = 100_000;

        private IHttpClientFactory _clientFactory;

        public TaxRatesExportFunction(IHttpClientFactory clientFactory)
        {
            _clientFactory = clientFactory;
        }

       #if !DEBUG // remove this line to run locally as a console app
        [FunctionName("TaxRatesExport")]
       #endif
        public async Task Run(
            [TimerTrigger(
                ScheduleExpressions.TaxRatesExport,
                RunOnStartup = false)]
            TimerInfo myTimer)
        {
            var log = LoglevelWrapper.WrapLogger(Log.Logger);

            try
            {
                log.Information("Starting TaxRatesExportFunction: {starttime}", DateTime.UtcNow);

                using (var topicMessageSender = new TopicMessageSender(ConnectionStringNames.ServiceBusTaxRates, TopicNames.TaxRates, log))
                {
                    var taxRates = await apiClient.TaxesAllAsync(); // custom endpoint from the client
                    
		    var export = new TaxRateExport
                    {
                        TaxRates = taxRates
                            .Select(x => new TaxRate
                            {
                               Percentage = x.TaxRate ?? 0.000,
                               PostalCode = x.PostalCode,
                               TaxCode = x.TaxCode,
                               TaxableDelivery = x.TaxableDelivery,
                               TaxablePlatinum = x.TaxablePlatinum,
                             })
                             .ToList(),
                        };
		
                   // Send the message to the topic to be consumed by the the transload function app
		    
                        try
                        {
                            var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(export)))
                            {
                                MessageId = Guid.NewGuid().ToString(),
                                SessionId = "sesionid",
                            };
                            string connectionString = Environment.GetEnvironmentVariable("connectionStringName");
                            if (string.IsNullOrEmpty(connectionString))
                            {
                                connectionString = Environment.GetEnvironmentVariable($"CUSTOMCONNSTR_{"connectionStringName"}");
                            }

                            var topicClient = new TopicClient(connectionString, "topicName", RetryPolicy.Default);
                            await topicClient.SendAsync(message);
                        }
                        catch (Exception ex)
                        {
                            // logging
                        }
	
                }
            }
            catch (Exception ex)
            {
                log.Error(ex, "Unhandled exception in TaxRatesExportFunction {exception}", ex);
            }
            finally
            {
                log.Information("TaxRatesExportFunction Complete: {endtime}", DateTime.UtcNow);
            }
        }
    }
}

Sample code Import job:

namespace .Website.Import.Features.CartCheckout.TaxSync
{
    using System;
    using System.Net.Http;
    using System.Threading.Tasks;
    using Infrastructure.Azure.Constants;
    using Microsoft.Azure.WebJobs;
    using Newtonsoft.Json;
    using ClientNamespace.Export.Core.Features.CartCheckout.TaxRateSync.Models;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Core.Features.Infrastructure.Logging;
    using .Website.Core.Features.Infrastructure.Episerver.Clients;
    using Serilog;
    using Serilog.Context;
    using ConnectionStringNames = ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants.ConnectionStringNames;

    public class TaxRatesImportFunction
    {
        private readonly IHttpClientFactory _clientFactory;

        public TaxRatesImportFunction(IHttpClientFactory clientFactory)
        {
            _clientFactory = clientFactory;
        }

        #if !DEBUG // Remove this to run locally (will be triggered when it sees a message on the topic it's subscribed to)
        [FunctionName(FunctionNames.TaxRatesImport)]
        #endif
        public async Task Run(
            [ServiceBusTrigger(
                TopicNames.TaxRates,
                SubscriptionNames.TaxRates,
                Connection = ConnectionStringNames.ServiceBusTaxRates,
                IsSessionsEnabled = true)]
            string mySbMsg)
           {
              var log = LoglevelWrapper.WrapLogger(Log.Logger);

            try
            {
                log.Information("Starting TaxRatesImportFunction: {starttime}", DateTime.UtcNow);
                log.Debug("Tax Rates Import Message: {message}", mySbMsg);

                TaxRateExport export = null;

                try
                {
                    // Get taxes from topic queue
                    export = JsonConvert.DeserializeObject<TaxRateExport>(mySbMsg);
                }
                catch (Exception ex)
                {
                    log.Error(ex, "Could not JSON deserialize tax rates message {message} with exception {exception}", mySbMsg, ex);
                }

                if (export?.TaxRates == null)
                {
                    log.Warning("Tax rates deserialized, but data was null");
                    return;
                }

                // Load taxes into Episerver
                var serviceApiClient = EpiserverApiClientFactory.Create(log, _clientFactory);
                foreach (var taxRate in export.TaxRates)
                {
                    try
                    {
                        using (LogContext.PushProperty("importtaxrate", taxRate.TaxCode))
                        {
			   // Update the taxes table (either custom endpoint or using Service API)
                            await serviceApiClient.SaveTaxRateAsync(taxRate);
                        }
                    }
                    catch (Exception ex)
                    {
                        // Don't fail the group
			// Custom logic to handle exception when updating in the Optimizely dtabase.
                    }
                }
            }
            catch (Exception ex)
            {
                log.Error(ex, "Unhandled exception in TaxRatesImportFunction {exception}", ex);
            }
            finally
            {
                log.Information("TaxRatesImportFunction Complete: {endtime}", DateTime.UtcNow);
            }
        }
    }
}

As you see, with minimal code you can create a more fault tolerant synchronization to the optimizely database. You can now visualize this scaling to other areas of your website. For ex: We have scaled this system to automate processing of orders - As orders come in, the serialized order object is placed on the Azure service bus for automated processing all the way to completing the orders. Yes the client's IT team needs to write some code to automate it on their side but it has saved them hundred's of thousands of dollars in costs of manually updating each order by a keying team member. 

Can you think of other ways to scale the Optimizely system to use Azure Service Bus Messaging? 

Happy coding!

Feb 27, 2023

Comments

Please login to comment.
Latest blogs
From Procrastination to Proficiency: Navigating Your Journey to Web Experimentation Certification

Hey there, Optimizely enthusiasts!   Join me in celebrating a milestone – I'm officially a certified web experimentation expert! It's an exhilarati...

Silvio Pacitto | May 17, 2024

GPT-4o Now Available for Optimizely via the AI-Assistant plugin!

I am excited to announce that GPT-4o is now available for Optimizely users through the Epicweb AI-Assistant integration. This means you can leverag...

Luc Gosso (MVP) | May 17, 2024 | Syndicated blog

The downside of being too fast

Today when I was tracking down some changes, I came across this commit comment Who wrote this? Me, almost 5 years ago. I did have a chuckle in my...

Quan Mai | May 17, 2024 | Syndicated blog

Optimizely Forms: Safeguarding Your Data

With the rise of cyber threats and privacy concerns, safeguarding sensitive information has become a top priority for businesses across all...

K Khan | May 16, 2024