财神道app下载最新版本-财神到购彩大厅(彩世界)

热门关键词: 财神道app下载最新版本,财神到购彩大厅

如何安装和使用Beanstalkd工作队列(1)财神道app下载

如何安装和使用Beanstalkd工作队列(1)

介绍

小心翼翼地宣布每一元素的职责部署应用程序栈带来很多好处,包括简单的诊断问题时发生,规模迅速的能力,以及更清晰的管理范围涉及的组件。

在当今世界web服务的工程,一个关键的组件实现上述场景涉及利用消息队列和工作(或任务)。这些通常是弹性和灵活的应用程序很容易实现和设置。他们是完美的分裂的不同部分之间的业务逻辑应用程序包时生产。

在这篇文章中,我们的应用程序级别系列通信解决方案,我们将看看Beanstalkd创建这个片段的分离。

什么是Beanstalkd

Beanstalkd首先是解决了一个流行的web应用程序的需求(Facebook上的原因)。目前,这是一个绝对可靠,易于安装的消息传递服务,是完美的开始和使用。

如前所述,Beanstalkd的主要用例是管理不同部分和工人之间的工作流应用程序的部署通过工作队列和消息堆栈,类似于其他受欢迎的解决方案,比如RabbitMQ。然而,创建Beanstalkd使它有别于其他工作。

自成立以来,与其他解决方案,Beanstalkd旨在成为一个工作队列,而不是一把雨伞工具来满足许多需求。为了实现这一目的,它作为一种轻量级的、快速有效的应用程序基于C编程语言。精益建筑还允许它是安装和使用非常简单,使它适合大多数用例。

Features(特性)

能够监控工作返回ID,在创建返回,只有一个的特性使它有别于其他的Beanstalkd。提供一些其他有趣的功能是:

1.持久性—>Beanstalkd运行使用内存,但也提供了持久性支持。

2.优先级—>与大多数选择一样,Beanstalkd提供了不同的任务的优先级来处理紧急事情时需要。

3.分布 —->不同的服务器实例可以分布类似于Memcached是如何工作的。

4.掩盖 —-> 有可能通过掩盖它无限期延期的作业(即任务)。

5.第三方工具—>Beanstalkd附带各种第三方工具包括综合领先指标和基于web的管理控制台。

6.过期 —->工作可以设置为过期,auto-queue之后(TTR – Time To Run).

Beanstalkd使用案例

一些模范的Banstalkd用例:

允许web服务器快速响应请求,而不是被迫当场曾推高程序执行

在指定的时间间隔执行某些工作(即爬行web)

分发到多个工作人员进行处理

让离线客户端(例如一个断开连接的用户)获取数据在稍后的时间,而不是让它永久失去了通过一个工人

引入完全异步功能的后端系统

订购和优先任务

应用程序负载不同员工之间保持平衡

极大地提高应用程序的可靠性和正常运行时间

处理CPU密集型工作(视频、图片等)

发送电子邮件到您的列表和更多。

Beanstalkd元素

就像大多数应用程序,Beanstalkd附带自己的术语来解释它的部分。

Tubes / Queues

Beanstalkd管翻译从其他消息传递应用程序队列。他们是通过工作(或消息)转移到消费者(即工人)。

Jobs / Messages

由于Beanstalkd是一个工作队列,通过管称为转移工作是什么——类似于所发送的消息。

Producers / Senders

生产商,类似于高级消息队列协议的定义,是应用程序创建和发送工作(或消息)。他们正在使用的消费者。

Consumers / Receivers

接收器是不同的应用程序的堆栈从管找份工作,由生产者进行处理。

在Ubuntu 13安装Beanstalkd

可以很简单获得Beanstalkd通过包管理器才能和开始。然而,在几个命令,您还可以从源下载并安装它。

注意:我们将执行安装和执行行动列在这里的新鲜和新创建的液滴由于各种原因。如果你是积极服务客户,可能会修改您的系统,不要打破任何工作和不运行在问题,强烈建议您试着在一个新系统下面的说明。

使用aptitude安装:

下载并安装Beanstalkd运行以下命令:

aptitude install -y beanstalkd 

编辑默认配置文件让随着系统启动

vim /etc/default/beanstalkd 

打开文件后,向下滚动并找到底部线#开始= yes。将其更改为:

START=yes 

下面介绍源码安装

我们需要从源代码安装过程的一个关键工具- Git。

运行以下获取Git在你系统上:

aptitude install -y git 

下载必要的开发工具软件包:

aptitude install -y build-essential 

使用Git克隆(下载)官方库:

git clone https://github.com/kr/beanstalkd 

进入到下载目录:

cd beanstalkd 

从源代码构建应用程序:

make 

安装:

make install 

再介绍一下centos下源码安装:

下载地址:   wget   http://cloud.github.com/downloads/kr/beanstalkd/beanstalkd-1.4.6.tar.gz   解压:   tar xzf beanstalkd-1.4.6.tar.gz   cd beanstalkd-1.4.6   /configure  make   make install   默认安装路径 :/usr/local/bin/   查看版本:   /usr/local/bin/beanstalkd -v   1.4.6 

财神道app下载最新版本 1


) 介绍 小心翼翼地宣布每一元素的职责部署应用程序栈带来很多好处,包括简单的诊断问题时发生,规模迅...

消息代理的必要性


<Settings>
    <Setting Key="ThisServerName" Value="halil_pc" />
    <Setting Key="StorageType" Value="MySQL-ODBC" />
    <Setting Key="ConnectionString" 
       Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" />
  </Settings>

      ØMQ是一种消息传递系统,或者乐意的话可以称它为“面向消息的中间件”。它在金融服务,游戏开发,嵌入式系统,学术研究和航空航天等多种环境中被使用。

using System;
using MDS;
using MDS.Client;
using StockCommonLib;

namespace StockServer
{
    class Program
    {
        static void Main(string[] args)
        {
            var mdsClient = new MDSClient("StockServer");
            mdsClient.MessageReceived  = MDSClient_MessageReceived;

            mdsClient.Connect();

            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            mdsClient.Disconnect();
        }

        static void MDSClient_MessageReceived(object sender, 
                    MessageReceivedEventArgs e)
        {
            //Get message
            var stockQueryMessage = 
                GeneralHelper.DeserializeObject(e.Message.MessageData) 
                as StockQueryMessage;
            if (stockQueryMessage == null)
            {
                return;
            }

            //Write message content
            Console.WriteLine("Stock Query Message for: "   
                              stockQueryMessage.StockCode);

            //Get stock counts from a database...
            int reservedStockCount;
            int totalStockCount;
            switch (stockQueryMessage.StockCode)
            {
                case "S01":
                    reservedStockCount = 14;
                    totalStockCount = 80;
                    break;
                case "S02":
                    reservedStockCount = 0;
                    totalStockCount = 25;
                    break;
                default: //Stock does not exists!
                    reservedStockCount = -1;
                    totalStockCount = -1;
                    break;
            }

            //Create a reply message for stock query
            var stockQueryResult = new StockQueryResultMessage
                                       {
                                           StockCode = stockQueryMessage.StockCode,
                                           ReservedStockCount = reservedStockCount,
                                           TotalStockCount = totalStockCount
                                       };

            //Create a MDS response message to send to client
            var responseMessage = e.Message.CreateResponseMessage();
            responseMessage.MessageData = 
               GeneralHelper.SerializeObject(stockQueryResult);

            //Send message
            responseMessage.Send();

            //Acknowledge the original request message.
            //So, it will be deleted from queue.
            e.Message.Acknowledge();
        }
    }
}

Lock-Free Algorithms

  无锁算法最近一直流行起来。 它们是线程间通信的简单机制,它不依赖于内核提供的同步原语,例如互斥体或信号量; 相反,它们使用原子CPU操作(诸如原子compare-and-swap(CAS))来进行同步。 应当理解,它们不是字面上无锁的,而是在硬件级别的幕后进行锁定。

  ØMQ在管道对象中使用无锁队列在用户的线程和ØMQ的工作线程之间传递消息。 ØMQ如何使用无锁队列有两个有趣的方面。

  首先,每个队列只有一个写线程和一个读线程。 如果需要1对N通信,则创建多个队列(图8)。 考虑到这种方式,队列不必关心同步写入器(只有一个写入器)或读取器(只有一个读取器),它可以以额外的高效方式实现。

 财神道app下载最新版本 2

  图八:Queues

  第二,我们意识到虽然无锁算法比传统的基于互斥的算法更高效,但原子CPU操作仍然代价较高(尤其是在CPU核心之间存在争用时),并且对每个写入的消息和/或每个消息执行原子操作读的速度比我们能接受的要慢。 

  加快速度的方法是再次批量处理。 想象一下,你有10条消息要写入队列。 例如,当收到包含10条小消息的网络包时,可能会发生这种情况。 接收分组是原子事件; 所以你不会只得到一半。 这个原子事件导致需要向无锁队列写入10条消息。 对每条消息执行原子操作没有太多意义。 相反,可以在队列的“预写”部分中累积消息,该部分仅由写入程序线程访问,然后使用单个原子操作刷新它。 

  这同样适用于从队列读取。 想象上面的10个消息已经刷新到队列。 阅读器线程可以使用原子操作从队列中提取每个消息。 然而,它是超杀; 相反,它可以使用单个原子操作将所有未决消息移动到队列的“预读”部分。 之后,它可以逐个从“预读”缓冲区检索消息。 “预读”仅由读取器线程拥有和访问,因此在该阶段不需要任何同步。

  图9左侧的箭头显示了如何通过修改单个指针可以将预写缓冲区刷新到队列。 右边的箭头显示了队列的整个内容如何可以通过不做任何事情而修改另一个指针来转移到预读。 

财神道app下载最新版本 3

  图九:Lock-free queue

获得的教训:无锁算法很难发明,麻烦执行,几乎不可能调试。 如果可能,请使用现有的成熟算法,而不是发明自己的。 当需要最佳性能时,不要仅依赖无锁算法。 虽然它们速度快,但通过在它们之上进行智能批处理可以显着提高性能。 


下面是一个使用MySQL-ODBC作为存储的简单配置:

