免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1175 | 回复: 0
打印 上一主题 下一主题

solaris10 下的MQ的开发 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2006-09-08 10:18 |只看该作者 |倒序浏览
好久没有写blog了。最近连续奋战了3个星期 每天都到11点 眼圈 黑的吓人。
还好在一定程度上有所收获!

言归正传:IBM MQ 数以万计的 pg对他都十分的了解~ 而且相关的文档不计其数。  集体的细节不写了 简单写点比较公用的代码~  在此备用!


#if !defined (__MQConfig_h)
#define __MQConfig_h
#include
using namespace std;
/**
* @file
* @author
*/
/**
* configuration for a single MQConnection
*/
class MQConfig {
public:
    /** Constructor with configuratoin name
     * @param name configuration name.
     */
    explicit MQConfig(const string& name);
    /**
     * Destructor
     */
    ~MQConfig();
    /**
     * copy constructor
     * @param another
     */
    MQConfig(const MQConfig& another);
    /**
     * Assignment
     * @param another
     */
    MQConfig& operator=(const MQConfig& another);
    //@{ getters/setters
    /** @return configuraiton
     */
    const string& getName() const;
    const string& getHost() const;
    void setHost(const string& host);
   
    int getPort() const;
    void setPort(int port);
    const string& getChannel() const;
    void setChannel(const string& channel);
    const string& getQueueManager() const;
    void setQueueManager(const string& qMan);
    const string& getReceiveQueue() const;
    void setReceiveQueue(const string& queue);
    const string& getSendQueue() const;
    void setSendQueue(const string& queue);
    int getTimeout() const;
    void setTimeout(int timeout);
    int getRetryCount() const;
    void setRetryCount(int n);
    int getRetryInterval() const;
    void setRetryInterval(int interval);
    //@}
private:
    string _name; ///
inline MQConfig::MQConfig(const string& name)
    : _name(name), _timeout(5000), _retryCount(0)
{
}
inline MQConfig::~MQConfig()
{
}
inline const string& MQConfig::getName() const
{
    return _name;
}
inline const string& MQConfig::getHost() const
{
    return _host;
}
inline void MQConfig::setHost(const string& host)
{
    _host = host;
}
   
inline int MQConfig::getPort() const
{
    return _port;
}
inline void MQConfig::setPort(int port)
{
    _port = port;
}
inline const string& MQConfig::getChannel() const
{
    return _channel;
}
inline void MQConfig::setChannel(const string& channel)
{
    _channel = channel;
}
inline const string& MQConfig::getQueueManager() const
{
    return _queueManager;
}
inline void MQConfig::setQueueManager(const string& qMan)
{
    _queueManager = qMan;
}
inline const string& MQConfig::getSendQueue() const
{
    return _sendQueue;
}
inline void MQConfig::setSendQueue(const string& queue)
{
    _sendQueue = queue;
}
inline const string& MQConfig::getReceiveQueue() const
{
    return _receiveQueue;
}
inline void MQConfig::setReceiveQueue(const string& queue)
{
    _receiveQueue = queue;
}
inline int MQConfig::getTimeout() const
{
    return _timeout;
}
inline void MQConfig::setTimeout(int timeout)
{
    _timeout = timeout;
}
inline int MQConfig::getRetryCount() const
{
    return _retryCount;
}
inline void MQConfig::setRetryCount(int n)
{
    _retryCount = n;
}
inline int MQConfig::getRetryInterval() const
{
    return _retryInterval;
}
inline void MQConfig::setRetryInterval(int interval)
{
    _retryInterval = interval;
}
#endif // __MQConfig_h

------------------
#if !defined (__MQConfigManager_h)
#define __MQConfigManager_h
/**
* @file
*/
#include
#include
using namespace std;
#include "mq/MQConfig.h"
/**
* The utility class to load MQ configuratoin information.
*/
class MQConfigManager {
public:
    /** data type for loaded configuration information.
     * a map of config name -> MQConfig
     */
    typedef map config_map_type;
    /**
     * default constructor
     */
    MQConfigManager();
   
    /**
     * destructor
     */
    ~MQConfigManager();
    /**
     * Loads configuration information from the given config file.
     * @param configFileName name of the MQ configuration file.
     * @post getConfigFileName() == configFileName
     * @throws MQException if any of the following errors occurred:
     * The given config file does not exist or could not be opened in read-only mode
     * Failed to read from the given file
     * configFileName is NULL
     * There is syntax error in the given file
     */
    void loadConfigFile(const char* configFileName);
    /**
     * Gets the loaded configuration information.
     * @return MQ configuration information
     */
    const map& getConfigInfo() const;
    /**
     * Gets the configuratoin file name
     * @return configuration file name
     */
    string getConfigFileName() const;
    /**
     * get configuration information for a given name.
     * @param name configuration name
     * @return the configuration information with the given name, or NULL if there is no such configration.
     */
    const MQConfig* getConfig(const string& name) const;
   
