Monday, June 16, 2014

Custom Receive Pipeline Component to break Message into Batches for Group elements

“Disassemble” is the second stage of the receive pipeline which is primarily used to disassemble incoming messages, validate schema and promote properties. To disassemble (break) messages it uses an envelope schema and it breaks the message record-wise. There is no provision to break a set of records (batches).


Sample Code:

1. Create a class library project.
2. Add reference to Microsoft.BizTalk.Pipeline.dll.
3. Rename default class1.cls to XMLSplitter.cs.
4. Use following code


[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
    [System.Runtime.InteropServices.Guid("b70b07ec-3b6c-475d-a5da-eb2ad5ea5c28")]
    public class XMLSplitter : IBaseComponent, IDisassemblerComponent, IComponentUI, IPersistPropertyBag
    {
        #region Private Variables

        private ResourceManager resourceManager = null;
     
        //Used to hold disassembled messages
        private System.Collections.Queue qOutputMsgs = null;

        #endregion

        #region Constructor

        /// <summary>
        /// Initializes a new instance of the <see cref="XMLSplitter"/> class.
        /// </summary>
        public XMLSplitter()
        {
            resourceManager = new ResourceManager("PipelineComponent.XMLDebatch.XMLSplitter", Assembly.GetExecutingAssembly());
            qOutputMsgs = new System.Collections.Queue();

            BatchSize = 50;
        }

        #endregion

        #region Public Properties

        /// <summary>
        /// Gets or sets the size of the batch.
        /// </summary>
        /// <value>
        /// The size of the batch.
        /// </value>
        public int BatchSize
        {
            get;
            set;
        }

        /// <summary>
        /// Gets or sets the batch root element x path.
        /// </summary>
        /// <value>
        /// The batch root element x path.
        /// </value>
        public string BatchRootElementXPath
        {
            get;
            set;
        }

        #endregion

        #region IBaseComponent

        /// <summary>
        /// Description of the Component
        /// </summary>
        public string Description
        {
            get { return resourceManager.GetString("COMPONENTDESCRIPTION", CultureInfo.InvariantCulture); }
        }

        /// <summary>
        /// Name of the Component
        /// </summary>
        public string Name
        {
            get { return resourceManager.GetString("COMPONENTNAME", CultureInfo.InvariantCulture); }
        }

        /// <summary>
        /// Version of the Component
        /// </summary>
        public string Version
        {
            get { return resourceManager.GetString("COMPONENTVERSION", CultureInfo.InvariantCulture); }
        }

        #endregion

        #region IPersistPropertyBag

        /// <summary>
        /// GetClassID of component for usage from unmanaged code.
        /// </summary>
        /// <param name="classID"></param>
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("0713faf8-748c-44d3-a1b1-8a80e3b168fc");
        }

        /// <summary>
        /// InitNew
        /// </summary>
        public void InitNew()
        {
        }

        /// <summary>
        /// Loads configuration properties for the component
        /// </summary>
        /// <param name="propertyBag"></param>
        /// <param name="errorLog"></param>
        public void Load(IPropertyBag propertyBag, int errorLog)
        {
            try
            {
                BatchSize = ReadPropertyBag<int>(propertyBag, "BatchSize");
                BatchRootElementXPath = ReadPropertyBag<string>(propertyBag, "BatchRootElementXPath");
            }
            catch (NullReferenceException ex)
            {
                System.Diagnostics.EventLog.WriteEntry("Error in reading property bag", ex.Message);
                throw ex;
            }
        }

        /// <summary>
        /// Saves the current component configuration into the property bag
        /// </summary>
        /// <param name="pb">Configuration property bag</param>
        /// <param name="fClearDirty">not used</param>
        /// <param name="fSaveAllProperties">not used</param>
        public virtual void Save(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, bool fClearDirty, bool fSaveAllProperties)
        {
            WritePropertyBag(pb, "BatchSize", this.BatchSize);
            WritePropertyBag(pb, "BatchRootElementXPath", this.BatchRootElementXPath);
        }

        #endregion

        #region IComponentUI

        /// <summary>
        /// Component icon to use in BizTalk Editor
        /// </summary>
        [System.ComponentModel.Browsable(false)]
        public IntPtr Icon
        {
            get
            {
                return ((System.Drawing.Bitmap)(this.resourceManager.GetObject("COMPONENTICON", System.Globalization.CultureInfo.InvariantCulture))).GetHicon();
            }
        }

        /// <summary>
        /// The Validate method is called by the BizTalk Editor during the build of a BizTalk project.
        /// </summary>
        /// <param name="projectSystem"></param>
        /// <returns></returns>
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }

        #endregion

        #region IDisassemblerComponent

        /// <summary>
        /// Disassembles the specified p context.
        /// </summary>
        /// <param name="pContext">The p context.</param>
        /// <param name="pInMsg">The pInMSG.</param>
        /// <exception cref="System.ApplicationException">
        /// Error in reading original message:  + ex.Message
        /// or
        /// Error in writing outgoing messages:  + ex.Message
        /// </exception>
        public void Disassemble(IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg)
        {
            XPathNavigator xPathNavigator = null;
            XmlNamespaceManager namespaceManager = null;
            XPathNodeIterator nodIterator = null;
            List<string> operationNames = null;

            try
            {
                //fetch original message
                Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
                XPathDocument document = new XPathDocument(originalMessageStream);
                xPathNavigator = document.CreateNavigator();
                namespaceManager = GetNameSpacesFromMessage(document);
                nodIterator = xPathNavigator.Select(string.Concat(BatchRootElementXPath, "/*"), namespaceManager);
                operationNames = nodIterator.Cast<XPathNavigator>().Select(node => node.LocalName).Distinct().ToList();
            }

            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in reading original message: ", ex.Message));
                throw new ApplicationException(string.Concat("Error in reading original message: ", ex.Message));
            }


            XmlDocument chunkDoc = null;
            XmlNode batchRootElement = null;
            try
            {
                //load original message
                chunkDoc = new XmlDocument();
                chunkDoc.LoadXml(xPathNavigator.OuterXml);
                batchRootElement = chunkDoc.SelectSingleNode(BatchRootElementXPath, namespaceManager);
                batchRootElement.InnerXml = string.Empty;   //Clear children.

                //fetch namespace and root element
                string namespaceURI = chunkDoc.DocumentElement.NamespaceURI;
                string rootElement = chunkDoc.DocumentElement.LocalName;

                //start batching messages
                int counter = 0;

                foreach(string operationName in operationNames)
                {
                    nodIterator = xPathNavigator.Select(string.Concat(BatchRootElementXPath, string.Format("/*[starts-with(local-name(), '{0}')]", operationName)), namespaceManager);                  

                    while (nodIterator.MoveNext())
                    {
                        counter = counter + 1;
                        if (counter > BatchSize)
                        {
                            CreateOutgoingMessage(pContext, chunkDoc.OuterXml, namespaceURI, rootElement, pInMsg);
                            counter = 1;
                            chunkDoc.LoadXml(xPathNavigator.OuterXml);
                            batchRootElement = chunkDoc.SelectSingleNode(BatchRootElementXPath, namespaceManager);
                            batchRootElement.InnerXml = nodIterator.Current.OuterXml;   //Clear children.
                        }

                        else
                        {
                            batchRootElement.InnerXml = string.Concat(batchRootElement.InnerXml, nodIterator.Current.OuterXml);
                        }
                    }
                    if (!string.IsNullOrEmpty(batchRootElement.InnerXml))   //Splitting last message in the batch
                    {
                        CreateOutgoingMessage(pContext, chunkDoc.OuterXml, namespaceURI, rootElement, pInMsg);                      
                    }

                    batchRootElement.InnerXml = string.Empty;
                    counter = 0;
                }
            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in writing outgoing messages: ", ex.Message));
                throw new ApplicationException("Error in writing outgoing messages: " + ex.Message);
            }
        }

        /// <summary>
        /// Gets the next message.
        /// </summary>
        /// <param name="pContext">The pContext.</param>
        /// <returns></returns>
        public Microsoft.BizTalk.Message.Interop.IBaseMessage GetNext(IPipelineContext pContext)
        {
            IBaseMessage returnMessage = null;

            if (qOutputMsgs.Count > 0)
            {
                returnMessage =  (IBaseMessage)qOutputMsgs.Dequeue();
            }

            return returnMessage;
        }

        #endregion

        #region Private Methods

        /// <summary>
        /// Gets the name spaces from message.
        /// </summary>
        /// <param name="doc">The document.</param>
        /// <returns></returns>
        private XmlNamespaceManager GetNameSpacesFromMessage(XPathDocument xDocument)
        {
            XmlNamespaceManager namespaceManager = new XmlNamespaceManager(new NameTable());
            XPathNavigator nav = xDocument.CreateNavigator();
            XPathNodeIterator nodes = (XPathNodeIterator)nav.Evaluate("//namespace::*");

            while (nodes.MoveNext())
            {
                namespaceManager.AddNamespace(nodes.Current.Name, nodes.Current.Value);
            }


            return namespaceManager;
        }

        /// <summary>
        /// Reads the property bag.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="propertyBag">The property bag.</param>
        /// <param name="propName">Name of the property.</param>
        /// <returns></returns>
        private T ReadPropertyBag<T>(IPropertyBag propertyBag, string propName)
        {
            T returnValue = default(T);
            object val = null;
            try
            {
                propertyBag.Read(propName, out val, 0);
                returnValue = (T)Convert.ChangeType(val, typeof(T));
            }
            catch (System.ArgumentException)
            {
                return returnValue;
            }
            catch (System.Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in reading into property bag", ex.Message));
                throw new ApplicationException(string.Concat("Error in reading into property bag: ", ex.Message));
            }
            return returnValue;
        }

        /// <summary>
        /// Writes the property bag.
        /// </summary>
        /// <param name="propertyBag">The property bag.</param>
        /// <param name="propName">Name of the property.</param>
        /// <param name="val">The value.</param>
        private void WritePropertyBag(IPropertyBag propertyBag, string propName, object val)
        {
            try
            {
                propertyBag.Write(propName, ref val);
            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("Error in writing into property bag", ex.Message);
                throw new ApplicationException(string.Concat("Error in writing into property bag: ", ex.Message));
            }
        }

        /// <summary>
        /// Queue outgoing messages
        /// </summary>
        private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement, IBaseMessage pInMsg)
        {

            IBaseMessage outMsg =null;
            string systemPropertiesNamespace = @"http://schemas.microsoft.com/BizTalk/2003/system-properties";
            try
            {
                //create outgoing message
                outMsg = pContext.GetMessageFactory().CreateMessage();
                outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
                outMsg.Context = PipelineUtil.CloneMessageContext(pInMsg.Context);
                outMsg.Context.Promote("MessageType", systemPropertiesNamespace, string.Concat(namespaceURI, "#", rootElement));
                byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
                outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
                qOutputMsgs.Enqueue(outMsg);

            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in queening outgoing messages:", ex.Message));
                throw new ApplicationException(string.Concat("Error in queening outgoing messages: ", ex.Message));
            }
        }

        #endregion
    }