Messaging Patterns

  在任何消息系统中,最重要的设计问题是如何为用户提供一种方式来指定哪些消息被路由到哪些目的地。 有两种主要方法,我认为这种二分法是非常通用的,并且适用于基本上在软件领域遇到的任何问题。 

  一种方法是采用UNIX的“做一件事,并做好”的哲学。 这意味着,问题领域应该被人为地限制在一个小的并且易于理解的区域。 然后程序应该以正确并详尽的方式解决这个限制的问题。 消息传递领域中的这种方法的示例是MQTT。 它是一种用于向一组消费者分发消息的协议。 它不能用于任何其他用途(比如说RPC),但它很容易使用,并且用做消息分发很好。 

  另一种方法是关注通用性并提供强大且高度可配置的系统。 AMQP是这样的系统的示例。 它的队列和交换的模型为用户提供了几乎任何路由算法定义的方法。 当然,权衡,需要关心很多选项。  

  ØMQ选择前一个模型,因为它允许基本上任何人使用最终产品,而通用模型需要消息传递专家使用它。 为了演示这一点,让我们看看模型如何影响API的复杂性。 以下是在通用系统(AMQP)之上的RPC客户端的实现:

 1 connect ("192.168.0.111")
 2 exchange.declare (exchange="requests", type="direct", passive=false,
 3     durable=true, no-wait=true, arguments={})
 4 exchange.declare (exchange="replies", type="direct", passive=false,
 5     durable=true, no-wait=true, arguments={})
 6 reply-queue = queue.declare (queue="", passive=false, durable=false,
 7     exclusive=true, auto-delete=true, no-wait=false, arguments={})
 8 queue.bind (queue=reply-queue, exchange="replies",
 9     routing-key=reply-queue)
10 queue.consume (queue=reply-queue, consumer-tag="", no-local=false,
11     no-ack=false, exclusive=true, no-wait=true, arguments={})
12 request = new-message ("Hello World!")
13 request.reply-to = reply-queue
14 request.correlation-id = generate-unique-id ()
15 basic.publish (exchange="requests", routing-key="my-service",
16     mandatory=true, immediate=false)
17 reply = get-message ()

  另一方面,ØMQ将消息传递分为所谓的“消息模式”。 模式的示例是“发布/订阅”,“请求/回复”或“并行化流水线”。 每个消息模式与其他模式完全正交,并且可以被认为是一个单独的工具。

  以下是使用ØMQ的请求/回复模式重新实现上述应用程序。 注意如何将所有选项调整减少到选择正确的消息模式(“REQ”)的单一步骤:

1 s = socket (REQ)
2 s.connect ("tcp://192.168.0.111:5555")
3 s.send ("Hello World!")
4 reply = s.recv ()

  到目前为止,我们认为具体的解决方案比通用解决方案更好。我们希望我们的解决方案尽可能具体。然而,同时,我们希望为我们的客户提供尽可能广泛的功能。我们如何才能解决这个明显的矛盾?

答案包括两个步骤:

  1. 定义堆栈的层以处理特定问题区域(传输,路由,呈现等)。
  2. 提供该层的多个实现。对于每个用例应该有一个单独的不相交的实现。

  让我们来看看Internet栈中传输层的例子。它意味着在网络层(IP)的顶部上提供诸如传送数据流,应用流控制,提供可靠性等的服务。它通过定义多个不相交解决方案:TCP面向连接的可靠流传输,UDP无连接不可靠数据包传输,SCTP传输多个流,DCCP不可靠连接等。

  注意每个实现是完全正交的:UDP端点不能说TCP端点。 SCTP端点也不能与DCCP端点通信。这意味着新的实现可以在任何时候添加到堆栈,而不会影响堆栈的现有部分。相反,失败的实现可以被忘记和丢弃而不损害作为整体的传输层的可行性。

就像图 - 5显示的那样,你可以把和DotNetMQ关联的应用程序作为消息代理来添加/删除。对于这些修改是不需要重启DotNetMQ的。应用程序的配置也保存在MDSSettings.xml文件里,就像下面这样:

 Architecture Overview

  到目前为止,我们专注于使ØMQ快速的通用原则。现在,让我们看看系统的实际架构(图6)。 

财神道app下载最新版本 4

  图六:ØMQ architecture

  用户使用所谓的“sockets”与ØMQ交互。 它们非常类似于TCP套接字,主要的区别是每个套接字可以处理与多个对等体的通信,有点像未绑定的UDP套接字。

  套接字对象存在于用户线程中(参见下一节中的线程模型的讨论)。除此之外,ØMQ运行多个工作线程来处理通信的异步部分:从网络读取数据,排队消息,接受接入连接等。

  在工作线程中存在各种对象。每个对象都由一个父对象拥有(所有权由图中的简单实线表示)。父对象可以在与子对象不同的线程中。大多数对象直接由套接字拥有; 然而,有几种情况下,对象由套接字拥有的对象所拥有。 我们得到的是一个对象树,每个套接字有一个这样的树。 这种树在关闭期间使用; 没有对象可以自己关闭,直到它关闭所有的子对象。 这样我们可以确保关机过程按预期工作; 例如,等待的出站消息被推送到网络优先于结束发送过程。

  大致来说,有两种异步对象:在消息传递中不涉及的对象和另外一些对象。前者主要做连接管理。例如,TCP侦听器对象侦听传入的TCP连接,并为每个新连接创建引擎/会话对象。类似地,TCP连接器对象尝试连接到TCP对等体,并且当它成功时,它创建一个引擎/会话对象来管理连接。 当此类连接失败时,连接器对象尝试重新建立连接。 

  后者是正在处理数据传输本身的对象。 这些对象由两部分组成:会话对象负责与ØMQ套接字交互,引擎对象负责与网络通信。 只有一种会话对象,但是对于ØMQ支持的每个底层协议有不同的引擎类型。 因此,我们有TCP引擎,IPC(进程间通信)引擎,PGM引擎(可靠的多播协议,参见RFC 3208)等。引擎集是可扩展的 (在将来我们可以选择实现 WebSocket引擎或SCTP引擎)。 

  会话与套接字交换消息。 有两个方向传递消息,每个方向由管道对象处理。每个管道基本上是一个优化的无锁队列,用于在线程之间快速传递消息。 

  最后,有一个context对象(在前面的部分中讨论,但没有在图中显示),它保存全局状态,并且可以被所有的套接字和所有的异步对象访问。


图 - 2:一个糟糕的集成应用程序解决方案。

      从第三年开始,ØMQ它的代码库已经增长地过大; 所以有一个倡议来标准化其使用的有线协议,以及在Linux内核中实验性地实现一个类似ØMQ的消息系统等。这些主题在这里就不涉及了。 但是,你可以获取在线资源( online resources)以获取更多详细信息。

原文:

 API

  用户接口是任何产品的最重要的部分。 这是你的程序中唯一可以看到的外部世界。 在最终用户产品中,它是GUI或命令行界面。 在库中它是API。

  在早期版本的ØMQ中,API基于AMQP的交换和队列模型。 (参见AMQP specification。)从历史的角度看,有趣的是看看2007年的白皮书(white paper from 2007),它试图权衡AMQP与无代理的消息模型。 我花了2009年年底重写它几乎从零开始使用BSD套接字API。 这是转折点; ØMQ从那时起就被快速采用。 虽然之前它是一个被一群消息专家使用的niche产品,后来它成为任何人的一个方便的常见工具。 在一年多的时间里,社区的规模增加了十倍,实现了约20种不同语言的绑定等。

  用户接口定义产品的感知。 基本上没有改变功能 - 只是通过更改API - ØMQ从“企业消息传递系统”产品更改为“网络消息传递系统”产品。 换句话说,感觉从“大型银行的一个复杂的基础设施”改变为“嗨,这有助于我将我的10字节长的消息从应用程序A发送到应用程序B”。

获得的经验:了解您想要的项目是什么,并相应地设计用户接口。 不符合项目愿景的用户接口是100%要失败的。

  迁移到BSD Sockets API的一个重要方面是,它不是一个革命性的新发明的API,而是一个现有的和知名的。 实际上,BSD套接字API是今天仍在使用的最古老的API之一; 它可追溯到1983年和4.2BSD Unix。 它被广泛稳定了使用几十年。 

  上述事实带来了很多优点。 首先,它是一个大家都知道的API,所以学习曲线非常短。 即使你从来没有听说过ØMQ,你可以在几分钟内构建你的第一个应用程序,因为你能够重用你的BSD套接字知识。

  此外,使用广泛实现的API可以实现ØMQ与现有技术的集成。 例如,将ØMQ对象暴露为“套接字”或“文件描述符”允许在同一事件循环中处理TCP,UDP,管道,文件和ØMQ事件。 另一个例子:实验项目给Linux内核带来类似ØMQ的功能,实现起来很简单。 通过共享相同的概念框架,它可以重用许多已经到位的基础设施。

  最重要的是,BSD套接字API已经存活了近三十年,尽管多次尝试更换它意味着在设计中有一些固有的合理的地方。 BSD套接字API设计者已经(无论是故意还是偶然) 做出了正确的设计决策。 通过采用这套API,我们可以自动共享这些设计决策,甚至可以不知道他们是什么,他们要解决什么问题。 

经验教训:虽然代码重用已经从很久前得到重视并且模式重用在后来被加以考虑,但重要的是以更通用的方式考虑重用。 在设计产品时,请看看类似的产品。 检查哪些失败,哪些已成功; 从成功的项目中学习。 Don't succumb to Not Invented Here syndrome。 重用思想,API,概念框架,以及无论你觉得合适的东西。 通过这样做,可以做到允许用户重用他们现有的知识。 同时,可能会避免目前还不知道的技术陷阱。


Filters用于决定消息使用哪个路由。如果一个消息的属性和其中一个过滤器匹配,该消息就会被路由。这有5个条件(XML的5个属性)来定义一个过滤器:

Application vs. Library

      ØMQ是一个消息库,而不是一个消息服务器。我们花了几年时间研究AMQP协议(一个金融行业尝试标准化企业消息传递的有线协议),为其编写参考实现并参与了好几个大规模的基于消息传递技术的大型项目,并最终意识到意识到使用经典客户端/服务器模型的智能消息传递服务器(代理)和哑消息传递客户端的方法有问题。

      我们首要关注的是性能:如果中间有一个服务器,每个消息必须通过网络两次(从发送方到代理,从代理到接收方),这在延迟和吞吐量方面都会有一定代价。 此外,如果所有消息都通过代理传递,在某一时刻,服务器必然成为瓶颈。 

      次要关注的是大规模部署:当部署跨组织(如:公司等)时,管理整个消息流的中央授权的概念不再适用。由于商业秘密和法律责任,没有公司愿意将控制权交给不同公司的服务器。在实践中的结果是,每个公司有一个消息服务器,用桥接器连接到其他公司的消息传递系统。整个系统因此严重分散,并且为每个涉及的公司维护大量的桥接器不会使情况更好。为了解决这个问题,我们需要一个完全分布式的架构,该架构中每个组件都可能由不同的业务实体控制。考虑到基于服务器的架构中的管理单元是服务器,我们可以通过为每个组件安装单独的服务器来解决上述问题。在这种情况下,我们可以通过使服务器和组件共享相同的进程来进一步优化设计。这样我们最终得到一个消息库。 

      ØMQ开始时,我们有一个想法,即如何使消息工作没有中央服务器。 它需要将消息的整个概念颠倒过来,并且基于端到端原则,使用“智能端点,哑网络”架构来替换自主集中存储网络中心的消息的模型。 这个决定的技术将决定ØMQ从一开始就是是一个消息库,而不是一个应用程序。

      我们已经能够证明这种架构比标准方法更高效(更低的延迟,更高的吞吐量)和更灵活(很容易构建任意复杂的拓扑,而不是限定为经典的hub-and-spoke模型)。

      其中一个出乎意料的结果是,选择库模型改善了产品的可用性。 一次又一次,用户因不必安装和管理独立的消息服务器而感到开心。 事实证明,没有服务器是一个首选项,因为它降低了运营成本(不需要有一个消息服务器管理员),并加快上线时间(无需与客户协商是否运行服务器,以及管理或运营团队的问题) 。

学到的教训是,当开始一个新的项目时,如果可能的话应该选择库设计。从一个简单的程序调用库可以很容易创建一个应用程序; 然而,几乎不可能从现有的可执行文件创建库。 库模型为用户提供了更多的灵活性,同时节省了他们不必要的管理工作。


图 - 6:Application1通过DotNetMQ发送两个消息到Application2。

Concurrency Model

      ØMQ的要求之一是利用计算机的多核; 换句话说,可以根据可用CPU内核的数量线性扩展吞吐量。  

  我们以前的消息系统经验表明,以经典方式使用多个线程(临界区,信号量等)不会带来很多性能改进。 事实上,即使在多核上测量,消息系统的多线程版本可能比单线程版本慢。 单独的线程花费太多时间等待对方,同时引发了大量的上下文切换,从而使系统减速。

  考虑到这些问题,我们决定采用不同的模式。 目标是避免完全锁定,让每个线程全速运行。 线程之间的通信是通过在线程之间传递的异步消息(事件)提供的。 这正是经典的Actor模型。

  这个想法的思想是为每个CPU核心启动一个工作线程(有两个线程共享同一个核心只会意味着很多上下文切换没有特别的优势)。每个内部ØMQ对象,比如说,一个TCP引擎,将绑定到一个特定的工作线程。 这反过来意味着不需要临界区,互斥体,信号量等。 此外,这些ØMQ对象不会在CPU核心之间迁移,从而避免高速缓存污染对性能的负面影响(图7)

财神道app下载最新版本 5

  图七:Multiple worker threads

  这个设计使很多传统的多线程问题消失了。 然而,需要在许多对象之间共享工作线程,这反过来意味着需要某种协作多任务。 这意味着我们需要一个调度器; 对象需要是事件驱动的,而不是控制整个事件循环。 也就是说,我们必须处理任意事件序列,即使是非常罕见的事件,我们必须确保没有任何对象持有CPU太长时间; 等等 

  简而言之,整个系统必须完全异步。 没有对象可以做阻塞操作,因为它不仅会阻塞自身,而且会阻塞共享同一个工作线程的所有其他对象。 所有对象必须成为状态机,无论是显式还是隐式。 有数百或数千个状态机并行运行,你就必须处理它们之间的所有可能的交互,并且最重要的是关闭过程。

  事实证明,以干净的方式关闭完全异步系统是一个非常复杂的任务。 试图关闭一千个移动部件,其中一些工作,一些空闲,一些在启动过程中,其中一些已经自行关闭,容易出现各种竞态条件,资源泄漏和类似情况。 关闭子系统绝对是ØMQ中最复杂的部分。 对Bug跟踪器的快速检查表明,大约30%-50%的报告的错误与以某种方式关闭相关。

获得的经验:在努力实现最佳性能和可扩展性时,请考虑actor模型; 它几乎是这种情况下唯一的方法。 但是,如果你不使用像Erlang或ØMQ这样的专用系统,你必须手工编写和调试大量的基础设施。 此外,从一开始,想想关闭系统的过程。 它将是代码库中最复杂的部分,如果你不清楚如何实现它,你应该可以重新考虑使用actor模型。 


应用程序

 Global State

  全局变量不能很好地与库交互。 即使只有一组全局变量,库可能在进程中也会加载多次。 图1显示了一个从两个不同的独立库中使用的ØMQ库的情况。 然后应用程序使用这两个库的示例

财神道app下载最新版本 6

 

 

 

 

  图1: ØMQ 库在两个不同的独立库中被使用

  当这种情况发生时,ØMQ的两个实例访问相同的变量,导致竞态条件,奇怪的错误和未定义的行为。为了防止这个问题的出现,ØMQ库中没有全局变量。相反,库的用户负责显式地创建全局状态变量。包含全局状态的对象称为context。 虽然从用户的角度来看,context看起来或多或少像一个工作线程池,但从ØMQ的角度来看,它只是一个存储任何我们碰巧需要的全局状态的对象。在上图中,libA有自己的context,libB也有自己的context。没有办法让他们中的一个破坏或颠覆另一个。

 这里的教训很明显:不要在库中使用全局状态。如果你这样做,当它恰好在同一个进程中被实例化两次时,库很可能会被中断。


你也可以调用服务的其他方法,会得到像常规方法那样的返回值。实际上,你的方法调用被转换成了可靠的消息,比如,即使你的远程应用程序(MyMailSmsService)在方法调用时没有运行,在服务启动后也会被调用,所以你的方法调用是一定会被调用的。

  相同的原则适用于由ØMQ定义的消息模式。消息模式在传输层(TCP和朋友)之上形成层(所谓的“可伸缩性层”)。单独的消息模式是该层的实现。它们是严格正交的

发布/订阅端点不能说请求/回复端点等。模式之间的严格分离意味着可以根据需要添加新模式,并且失败的新模式的实验赢得“不利于现有模式。

获得的经验:在解决复杂和多方面的问题时,可能会发现单一通用解决方案可能不是最好的解决方法。相反,我们可以将问题区域看作一个抽象层,并提供该层的多个实现,每个集中在一个特定的定义良好的用例。在这样做时,请仔细描述用例。确保范围,什么不在范围内。太明显地限制用例,应用程序可能会受到限制。然而,如果定义的问题太宽泛,产品可能变得太复杂,模糊,并使用户产生混淆。


  • Sequential:消息依次顺序的路由到目标服务器。Destination的RouteFactor是分发因子。
  • Random:消息随机的路由到目标服务器。选择Server-A服务器的概率是:(Server-A的RouteFactor)/(Destinations里所有RouteFactor的总和)。

      本文将深入了解上述三个目标如何转化为ØMQ的内部架构,并为那些正在努力解决相同问题的人提供一些提示或技巧。

在我们的项目里添加这个代理类后,我们就可以想简单方法调用那样向服务发消息了。

Conclusion

  随着我们的世界变得充满了许多通过互联网连接的小型计算机 - 移动电话,RFID阅读器,平板电脑和笔记本电脑,GPS设备等 - 分布式计算的问题不再是学术科学的领域,并且成为常见的日常问题 为每个开发者解决。 不幸的是,解决方案主要是具体领域的hacks。 本文总结了我们系统地构建大规模分布式系统的经验。 关注从软件架构的角度来看有趣的问题,希望开源社区的设计师和程序员会发现它有用。


MartinSústrik是消息传递中间件领域的专家。 他参与了AMQP标准的创建和参考实施,并参与了金融行业的各种消息传递项目。 他是ØMQ项目的创始人,目前正在致力于将消息传递技术与操作系统和Internet栈进行集成。 本文摘自并修改自《The Architecture of Open Source Applications: Volume II》。

 原文链接:ZeroMQ: The Design of Messaging Middleware

图 - 8:DotNetMQ服务器图管理。

 Critical Path

  我们在优化过程中发现三个因素对性能有至关重要的影响:

  1. 内存分配数
  2. 系统调用数
  3. 并发模型 

  然而,不是每个内存分配或每个系统调用对性能有相同的影响。我们对消息传递系统感兴趣的性能是在给定时间内我们可以在两个端点之间传输的消息数。或者,我们可能感兴趣的是消息从一个端点到另一个端点需要多长时间。

  然而,鉴于ØMQ是为具有长连接的场景设计的,建立连接所需的时间或处理连接错误所需的时间基本上是不相关的。这些事件很少发生,因此它们对整体性能的影响可以忽略不计。 

  一个代码库的反复频繁使用的部分被称为关键路径; 优化应该关注关键路径。

  让我们看看一个例子:ØMQ并没有在内存分配方面进行极大优化。例如,当操作字符串时,它通常为转换的每个中间阶段分配一个新字符串, 但是,如果我们严格查看关键路径(实际的消息传递),我们会发现它几乎不使用内存分配。如果消息很小,则每256个消息只有一个内存分配(这些消息保存在一个大的分配的内存块中)。此外,如果消息流稳定,没有巨大的流量峰值,则关键路径上的内存分配数量将降至零(已分配的内存块不会返回到系统,而是重复使用)。

经验教训:优化产生显著差异的地方。优化不在关键路径上的代码段是是无效的。


图 - 4:自动路由消息的消息代理服务器图。

      消息传递系统基本上像应用程序的即时消息一样工作。应用程序决定将事件传送到另一个应用程序(或多个应用程序),它组装要发送的数据,点击“发送”按钮,消息传递系统负责其余的事情。然而,与即时消息传递不同,消息传递系统没有GUI,并且在出现问题时,在端点处没有人能够进行智能干预。 因此,消息系统必须是容错的并且比常见的即时消息传送快得多。

财神道app下载最新版本 7

Allocating Memory

  假设所有基础设施都已初始化,并且两个端点之间的连接已建立,则在发送消息时只需要为一个东西分配内存:消息本身。因此,为了优化关键路径,我们必须研究如何为消息分配内存并在堆栈中上下传递。

  在高性能网络领域中的常识是,通过仔细平衡消息分配内存的成本和消息复制的成本(例如,对小,中和大消息的不同处理)来实现最佳性能。对于小消息,复制比分配内存要代价小。根本不分配新的存储器块,而是在需要时将消息复制到预分配的存储器是有意义的。另一方面,对于大消息,复制比内存分配代价大。将消息分配一次,并将指针传递到分配的块,而不是复制数据是有意义的。这种方法称为“零拷贝”。

  ØMQ以透明的方式处理这两种情况。 ØMQ消息由不透明句柄表示。 非常小的消息的内容直接编码在句柄中。 因此,复制句柄实际上复制了消息数据。当消息较大时,它被分配在单独的缓冲区中,并且句柄仅包含指向缓冲区的指针。创建句柄的副本不会导致复制消息数据,这在消息是兆字节长时是有意义的(图3)。 应当注意,在后一种情况下,缓冲器被引用计数,使得其可以被多个句柄引用,而不需要复制数据。

财神道app下载最新版本 8

  图三:消息拷贝(或没有消息拷贝)

经验教训:在考虑性能时,不要假设有一个单一的最佳解决方案。可能发生的是,存在问题的多个子类(例如,小消息 vs. 大消息),每个都具有其自己的最佳算法。


当然,你可以在Web服务里连接DotNetMQ,因为把本身还是一个.Net应用程序。但是,为什么你要写一个ASP.NET Web方法为应用程序处理消息(而且可以在同一个上下文中回复消息)呢?Web服务更适合这样请求/应答式的方法调用。

 Batching

  已经提到,消息系统中的一定系统调用的数量可能导致性能瓶颈。其实,这个问题比那个更普遍。 遍历调用堆栈相关时会有不小的性能损失,因此,当创建高性能应用程序时,避免尽可能多的堆栈遍历是明智的。

  考虑图4.要发送四个消息,你必须遍历整个网络栈四次(ØMQ,glibc,用户/内核空间边界,TCP实现,IP实现,以太网层,NIC本身和重新备份栈)。

财神道app下载最新版本 9

  图四:发送四个消息

  但是,如果您决定将这些消息合并到单个批消息中,则只有一次遍历堆栈(图5)。对消息吞吐量的影响可能是非常显著的:高达两个数量级,特别是如果消息很小,并且其中几百个可以打包成一个批消息时。

财神道app下载最新版本 10

  图五:Batching messages

  另一方面,批量化会对延迟产生负面影响。让我们举个例子,知名的Nagle算法,在TCP中实现。它将出站消息延迟一定量的时间,并将所有累积的数据合并到单个数据包中。显然,分组中的第一消息的端到端等待时间比最后一个的等待时间多得多。因此,对于需要获得一致的低延迟来关闭Nagle算法的应用程序来说,这是很常见的。甚至常常在堆栈的所有层次上关闭批量化(例如,NIC的中断合并功能)。但是没有批量化意味着大量遍历堆栈并导致低消息吞吐量。我们似乎陷入了权衡吞吐量和延迟的困境。 

  ØMQ尝试使用以下策略提供一致的低延迟和高吞吐量:当消息流稀疏并且不超过网络堆栈的带宽时,ØMQ关闭所有批量化以提高延迟。这里的权衡在某种程度上是会使CPU使用率变高(我们仍然需要经常遍历堆栈)。 然而,这在大多数情况下不被认为是问题。

  当消息速率超过网络栈的带宽时,消息必须排队(存储在存储器中),直到栈准备好接受它们。排队意味着延迟将增长。如果消息在队列中花费了一秒钟,则端到端延迟将至少为1秒。 更糟糕的是,随着队列的大小增加,延迟将逐渐增加。如果队列的大小没有限制,则延迟可能会超过任何限制。

  已经观察到,即使网络堆栈被调到尽可能低的延迟(Nagle的算法被关闭,NIC中断合并被关闭,等等),由于排队效应,延迟仍然可能是令人沮丧的,如上所述。

由于默认的传送规则是StoreAndForward,让我们试试下面这些:

本文主要是探究学习比较流行的一款消息层是如何设计与实现的

很显然,在集成应用程序中消息代理是有必要的。我网上搜索,查找书籍,想找一个免费的(最好也是开源的)而且是.Net用起来很容易的消息代理。让我们看看我找到了什么:

  1. ØMQ最初被构想用于是一个针对股票交易的极速的消息传递系统,所以重点是极端优化。该项目的第一年用于设计基准方法,并尝试定义一个尽可能高效的架构。
  2. 后来,大约在第二年的发展时,重点转向了提供一个通用系统,该系统用于构建分布式应用程序和支持任意消息模式,多种传输机制,任意语言绑定等。
  3. 在第三年,重点主要是提高可用性和扁平化学习曲线。 我们采用了BSD套接字API,试图清除单个消息模式的语义,等等。 

 一个新的、独立的、开源的,完全基于C#和.NET Framework3.5的消息队列系统

Performance

  当ØMQ项目启动时,其主要目标是优化性能。 消息传递系统的性能使用两个度量来表示:吞吐量 - 在给定时间内可以传递多少消息; 延迟 - 消息从一个端点到另一个端点需要多长时间。 

  我们应该关注哪个指标? 两者之间的关系是什么? 不是很明显吗? 运行测试,将测试的总时间除以传递的消息数,得到的是延迟。 单位时间内的消息数是吞吐量。 换句话说,延迟是吞吐量的逆值。 简单,对吧?

  我们花了几个星期详细评估性能指标而不是立即开始编码,从而发现吞吐量和延迟之间的关系远没有那么简单,而且是与直觉相反的。 

  想象A发送消息到B(参见图2)。 测试的总时间为6秒。 有5个消息已通过。 因此,吞吐量为0.83个消息/秒(5/6),延迟为1.2秒(6/5),对吗?

财神道app下载最新版本 11

  图二:从A发送消息到B

  再看看图二。 每个消息从A到B需要不同的时间:2秒,2.5秒,3秒,3.5秒,4秒。 平均值是3秒,这与我们原来计算的1.2秒相差很大。 这个例子显示了人们对性能指标直观倾向的误解。

  现在来看看吞吐量。 测试的总时间为6秒。 然而,对于A而言,它只需要2秒就可以发送完所有的消息。 从A的角度来看,吞吐量为2.5 msgs / sec(5/2)。 对于B而言,接收所有消息需要4秒。 所以从B的角度来看,吞吐量为1.25 msgs / sec(5/4)。 这些数字都不符合我们原来计算的1.2 msgs / sec的结果。

  长话短说:延迟和吞吐量是两个不同的指标; 这很明显。重要的是要了解两者之间的差异及其关系。延迟只能在系统中的两个不同点之间度量; 单独在点A处没有延迟的概念。每个消息具有其自己的延迟。你可以得到多个消息的平均延迟; 而消息流是没有延迟的。

  另一方面,只能在系统的单个点处测量吞吐量。发送端有一个吞吐量,接收端有一个吞吐量,两者之间的任何中间点都有一个吞吐量,但是没有整个系统的整体吞吐量。而吞吐量只对一组消息有意义; 没有单个消息的吞吐量的概念。

  至于吞吐量和延迟之间的关系,事实证明真的有一种关系; 然而,公式涉及积分,我们不会在这里讨论它。 有关更多信息,请阅读有关排队理论的文献。 在基准化消息系统中有很多的陷阱,我们不会进一步深入。 我们应该把精力放在学到的教训上:确保你理解你正在解决的问题。 即使一个简单的问题,“让程序更快”也需要大量的工作才能正确理解。 更重要的是,如果你不理解这个问题,你可能会在你的代码中构建隐式假设和流行的神话,使得解决方案有缺陷,或者至少要复杂得多或者比可能的少。


财神道app下载最新版本 12

  在这种情况下,大量开始批量化处理是有意义的。没有什么会丢失,因为延迟已经很高。另一方面,大量的批处理提高了吞吐量,并且可以清空未完成消息的队列

这反过来意味着等待时间将随着排队延迟的减少而逐渐降低。一旦队列中没有未完成的消息,则可以关闭批量化处理,以进一步改善延迟。

  另一个观察是,批量化只应在最高层次进行。 如果消息在那里被批量化,则较低层无论如何都不需要批处理,因此下面的所有分批算法不做任何事情,除了引入附加的等待时间。

经验教训:为了在异步系统中获得最佳吞吐量和最佳响应时间,请关闭堆栈的最底层上的批量化算法并且在在最高层次进行批量化。只有当新数据的到达速度比可处理的数据快时才进行批量化处理。


还有一个设置是定义"current/this"这个名称代表哪台服务器的,这个值必须是Servers节点里的一个服务器名。如果你用DotNetMQ管理器编辑服务器图,这个值是自动设置的。


客户端属性:出错时重新连接服务器(ReConnectServerOnError)

当然,这个配置是要根据你实际的网络进行的。你必须在图中所有服务器上安装DotNetMQ。此外,还必须在所有服务器上配置相同的服务器图(你可以很容易地从XML文件复制服务器节点到其他服务器上)。

服务端

  • 持久地 10,000个方法调用大约需要25秒(约每秒400个)。
  • 非持久地 10,000个方法调用大约需要8.7秒(约每秒1150个)。

如你所见,它只是一个带有特性(Attribute)的一个常规C#类。MDSService和MDSServiceMethod两个特性是必须的,其他的特性是可选的(不过写上去是最好了,你将很快会看到什么会用这些特性)。你提供服务的方法必须有MDSServiceMehod特性,如果你不想公开一些方法,只要不加MDSServiceMethod特性就行了。

当一个消息发送给Application2是,MDSClient_MessageReceived方法就会被调用来处理消息。我们从MessageReceivedEventArgs参数对象的Message属性可以得到发送过来的消息。这个消息的类型是IIncomingMessage。IIncomingMessage对象的MessageData属性实际包含了由Application1发送的消息数据。由于它是一个字节数组,我们用UTF8编码把它转换成字符串。然后把文本消息打印到控制台上。

路由/负载均衡

现在还没有实现自动安装,不过安装DotNetMQ是非常容易的。下载并解压文章开始提供的二进制文件。只需将所有的东西复制到C:Progame FilesDotNetMQ下,然后运行INSTALL_x86.bat(如果你用的是64位系统,那么将执行INSTALL_x64)。

 ... 
  <Applications>
    <Application Name="SampleWebServiceApp">
      <Communication Type="WebService" 
        Url="http://localhost/SampleWebApplication/SampleService.asmx" />
    </Application>
  </Applications>
  ... 

首先,我将演示一个需要消息代理的简单情况。

网络传输消息

客户端

图 - 1:两个应用程序间最简单的消息传递。

运行在ServerA上的Application1想发消息到ServerC上的Application2,由于防火墙的规则,ServerA和ServerC不能直接连接。让我们修改一下在第一个DotNetMQ程序里开发的程序。

图 - 8:两个应用程序通过DotNetMQ在网络上通信。

在Visual Studio中创建一个名称为Application1的控制台应用程序,并添加MDSCommonLib.dll引用,这个dll文件里提供了连接到DotNetMQ必需的一些类。然后在Program.cs文件中写上下面的代码:

消息传送:

引用

默认情况下,一个应用程序可以通过MDSClient发送和接收消息(CommunicationWays.SendAndReceive)。如果一个应用程序不需要接收消息,可以设置MDSClient的CommunicationWay为CommunicationWays.Send。这个属性在连接DotNetMQ之前或在和DotNetMQ通信中都可以改变。

  • 持久和非持久的消息发送。
  • 即使在系统崩溃时,也会保证持久消息的传送。
  • 可在一个机器图里自动和手动设置消息的路由。
  • 支持多种数据库(MS SQL Server,MySQL,SQLite,和一些现有的基于内存的存储)
  • 支持不存储,直接发送及时消息。
  • 支持请求/应答式的消息。
  • 用客户端类库和DotNetMQ消息代理通信很方便
  • 内置的框架,可以轻松地在消息队列上构建RMI服务。
  • 支持把消息传送给ASP.NET Web服务。
  • 基于图形界面的管理和监控工具。
  • 易于安装,管理和使用。
  • 完全由C#开发(使用.NET Framework 3.5)。

MDSClient的CreateMessage方法返回一个IOutgoingMessage的对象。对象的MessageData属性是实际发送给目标应用程序的数据,它是一个字节数组。我们使用UTF8编码把用户输入的文本转换成字节数组。对象的DestinationApplicationName和DestinationServerName属性是用于设置消息的目标地址。如果我们没有指定目标服务器,默认就是本地服务器。最后,我们发送这个消息对象。

文章概要

  • 2011-05-23(DotNetMQ v0.9.1.0)
    • 添加对微软SQL Server数据库的支持。
    • 把MySQLConnectionString设置改成ConnectionString。
    • 修改源码。
    • 根据修改更新了文章。
  • 2011-05-16 (DotNetMQ v0.9.0.0)
    • 添加web服务模板的下载。
    • 对文章做了一些修改和添加。
  • 2011-05-09(DotNetMQ v0.9.0.0)
    • 第一次发布。

对于上面的服务器图,对应的MDSSettings.xml设置如下:

using System;
using System.Text;
using MDS.Client;

namespace Application2
{
    class Program
    {
        static void Main(string[] args)
        {
            //Create MDSClient object to connect to DotNetMQ
            //Name of this application: Application2
            var mdsClient = new MDSClient("Application2");

            //Register to MessageReceived event to get messages.
            mdsClient.MessageReceived  = MDSClient_MessageReceived;

            //Connect to DotNetMQ server
            mdsClient.Connect();

            //Wait user to press enter to terminate application
            Console.WriteLine("Press enter to exit...");
            Console.ReadLine();

            //Disconnect from DotNetMQ server
            mdsClient.Disconnect();
        }

        /// <summary>
        /// This method handles received messages from other applications via DotNetMQ.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e">Message parameters</param>
        static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e)
        {
            //Get message
            var messageText = Encoding.UTF8.GetString(e.Message.MessageData);

            //Process message
            Console.WriteLine();
            Console.WriteLine("Text message received : "   messageText);
            Console.WriteLine("Source application    : "   e.Message.SourceApplicationName);

            //Acknowledge that message is properly handled
            //and processed. So, it will be deleted from queue.
            e.Message.Acknowledge();
        }
    }
}

DotNetMQ采用段路径算法发送消息(没有在XML配置文件里手动定义路由的情况下)。考虑这个情景,运行在halil_pc的Application A发送一个消息到webserver2上的Application B,路径是很简单的:Application A -> halil_pc -> emre_pc -> webserver2 -> Application B。halil_pc通过服务器图定义知道下一个要转发到的服务器(emre_pc)。

在我的业务经历中,我见到过一些非常糟糕且不寻常的异步企业应用集成解决方案。通常是运行在一台服务器上的一个程序执行一些任务,并且产生一些数据,然后将结果数据发送到另一台服务器上的另一个程序。第二个应用在数据上执行其他任务或计算结果(这台服务器在同一网络中或是通过互联网连接)。另外,消息数据必须是持久的。即使远程程序没有工作或网络不可用,消息必须第一时间发送过去。

如上所述,你不需要改变ReceiveMDSMessage方法,而且必须在ProcessMDSMessage方法里处理消息。另外,你需要向下面这样在MDSSettings.xml里定义你的web服务地址,你也可以用DotNetMQ管理工具添加web服务。

一个消息可以是一个字符串,一个字节数组,一个对象等等。通常情况下,一个发送者(生产者)程序创建一个消息,并将其推送到一个消息队列,然后一个接受者(消费者)程序从队列中获取这个消息并处理它。发送程序和接受程序不需要同时运行,因为消息传递是一个异步过程。这就是所谓的松耦合通信。

方法调用(在DotNetMQ服务里)

  • 持久地 10,000个消息大约需要25秒(约每秒400个消息)。
  • 非持久地 10,000个消息大约需要3.5秒(约每秒2850个消息)。

正文: 

消息传递是一种异步通信方式,具体就是在同一个或不同的机器上运行的多个应用程序之间可靠的消息传递。应用程序通过发送一种叫消息的数据包和其他应用程序通信。

如果在调用Web服务时或Web服务处理数据时出错,数据不能丢失,并且稍后必须重发。但是,Application -1有其他任务要做,所以它不能一次又一次的尝试重发数据。它只是将数据插入到数据表。另一个Windows服务(如果Application -1是一直运行的,也可以使里的一个线程)定期检查这个表,并尝试将数据发送到Web服务,直到数据成功发送。

安装、运行DotNetMQ

客户端属性:自动确认消息(AutoAcknowledgeMessages)

在许多情况下,一个应用发一个消息到另一个应用,然后得到一个应答消息。DotNetMQ对这种通信方式有内置的支持。考虑这样一个服务:用于查询库存的状态。这里有两种消息类型:

<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
  ...
  <Applications>
    <Application Name="Application1" />
    <Application Name="Application2" />
  </Applications>
  ...
</MDSConfiguration>

创建了正确的服务类后,我们必须创建一个应用来运行它,下面是用一个简单的控制台程序运行我们的MyMailSmsService服务:

面向服务架构的DotNetMQ

在开始创建它的时候,我更喜欢叫它为MDS(消息传送系统 Message Delivery System)。因为它不仅是一个消息队列,而且还是一个直接传送消息到应用程序的系统和一个提供了建立应用服务框架的环境。我把它叫做DotNetMQ,是因为它完全由.NET开发,而且这个名字也更好记。所以它原来的名字是MDS,以至于源码里有许多以MDS为前缀的类。

下面展示了一个简单的库存服务。

图 - 10:请求/应答式的通信应用。

其他设置

DotNetMQ的性能

一个服务器图包含两个或更多个节点,每一个节点都是一个具有IP地址和TCP端口(被DotNetMQ用的那个)的服务器。你可以用DotNetMQ管理器配置/设计一个服务器图。

客户端属性:通讯方式(CommunicationWay)

  • 介绍
  • 什么是消息传递?
  • 什么是DotNetMQ?
  • 为什么要一个新的消息代理?
    • 消息代理的必要性
    • 现有的消息代理
  • 安装、运行DotNetMQ
  • 第一个DotNetMQ程序
    • 注册应用程序到DotNetMQ
    • 开发Application1
    • 开发Application2
    • 消息属性:传送规则(Transmit Rule)
    • 客户端属性:通讯方式(CommunicationWay)
    • 客户端属性:出错时重新连接服务器(ReConnectServerOnError)
    • 客户端属性:自动确认消息(AutoAcknowledgeMessages)
  • 配置DotNetMQ
    • 服务端
    • 应用程序
    • 路由/负载均衡
    • 其他设置
  • 网络传输消息
    • 一个简单的应用程序
    • 一个真实的案例:分布式短信处理器(Distributed SMS Processor)
  • 请求/应答式通信
  • 面向服务架构的DotNetMQ
    • 简单应用程序:短息/邮件发送器
      • 服务端
      • 客户端
    • Web服务支持
  • DotNetMQ性能
  • 历史
  • 引用

服务端

为了开发一个使用DotNetMQ服务的应用,你必须创建一个服务代理(就像Web服务和WCF那样)。为了创建代理,你可以用MDSServiceProxyGenerator工具。首先,编译你的服务项目,然后运行MDSServiceProxyGenerator.exe(在DotNetMQ安装目录).

你可以通过改变服务代理的TransmitRule属性来改变消息传输的规则。如果服务方法返回void,那么他的默认传输规则是StoreAndForward。如果服务方法有个一返回值,那么方法调用将会不可靠(因为方法调用时同步的,要等待一个结果的),它的规则是DiretlySend。你可以选择任何类型作为方法的参数,如果参数类型是基元类型(string,int,byte...),就不需要附加的设置,但是如果你想用你自定义的类型作为方法参数,这个类型必须标记为Serializable,因为DotNetMQ会用二进制序列化参数。

如上所述,添加Application1和Application2到DotNetMQ。最后,你的应用程序列表应该像下面这样。

Application -1 和Application -2是可执行程序(或是Windows服务),Sender Service是一个Windows服务。Application -1执行一些任务,产生数据,并调用Server-B服务器上的Remote Web Service方法来传输数据。这个web服务将数据插入到数据表。Application -2定期检查数据表来获得新的数据行并处理它们(然后从表中删除它们,或将其标记为已处理,避免处理重复数据)。

DotNetMQ支持ASP.NET web服务并可以传递消息到web服务。这里有个web服务的模板样品(在下载文件中)来实现这一目标。它的定义如下:

Server-A没有直接和Server-D连接。因此,消息代理在服务器间转发消息(这个消息依次通过Server-A,Server-B,Server-C,Server-D),消息最后到达Server-D上的消息代理,然后传递给Application -2。注意在Server-E上也有一个Application-2在运行,但是它不会收到这个消息,因为消息的目标服务器是Server-D。

Application1只是在如何发消息的地方稍微改动一点,就是设置DestinationServerName(目标服务器名)为ServerC。

 

DotNetMQ的有一个路由功能。现在路由设置只能通过MDSSettings.xml设置。你可以看到下面文件里有两种路由设置:

你可以在SetupDatabases文件夹(这个文件夹在DotNetMQ的安装目录)找到所需的文件,然后创建数据库和数据表,以供DotNetMQ使用。如果你有什么问题,可以随时问我。

简单应用程序:短息/邮件发送器

注意:你在运行这个例子前必须在DotNetMQ里注册MyMailSmsService和Application3。

图 - 5:DotNetMQ管理工具的应用程序列表界面。

Web服务支持

 

  • 目标服务器:Server-D
  • 目标应用程序:Application -2
  • 消息数据:应用程序特定的数据

如你所见,只需要3行代码就可以创建并运行服务,由于MDSService是可销毁的,所以你可以uing语句,另外,你也可以使用MDSServiceApplication的Disconnect方法手动关闭服务。你可以通过AddService方法在一个MDSServiceApplication中运行多个服务。

书籍:Enterprise Integration Patterns: Designing,Building,and Deploying Messaging Solutions .作者 Gregor Hohpe,Bobby Woolf(艾迪生韦斯利出版,2003年)。

即使在Application1发送过消息后,你停止了DotNetMQ服务,你的消息也是不会丢失的,这就叫持久化

在这篇文章中,我将介绍一个新的、独立的、开源的,完全基于C#和.NET Framework3.5的消息队列系统,DotNetMQ是一个消息代理,它包括确保传输,路由,负载均衡,服务器图等等多项功能。我将从解释消息的概念和消息代理的必要性讲起,然后,我会说明什么是DotNetMQ,以及如何使用它。

  • 运行Application1(这时Application2没有运行),输入一些消息,然后关闭程序。
  • 运行Application2,你将看到消息没有丢失,而是被Application2接收了。

现有的消息代理

请求/应答式通信

有两种方式可以配置DotNetMQ:通过XML配置文件或用DotNetMQ管理工具(一个Windows Forms程序),这里我分别演示这两种方法,有些配置是及时生效的,而有些则需要重启DotNetMQ。

  • Application1:从用户输入那里得到一个字符串消息,并将其发送到Application2.
  • Application2:在控制台上打印出传入的消息。
using System;
using System.Web.Services;
using MDS.Client.WebServices;

[WebService(Namespace = "http://www.dotnetmq.com/mds")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class MDSAppService : WebService
{
    /// <summary>
    /// MDS server sends messages to this method.
    /// </summary>
    /// <param name="bytesOfMessage">Byte array form of message</param>
    /// <returns>Response message to incoming message</returns>
    [WebMethod(Description = "Receives incoming messages to this web service.")]
    public byte[] ReceiveMDSMessage(byte[] bytesOfMessage)
    {
        var message = WebServiceHelper.DeserializeMessage(bytesOfMessage);
        try
        {
            var response = ProcessMDSMessage(message);
            return WebServiceHelper.SerializeMessage(response);
        }
        catch (Exception ex)
        {
            var response = message.CreateResponseMessage();
            response.Result.Success = false;
            response.Result.ResultText = 
              "Error in ProcessMDSMessage method: "   ex.Message;
            return WebServiceHelper.SerializeMessage(response);
        }
    }

    /// <summary>
    /// Processes incoming messages to this web service.
    /// </summary>
    /// <param name="message">Message to process</param>
    /// <returns>Response Message</returns>
    private IWebServiceResponseMessage 
            ProcessMDSMessage(IWebServiceIncomingMessage message)
    {
        //Process message

        //Send response/result
        var response = message.CreateResponseMessage();
        response.Result.Success = true;
        return response;
    }
}

处理传入消息之后,还需要来确认这个消息。这表示消息已经正确接收并处理。然后DotNetMQ将从消息队列中把消息删除。我们也可以用Reject方法拒绝一个消息(如果在出错的情况下我们不能处理这个消息)。在这种情况下,该消息将回到消息队列,稍后再试着发到目标应用程序(如果在同一个服务器上存在另一个Application2的实体,也可能发到另一个上)。这是DotNetMQ系统的一个强大机制。因此,可以确保消息不会丢失并绝对可以被处理。如果你不确认或拒绝一个消息,系统假设是被拒绝的。所以,即使你的应用程序崩溃了,在你的应用程序正常运行后,还是会收到消息的。

在上面的示例中,为了演示目的TransmitRule设置成了NonPersistent(非持久)。当然,你可以发送StoreAndForward(持久性)消息。这个是程序运行的截图:

财神道app下载最新版本 13

选择你的服务程序集(在这个简单的例子中是指SmsMailServer.exe)。你可以选择服务类或生成这个程序集里所有服务的代理。输入一个命名空间和一个目标文件夹,然后生成代理类。生成玩后,你就可以把它加到你的项目里了。

DotNetMQ是一个独立的Windows服务,分别运行在Server-A和Server-B服务器上。因此,你只需编写代码和DotNetMQ通信。使用DotNetMQ客户端类库,和DotNetMQ服务发送、接收信息是非常容易和快速的。Application -1准备消息,设置目标,并将消息传递给DotNetMQ代理。DotNetMQ代理将以最有效和最快的方式传递给Application -2。

我就不展示这个代理类了,但你必须了解它(你可以看源码,它是一个很简单的类)。你方法/参数上的特性用来生成这个代理类的注释。

让我们看看实际中的DotNetMQ。为了使第一个程序足够简单,我假设是同一台机器上的两个控制台应用程序(实际上,就像我们待会在文章中看到的那个,和在两台机器上的两个应用程序是没什么显著差异的,只是需要设置一下消息的目标服务器名字而已)。

向一个网络服务器的应用程序发消息是和向同一个服务器的应用程序发消息一样简单的。

假定有一个用于音乐比赛投票的短消息(MSM)服务。所有竞赛者唱过他们的歌曲后,观众给他们最喜欢的歌手投票,会发一条像"VOTE 103"这样的短信到我们的短息服务器。并假定这次投票会在短短的30分钟完成,大约有五百万人发短息到我们的服务。

一个应用程序必须在这个列表里才能和DotNetMQ连接。如果你直接修改xml文件,你必须重启DotNetMQ服务才能生效。

这是一些通过DotNetMQ传送消息的测试结果:

using System;
using MDS;
using MDS.Client;
using MDS.Communication.Messages;
using StockCommonLib;

namespace StockApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to query a stock status");
            Console.ReadLine();

            //Connect to DotNetMQ  
            var mdsClient = new MDSClient("StockClient");
            mdsClient.MessageReceived  = mdsClient_MessageReceived;
            mdsClient.Connect();
            //Create a stock request message 
            var stockQueryMessage = new StockQueryMessage { StockCode = "S01" }; 
            //Create a MDS message 
            var requestMessage = mdsClient.CreateMessage(); 
            requestMessage.DestinationApplicationName = "StockServer"; 
            requestMessage.TransmitRule = MessageTransmitRules.NonPersistent; 
            requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage); 
            //Send message and get response 
            var responseMessage = requestMessage.SendAndGetResponse(); 
            //Get stock query result message from response message 
            var stockResult = (StockQueryResultMessage) GeneralHelper.DeserializeObject(responseMessage.MessageData); 
            //Write stock query result 
            Console.WriteLine("StockCode = "   stockResult.StockCode); 
            Console.WriteLine("ReservedStockCount = "   stockResult.ReservedStockCount); 
            Console.WriteLine("TotalStockCount = "   stockResult.TotalStockCount); 
            //Acknowledge received message 
            responseMessage.Acknowledge(); 
            Console.ReadLine(); 
            //Disconnect from DotNetMQ server. 
            mdsClient.Disconnect(); 
       } 
       static void mdsClient_MessageReceived(object sender, MessageReceivedEventArgs e) { 
            //Simply acknowledge other received messages 
            e.Message.Acknowledge(); 
       } 
   } 
}

下载二进制文件 - 933 KB

财神道app下载最新版本 14

using System;
using MDS.Client.MDSServices;

namespace SmsMailServer
{
    [MDSService(Description = "This service is a "   
              "sample mail/sms service.", Version = "1.0.0.0")]
    public class MyMailSmsService : MDSService
    {
        //All parameters and return values can be defined.
        [MDSServiceMethod(Description = "This method is used send an SMS.")]
        public void SendSms(
            [MDSServiceMethodParameter("Phone number to send SMS.")] string phone,
            [MDSServiceMethodParameter("SMS text to be sent.")] string smsText)
        {
            //Process SMS
            Console.WriteLine("Sending SMS to phone: "   phone);
            Console.WriteLine("Sms Text: "   smsText);

            //Acknowledge the message
            IncomingMessage.Acknowledge();
        }

        //You do not have to define any parameters
        [MDSServiceMethod]
        public void SendEmail(string emailAddress, string header, string body)
        {
            //Process email
            Console.WriteLine("Sending an email to "   emailAddress);
            Console.WriteLine("Header: "   header);
            Console.WriteLine("Body  : "   body);

            //Acknowledge the message
            IncomingMessage.Acknowledge();
        }

        // A simple method just to show return values.
        [MDSServiceMethod]
        [return: MDSServiceMethodParameter("True, if phone number is valid.")]
        public bool IsValidPhone([MDSServiceMethodParameter(
               "Phone number to send SMS.")] string phone)
        {
            //Acknowledge the message
            IncomingMessage.Acknowledge();

            //Return result
            return (phone.Length == 10);
        }
    }
}
using System;
using MDS.Client.MDSServices;

namespace SmsMailServer
{  
    class Program
    {
        static void Main(string[] args)
        {
            using (var service = new MDSServiceApplication("MyMailSmsService"))
            {
                service.AddService(new MyMailSmsService());
                service.Connect();

                Console.WriteLine("Press any key to stop service");
                Console.ReadLine();
            }
        }
    }
}

我们将会接收每一条短息,处理它(格式化短息文本,修改数据库,以便增加选手的票数),并要发送确认消息给发送者。我们从两台服务器接收消息,在四台服务器上处理消息,然后从两台服务器上发送确认消息。我们总共有八台服务器。让我们看看完整的系统示意图:

message.TransmitRule = MessageTransmitRules.NonPersistent;

你可以检查Windows服务,看看DotNetMQ是否已经安装并正常工作。

让我们考虑下面这个网络:

就这样,就完事儿了。你不需要知道ServerC在哪里,也不需要直接连接ServerC...这些全部定义在DotNetMQ设置里。注意:如果你不给一个消息设置DestinationServerName,系统假设目标服务器就是"current/this"指定的那台服务器,DotNetMQ也将把消息发送到同一台服务器上的应用程序。另外,如果你定义了必要的路由,你就不必设置目标服务器了,DotNetMQ会自动地路由消息。

让我们来看看下面的设计图:

SOA(面向服务的架构)是以个流行多年的概念了。Web服务和WCF是两个主要的SOA解决方案。一般情况下,一个消息队列系统是不会预期支持SOA的。同时,消息通信是异步的,松耦合的过程,而Web服务方法调用则通常是同步的,紧耦合的。即使(正如你在前面示例程序中看到的那样)消息通信并不如调用一个远程方法一样简单,但是当你的消息数增加,你的应用变复杂以至于难以维护时就不一样了。DotNetMQ支持持久性和非持久性的远程调用机制,所有你可以异步地调用一个远程方法,DotNetMQ会确保调用成功。

  • Apache ActiveMQ():它是开源的,并且实现了JMS(Java Message Service,java消息服务在java世界里是一个标准的消息传输API)。它也有一个.NET客户端类库。我为了更加了解,读完了“ActiveMQ in Action”整本书,并且开发了一些简单的应用。即使我通读了这本书,我没有看到一个简单可靠的方式来构建一个共同合作和路有消息的ActiveMQ服务图。我也没有看到如何给一个消息设置目标服务器。它自动路由消息,但我不能有效的控制路由的路径。我的理解是,它通常和Apache Camel()一起使用来实现常见的应用集成模式。Apache Camel也是另一个需要去了解的领域,更糟的是,它只使用Java。最后,我认为它不够简单易用,特别是配置,监控和管理。于是我放弃了对ActiveMQ的研究。
  • MSMQ():这是来自微软的解决方案,是.NET应用程序最合适的框架。它很容易使用和学习,而且它有工具看检测队列和消息。它尤其适用于那些运行在同一台机器上,或可以直接连接到同一台机器的应用程序间的异步通信。但我无法找到一个内置的解决方案,构建一个MSMQ服务器图来路由消息。因为路由是我的出发点,所以我只好淘汰掉这个消息代理。
  • RabbitMQ():它是由Erlang(有爱立信开发的一种编程语言)开发的。你需要先安装Erlang。我花了很多时间来安装,配置,并写了一个示例程序。它有一个.NET客户端,但当我试图开发并运行一个简单的程序是,出现很多错误。很难安装,很难使不同服务器上的两个RabbitMQ协同工作。过了几天,我就放弃了,因为我觉得学习并开始开发程序不应该那么难。
  • OpenAMQ(),ZeroMQ():我总体研究了这两个消息代理,但我发现我不能轻易做我想用.NET想做的事。
  • 其他:我还发现了一些其他的项目,但它们缺失一些重要的功能如路由,持久消息传递,请求/应答消息...等。

第一个DotNetMQ程序

财神道app下载最新版本 15

目前DotNetMQ支持3中存储类型:SQLite(默认),MySQL内存(译者注:根据下面内容,还支持MSSQL)。你可以在MDSSettings.xml修改存储类型。

<?xml version="1.0" encoding="utf-8" ?>
<MDSConfiguration>
  ...
  <Routes>

    <Route Name="Route-App2" DistributionType="Sequential" >
      <Filters>
        <Filter DestinationServer="this" DestinationApplication="Application1" />
      </Filters>
      <Destinations>
        <Destination Server="Server-A" Application="Application1" RouteFactor="1" />
        <Destination Server="Server-B" Application="Application1" RouteFactor="1" />
        <Destination Server="Server-C" Application="Application1" RouteFactor="1" />
    </Destinations>
    </Route>

    <Route Name="Route-App2" DistributionType="Random" >
      <Filters>
        <Filter DestinationServer="this" DestinationApplication="Application2" /> 
        <Filter SourceApplication="Application2" TransmitRule="StoreAndForward" /> 
    </Filters>
      <Destinations>
        <Destination Server="Server-A" Application="Application2" RouteFactor="1" />
        <Destination Server="Server-B" Application="Application2" RouteFactor="3" />
      </Destinations>
    </Route>

  </Routes>
  ...
</MDSConfiguration>
var message = mdsClient.CreateMessage();
message.DestinationServerName = "ServerC"; //Set destination server name here!
message.DestinationApplicationName = "Application2";
message.MessageData = Encoding.UTF8.GetBytes(messageText);
message.Send();

你还必须在你的服务方法中确认消息,否则,这个消息(引起这个服务方法调用的那个)就不会从消息队列中删除,而我们的服务方法将会被再次调用。如果我们不能处理这个消息(比如,如果邮件服务没有工作,我们没办法发送时)我们也可以拒绝它。如果我们拒绝了这个消息,它稍后还会发送给我们(很可靠)。你可以通过MDSService类的IncomingMessage属性得到原消息,另外,你也可以通过RemoteApplication属性得到远程应用程序的信息。

在发送一个消息之前,你可以像这样设置一个消息的Transmit Rule属性:

Application2甚至一点有不用修改,只要把Application2上ServerC上运行并等待传入的消息即可。

我们用和Application1相似的方法创建一个MDSClient对象,不同的就是连接应用程序的名称是Application2。为了接收消息,需要给MDSClient对象注册MessageReceived事件。然后我们连接DotNetMQ,直到用户输入Enter才断开。

注:原作者用windows服务启动消息队列服务,但是本人在win10上测试出错,可自行改成控制台启动消息队列服务,然后用第三方工具注册服务(如:SrvanyUI)

传送规则有三种类型:

DotNetMQ提供了这种功能和便利。它在服务器图上找到最佳的(最短的)路径把消息从原服务器转发到目标服务器。

财神道app下载最新版本 16

历史

一个真实的案例:分布式短信处理器(Distributed SMS Processor)

这个解决方案的确是可靠的(消息确保传送了),但它不是两个应用程序之间通信的有效方式。该解决方案有一些非常关键的问题:

如果你在同一台服务器上运行多个Application2的实例,哪一个会收到消息呢?在这种情况下,DotNetMQ会把消息顺序地发给这多个实例。所以你可以创建多发送/接收的系统。一个消息只能被一个实例接收(实例接收相互不同的消息)。DotNetMQ提供这所有功能和同步。

开发Application1

如你所见,在上面的列表中没有哪一个消息代理是完全由.NET开发的。

配置DotNetMQ

为什么要一个新的消息代理?

在上图中,你看到了一个包含5个节点的服务器图。红色节点表示当前服务器(当前服务器就是你用DotNetMQ管理器连接的那个)。直线表示两个节点(它们互为相邻节点)是可连接的(它们可以发送/接收消息)。服务器/节点图形中的名称是很重要的,它被用来向该服务器发送消息。

图 - 11:为DotNetMQ服务生成代理类。

在这种情况下,你可以开发一个单独的邮件/短信服务,它将尝试发送直到成功。你可以通过DotNetMQ开发一个邮件服务,仅当邮件发送成功时确认请求,如果发送失败,只要不确认(或拒绝)消息就行了,它稍后会重试。

这个库存服务监听进来的StockQueryMessage消息对象,然后把StockQueryResultMessage消息对象发送给查询者。为了简单起见,我没有从数据库查询库存。应答消息对象是由传入消息对象的CreateResponseMessage()方法创建的。最后,发出回应消息后要确认进入的消息。现在,我展示一个简单的库存客户端从服务器查询库存的示例:

  • Server:目标服务器,可以用this表示当前服务器。
  • Application:目标应用程序,目标应用程序通常和消息的原目标程序是一样的,不过这里你可以重定向到另一个应用程序。
  • RouteFactor:这个属性用于表明一个目标服务器被选中的相对比率,可以用来做负载均衡。如果你想把消息平均分发到所有服务器上,你可以把所有目标服务器的FouteFactor属性都设为1。但是如果你有两台服务器,其中一台比另一台性能强大的多,你可以通过设置这个路由因子来达到选择第一台服务器的概率是第二台的两倍以上。

在上图中,两个应用程序通过消息队列进行松散耦合方式通信。如果接受者处理消息的速度慢于发送者产生消息的速度,那么队列里的消息数就会增加。此外,在发送者发送消息的过程中,接受者可能是离线的。在这种情况下,当接收者上线后,它会从队列中得到消息(当它开始并加入这个队列时)。

下载例子 - 534 KB

  • StoreAndForward:这个是默认传送规则,消息是持久的,不会丢失的,并且使确保传送的。如果Send方法没有抛出异常,就表明消息已被DotNetMQ接收,而且存储到了数据库。直到目标应用程序接收并确认了它,这个消息会一直存储在数据库里。
  • NonPersistent:消息不会存储到数据库,这是发送消息最快的方式。仅在DotNetMQ服务停止工作,消息才会丢失。
  • DirectlySend:这个是DotNetMQ独有的功能。这种类型的消息直接发送给目标应用程序。在接收者确认一个消息之前,发送者程序是一直被阻塞的。所以,如果发送者在调用Send方法的过程中没有发生异常,就意味着该消息被接受者正确接收并确认。如果在传送消息时发生错误,或接受者处于脱机状态,或者接受者拒绝了消息,发送者在调用Send方法时都会得到一个异常。即使应用程序是在不同的服务器上(更即使在应用程序之间有许多服务器要路由),这个规则依然能正常工作。
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
  <Settings>
    ...
  </Settings>
  <Servers>
    <Server Name="halil_pc" IpAddress="192.168.10.105" 
       Port="10099" Adjacents="emre_pc" />
    <Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099" 
       Adjacents="halil_pc,out_server,webserver1,webserver2" />
    <Server Name="out_server" IpAddress="85.19.100.185" 
       Port="10099" Adjacents="emre_pc" />
    <Server Name="webserver1" IpAddress="192.168.10.263" 
       Port="10099" Adjacents="emre_pc,webserver2" />
    <Server Name="webserver2" IpAddress="192.168.10.44" 
       Port="10099" Adjacents="emre_pc,webserver1" />
  </Servers>
  <Applications>
    ...
  </Applications>
  <Routes>
    ...
  </Routes>
</MDSConfiguration>

消息队列通常由消息代理提供。消息代理是一个独立的应用程序(一个服务),其他应用程序通过连接它发送、接收消息。在消息被接收者接收之前,消息代理负责存储消息。消息代理可以通过路由多台机器把消息传送给目标应用程序,在消息被接收者正确处理之前,消息代理会一直尝试传送它。有时候消息代理也被称为面向消息的中间件(Message-Oriented-Middleware MOM)或者简单的叫消息队列(Message Queue MQ).

[Serializable]
public class StockQueryMessage
{
    public string StockCode { get; set; }
}

[Serializable]
public class StockQueryResultMessage
{
    public string StockCode { get; set; }
    public int ReservedStockCount { get; set; }
    public int TotalStockCount { get; set; }
}

修改路由配置,必须重启DotNetMQ才会生效。

财神道app下载最新版本 17

using System;
using System.Text;
using MDS.Client;

namespace Application1
{
    class Program
    {
        static void Main(string[] args)
        {
            //Create MDSClient object to connect to DotNetMQ
            //Name of this application: Application1
            var mdsClient = new MDSClient("Application1");

            //Connect to DotNetMQ server
            mdsClient.Connect();

            Console.WriteLine("Write a text and press enter to send "   
               "to Application2. Write 'exit' to stop application.");

            while (true)
            {
                //Get a message from user
                var messageText = Console.ReadLine();
                if (string.IsNullOrEmpty(messageText) || messageText == "exit")
                {
                    break;
                }

                //Create a DotNetMQ Message to send to Application2
                var message = mdsClient.CreateMessage();
                //Set destination application name
                message.DestinationApplicationName = "Application2";
                //Set message data
                message.MessageData = Encoding.UTF8.GetBytes(messageText);

                //Send message
                message.Send();
            }

            //Disconnect from DotNetMQ server
            mdsClient.Disconnect();
        }
    }
}

消息属性:传送规则(Transmit Rule)

当你设计好服务器图之后,你必须点击Save & Update Graph按钮来保存这些修改。这些修改将保存在DotNetMQ安装目录的MDSSettings.xml文件里。你必须重启DotNetMQ才能应用这些修改。

  • 需要很长的开发时间(去编码)。
  • 要定制所有的消息类型(或远程方法调用),对于一个新的Web服务方法调用,你必须改变所有的服务、应用程序和数据表。
  • 对每一个相似的服务,必须开发基本上一样的软件和结构(或复制,然后修改)。
  • 编码后需要对服务、程序、数据库做太多的测试和维护。
  • 一些程序和服务在没有新消息的时候,还是会定期检查数据库(如果数据库没有很好的索引和优化,这可能会严重消耗系统资源)。

默认情况下,MDSClient由于某种原因断开DotNetMQ时会自动重连。所以,即使你重启DotNetMQ服务,也不用重启你的应用程序。你可以把ReconnectServerOnError设置为false来禁用自动重连。

在这里我们将开发一个简单的服务,可用于发送短信和邮件。也许没有必要专门写一个服务来发送短信和邮件,这些功能都可以在应用自身实现,但是想象一下你有很多应用都要发邮件,在发送时如果邮件服务出问题了怎么办?在可以成功发送邮件之前,应用程序必须一直尝试。所以你必须在你的应用程序中建立一个邮件队列机制,用于一次又一次的尝试发送。在最坏的情况下,你的应用程序可能只运行很短的时间(如Web服务)或者必须在发送完邮件前关闭。但是在邮件服务器上线后,你还必须发送,不允许邮件丢失。

财神道app下载最新版本 18

  • SQLite:使用SQLite数据库系统。这个是默认存储类型,使用(DotNetMQ安装目录SqliteDBMDS.s3db)文件作为数据库。
  • MSSQL:使用微软SQL Server数据库,你需要提供ConnectionString属性作为连接字符串(下面会说到)。
  • MySQL-ODBC:通过ODBC使用MySQL数据库,你需要提供ConnectionString数据作为连接字符串。
  • MySQL-Net:通过.NET Adapter(.NET适配器)使用MySQL数据库,你需要提供ConnectionString数据作为连接字符串。
  • Memory:使用内存作为存储设备。在这种情况下,如果DotNetMQ停止了,持久性消息会丢失。

介绍

最后,MDSSettings.design.xml包含了服务器图的设计信息(节点在屏幕上的位置)。这个文件只是用于DotNetMQ管理器的服务器图窗体,运行时的DotNetMQ服务是不需要的。

在一个大型的分布式系统中,消息队列是不可缺少的中间件,能很好的解决异步消息、应用解耦、均衡并发等问题。在.net中,偶然发现一个效率不错、安全可靠、功能齐全的消息组件,忍不住翻译过来,供大家快速预览。

下载源代码 - 1.28 MB

在创建MDSClient对象时,我们把要连接的应用程序名称传给构造函数,用这个构造函数,我们将用默认端口(10905)连接本地服务器(127.0.0.1)上的DotNetMQ。重载的构造函数可以用于连接其他服务器和端口。

测试平台:Intel Core 2 Duo 3,00 GHZ CPU.2 GB RAM PC。消息传送和方法调用是在同一台电脑上的两个应用程序之间进行的。

现在用消息代理来做这所有的事情,用最有效的方式负责将消息传送给远程应用。同一应用程序集成用上DotNetMQ展示于下图。

Destinations是用来将消息路由到其他服务器用的。一个目标服务器被选中是根据Route节点的DistributionType属性(前面解释过)决定的。一个destination节点必须定义三个属性

using System;
using MDS.Client;
using MDS.Client.MDSServices;
using SampleService;

namespace SmsMailClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press enter to test SendSms method");
            Console.ReadLine();

            //Application3 is name of an application that sends sms/email.
            using (var serviceConsumer = new MDSServiceConsumer("Application3"))
            {
                //Connect to DotNetMQ server
                serviceConsumer.Connect();

                //Create service proxy to call remote methods
                var service = new MyMailSmsServiceProxy(serviceConsumer, 
                    new MDSRemoteAppEndPoint("MyMailSmsService"));

                //Call SendSms method
                service.SendSms("3221234567", "Hello service!");
            }
        }
    }
}

当然,DotNetMQ的设置必须根据服务器间的连接(服务器图)来设置,并且Application1和Application2必须像配置DotNetMQ部分说的那样注册到DotNetMQ服务器。

这里有三种类型的应用:接受者,处理器,和发送者。在这种情况下,你就可以使用DotNetMQ作为消息队列和负载均衡器,通过配置服务器图和路由(就像配置DotNetMQ小节中描述的那样),来构建一个分布式的,可扩展的消息处理系统。

什么是DotNetMQ?

我们的应用程序为了使用DotNetMQ,要先注册一下,只需操作一次,是一个非常简单的过程。运行DotNetMQ管理器(DotNETMQ文件夹下的MDSManager.exe,如上所诉,默认是在C:Programe FilesDotNetMQ文件夹下),并在Applications菜单中打开Application类表。点击Add New Appliction按钮,输入应用程序名称。

在Visual Studio里创建一个新的控制台应用程序,命名为Application2,添加MDSCommonLib.dll并写下以下代码:

默认情况下,你必须在MessageReceived事件中显式的确认消息。否则,系统将认为消息是被拒绝了。如果你想改变这种行为,你需要把AutoAcknowledgeMessages属性设为true。在这种情况下,如果你的MessageReceived事件处理程序没有抛出异常,你也没有显式确认和拒绝一个消息,系统将自动确认该消息(如果抛出异常,该消息将被拒绝)。

你可以双击图形中的一个服务器来编辑它的属性。为了连接两个服务器,你要按住Ctrl键,点击第一个再点击第二个(断开连接也是相同的操作)。你可以通过点击右键,选择Set as this server来设置管理器连接该服务器。你可以从图中删除一个服务器或通过右键菜单添加一个新的服务器。最后,你可以通过拖拽添加或移除服务器。

什么是消息传递

一个简单的应用程序

  • SourceServer:消息的第一个源服务器,可以用this表示当前服务器。
  • SourceApplication:发现消息的应用程序。
  • DestinationServer:消息的最终目标服务器,可以用this表示当前服务器。
  • DestinationApplication:接收消息的应用程序。
  • TransmitRule:消息传送规则的一种(StoreAndForward,DirectlySend,NonPersistent)。

图 - 9:分布式短信处理系统

正如你已看到的那样,DotNetMQ可以用于构建分布式负载均衡应用系统。在本节中,我将讨论一个生活中真实的场景:一个分布式消息处理系统。

财神道app下载最新版本 19

财神道app下载最新版本 20

从用户角度来看,我只是想通过“消息数据,目标服务器和应用程序名称”来定位我的代理。其他的我都不关心。他将会根据需要在网络上多次路由一个消息,最后发送到目标服务器的目标程序上。我的消息传送系统必须为我提供这个便利。这是我的出发点。我根据这一点大概设计了消息代理的结构。下图显示了我想要的。

图 - 3:使用DotNetMQ的简单消息传递。

注册应用程序到DotNetMQ

Application -1 传递一个消息到本地服务器(Server-A)上的消息代理:

首先,我们开发短信/邮件的服务部分。为了实现这个,我们必须定义一个派生自MDSService的类型:

你可以只在一台服务器上运行DotNetMQ,在这种情况下,是不需要为服务器配置任何东西的。但如果你想在多台服务器上运行DotNetMQ并使它们相互通信,你就需要定义服务器图了。

开发Application2

另一方面,Web服务方法调用(远程方法调用)是一种紧耦合同步通信(这两个应用程序在整个通信的过程中都必须是运行着并且可用,如果Web服务脱机或在方法调用期间发生错误,那么客户端应用程序将得到一个异常)。

每个路由节点有两个属性:Name属性是对用户友好的显示(不影响路由功能),DistributionType是路由的策略。这里有两种类型的路由策略:

过滤消息时,不会考虑没有定义的条件。所以,如果所有的条件都是空的(或直接没定义),那么所有的消息都适合这个过滤器。只有所有的条件都匹配时,一个过滤器才适合这个消息。如果一个消息正确匹配(至少是过滤器定义的都匹配)一个路由中的一个过滤器,那么这个路由将被选择并使用。

 

财神道app下载最新版本 21


经过这种全面的介绍会,让我们看看如果在实践中使用DotNetMQ。

DotNetMQ是一个开源的消息代理,它有以下几个特点:

本文由财神道app下载最新版本发布于服务器运维,转载请注明出处:如何安装和使用Beanstalkd工作队列(1)财神道app下载