    /**
     * get the count of configurations
     * @return count of configurations
     */
    int getConfigCount() const;
    /**
     * Clears any loaded configuration
     * @post getConfigFileName() == ""
     */
    void clear();
    /**
     * checks whether the given MQConfig is valid
     * @param config the configuration to be checked
     * @throws ConfigException if the configuration is invalid.
     */
    static void checkConfig(const MQConfig& config);
private:
    /** set configuration data item based on the property name and value */
    static void setMQConfigInfo(const string& propName, const string& propValue, MQConfig& config);
private:
    string _configFileName;
    config_map_type _configMap;
};
inline MQConfigManager::MQConfigManager()
{
}
   
inline MQConfigManager::~MQConfigManager()
{
}
inline const MQConfigManager::config_map_type& MQConfigManager::getConfigInfo() const
{
    return _configMap;
}
inline string MQConfigManager::getConfigFileName() const
{
    return _configFileName;
}
inline const MQConfig* MQConfigManager::getConfig(const string& name) const
{
    config_map_type::const_iterator iter = _configMap.find(name);
    if (iter != _configMap.end()) {
        return &(*iter).second;
    }
    return NULL;
}
   
inline int MQConfigManager::getConfigCount() const
{
    return _configMap.size();
}
inline void MQConfigManager::clear()
{
    _configMap.clear();
}
#endif // __MQConfigManager_h
-------------------

#if !defined (__MQConnection_h)
#define __MQConnection_h
#include "mq/MQMessage.h"
#include
using namespace std;
/**
* @file
* @author aloha zhang
*/
/**
* maximum number of times a message can be backed out
*/
#define MAX_BACKOUT_COUNT 2
/**
* forward declaration
*/
class MQConfig;
/**
* A MQConnection encapsulates MQ qeueue manager, request/reply queues and provides simplified interfaces
* for sending and receiving messages.
* MQConnection::initialize must be called before a MQConnection can be acquired.
*
*/
class MQConnection {
public:
    struct ConnectionInfo;
    /**
     * Does initialization with the specified config file.
     * load configuration from the given file, throws ConfigException whenever an error occurs.
     * @param configFile the file from which configuration is loaded.
     * @throws ConfigException
     * @pre configFile != NULL
     */
    static void initialize(const char* configFile);
    /**
     * Checks to see whether the initialization has been done.
     * @return true when the MQConnection library has been initialized.
     *    false otherwise.
     */
    static bool isInitialized();
    /**
     * Cleans up the MQConnection library.
     * all the MQConnection instances are deleted, all the configuration information is freed.
     */
    static void cleanup();
    /**
     * Acquires a MQConnection which has the given name.
     * If there is already a MQConnection instance has the given name, it's reference count is increased and the insatance is returned.
     * If there is no MQConnection instance with the given name, then configuration is checked to see if there is a MQConnection definition has the given name.  If so, a MQConnection is created with the found configuration.
  
     * The MQConnection is then registerd to the internal registry and returned.
     * If there is no MQConnection instance with the given name and no definition with the given name(connName)in the config file, NULL is returned.
     * The returned MQConnection object can be freed by calling MQConnection::close(MQConnection* ) method.
     * @param connName MQConnection name.  The name shall be defined in the config file.
     * @return a MQConnection instance with the given name.
     * @pre connName != NULL
     */
    static MQConnection* getConnection(const char* connName);
    /**
     * get the connection name
     */
    string getName() const;
    /**
     * check whether the MQConnection is connected to the queue manager.
     * @return true if connected to the queue manager, false otherwise.
     */
    bool isConnected() const;
    /**
     * Closes a MQConnection previously returned by getConnection(const char* )
     * The connection is really closed and destroyed after it's reference count reaches 0, and
     * a connection cannot be used after it is closed.
     */
    void close(MQConnection* conn);
    /**
     * @return internal reference count.  Used for debug purposes.
     */
    int getOpenCount() const;
    /**
     * Tries to send a message to the request queue, and wait for a reply message from the reply queue.
     * the message ID of the request queue is set by the queue manager, and the CorreID of the reply message is expected to be the same as message ID of the request message.
     * @param msg request message to be sent.
     * @param sz number of bytes of the request message.
     * @return reply message.  or NULL if timed out.
     * the returned message object shall be deleted by #deleteMessage(const MQMessage&).
     * @throws MQException if an error occured.
     */
    MQMessage sendRequest(const void* msg, size_t sz);
    /**
     * Receives a single message from the queue, wait untill a message is received or timed out.
     * @return the first message in the queue.
     * or NULL if there is no message available.
     * the returned message object shall be deleted by #deleteMessage(const MQMessage&).
     * @throws MQException if an error occured
     */
    MQMessage receive();
    /**
     * Receives a single message from the queue, returns immediately if there is no message available.
     * @return reply message.  or NULL if there is no message available.
     * the returned message object shall be deleted by #deleteMessage(const MQMessage&).
     * @throws MQException if an error occured
     */
    MQMessage receiveNoWait();
    /**
     * Commits the current work unit.
     * @throws MQException if commit failed.
     */
    void commit();
    /**
     * Rolls back the current work unit.
     * @throws MQException if commit failed.
     */
    void rollback();
    /**
     * Frees memory for the given MQMessage.
     * After this method is called, the msg shall not be used in any way.
     * @param msg A MQMessage returned by any one of the following methods.
     * sendRequest
     * receive
     * receiveNoWait
     */
    static void deleteMessage(const MQMessage& msg);
private:
    /**
     * tres to connect to the queue manager, defined in the configuration.
     * @return the internally maintained information for MQ connection
     * @throws MQException  when failed to connect to queue manager.
     */
    ConnectionInfo* connect();
    /**
     * If a connection to queue managre is established, try to disconnect to the queue manager, release any MQ object returned by MQI.
     * If the MQConnection is not connected, returns immediately.
     */
    void disconnect();
    /** opens the outgoing queue
     * @param mqInfo internally maintained MQ connection information.
     */
    void openSendQueue(ConnectionInfo* mqInfo);
    /** opens receiving queue */
    void openReceiveQueue(ConnectionInfo* mqInfo);
    /**
     * Ensure MQ connection is established and the necessary queue(or queues) is opened
     * @param send ensure the outgoing queue is open.
     * @param receive ensure the receiving queue is open.
     */
    void ensureConnection(bool send, bool receive);
    /**
     * Tries to receive a message from the receiving queue, with the given parameters.
     * @param correId correlation Id for MQGET.  Only message has the given correId will be received.
     * @param timeout timeout in mili-seconds for MQGET. 0 means no timeout.
     * @param syncPoint whether to start a synchronous point.  If true, MQGMO_SYNCPOINT will be used in MQGET options.
     */
    MQMessage receiveMessage(const char* correId, int timeout, bool syncPoint);
    /**
     * Creates a connection with the given configuraton.
     * @param config configuration
     */
    MQConnection(const MQConfig& config);
    /**
     * Internal implementation
     */
    struct TImpl;
    TImpl& _impl;
private:
    /**
     * copy and assignment are not allowed.
     */
    MQConnection(const MQConnection& );
    MQConnection& operator=(const MQConnection& );
};
#endif // __MQConnection
-----------------------
#if !defined (__MQException_h)
#define __MQException_h
#include "base/Exception.h"
#include "base/ExceptionCategoryEnum.h"
#include
/**
* @file
*/
/**
* Exception thrown when a MQ operation failed.
*/
class MQException: public Exception {
public:
    /**
     * Constructor with error information.
     * @param completionCode completion code of the failed MQ operation
     * @param reasonCode reason code of the failed MQ operation
     * @param msg error message error message.
     * @param cause when the MQException is caused by another exception, the source exception can be passed by this parameter
     */
    MQException(MQLONG completionCode, MQLONG reasonCode, const char* msg, const Exception* cause = NULL);
    /**
     * Constructor with error information.
     * @param completionCode completion code of the failed MQ operation
     * @param reasonCode reason code of the failed MQ operation
     * @param msg error message error message.
     * @param cause when the MQException is caused by another exception, the source exception can be passed by this parameter
     */
    MQException(MQLONG completionCode, MQLONG reasonCode, const string& msg, const Exception* cause = NULL);
    /**
     * copy constructor
     */
    MQException(const MQException& another);
    /** assignment */
    MQException& operator=(const MQException&);
    /** destructor */
    virtual ~MQException();
    /** @see Exception#clone()
     */
    virtual Exception* clone() const;
    /** @return completion code
     */
    MQLONG getCompletionCode() const;
    /**
     * @return reason code
     */
    MQLONG getReasonCode() const;
    /**
     * Checks whether the Exception was thrown due to a broken MQ connection.
     * @return true if the Exception was thrown due to a broken MQ connection, false otherwise
     * @see #isConnectionBroken(MQLONG, MQLONG)
     */
    bool isConnectionBroken() const;
    /** Checks whether the Exception was thrown due to MQ queue manager is terminating.
     * @return true if Exception was thrown due MQ queue manager is terminating.
     */
    bool isQueueManagerTerminating() const;
   