5. Sign and then build project.
6. Copy MessageBatchPipelineCompoent.dll to C:\Program Files\Microsoft BizTalk Server 2006\Pipeline Components.
7. Pipeline component is now ready to use. In BTS pipeline project, add this pipeline component dll in toolbar and then use in disassemble stage.

Usage Scenario:
Sometimes in BizTalk orchestrations/messaging, incoming messages come with huge chunks of records where as a part of business process each record is processed individually one by one and the repeating record is not consistent. Publishing this huge record set in message box and further their processing consumes a lot of resources because orchestration takes a longer time to process each record individually.

Using this custom pipeline, huge messages can be broken into multiple messages of smaller records as per the each record type. And when published in a message box, orchestrations can run in parallel to process more than one message (broken ones) at the same time. Result is better performance and small processing time.

Example:
<root>
<items>
<childItem1>childItem1-1</childItem1>
<childItem1>childItem1-2</childItem1>
<childItem1>childItem1-3</childItem1>
<childItem2>childItem2-1</childItem2>
<childItem2>childItem2-1</childItem2>
<childItem3>childItem3-1</childItem3>
<childItem3>childItem3-2</childItem3>
<childItem3>childItem3-3</childItem3>
<childItem3>childItem3-4</childItem3>
<childItem3>childItem3-5</childItem3>
</items>
</root>

If we specify the BatchSize poperty as 2 and BatchRootElementXPath as //root/items. The output will come as below specified:

For childItem1 2 chunks
For childItem2 1 chunk
For childItem3 3 chunk

2 comments:

  1. I am facing error

    The Messaging Engine encountered an error during the processing of one or more inbound messages.

    ReplyDelete
  2. My input XML is....


    --1111123456117236300300DD20150925INRRDSUMRetrn Defectiv 00017A0682.13682.130DDN20150925682.1320150925QSYSOPR00000-1111123456117236300300DD20150925INRRDSUMRetrn Defectiv 00017A0-682.13-682.130DDN20150925-682.1320150925QSYSOPR00000</ns0:Inbound_Canonical_GLJE_

    ReplyDelete