Tuesday, April 7, 2015

Debatching(Splitting) XML Message in Orchestration using Custom Pipeline with specified BatchSize



I have used custom pipeline component to debatch it which uses XML disassembler.

“Disassemble” is the second stage of the receive pipeline which is primarily used to disassemble incoming messages, validate schema and promote properties. To disassemble (break) messages it uses an envelope schema and it breaks the message record-wise. There is no provision to break a set of records (batches).


Sample Code:

1. Create a class library project.
2. Add reference to below assemblies
         a). Microsoft.XLANGs.BaseTypes (C:\Program Files (x86)\Microsoft BizTalk Server 2010\Microsoft.XLANGs.BaseTypes.dll)
         b). Microsoft.BizTalk.Pipeline (C:\Program Files (x86)\Microsoft Visual Studio 10.0\Common7\IDE\PublicAssemblies\Microsoft.BizTalk.Pipeline.dll)
3. Rename default class1.cls to XMLDebatch.cs.
4. Use following code




[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
    [System.Runtime.InteropServices.Guid("6cf0de2b-fc7a-4a8d-913b-9d48d59ca584")]
    public class XMLDebatch : IBaseComponent, IDisassemblerComponent, IComponentUI, IPersistPropertyBag
    {
        #region Private Variables

        private ResourceManager resourceManager = null;

        //Used to hold disassembled messages
        private System.Collections.Queue qOutputMsgs = null;
        private string systemPropertiesNamespace = null;
        private string filePropertiesNamespace = null;

        #endregion

        #region Constructor

        /// <summary>
        /// Initializes a new instance of the <see cref="XMLDebatch"/> class.
        /// </summary>
        public XMLDebatch()
        {
            resourceManager = new ResourceManager("{YourClassLibrary}.XMLDebatch", Assembly.GetExecutingAssembly());
            qOutputMsgs = new System.Collections.Queue();
            systemPropertiesNamespace = @"http://schemas.microsoft.com/BizTalk/2003/system-properties";
            filePropertiesNamespace = "http://schemas.microsoft.com/BizTalk/2003/file-properties";

        }

        #endregion

        #region Public Properties

        /// <summary>
        /// Gets or sets the size of the batch.
        /// </summary>
        /// <value>
        /// The size of the batch.
        /// </value>
        [DisplayName("BatchSize")]
        [DefaultValue(50)]
        public int BatchSize
        {
            get;
            set;
        }

        /// <summary>
        /// Gets or sets the batch root element x path.
        /// </summary>
        /// <value>
        /// The batch root element x path.
        /// </value>
        [DisplayName("BatchRootElementXPath")]
        public string BatchRootElementXPath
        {
            get;
            set;
        }

        #endregion

        #region IBaseComponent

        /// <summary>
        /// Description of the Component
        /// </summary>
        public string Description
        {
            get { return resourceManager.GetString("COMPONENTDESCRIPTION", CultureInfo.InvariantCulture); }
        }

        /// <summary>
        /// Name of the Component
        /// </summary>
        public string Name
        {
            get { return resourceManager.GetString("COMPONENTNAME", CultureInfo.InvariantCulture); }
        }

        /// <summary>
        /// Version of the Component
        /// </summary>
        public string Version
        {
            get { return resourceManager.GetString("COMPONENTVERSION", CultureInfo.InvariantCulture); }
        }

        #endregion

        #region IPersistPropertyBag

        /// <summary>
        /// GetClassID of component for usage from unmanaged code.
        /// </summary>
        /// <param name="classID"></param>
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("0713faf8-748c-44d3-a1b1-8a80e3b168fc");
        }

        /// <summary>
        /// InitNew
        /// </summary>
        public void InitNew()
        {
        }

        /// <summary>
        /// Loads configuration properties for the component
        /// </summary>
        /// <param name="propertyBag"></param>
        /// <param name="errorLog"></param>
        public void Load(IPropertyBag propertyBag, int errorLog)
        {
            try
            {
                BatchSize = ReadPropertyBag<int>(propertyBag, "BatchSize");
                BatchRootElementXPath = ReadPropertyBag<string>(propertyBag, "BatchRootElementXPath");
            }
            catch (NullReferenceException ex)
            {
                System.Diagnostics.EventLog.WriteEntry("Error in reading property bag", ex.Message);
                throw ex;
            }
        }

        /// <summary>
        /// Saves the current component configuration into the property bag
        /// </summary>
        /// <param name="pb">Configuration property bag</param>
        /// <param name="fClearDirty">not used</param>
        /// <param name="fSaveAllProperties">not used</param>
        public virtual void Save(Microsoft.BizTalk.Component.Interop.IPropertyBag pb, bool fClearDirty, bool fSaveAllProperties)
        {
            WritePropertyBag(pb, "BatchSize", this.BatchSize);
            WritePropertyBag(pb, "BatchRootElementXPath", this.BatchRootElementXPath);
        }

        #endregion

        #region IComponentUI

        /// <summary>
        /// Component icon to use in BizTalk Editor
        /// </summary>
        [System.ComponentModel.Browsable(false)]
        public IntPtr Icon
        {
            get
            {
                return ((System.Drawing.Bitmap)(this.resourceManager.GetObject("COMPONENTICON", System.Globalization.CultureInfo.InvariantCulture))).GetHicon();
            }
        }

        /// <summary>
        /// The Validate method is called by the BizTalk Editor during the build of a BizTalk project.
        /// </summary>
        /// <param name="projectSystem"></param>
        /// <returns></returns>
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }

        #endregion

        #region IDisassemblerComponent

        /// <summary>
        /// Disassembles the specified p context.
        /// </summary>
        /// <param name="pContext">The p context.</param>
        /// <param name="pInMsg">The pInMSG.</param>
        /// <exception cref="System.ApplicationException">
        /// Error in reading original message:  + ex.Message
        /// or
        /// Error in writing outgoing messages:  + ex.Message
        /// </exception>
        public void Disassemble(IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg)
        {
            XPathNavigator xPathNavigator = null;
            XmlNamespaceManager namespaceManager = null;
            XPathNodeIterator nodeIterator = null;
            string receivedFileName = pInMsg.Context.Read("ReceivedFileName", filePropertiesNamespace).ToString();
            string fileName = System.IO.Path.GetFileNameWithoutExtension(receivedFileName);
            long enrollmentsCount = 0;
            bool isOutstandingChunks = true;

            try
            {
                //fetch original message
                Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
                XPathDocument document = new XPathDocument(originalMessageStream);
                xPathNavigator = document.CreateNavigator();
                namespaceManager = XmlDocumentHelper.GetNameSpacesFromMessage(document);
                enrollmentsCount = long.Parse(xPathNavigator.Evaluate(string.Concat("count(", BatchRootElementXPath, "/*)"), namespaceManager).ToString());
            }

            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in reading original message: ", ex.Message));
                throw new ApplicationException(string.Concat("Error in reading original message: ", ex.Message));
            }

         
            try
            {
                long startPosition = 1;
                long FinalPosition = BatchSize;
                string[] nodes = null;
                string finalChunkString = null;
                XmlDocument chunkDoc = null;
                XmlNode batchRootElement = null;
                int batchCount = 0;
                string batchId = Guid.NewGuid().ToString();
                XmlDocument templateDoc = new XmlDocument();

                templateDoc.LoadXml(xPathNavigator.OuterXml);
                batchRootElement = templateDoc.SelectSingleNode(BatchRootElementXPath, namespaceManager);
                batchRootElement.InnerXml = string.Empty;   //Clear children.

                while (isOutstandingChunks)
                {
                    chunkDoc = templateDoc;
                    batchRootElement = chunkDoc.SelectSingleNode(BatchRootElementXPath, namespaceManager);
                    string namespaceURI = chunkDoc.DocumentElement.NamespaceURI;
                    string rootElement = chunkDoc.DocumentElement.LocalName;


                    nodeIterator = xPathNavigator.Select(string.Concat(BatchRootElementXPath, string.Format("/*[position() >= {0} and position() <= {1}]", startPosition, FinalPosition)), namespaceManager);
                    nodes = nodeIterator.Cast<XPathNavigator>().Select(item => item.OuterXml).ToArray();
                    finalChunkString = string.Join(string.Empty, nodes);

                    batchRootElement.InnerXml = finalChunkString;

                    batchCount++;
                    CreateOutgoingMessage(pContext, chunkDoc.OuterXml, namespaceURI, rootElement, pInMsg, batchCount, batchId);
                    startPosition = FinalPosition + 1;
                    if (FinalPosition >= enrollmentsCount)
                    {
                        isOutstandingChunks = false;
                    }
                    else
                    {
                        FinalPosition = FinalPosition + BatchSize;
                        if (FinalPosition > enrollmentsCount)
                        {
                            FinalPosition = enrollmentsCount;
                        }
                    }

                }

                batchRootElement.InnerXml = string.Empty;

            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in writing outgoing messages: ", ex.Message));
                throw new ApplicationException("Error in writing outgoing messages: " + ex.Message);
            }
        }

        /// <summary>
        /// Gets the next message.
        /// </summary>
        /// <param name="pContext">The pContext.</param>
        /// <returns></returns>
        public Microsoft.BizTalk.Message.Interop.IBaseMessage GetNext(IPipelineContext pContext)
        {
            IBaseMessage returnMessage = null;

            if (qOutputMsgs.Count > 0)
            {
                returnMessage = (IBaseMessage)qOutputMsgs.Dequeue();
            }

            return returnMessage;
        }

        #endregion

        #region Private Methods

        /// <summary>
        /// Reads the property bag.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="propertyBag">The property bag.</param>
        /// <param name="propName">Name of the property.</param>
        /// <returns></returns>
        private T ReadPropertyBag<T>(IPropertyBag propertyBag, string propName)
        {
            T returnValue = default(T);
            object val = null;
            try
            {
                propertyBag.Read(propName, out val, 0);
                val = val ?? default(T);
                returnValue = (T)Convert.ChangeType(val, typeof(T));
            }
            catch (System.ArgumentException)
            {
                return returnValue;
            }
            catch (System.Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in reading into property bag", ex.Message));
                throw new ApplicationException(string.Concat("Error in reading into property bag: ", ex.Message));
            }
            return returnValue;
        }

        /// <summary>
        /// Writes the property bag.
        /// </summary>
        /// <param name="propertyBag">The property bag.</param>
        /// <param name="propName">Name of the property.</param>
        /// <param name="val">The value.</param>
        private void WritePropertyBag(IPropertyBag propertyBag, string propName, object val)
        {
            try
            {
                propertyBag.Write(propName, ref val);
            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("Error in writing into property bag", ex.Message);
                throw new ApplicationException(string.Concat("Error in writing into property bag: ", ex.Message));
            }
        }

        /// <summary>
        /// Queue outgoing messages
        /// </summary>
        private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement, IBaseMessage pInMsg, int batchIndex, string batchId)
        {

            IBaseMessage outMsg = null;
            try
            {
                string receivedFileName = pInMsg.Context.Read("ReceivedFileName", filePropertiesNamespace).ToString();
                string fileName = System.IO.Path.GetFileNameWithoutExtension(receivedFileName);
                string[] fileNameAttr = fileName.Split(new char[] { '.' });

                fileName = fileNameAttr[fileNameAttr.Length - 1];
                receivedFileName = Path.GetDirectoryName(receivedFileName);
                //create outgoing message
                outMsg = pContext.GetMessageFactory().CreateMessage();
                outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
                outMsg.Context = PipelineUtil.CloneMessageContext(pInMsg.Context);
                outMsg.Context.Promote("MessageType", systemPropertiesNamespace, string.Concat(namespaceURI, "#", rootElement));
                outMsg.Context.Promote("ReceivedFileName", filePropertiesNamespace, string.Concat(receivedFileName, "\\", fileName, ".", batchId, ".Chunk.", batchIndex.ToString("00"), ".xml"));
                byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
                outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
                qOutputMsgs.Enqueue(outMsg);

            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry("XMLDebatch", string.Concat("Error in queening outgoing messages:", ex.Message));
                throw new ApplicationException(string.Concat("Error in queening outgoing messages: ", ex.Message));
            }
        }

        #endregion

    }


Create new XmlDocumentHelper class and use the below code.

public class XmlDocumentHelper
    {
        #region Private Variables

        #endregion

        #region Constructor

        /// <summary>
        /// Initializes a new instance of the <see cref="XmlDocumentHelper"/> class.
        /// </summary>
        public XmlDocumentHelper()
        {
        }

        #endregion

        #region Private Methods

        #endregion

        #region Public Methods

        /// <summary>
        /// Constructs the empty message.
        /// </summary>
        /// <param name="objType">Type of the object.</param>
        /// <param name="rootNodeName">Name of the root node.</param>
        /// <returns></returns>
        public static XmlDocument ConstructEmptyMessage(Type objType, string rootNodeName = "")
        {
            Type MessageSchemaType = objType;

            try
            {
                DocumentSpec DocumentSpecification = null;
                if (string.IsNullOrEmpty(rootNodeName))
                {
                    DocumentSpecification = new DocumentSpec(MessageSchemaType.FullName, MessageSchemaType.Assembly.FullName);
                }
                else
                {
                    DocumentSpecification = new DocumentSpec(string.Concat(MessageSchemaType.FullName, "+", rootNodeName), MessageSchemaType.Assembly.FullName);
                }

                XmlDocument SchemaInstance = new XmlDocument();
                using (StreamReader InstanceStreamReader = new StreamReader(DocumentSpecification.GetDocSchema().CreateXmlInstance()))
                {
                    SchemaInstance.Load(InstanceStreamReader);
                }

                return SchemaInstance;

            }
            catch (Exception ex)
            {
                throw ex;
            }

        }

        /// <summary>
        /// Gets the name spaces from message.
        /// </summary>
        /// <param name="doc">The document.</param>
        /// <returns></returns>
        public static XmlNamespaceManager GetNameSpacesFromMessage(XPathDocument xDocument)
        {
            XmlNamespaceManager namespaceManager = new XmlNamespaceManager(new NameTable());
            XPathNavigator nav = xDocument.CreateNavigator();
            XPathNodeIterator nodes = (XPathNodeIterator)nav.Evaluate("//namespace::*");

            while (nodes.MoveNext())
            {
                namespaceManager.AddNamespace(nodes.Current.Name, nodes.Current.Value);
            }


            return namespaceManager;
        }

        /// <summary>
        /// Sets the node value.
        /// </summary>
        /// <param name="doc">The document.</param>
        /// <param name="xpath">The xpath.</param>
        /// <param name="value">The value.</param>
        /// <param name="namespaceManager">The namespace manager.</param>
        public static void SetNodeValue(XmlDocument doc, string xpath, string value, XmlNamespaceManager namespaceManager)
        {
            if (doc != null && !string.IsNullOrEmpty(xpath))
            {
                string nodeValue = string.Empty;
                XmlNode xpathNode = doc.SelectSingleNode(xpath, namespaceManager);
                if (xpathNode != null)
                {
                    xpathNode.InnerText = value;
                }
            }
        }

        #endregion
    }