    /**
     * Utility method to check whether the given completionCode and reasonCode describes a connection broken error
     * @param completionCode completion code of a MQ operation
     * @param reasonCode reason code of a MQ operation
     */
    static bool isConnectionBroken(MQLONG completionCode, MQLONG reasonCode);
    /**
     * Utility method to check whether the given completionCode and reasonCode implies the queue manager is terminating.
     * @param completionCode completion code of a MQ operation
     * @param reasonCode reason code of a MQ operation
     */
    static bool isQueueManagerTerminating(MQLONG completionCode, MQLONG reasonCode);
private:
    MQLONG _completionCode; ///
inline MQException::MQException(MQLONG completionCode, MQLONG reasonCode, const char* msg, const Exception* cause)
    : Exception(MQ_ERROR, msg, cause), _completionCode(completionCode), _reasonCode(reasonCode)
{
}
inline MQException::MQException(MQLONG completionCode, MQLONG reasonCode, const string& msg, const Exception* cause)
    : Exception(MQ_ERROR, msg, cause), _completionCode(completionCode), _reasonCode(reasonCode)
{
}
inline MQException::MQException(const MQException& another)
    : Exception(another), _completionCode(another._completionCode), _reasonCode(another._reasonCode)
{
}
inline MQException& MQException::operator=(const MQException& another)
{
    Exception::operator=(another);
    _completionCode = another._completionCode;
    _reasonCode = another._reasonCode;
    return *this;
}
inline MQLONG MQException::getReasonCode() const
{
    return _reasonCode;
}
inline MQLONG MQException::getCompletionCode() const
{
    return _completionCode;
}
inline bool MQException::isConnectionBroken() const
{
    return isConnectionBroken(_completionCode, _reasonCode);
}
inline bool MQException::isQueueManagerTerminating() const
{
    return isQueueManagerTerminating(_completionCode, _reasonCode);
}
#endif // __MQException_h

-----------

#if !defined (__MQMessage_h)
#define __MQMessage_h
#include
/**
* @file
* @author aloha zhang
*/
/**
* Represents a received MQ message.
*/
class MQMessage {
public:
    /** default constructor.
     * initializes size to 0, and message buffer to NULL.
     */
    MQMessage();
    /**
     * Specifies message size and buffer.
     * @param sz message size
     * @param buf message data buffer.
     */
    MQMessage(size_t sz, char* buf);
    /**
     * copy constructor
     */
    MQMessage(const MQMessage& msg);
    /**
     * assignment
     */
    MQMessage& operator=(const MQMessage& msg);
    /**
     * @return message data buffer.
     */
    char* data() const;
    /**
     * @return message size in bytes.
     */
    size_t getSize() const;
    /**
     * equality comparator
     * @return data() == ptr
     */
    bool operator==(const void* ptr) const;
    /**
     * non-equality comparator
     * @return data() != ptr
     */
    bool operator!=(const void* ptr) const;
private:
    size_t _size; ///
inline MQMessage::MQMessage()
    : _size(0), _buf(NULL)
{
}
inline MQMessage::MQMessage(size_t sz, char* buf)
    : _size(sz), _buf(buf)
{
}
inline MQMessage::MQMessage(const MQMessage& msg)
    : _size(msg._size), _buf(msg._buf)
{
}
inline char* MQMessage::data() const
{
    return _buf;
}
inline size_t MQMessage::getSize() const
{
    return _size;
}
inline bool MQMessage::operator==(const void* ptr) const
{
    return _buf == ptr;
}
inline bool MQMessage::operator!=(const void* ptr) const
{
    return _buf != ptr;
}
/**
* Equality comparator
* @return ptr == msg.data()
*/
inline bool operator==(const void* ptr, const MQMessage& msg)
{
    return msg.data() == ptr;
}
/**
* Non-equality comparator
* @return ptr != msg.data()
*/
inline bool operator!=(const void* ptr, const MQMessage& msg)
{
    return msg.data() != ptr;
}
#endif // __MQMessage_h


本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u/22900/showart_166893.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP