提交 572fa0dcc87b72799e8ca347559cc7f9b160ea65

作者 aliyunmq
1 个父辈 9d47cd58

101 samples

... ... @@ -2,12 +2,17 @@
2 2 build.sh
3 3 *.o
4 4 *.a
  5 +include
5 6 include/
  7 +lib
6 8 lib/
7 9 consumer
8 10 producer
  11 +trans_producer
9 12 run_consumer
10 13 run_producer
  14 +run_trans_producer
11 15 run_consumer.cpp
12 16 run_producer.cpp
  17 +run_trans_producer.cpp
13 18 replace.sh
... ...
... ... @@ -57,9 +57,12 @@ int main() {
57 57 cout << "MessageId: " << iter->getMessageId()
58 58 << " PublishTime: " << iter->getPublishTime()
59 59 << " Tag: " << iter->getMessageTag()
  60 + << " Body: " << iter->getMessageBody()
60 61 << " FirstConsumeTime: " << iter->getFirstConsumeTime()
61 62 << " NextConsumeTime: " << iter->getNextConsumeTime()
62   - << " ConsumedTimes: " << iter->getConsumedTimes() << endl;
  63 + << " ConsumedTimes: " << iter->getConsumedTimes()
  64 + << " Properties: " << iter->getPropertiesAsString()
  65 + << " Key: " << iter->getMessageKey() << endl;
63 66 receiptHandles.push_back(iter->getReceiptHandle());
64 67 }
65 68  
... ... @@ -87,14 +90,14 @@ int main() {
87 90 continue;
88 91 }
89 92 cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
90   -#ifdef WIN32
  93 +#ifdef _WIN32
91 94 Sleep(2000);
92 95 #else
93 96 usleep(2000 * 1000);
94 97 #endif
95 98 } catch (MQExceptionBase& mb) {
96 99 cout << "Request Failed: " + mb.ToString() << endl;
97   -#ifdef WIN32
  100 +#ifdef _WIN32
98 101 Sleep(2000);
99 102 #else
100 103 usleep(2000 * 1000);
... ...
1 1 //#include <iostream>
2 2 #include <fstream>
  3 +#include <time.h>
3 4 #include "mq_http_sdk/mq_client.h"
4 5  
5 6 using namespace std;
... ... @@ -30,10 +31,30 @@ int main() {
30 31 }
31 32  
32 33 try {
33   - for (int i = 0; i < 100; i++)
  34 + for (int i = 0; i < 4; i++)
34 35 {
35 36 PublishMessageResponse pmResp;
36   - producer->publishMessage("Hello, mq!", pmResp);
  37 + if (i % 4 == 0) {
  38 + // publish message, only have body.
  39 + producer->publishMessage("Hello, mq!", pmResp);
  40 + } else if (i % 4 == 1) {
  41 + // publish message, only have body and tag.
  42 + producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
  43 + } else if (i % 4 == 2) {
  44 + // publish message, have body,tag,properties and key.
  45 + TopicMessage pubMsg("Hello, mq!have key!");
  46 + pubMsg.putProperty("a",std::to_string(i));
  47 + pubMsg.setMessageKey("MessageKey" + std::to_string(i));
  48 + producer->publishMessage(pubMsg, pmResp);
  49 + } else {
  50 + // publish timer message, message will be consumed after StartDeliverTime
  51 + TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
  52 + // StartDeliverTime is an absolute time in millisecond.
  53 + pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
  54 + pubMsg.putProperty("b",std::to_string(i));
  55 + pubMsg.putProperty("c",std::to_string(i));
  56 + producer->publishMessage(pubMsg, pmResp);
  57 + }
37 58 cout << "Publish mq message success. Topic is: " << topic
38 59 << ", msgId is:" << pmResp.getMessageId()
39 60 << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
... ...
  1 +//#include <iostream>
  2 +#include <fstream>
  3 +#ifdef _WIN32
  4 +#include <windows.h>
  5 +#include <process.h>
  6 +#else
  7 +#include "pthread.h"
  8 +#endif
  9 +#include "mq_http_sdk/mq_client.h"
  10 +
  11 +using namespace std;
  12 +using namespace mq::http::sdk;
  13 +
  14 +
  15 +const int32_t pubMsgCount = 4;
  16 +const int32_t halfCheckCount = 3;
  17 +
  18 +void processCommitRollError(AckMessageResponse& bdmResp, const std::string& messageId) {
  19 + if (bdmResp.isSuccess()) {
  20 + cout << "Commit/Roll Transaction Suc: " << messageId << endl;
  21 + return;
  22 + }
  23 + const std::vector<AckMessageFailedItem>& failedItems =
  24 + bdmResp.getAckMessageFailedItem();
  25 + for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
  26 + iter != failedItems.end(); ++iter)
  27 + {
  28 + cout << "Commit/Roll Transaction ERROR: " << iter->errorCode
  29 + << " " << iter->receiptHandle << endl;
  30 + }
  31 +}
  32 +
  33 +#ifdef WIN32
  34 +unsigned __stdcall consumeHalfMessageThread(void *arg)
  35 +#else
  36 +void* consumeHalfMessageThread(void *arg)
  37 +#endif
  38 +{
  39 + MQTransProducerPtr transProducer = *(MQTransProducerPtr*)(arg);
  40 + int count = 0;
  41 + do {
  42 + std::vector<Message> halfMsgs;
  43 + try {
  44 + // 长轮询消费消息
  45 + // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  46 + transProducer->consumeHalfMessage(
  47 + 1,//一次最多消费1条(最多可设置为16条)
  48 + 3,//长轮询时间3秒(最多可设置为30秒)
  49 + halfMsgs
  50 + );
  51 + } catch (MQServerException& me) {
  52 + if (me.GetErrorCode() == "MessageNotExist") {
  53 + cout << "No half message to consume! RequestId: " + me.GetRequestId() << endl;
  54 + continue;
  55 + }
  56 + cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
  57 + }
  58 + if (halfMsgs.size() == 0) {
  59 + continue;
  60 + }
  61 +
  62 + cout << "Consume Half: " << halfMsgs.size() << " Messages!" << endl;
  63 + // 处理事务半消息
  64 + std::vector<std::string> receiptHandles;
  65 + for (std::vector<Message>::iterator iter = halfMsgs.begin();
  66 + iter != halfMsgs.end(); ++iter)
  67 + {
  68 + cout << "MessageId: " << iter->getMessageId()
  69 + << " PublishTime: " << iter->getPublishTime()
  70 + << " Tag: " << iter->getMessageTag()
  71 + << " Body: " << iter->getMessageBody()
  72 + << " FirstConsumeTime: " << iter->getFirstConsumeTime()
  73 + << " NextConsumeTime: " << iter->getNextConsumeTime()
  74 + << " ConsumedTimes: " << iter->getConsumedTimes()
  75 + << " Properties: " << iter->getPropertiesAsString()
  76 + << " Key: " << iter->getMessageKey() << endl;
  77 +
  78 + int32_t consumedTimes = iter->getConsumedTimes();
  79 + const std::string propA = iter->getProperty("a");
  80 + const std::string handle = iter->getReceiptHandle();
  81 + AckMessageResponse bdmResp;
  82 + if (propA == "1") {
  83 + cout << "Commit msg.." << endl;
  84 + transProducer->commit(handle, bdmResp);
  85 + count++;
  86 + } else if(propA == "2") {
  87 + if (consumedTimes > 1) {
  88 + cout << "Commit msg.." << endl;
  89 + transProducer->commit(handle, bdmResp);
  90 + count++;
  91 + } else {
  92 + cout << "Commit Later!!!" << endl;
  93 + }
  94 + } else if(propA == "3") {
  95 + cout << "Rollback msg.." << endl;
  96 + transProducer->rollback(handle, bdmResp);
  97 + count++;
  98 + } else {
  99 + transProducer->commit(handle, bdmResp);
  100 + cout << "Unkown msg.." << endl;
  101 + }
  102 + // 如果Commit/Rollback时超过了NextConsumeTime的时间则会失败
  103 + processCommitRollError(bdmResp, iter->getMessageId());
  104 + }
  105 +
  106 + } while(count < halfCheckCount);
  107 +
  108 +#ifdef WIN32
  109 + return 0;
  110 +#else
  111 + return NULL;
  112 +#endif
  113 +}
  114 +
  115 +int main() {
  116 +
  117 + MQClient mqClient(
  118 + // 设置HTTP接入域名(此处以公共云生产环境为例)
  119 + "${HTTP_ENDPOINT}",
  120 + // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  121 + "${ACCESS_KEY}",
  122 + // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  123 + "${SECRET_KEY}"
  124 + );
  125 +
  126 + // 所属的 Topic
  127 + string topic = "${TOPIC}";
  128 + // Topic所属实例ID,默认实例为空
  129 + string instanceId = "${INSTANCE_ID}";
  130 + // 您在控制台创建的 Consumer ID(Group ID)
  131 + string groupId = "${GROUP_ID}";
  132 +
  133 + MQTransProducerPtr transProducer;
  134 + if (instanceId == "") {
  135 + transProducer = mqClient.getTransProducerRef(topic, groupId);
  136 + } else {
  137 + transProducer = mqClient.getTransProducerRef(instanceId, topic, groupId);
  138 + }
  139 +
  140 + // 客户端需要有一个线程或者进程来消费没有确认的事务消息
  141 + // 示例这里启动一个线程来检查没有确认的事务消息
  142 +#ifdef WIN32
  143 + HANDLE thread;
  144 + unsigned int threadId;
  145 + thread = (HANDLE)_beginthreadex(NULL, 0, consumeHalfMessageThread, &transProducer, 0, &threadId);
  146 +#else
  147 + pthread_t thread;
  148 + pthread_create(&thread, NULL, consumeHalfMessageThread, static_cast<void *>(&transProducer));
  149 +#endif
  150 +
  151 + try {
  152 + for (int i = 0; i < pubMsgCount; i++)
  153 + {
  154 + PublishMessageResponse pmResp;
  155 + TopicMessage pubMsg("Hello, mq, trans_msg!");
  156 + pubMsg.putProperty("a",std::to_string(i));
  157 + pubMsg.setMessageKey("ImKey");
  158 + pubMsg.setTransCheckImmunityTime(10);
  159 + transProducer->publishMessage(pubMsg, pmResp);
  160 + cout << "Publish mq message success. Topic:" << topic
  161 + << ", msgId:" << pmResp.getMessageId()
  162 + << ", bodyMD5:" << pmResp.getMessageBodyMD5()
  163 + << ", Handle:" << pmResp.getReceiptHandle() << endl;
  164 +
  165 + if (i == 0) {
  166 + // 发送完处理了业务逻辑,可直接Commit/Rollback
  167 + // 如果Commit/Rollback时超过了TransCheckImmunityTime则会失败
  168 + AckMessageResponse bdmResp;
  169 + transProducer->commit(pmResp.getReceiptHandle(), bdmResp);
  170 + processCommitRollError(bdmResp, pmResp.getMessageId());
  171 + }
  172 + }
  173 + } catch (MQServerException& me) {
  174 + cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
  175 + } catch (MQExceptionBase& mb) {
  176 + cout << "Request Failed: " + mb.ToString() << endl;
  177 + }
  178 +
  179 +#ifdef WIN32
  180 + WaitForSingleObject(thread, INFINITE);
  181 + CloseHandle(thread);
  182 +#else
  183 + pthread_join(thread, NULL);
  184 +#endif
  185 +
  186 + return 0;
  187 +}
... ...
... ... @@ -3,3 +3,4 @@
3 3 replace.sh
4 4 run_consumer.cs
5 5 run_producer.cs
  6 +run_trans_producer.cs
... ...
... ... @@ -42,7 +42,8 @@ namespace Aliyun.MQ.Sample
42 42 3, // 一次最多消费3条(最多可设置为16条)
43 43 3 // 长轮询时间3秒(最多可设置为30秒)
44 44 );
45   - } catch (Exception exp1)
  45 + }
  46 + catch (Exception exp1)
46 47 {
47 48 if (exp1 is MessageNotExistException)
48 49 {
... ... @@ -53,19 +54,18 @@ namespace Aliyun.MQ.Sample
53 54 Thread.Sleep(2000);
54 55 }
55 56  
56   - if (messages == null) {
  57 + if (messages == null)
  58 + {
57 59 continue;
58 60 }
59 61  
60   - List<String> handlers = new List<string>();
  62 + List<string> handlers = new List<string>();
61 63 Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
62 64 // 处理业务逻辑
63 65 foreach (Message message in messages)
64 66 {
65   - Console.WriteLine("MessageId:" + message.Id + ", PublishTime:" + message.PublishTime + ", NextConsumeTime:" + message.NextConsumeTime
66   - + "\n ConsumedTimes:" + message.ConsumedTimes + ", MessageTag:" + message.MessageTag
67   - + "\n BodyMD5:" + message.BodyMD5 + ", NextConsumeTime:" + message.NextConsumeTime
68   - + "\n Body:" + message.Body);
  67 + Console.WriteLine(message);
  68 + Console.WriteLine("Property a is:" + message.GetProperty("a"));
69 69 handlers.Add(message.ReceiptHandle);
70 70 }
71 71 // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
... ... @@ -74,12 +74,13 @@ namespace Aliyun.MQ.Sample
74 74 {
75 75 consumer.AckMessage(handlers);
76 76 Console.WriteLine("Ack message success:");
77   - foreach(string handle in handlers)
  77 + foreach (string handle in handlers)
78 78 {
79 79 Console.Write("\t" + handle);
80 80 }
81 81 Console.WriteLine();
82   - } catch (Exception exp2)
  82 + }
  83 + catch (Exception exp2)
83 84 {
84 85 // 某些消息的句柄可能超时了会导致确认不成功
85 86 if (exp2 is AckMessageException)
... ...
... ... @@ -3,7 +3,7 @@ using System.Collections.Generic;
3 3 using System.Threading;
4 4 using Aliyun.MQ.Model;
5 5 using Aliyun.MQ.Model.Exp;
6   -using Aliyun.MQ;
  6 +using Aliyun.MQ.Util;
7 7  
8 8 namespace Aliyun.MQ.Sample
9 9 {
... ... @@ -28,13 +28,28 @@ namespace Aliyun.MQ.Sample
28 28 {
29 29 try
30 30 {
31   - // 循环发送100条消息
32   - for (int i = 0; i < 50; i++)
  31 + // 循环发送4条消息
  32 + for (int i = 0; i < 4; i++)
33 33 {
34   - TopicMessage result = producer.PublishMessage(new TopicMessage("dfadfadfadf"));
35   - Console.WriteLine("publis message success: MessageId:" + result.Id + ", BodyMD5:" + result.BodyMD5);
36   - result = producer.PublishMessage(new TopicMessage("dfadfadfadf", "tag"));
37   - Console.WriteLine("publis message success: MessageId:" + result.Id + ", BodyMD5:" + result.BodyMD5);
  34 + TopicMessage sendMsg;
  35 + if (i % 2 == 0)
  36 + {
  37 + sendMsg = new TopicMessage("dfadfadfadf");
  38 + // 设置属性
  39 + sendMsg.PutProperty("a", i.ToString());
  40 + // 设置KEY
  41 + sendMsg.MessageKey = "MessageKey";
  42 + }
  43 + else
  44 + {
  45 + sendMsg = new TopicMessage("dfadfadfadf", "tag");
  46 + // 设置属性
  47 + sendMsg.PutProperty("a", i.ToString());
  48 + // 定时消息, 定时时间为10s后
  49 + sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
  50 + }
  51 + TopicMessage result = producer.PublishMessage(sendMsg);
  52 + Console.WriteLine("publis message success:" + result);
38 53 }
39 54 }
40 55 catch (Exception ex)
... ... @@ -43,4 +58,4 @@ namespace Aliyun.MQ.Sample
43 58 }
44 59 }
45 60 }
46   -}
47 61 \ No newline at end of file
  62 +}
... ...
  1 +using System;
  2 +using System.Collections.Generic;
  3 +using System.Threading;
  4 +using Aliyun.MQ.Model;
  5 +using Aliyun.MQ.Model.Exp;
  6 +using Aliyun.MQ.Util;
  7 +
  8 +namespace Aliyun.MQ.Sample
  9 +{
  10 + public class TransProducerSample
  11 + {
  12 + // 设置HTTP接入域名(此处以公共云生产环境为例)
  13 + private const string _endpoint = "${HTTP_ENDPOINT}";
  14 + // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  15 + private const string _accessKeyId = "${ACCESS_KEY}";
  16 + // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  17 + private const string _secretAccessKey = "${SECRET_KEY}";
  18 + // 所属的 Topic
  19 + private const string _topicName = "${TOPIC}";
  20 + // Topic所属实例ID,默认实例为空
  21 + private const string _instanceId = "${INSTANCE_ID}";
  22 + // 您在控制台创建的 Consumer ID(Group ID)
  23 + private const string _groupId = "${GROUP_ID}";
  24 +
  25 + private static readonly MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
  26 +
  27 + private static readonly MQTransProducer transProducer = _client.GetTransProdcuer(_instanceId, _topicName, _groupId);
  28 +
  29 + static void ProcessAckError(Exception exception)
  30 + {
  31 + // 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过10s(针对consumeHalfMessage的句柄)则会失败
  32 + if (exception is AckMessageException)
  33 + {
  34 + AckMessageException ackExp = (AckMessageException)exception;
  35 + Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
  36 + foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
  37 + {
  38 + Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
  39 + }
  40 + }
  41 + }
  42 +
  43 + static void ConsumeHalfMessage()
  44 + {
  45 + int count = 0;
  46 + while (true)
  47 + {
  48 + if (count == 3)
  49 + break;
  50 + try
  51 + {
  52 + // 检查事务半消息,类似消费普通消息
  53 + List<Message> messages = null;
  54 + try
  55 + {
  56 + messages = transProducer.ConsumeHalfMessage(3, 3);
  57 + } catch (Exception exp1) {
  58 + if (exp1 is MessageNotExistException)
  59 + {
  60 + Console.WriteLine(Thread.CurrentThread.Name + " No half message, " + ((MessageNotExistException)exp1).RequestId);
  61 + continue;
  62 + }
  63 + Console.WriteLine(exp1);
  64 + Thread.Sleep(2000);
  65 + }
  66 +
  67 + if (messages == null)
  68 + continue;
  69 + // 处理业务逻辑
  70 + foreach (Message message in messages)
  71 + {
  72 + Console.WriteLine(message);
  73 + int a = int.Parse(message.GetProperty("a"));
  74 + uint consumeTimes = message.ConsumedTimes;
  75 + try {
  76 + if (a == 1) {
  77 + // 确认提交事务消息
  78 + transProducer.Commit(message.ReceiptHandle);
  79 + count++;
  80 + Console.WriteLine("Id:" + message.Id + ", commit");
  81 + } else if (a == 2 && consumeTimes > 1) {
  82 + // 确认提交事务消息
  83 + transProducer.Commit(message.ReceiptHandle);
  84 + count++;
  85 + Console.WriteLine("Id:" + message.Id + ", commit");
  86 + } else if (a == 3) {
  87 + // 确认回滚事务消息
  88 + transProducer.Rollback(message.ReceiptHandle);
  89 + count++;
  90 + Console.WriteLine("Id:" + message.Id + ", rollback");
  91 + } else {
  92 + // 什么都不做,下次再检查
  93 + Console.WriteLine("Id:" + message.Id + ", unkonwn");
  94 + }
  95 + } catch (Exception ackError) {
  96 + ProcessAckError(ackError);
  97 + }
  98 + }
  99 + }
  100 + catch (Exception ex)
  101 + {
  102 + Console.WriteLine(ex);
  103 + Thread.Sleep(2000);
  104 + }
  105 + }
  106 + }
  107 +
  108 + static void Main(string[] args)
  109 + {
  110 + // 客户端需要有一个线程或者进程来消费没有确认的事务消息
  111 + // 示例这里启动一个线程来检查没有确认的事务消息
  112 + Thread consumeHalfThread = new Thread(ConsumeHalfMessage);
  113 + consumeHalfThread.Start();
  114 +
  115 + try
  116 + {
  117 + // 循环发送4条事务消息, 第一条直接在发送完提交事务, 其它三条根据条件处理
  118 + for (int i = 0; i < 4; i++)
  119 + {
  120 + TopicMessage sendMsg = new TopicMessage("trans_msg");
  121 + sendMsg.MessageTag = "a";
  122 + sendMsg.MessageKey = "MessageKey";
  123 + sendMsg.PutProperty("a", i.ToString());
  124 + // 设置事务第一次回查的时间表征该条消息为事务消息,为相对时间,单位:秒,范围为10~300s之间
  125 + // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  126 + sendMsg.TransCheckImmunityTime = 10;
  127 +
  128 + TopicMessage result = transProducer.PublishMessage(sendMsg);
  129 + Console.WriteLine("publis message success:" + result);
  130 + try {
  131 + if (!string.IsNullOrEmpty(result.ReceiptHandle) && i == 0)
  132 + {
  133 + // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  134 + transProducer.Commit(result.ReceiptHandle);
  135 + Console.WriteLine("Id:" + result.Id + ", commit");
  136 + }
  137 + } catch (Exception ackError) {
  138 + ProcessAckError(ackError);
  139 + }
  140 + }
  141 + } catch (Exception ex) {
  142 + Console.Write(ex);
  143 + }
  144 +
  145 + consumeHalfThread.Join();
  146 + }
  147 + }
  148 +}
0 149 \ No newline at end of file
... ...
... ... @@ -3,4 +3,5 @@
3 3 replace.sh
4 4 run_consumer.go
5 5 run_producer.go
  6 +run_trans_producer.go
6 7 src/
... ...
... ... @@ -41,9 +41,11 @@ func main() {
41 41 for _, v := range resp.Messages {
42 42 handles = append(handles, v.ReceiptHandle)
43 43 fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
44   - "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n\tBody: %s\n",
  44 + "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
  45 + "\tBody: %s\n"+
  46 + "\tProps: %s\n",
45 47 v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
46   - v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody)
  48 + v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
47 49 }
48 50  
49 51 // NextConsumeTime前若不确认消息消费成功,则消息会重复消费
... ...
... ... @@ -3,6 +3,7 @@ package main
3 3 import (
4 4 "fmt"
5 5 "time"
  6 + "strconv"
6 7  
7 8 "github.com/aliyunmq/mq-http-go-sdk"
8 9 )
... ... @@ -22,11 +23,29 @@ func main() {
22 23 client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
23 24  
24 25 mqProducer := client.GetProducer(instanceId, topic)
25   - // 循环发送100条消息
26   - for i := 1; i < 100; i++ {
27   - msg := mq_http_sdk.PublishMessageRequest{
28   - MessageBody: "hello mq!", //消息内容
29   - MessageTag: "", // 消息标签
  26 + // 循环发送4条消息
  27 + for i := 0; i < 4; i++ {
  28 + var msg mq_http_sdk.PublishMessageRequest
  29 + if i%2 == 0 {
  30 + msg = mq_http_sdk.PublishMessageRequest{
  31 + MessageBody: "hello mq!", //消息内容
  32 + MessageTag: "", // 消息标签
  33 + Properties: map[string]string{}, // 消息属性
  34 + }
  35 + // 设置KEY
  36 + msg.MessageKey = "MessageKey"
  37 + // 设置属性
  38 + msg.Properties["a"] = strconv.Itoa(i)
  39 + } else {
  40 + msg = mq_http_sdk.PublishMessageRequest{
  41 + MessageBody: "hello mq timer!", //消息内容
  42 + MessageTag: "", // 消息标签
  43 + Properties: map[string]string{}, // 消息属性
  44 + }
  45 + // 设置属性
  46 + msg.Properties["a"] = strconv.Itoa(i)
  47 + // 定时消息, 定时时间为10s后, 值为毫秒级别的Unix时间戳
  48 + msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
30 49 }
31 50 ret, err := mqProducer.PublishMessage(msg)
32 51  
... ... @@ -36,6 +55,6 @@ func main() {
36 55 } else {
37 56 fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
38 57 }
39   - time.Sleep(time.Duration(1) * time.Second)
  58 + time.Sleep(time.Duration(100) * time.Millisecond)
40 59 }
41 60 }
... ...
  1 +package main
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/gogap/errors"
  6 + "strconv"
  7 + "strings"
  8 + "time"
  9 +
  10 + "github.com/aliyunmq/mq-http-go-sdk"
  11 +)
  12 +
  13 +var loopCount = 0
  14 +
  15 +func ProcessError(err error) {
  16 + // 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过10s(针对consumeHalfMessage的句柄)则会失败
  17 + if err == nil {
  18 + return
  19 + }
  20 + fmt.Println(err)
  21 + for _, errAckItem := range err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
  22 + fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
  23 + errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
  24 + }
  25 +}
  26 +
  27 +func ConsumeHalfMsg(mqTransProducer *mq_http_sdk.MQTransProducer) {
  28 + for {
  29 + if loopCount >= 10 {
  30 + return
  31 + }
  32 + loopCount++
  33 + endChan := make(chan int)
  34 + respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
  35 + errChan := make(chan error)
  36 + go func() {
  37 + select {
  38 + case resp := <-respChan:
  39 + {
  40 + // 处理业务逻辑
  41 + var handles []string
  42 + fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
  43 + for _, v := range resp.Messages {
  44 + handles = append(handles, v.ReceiptHandle)
  45 + fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
  46 + "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n\tBody: %s\n"+
  47 + "\tProperties:%s, Key:%s, Timer:%d, Trans:%d\n",
  48 + v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
  49 + v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody,
  50 + v.Properties, v.MessageKey, v.StartDeliverTime, v.TransCheckImmunityTime)
  51 +
  52 + a, _ := strconv.Atoi(v.Properties["a"])
  53 + var comRollErr error
  54 + if a == 1 {
  55 + // 确认提交事务消息
  56 + comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
  57 + fmt.Println("Commit---------->")
  58 + } else if a == 2 && v.ConsumedTimes > 1 {
  59 + // 确认提交事务消息
  60 + comRollErr = (*mqTransProducer).Commit(v.ReceiptHandle)
  61 + fmt.Println("Commit---------->")
  62 + } else if a == 3 {
  63 + // 确认回滚事务消息
  64 + comRollErr = (*mqTransProducer).Rollback(v.ReceiptHandle)
  65 + fmt.Println("Rollback---------->")
  66 + } else {
  67 + // 什么都不做,下次再检查
  68 + fmt.Println("Unknown---------->")
  69 + }
  70 + ProcessError(comRollErr)
  71 + }
  72 + endChan <- 1
  73 + }
  74 + case err := <-errChan:
  75 + {
  76 + // 没有消息
  77 + if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
  78 + fmt.Println("\nNo new message, continue!")
  79 + } else {
  80 + fmt.Println(err)
  81 + time.Sleep(time.Duration(3) * time.Second)
  82 + }
  83 + endChan <- 1
  84 + }
  85 + case <-time.After(35 * time.Second):
  86 + {
  87 + fmt.Println("Timeout of consumer message ??")
  88 + return
  89 + }
  90 + }
  91 + }()
  92 +
  93 + // 长轮询检查事务半消息
  94 + // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  95 + (*mqTransProducer).ConsumeHalfMessage(respChan, errChan,
  96 + 3, // 一次最多消费3条(最多可设置为16条)
  97 + 3, // 长轮询时间3秒(最多可设置为30秒)
  98 + )
  99 + <-endChan
  100 + }
  101 +}
  102 +
  103 +func main() {
  104 + // 设置HTTP接入域名(此处以公共云生产环境为例)
  105 + endpoint := "${HTTP_ENDPOINT}"
  106 + // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  107 + accessKey := "${ACCESS_KEY}"
  108 + // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  109 + secretKey := "${SECRET_KEY}"
  110 + // 所属的 Topic
  111 + topic := "${TOPIC}"
  112 + // Topic所属实例ID,默认实例为空
  113 + instanceId := "${INSTANCE_ID}"
  114 + // 您在控制台创建的 Consumer ID(Group ID)
  115 + groupId := "${GROUP_ID}"
  116 +
  117 + client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
  118 +
  119 + mqTransProducer := client.GetTransProducer(instanceId, topic, groupId)
  120 +
  121 + // 客户端需要有一个线程或者进程来消费没有确认的事务消息
  122 + // 示例这里启动一个Goroutines来检查没有确认的事务消息
  123 + go ConsumeHalfMsg(&mqTransProducer)
  124 +
  125 + // 发送4条事务消息,1条发送完就提交,其余3条通过检查事务半消息处理
  126 + for i := 0; i < 4; i++ {
  127 + msg := mq_http_sdk.PublishMessageRequest{
  128 + MessageBody:"I am transaction msg!",
  129 + Properties: map[string]string{"a":strconv.Itoa(i)},
  130 + }
  131 + // 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  132 + // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  133 + msg.TransCheckImmunityTime = 10
  134 +
  135 + resp, pubErr := mqTransProducer.PublishMessage(msg)
  136 + if pubErr != nil {
  137 + fmt.Println(pubErr)
  138 + return
  139 + }
  140 + fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, Handle:%s\n",
  141 + resp.MessageId, resp.MessageBodyMD5, resp.ReceiptHandle)
  142 + if i == 0 {
  143 + // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  144 + ackErr := mqTransProducer.Commit(resp.ReceiptHandle)
  145 + fmt.Println("Commit---------->")
  146 + ProcessError(ackErr)
  147 + }
  148 + }
  149 +
  150 + for ; loopCount < 10 ; {
  151 + time.Sleep(time.Duration(1) * time.Second)
  152 + }
  153 +}
... ...
... ... @@ -3,4 +3,5 @@
3 3 replace.sh
4 4 RunConsumer.java
5 5 RunProducer.java
  6 +RunTransProducer.java
6 7 target/
... ...
... ... @@ -18,7 +18,7 @@
18 18 <dependency>
19 19 <groupId>com.aliyun.mq</groupId>
20 20 <artifactId>mq-http-sdk</artifactId>
21   - <version>1.0.0</version>
  21 + <version>1.0.1-SNAPSHOT</version>
22 22 <classifier>jar-with-dependencies</classifier>
23 23 </dependency>
24 24 </dependencies>
... ...
... ... @@ -30,14 +30,33 @@ public class Producer {
30 30 }
31 31  
32 32 try {
33   - // 循环发送100条消息
34   - for (int i = 0; i < 100; i++) {
35   - TopicMessage pubMsg = new TopicMessage(
36   - // 消息内容
37   - "hello mq!".getBytes(),
38   - // 消息标签
39   - "A"
40   - );
  33 + // 循环发送4条消息
  34 + for (int i = 0; i < 4; i++) {
  35 + TopicMessage pubMsg;
  36 + if (i % 2 == 0) {
  37 + // 普通消息
  38 + pubMsg = new TopicMessage(
  39 + // 消息内容
  40 + "hello mq!".getBytes(),
  41 + // 消息标签
  42 + "A"
  43 + );
  44 + // 设置属性
  45 + pubMsg.getProperties().put("a", String.valueOf(i));
  46 + // 设置KEY
  47 + pubMsg.setMessageKey("MessageKey");
  48 + } else {
  49 + pubMsg = new TopicMessage(
  50 + // 消息内容
  51 + "hello mq!".getBytes(),
  52 + // 消息标签
  53 + "A"
  54 + );
  55 + // 设置属性
  56 + pubMsg.getProperties().put("a", String.valueOf(i));
  57 + // 定时消息, 定时时间为10s后
  58 + pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
  59 + }
41 60 // 同步发送消息,只要不抛异常就是成功
42 61 TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
43 62  
... ...
  1 +import com.aliyun.mq.http.MQClient;
  2 +import com.aliyun.mq.http.MQTransProducer;
  3 +import com.aliyun.mq.http.common.AckMessageException;
  4 +import com.aliyun.mq.http.model.Message;
  5 +import com.aliyun.mq.http.model.TopicMessage;
  6 +
  7 +import java.util.List;
  8 +
  9 +public class TransProducer {
  10 +
  11 +
  12 + static void processCommitRollError(Throwable e) {
  13 + if (e instanceof AckMessageException) {
  14 + AckMessageException errors = (AckMessageException) e;
  15 + System.out.println("Commit/Roll transaction error, requestId is:" + errors.getRequestId() + ", fail handles:");
  16 + if (errors.getErrorMessages() != null) {
  17 + for (String errorHandle :errors.getErrorMessages().keySet()) {
  18 + System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
  19 + + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
  20 + }
  21 + }
  22 + }
  23 + }
  24 +
  25 + public static void main(String[] args) throws Throwable {
  26 + MQClient mqClient = new MQClient(
  27 + // 设置HTTP接入域名(此处以公共云生产环境为例)
  28 + "${HTTP_ENDPOINT}",
  29 + // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  30 + "${ACCESS_KEY}",
  31 + // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  32 + "${SECRET_KEY}"
  33 + );
  34 +
  35 + // 所属的 Topic
  36 + final String topic = "${TOPIC}";
  37 + // Topic所属实例ID,默认实例为空
  38 + final String instanceId = "${INSTANCE_ID}";
  39 + // 您在控制台创建的 Consumer ID(Group ID)
  40 + final String groupId = "${GROUP_ID}";
  41 +
  42 + final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topic, groupId);
  43 +
  44 + for (int i = 0; i < 4; i++) {
  45 + TopicMessage topicMessage = new TopicMessage();
  46 + topicMessage.setMessageBody("trans_msg");
  47 + topicMessage.setMessageTag("a");
  48 + topicMessage.setMessageKey(String.valueOf(System.currentTimeMillis()));
  49 + // 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  50 + // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  51 + topicMessage.setTransCheckImmunityTime(10);
  52 + topicMessage.getProperties().put("a", String.valueOf(i));
  53 +
  54 + TopicMessage pubResultMsg = null;
  55 + pubResultMsg = mqTransProducer.publishMessage(topicMessage);
  56 + System.out.println("Send---->msgId is: " + pubResultMsg.getMessageId()
  57 + + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
  58 + + ", Handle: " + pubResultMsg.getReceiptHandle()
  59 + );
  60 + if (pubResultMsg != null && pubResultMsg.getReceiptHandle() != null) {
  61 + if (i == 0) {
  62 + // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  63 + try {
  64 + mqTransProducer.commit(pubResultMsg.getReceiptHandle());
  65 + System.out.println(String.format("MessageId:%s, commit", pubResultMsg.getMessageId()));
  66 + } catch (Throwable e) {
  67 + // 如果Commit/Rollback时超过了TransCheckImmunityTime则会失败
  68 + if (e instanceof AckMessageException) {
  69 + processCommitRollError(e);
  70 + continue;
  71 + }
  72 + }
  73 + }
  74 + }
  75 + }
  76 +
  77 + // 客户端需要有一个线程或者进程来消费没有确认的事务消息
  78 + // 示例这里启动一个线程来检查没有确认的事务消息
  79 + Thread t = new Thread(new Runnable() {
  80 + public void run() {
  81 + int count = 0;
  82 + while(true) {
  83 + try {
  84 + if (count == 3) {
  85 + break;
  86 + }
  87 + List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
  88 + if (messages == null) {
  89 + System.out.println("No Half message!");
  90 + continue;
  91 + }
  92 + System.out.println(String.format("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d",
  93 + messages.get(0).getMessageId(),
  94 + messages.get(0).getProperties(),
  95 + messages.get(0).getMessageBodyString(),
  96 + System.currentTimeMillis() - messages.get(0).getPublishTime()));
  97 +
  98 + for (Message message : messages) {
  99 + try {
  100 + if (Integer.valueOf(message.getProperties().get("a")) == 1) {
  101 + // 确认提交事务消息
  102 + mqTransProducer.commit(message.getReceiptHandle());
  103 + count++;
  104 + System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
  105 + } else if (Integer.valueOf(message.getProperties().get("a")) == 2
  106 + && message.getConsumedTimes() > 1) {
  107 + // 确认提交事务消息
  108 + mqTransProducer.commit(message.getReceiptHandle());
  109 + count++;
  110 + System.out.println(String.format("MessageId:%s, commit", message.getMessageId()));
  111 + } else if (Integer.valueOf(message.getProperties().get("a")) == 3) {
  112 + // 确认回滚事务消息
  113 + mqTransProducer.rollback(message.getReceiptHandle());
  114 + count++;
  115 + System.out.println(String.format("MessageId:%s, rollback", message.getMessageId()));
  116 + } else {
  117 + // 什么都不做,下次再检查
  118 + System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
  119 + }
  120 + } catch (Throwable e) {
  121 + // 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过10s(针对consumeHalfMessage的句柄)则会失败
  122 + processCommitRollError(e);
  123 + }
  124 + }
  125 + } catch (Throwable e) {
  126 + System.out.println(e.getMessage());
  127 + }
  128 + }
  129 + }
  130 + });
  131 +
  132 + t.start();
  133 +
  134 + t.join();
  135 +
  136 + mqClient.close();
  137 + }
  138 +
  139 +}
... ...
... ... @@ -4,3 +4,4 @@ package-lock.json
4 4 replace.sh
5 5 run_consumer.js
6 6 run_producer.js
  7 +run_trans-producer.js
... ...
... ... @@ -35,9 +35,10 @@ const consumer = client.getConsumer(instanceId, topic, groupId);
35 35 // 消费消息,处理业务逻辑
36 36 console.log("Consume Messages, requestId:%s", res.requestId);
37 37 const handles = res.body.map((message) => {
38   - console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s",
  38 + console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
  39 + ",Props:%j,MessageKey:%s,Prop-A:%s",
39 40 message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
40   - message.MessageBody);
  41 + message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
41 42 return message.ReceiptHandle;
42 43 });
43 44  
... ...
1 1 {
2 2 "name": "mq-http-sdk-sample",
3 3 "dependencies": {
4   - "@aliyunmq/mq-http-sdk": "1.0.0"
  4 + "@aliyunmq/mq-http-sdk": "1.0.1"
5 5 }
6 6 }
... ...
1 1 const {
2   - MQClient
  2 + MQClient,
  3 + MessageProperties
3 4 } = require('@aliyunmq/mq-http-sdk');
4 5  
5 6 // 设置HTTP接入域名(此处以公共云生产环境为例)
... ... @@ -20,12 +21,24 @@ const producer = client.getProducer(instanceId, topic);
20 21  
21 22 (async function(){
22 23 try {
23   - // 循环发送100条消息
24   - for(var i = 0; i < 50; i++) {
  24 + // 循环发送4条消息
  25 + for(var i = 0; i < 4; i++) {
25 26 let res;
26   - res = await producer.publishMessage("hello mq.");
27   - console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
28   - res = await producer.publishMessage("hello mq.", "tag");
  27 + if (i % 2 == 0) {
  28 + msgProps = new MessageProperties();
  29 + // 设置属性
  30 + msgProps.putProperty("a", i);
  31 + // 设置KEY
  32 + msgProps.messageKey("MessageKey");
  33 + res = await producer.publishMessage("hello mq.", "", msgProps);
  34 + } else {
  35 + msgProps = new MessageProperties();
  36 + // 设置属性
  37 + msgProps.putProperty("a", i);
  38 + // 定时消息, 定时时间为10s后
  39 + msgProps.startDeliverTime(Date.now() + 10 * 1000);
  40 + res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
  41 + }
29 42 console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
30 43 }
31 44  
... ...
  1 +const {
  2 + MQClient,
  3 + MessageProperties
  4 +} = require('@aliyunmq/mq-http-sdk');
  5 +
  6 +// 设置HTTP接入域名(此处以公共云生产环境为例)
  7 +const endpoint = "${HTTP_ENDPOINT}";
  8 +// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  9 +const accessKeyId = "${ACCESS_KEY}";
  10 +// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  11 +const accessKeySecret = "${SECRET_KEY}";
  12 +
  13 +var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
  14 +
  15 +// 所属的 Topic
  16 +const topic = "${TOPIC}";
  17 +// Topic所属实例ID,默认实例为空
  18 +const instanceId = "${INSTANCE_ID}";
  19 +// 您在控制台创建的 Consumer ID(Group ID)
  20 +const groupId = "${GROUP_ID}";
  21 +
  22 +const mqTransProducer = client.getTransProducer(instanceId, topic, groupId);
  23 +
  24 +async function processTransResult(res, msgId) {
  25 + if (!res) {
  26 + return;
  27 + }
  28 + if (res.code != 204) {
  29 + // 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过NextConsumeTime(针对consumeHalfMessage的句柄)则会失败
  30 + console.log("Commit/Rollback Message Fail:");
  31 + const failHandles = res.body.map((error) => {
  32 + console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
  33 + return error.ReceiptHandle;
  34 + });
  35 + } else {
  36 + console.log("Commit/Rollback Message suc!!! %s", msgId);
  37 + }
  38 +}
  39 +
  40 +var halfMessageCount = 0;
  41 +var halfMessageConsumeCount = 0;
  42 +
  43 +(async function(){
  44 + try {
  45 + // 循环发送4条事务消息
  46 + for(var i = 0; i < 4; i++) {
  47 + let res;
  48 + msgProps = new MessageProperties();
  49 + // 设置属性
  50 + msgProps.putProperty("a", i);
  51 + // 设置KEY
  52 + msgProps.messageKey("MessageKey");
  53 + //设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  54 + // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  55 + msgProps.transCheckImmunityTime(10);
  56 + res = await mqTransProducer.publishMessage("hello mq.", "", msgProps);
  57 + console.log("Publish message: MessageID:%s,BodyMD5:%s,Handle:%s", res.body.MessageId, res.body.MessageBodyMD5, res.body.ReceiptHandle);
  58 + if (res && i == 0) {
  59 + // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  60 + const msgId = res.body.MessageId;
  61 + res = await mqTransProducer.commit(res.body.ReceiptHandle);
  62 + console.log("Commit msg when publish, %s", msgId);
  63 + // 如果Commit/Rollback时超过了TransCheckImmunityTime则会失败
  64 + processTransResult(res, msgId);
  65 + }
  66 + }
  67 + } catch(e) {
  68 + // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  69 + console.log(e)
  70 + }
  71 +})();
  72 +
  73 +// 这里最好有一个单独线程或者进程来消费没有确认的事务消息
  74 +// 仅示例:检查没有确认的事务消息
  75 +(async function() {
  76 + // 循环检查事务半消息,类似消费普通消息
  77 + while(halfMessageCount < 3 && halfMessageConsumeCount < 15) {
  78 + try {
  79 + halfMessageConsumeCount++;
  80 + res = await mqTransProducer.consumeHalfMessage(3, 3);
  81 + if (res.code == 200) {
  82 + // 消费消息,处理业务逻辑
  83 + console.log("Consume Messages, requestId:%s", res.requestId);
  84 + res.body.forEach(async (message) => {
  85 + console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
  86 + ",Props:%j,MessageKey:%s,Prop-A:%s",
  87 + message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
  88 + message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
  89 +
  90 + var propA = message.Properties && message.Properties.a ? parseInt(message.Properties.a) : 0;
  91 + var opResp;
  92 + if (propA == 1 || (propA == 2 && message.ConsumedTimes > 1)) {
  93 + opResp = await mqTransProducer.commit(message.ReceiptHandle);
  94 + console.log("Commit msg when check half, %s", message.MessageId);
  95 + halfMessageCount++;
  96 + } else if (propA == 3) {
  97 + opResp = await mqTransProducer.rollback(message.ReceiptHandle);
  98 + console.log("Rollback msg when check half, %s", message.MessageId);
  99 + halfMessageCount++;
  100 + }
  101 + processTransResult(opResp, message.MessageId);
  102 + });
  103 + }
  104 + } catch(e) {
  105 + if (e.Code && e.Code.indexOf("MessageNotExist") > -1) {
  106 + // 没有消息,则继续长轮询服务器
  107 + console.log("Consume Transaction Half msg: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
  108 + } else {
  109 + console.log(e);
  110 + }
  111 + }
  112 + }
  113 +})();
  114 +
... ...
... ... @@ -4,4 +4,5 @@ composer.lock
4 4 .idea/
5 5 RunConsumer.php
6 6 RunProducer.php
  7 +RunTransProducer.php
7 8 replace.sh
... ...
... ... @@ -61,9 +61,11 @@ class ConsumerTest
61 61 $receiptHandles = array();
62 62 foreach ($messages as $message) {
63 63 $receiptHandles[] = $message->getReceiptHandle();
64   - printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d\n",
  64 + printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
65 65 $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
66   - $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime());
  66 + $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
  67 + $message->getMessageKey());
  68 + print_r($message->getProperties());
67 69 }
68 70  
69 71 // $message->getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费
... ...
... ... @@ -33,11 +33,19 @@ class ProducerTest
33 33 {
34 34 try
35 35 {
36   - for ($i=1; $i<=100; $i++)
  36 + for ($i=1; $i<=4; $i++)
37 37 {
38 38 $publishMessage = new TopicMessage(
39 39 "xxxxxxxx"// 消息内容
40 40 );
  41 + // 设置属性
  42 + $publishMessage->putProperty("a", $i);
  43 + // 设置消息KEY
  44 + $publishMessage->setMessageKey("MessageKey");
  45 + if ($i % 2 == 0) {
  46 + // 定时消息, 定时时间为10s后
  47 + $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
  48 + }
41 49 $result = $this->producer->publishMessage($publishMessage);
42 50  
43 51 print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
... ...
  1 +<?php
  2 +
  3 +require "vendor/autoload.php";
  4 +
  5 +use MQ\Model\TopicMessage;
  6 +use MQ\MQClient;
  7 +
  8 +class ProducerTest
  9 +{
  10 + private $client;
  11 + private $transProducer;
  12 + private $count;
  13 + private $popMsgCount;
  14 +
  15 + public function __construct()
  16 + {
  17 + $this->client = new MQClient(
  18 + // 设置HTTP接入域名(此处以公共云生产环境为例)
  19 + "${HTTP_ENDPOINT}",
  20 + // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  21 + "${ACCESS_KEY}",
  22 + // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  23 + "${SECRET_KEY}"
  24 + );
  25 +
  26 + // 所属的 Topic
  27 + $topic = "${TOPIC}";
  28 + // 您在控制台创建的 Consumer ID(Group ID)
  29 + $groupId = "${GROUP_ID}";
  30 + // Topic所属实例ID,默认实例为空NULL
  31 + $instanceId = "${INSTANCE_ID}";
  32 +
  33 + $this->transProducer = $this->client->getTransProducer($instanceId,$topic, $groupId);
  34 + $this->count = 0;
  35 + $this->popMsgCount = 0;
  36 + }
  37 +
  38 + function processAckError($e) {
  39 + if ($e instanceof MQ\Exception\AckMessageException) {
  40 + // 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)或者超过NextConsumeTime(针对consumeHalfMessage的句柄)则会失败
  41 + printf("Commit/Rollback Error, RequestId:%s\n", $e->getRequestId());
  42 + foreach ($e->getAckMessageErrorItems() as $errorItem) {
  43 + printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
  44 + }
  45 + } else {
  46 + print_r($e);
  47 + }
  48 + }
  49 +
  50 + function consumeHalfMsg() {
  51 + while($this->count < 3 && $this->popMsgCount < 15) {
  52 + $this->popMsgCount++;
  53 +
  54 + try {
  55 + $messages = $this->transProducer->consumeHalfMessage(4, 3);
  56 + } catch (\Exception $e) {
  57 + if ($e instanceof MQ\Exception\MessageNotExistException) {
  58 + print "no half transaction message\n";
  59 + continue;
  60 + }
  61 + print_r($e->getMessage() . "\n");
  62 + sleep(3);
  63 + continue;
  64 + }
  65 +
  66 + foreach ($messages as $message) {
  67 + printf("ID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d\nConsumedTimes:%d, NextConsumeTime:%d\nPropA:%s\n",
  68 + $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
  69 + $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
  70 + $message->getProperty("a"));
  71 + print_r($message->getProperties());
  72 + $propA = $message->getProperty("a");
  73 + $consumeTimes = $message->getConsumedTimes();
  74 + try {
  75 + if ($propA == "1") {
  76 + print "\n commit transaction msg: " . $message->getMessageId() . "\n";
  77 + $this->transProducer->commit($message->getReceiptHandle());
  78 + $this->count++;
  79 + } else if ($propA == "2" && $consumeTimes > 1) {
  80 + print "\n commit transaction msg: " . $message->getMessageId() . "\n";
  81 + $this->transProducer->commit($message->getReceiptHandle());
  82 + $this->count++;
  83 + } else if ($propA == "3") {
  84 + print "\n rollback transaction msg: " . $message->getMessageId() . "\n";
  85 + $this->transProducer->rollback($message->getReceiptHandle());
  86 + $this->count++;
  87 + } else {
  88 + print "\n unknown transaction msg: " . $message->getMessageId() . "\n";
  89 + }
  90 + } catch (\Exception $e) {
  91 + processAckError($e);
  92 + }
  93 + }
  94 + }
  95 + }
  96 +
  97 + public function run()
  98 + {
  99 + for ($i = 0; $i < 4; $i++) {
  100 + $pubMsg = new TopicMessage("xxxxxxxx");
  101 + // 设置属性
  102 + $pubMsg->putProperty("a", $i);
  103 + // 设置消息KEY
  104 + $pubMsg->setMessageKey("MessageKey");
  105 + //设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  106 + // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  107 + $pubMsg->setTransCheckImmunityTime(10);
  108 + $topicMessage = $this->transProducer->publishMessage($pubMsg);
  109 +
  110 + print "\npublish -> \n\t" . $topicMessage->getMessageId() . " " . $topicMessage->getReceiptHandle() . "\n";
  111 +
  112 + if ($i == 0) {
  113 + try {
  114 + // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  115 + $this->transProducer->commit($topicMessage->getReceiptHandle());
  116 + print "\n commit transaction msg when publish: " . $topicMessage->getMessageId() . "\n";
  117 + } catch (\Exception $e) {
  118 + // 如果Commit/Rollback时超过了TransCheckImmunityTime则会失败
  119 + processAckError($e);
  120 + }
  121 + }
  122 + }
  123 +
  124 + // 这里最好有一个单独线程或者进程来消费没有确认的事务消息
  125 + // 仅示例:检查没有确认的事务消息
  126 + $this->consumeHalfMsg();
  127 + }
  128 +}
  129 +
  130 +
  131 +$instance = new ProducerTest();
  132 +$instance->run();
  133 +
  134 +?>
... ...
... ... @@ -3,3 +3,5 @@
3 3 replace.sh
4 4 run_consumer.py
5 5 run_producer.py
  6 +run_trans_producer.py
  7 +*.bak
... ...
... ... @@ -5,62 +5,66 @@ from mq_http_sdk.mq_exception import MQExceptionBase
5 5 from mq_http_sdk.mq_consumer import *
6 6 from mq_http_sdk.mq_client import *
7 7  
8   -#初始化 client
  8 +# 初始化 client
9 9 mq_client = MQClient(
10   - #设置HTTP接入域名(此处以公共云生产环境为例)
  10 + # 设置HTTP接入域名(此处以公共云生产环境为例)
11 11 "${HTTP_ENDPOINT}",
12   - #AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  12 + # AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
13 13 "${ACCESS_KEY}",
14   - #SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  14 + # SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
15 15 "${SECRET_KEY}"
16   - )
17   -#所属的 Topic
  16 +)
  17 +# 所属的 Topic
18 18 topic_name = "${TOPIC}"
19   -#您在控制台创建的 Consumer ID(Group ID)
  19 +# 您在控制台创建的 Consumer ID(Group ID)
20 20 group_id = "${GROUP_ID}"
21   -#Topic所属实例ID,默认实例为空None
  21 +# Topic所属实例ID,默认实例为空None
22 22 instance_id = "${INSTANCE_ID}"
23 23  
24 24 consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
25 25  
26   -#长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
27   -#长轮询时间3秒(最多可设置为30秒)
  26 +# 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  27 +# 长轮询时间3秒(最多可设置为30秒)
28 28 wait_seconds = 3
29   -#一次最多消费3条(最多可设置为16条)
  29 +# 一次最多消费3条(最多可设置为16条)
30 30 batch = 3
31   -print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
  31 +print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
  32 + % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
32 33 while True:
33 34 try:
34   - #长轮询消费消息
  35 + # 长轮询消费消息
35 36 recv_msgs = consumer.consume_message(batch, wait_seconds)
36 37 for msg in recv_msgs:
37   - print "Receive, MessageId: %s\nMessageBodyMD5: %s \
  38 + print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
38 39 \nMessageTag: %s\nConsumedTimes: %s \
39 40 \nPublishTime: %s\nBody: %s \
40 41 \nNextConsumeTime: %s \
41   - \nReceiptHandle: %s" % \
42   - (msg.message_id, msg.message_body_md5,
43   - msg.message_tag, msg.consumed_times,
44   - msg.publish_time, msg.message_body,
45   - msg.next_consume_time, msg.receipt_handle)
46   - except MQExceptionBase, e:
  42 + \nReceiptHandle: %s \
  43 + \nProperties: %s\n" % \
  44 + (msg.message_id, msg.message_body_md5,
  45 + msg.message_tag, msg.consumed_times,
  46 + msg.publish_time, msg.message_body,
  47 + msg.next_consume_time, msg.receipt_handle, msg.properties)))
  48 + print(msg.get_property("哈哈哈"))
  49 + except MQExceptionBase as e:
47 50 if e.type == "MessageNotExist":
48   - print "No new message! RequestId: %s" % e.req_id
  51 + print(("No new message! RequestId: %s" % e.req_id))
49 52 continue
50 53  
51   - print "Consume Message Fail! Exception:%s\n" % e
  54 + print(("Consume Message Fail! Exception:%s\n" % e))
52 55 time.sleep(2)
53 56 continue
54 57  
55   - #msg.next_consume_time前若不确认消息消费成功,则消息会重复消费
56   - #消息句柄有时间戳,同一条消息每次消费拿到的都不一样
  58 + # msg.next_consume_time前若不确认消息消费成功,则消息会重复消费
  59 + # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
57 60 try:
58 61 receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
59 62 consumer.ack_message(receipt_handle_list)
60   - print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
61   - except MQExceptionBase, e:
62   - print "\nAk Message Fail! Exception:%s" % e
63   - #某些消息的句柄可能超时了会导致确认不成功
  63 + print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
  64 + except MQExceptionBase as e:
  65 + print(("\nAk Message Fail! Exception:%s" % e))
  66 + # 某些消息的句柄可能超时了会导致确认不成功
64 67 if e.sub_errors:
65   - for sub_error in e.sub_errors:
66   - print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
  68 + for sub_error in e.sub_errors:
  69 + print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
  70 + (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
... ...
1 1 #!/usr/bin/env python
2 2 # coding=utf8
  3 +import sys
3 4  
4 5 from mq_http_sdk.mq_exception import MQExceptionBase
5 6 from mq_http_sdk.mq_producer import *
6 7 from mq_http_sdk.mq_client import *
  8 +import time
7 9  
8 10 #初始化 client
9 11 mq_client = MQClient(
... ... @@ -22,23 +24,39 @@ instance_id = &quot;${INSTANCE_ID}&quot;
22 24 producer = mq_client.get_producer(instance_id, topic_name)
23 25  
24 26 # 循环发布多条消息
25   -msg_count = 100
26   -print "%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count)
  27 +msg_count = 4
  28 +print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
27 29  
28 30 try:
29 31 for i in range(msg_count):
30   - msg_body = "I am test message %s." % i
31   - msg = TopicMessage(
32   - # 消息内容
33   - "I am test message %s." % i,
34   - # 消息标签
35   - ""
36   - )
37   - re_msg = producer.publish_message(msg)
38   - print "Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5)
  32 + if i % 2 == 0:
  33 + msg = TopicMessage(
  34 + # 消息内容
  35 + "I am test message %s.你好" % i,
  36 + # 消息标签
  37 + ""
  38 + )
  39 + # 设置属性
  40 + msg.put_property("a", "i")
  41 + # 设置KEY
  42 + msg.set_message_key("MessageKey")
  43 + re_msg = producer.publish_message(msg)
  44 + print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
  45 + else:
  46 + msg = TopicMessage(
  47 + # 消息内容
  48 + "I am test message %s." % i,
  49 + # 消息标签
  50 + ""
  51 + )
  52 + msg.put_property("a", i)
  53 + # 定时消息,毫秒级绝对时间
  54 + msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
  55 + re_msg = producer.publish_message(msg)
  56 + print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
39 57 time.sleep(1)
40   -except MQExceptionBase, e:
  58 +except MQExceptionBase as e:
41 59 if e.type == "TopicNotExist":
42   - print "Topic not exist, please create it."
  60 + print("Topic not exist, please create it.")
43 61 sys.exit(1)
44   - print "Publish Message Fail. Exception:%s" % e
  62 + print("Publish Message Fail. Exception:%s" % e)
... ...
  1 +#!/usr/bin/env python
  2 +# coding=utf8
  3 +import sys
  4 +
  5 +from mq_http_sdk.mq_exception import MQExceptionBase
  6 +from mq_http_sdk.mq_producer import *
  7 +from mq_http_sdk.mq_client import *
  8 +import time
  9 +import threading
  10 +
  11 +# 初始化 client
  12 +mq_client = MQClient(
  13 + # 设置HTTP接入域名(此处以公共云生产环境为例)
  14 + "${HTTP_ENDPOINT}",
  15 + # AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  16 + "${ACCESS_KEY}",
  17 + # SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  18 + "${SECRET_KEY}"
  19 +)
  20 +# 所属的 Topic
  21 +topic_name = "${TOPIC}"
  22 +# Topic所属实例ID,默认实例为空None
  23 +instance_id = "${INSTANCE_ID}"
  24 +# 您在控制台创建的 Consumer ID(Group ID)
  25 +group_id = "${GROUP_ID}"
  26 +
  27 +# 循环发布多条事务消息
  28 +msg_count = 4
  29 +print("%sPublish Transaction Message To %s\nTopicName:%s\nMessageCount:%s\n" \
  30 + % (10 * "=", 10 * "=", topic_name, msg_count))
  31 +
  32 +
  33 +def process_trans_error(exp):
  34 + print("\nCommit/Roll Transaction Message Fail! Exception:%s" % exp)
  35 + # 如果Commit/Rollback时超过了TransCheckImmunityTime(针对发送事务消息的句柄)
  36 + # 或者超过10s(针对consumeHalfMessage的句柄)则会失败
  37 + if exp.sub_errors:
  38 + for sub_error in exp.sub_errors:
  39 + print("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
  40 + (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"]))
  41 +
  42 +
  43 +# 客户端需要有一个线程或者进程来消费没有确认的事务消息
  44 +# 示例这里启动一个线程来检查没有确认的事务消息
  45 +class ConsumeHalfMessageThread(threading.Thread):
  46 + def __init__(self):
  47 + threading.Thread.__init__(self)
  48 + self.count = 0
  49 + # 重新New一个Client
  50 + self.mq_client = MQClient(
  51 + # 设置HTTP接入域名(此处以公共云生产环境为例)
  52 + "${HTTP_ENDPOINT}",
  53 + # AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  54 + "${ACCESS_KEY}",
  55 + # SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  56 + "${SECRET_KEY}"
  57 + )
  58 + self.trans_producer = self.mq_client.get_trans_producer(instance_id, topic_name, group_id)
  59 +
  60 + def run(self):
  61 + while 1:
  62 + if self.count == 3:
  63 + break;
  64 + try:
  65 + half_msgs = self.trans_producer.consume_half_message(1, 3)
  66 + for half_msg in half_msgs:
  67 + print("Receive Half Message, MessageId: %s\nMessageBodyMD5: %s \
  68 + \nMessageTag: %s\nConsumedTimes: %s \
  69 + \nPublishTime: %s\nBody: %s \
  70 + \nNextConsumeTime: %s \
  71 + \nReceiptHandle: %s \
  72 + \nProperties: %s" % \
  73 + (half_msg.message_id, half_msg.message_body_md5,
  74 + half_msg.message_tag, half_msg.consumed_times,
  75 + half_msg.publish_time, half_msg.message_body,
  76 + half_msg.next_consume_time, half_msg.receipt_handle, half_msg.properties))
  77 +
  78 + a = int(half_msg.get_property("a"))
  79 + try:
  80 + if a == 1:
  81 + # 确认提交事务消息
  82 + self.trans_producer.commit(half_msg.receipt_handle)
  83 + self.count += 1
  84 + print("------>commit")
  85 + elif a == 2 and half_msg.consumed_times > 1:
  86 + # 确认提交事务消息
  87 + self.trans_producer.commit(half_msg.receipt_handle)
  88 + self.count += 1
  89 + print("------>commit")
  90 + elif a == 3:
  91 + # 确认回滚事务消息
  92 + self.trans_producer.rollback(half_msg.receipt_handle)
  93 + self.count += 1
  94 + print("------>rollback")
  95 + else:
  96 + # 什么都不做,下次再检查
  97 + print("------>unknown")
  98 + except MQExceptionBase as rec_commit_roll_e:
  99 + process_trans_error(rec_commit_roll_e)
  100 + except MQExceptionBase as half_e:
  101 + if half_e.type == "MessageNotExist":
  102 + print("No half message! RequestId: %s" % half_e.req_id)
  103 + continue
  104 +
  105 + print("Consume half message Fail! Exception:%s\n" % half_e)
  106 + break
  107 +
  108 +
  109 +consume_half_thread = ConsumeHalfMessageThread()
  110 +consume_half_thread.setDaemon(True)
  111 +consume_half_thread.start()
  112 +
  113 +try:
  114 + trans_producer = mq_client.get_trans_producer(instance_id, topic_name, group_id)
  115 + for i in range(msg_count):
  116 + msg = TopicMessage(
  117 + # 消息内容
  118 + "I am test message %s." % i,
  119 + # 消息标签
  120 + ""
  121 + )
  122 + # 设置属性
  123 + msg.put_property("a", i)
  124 + # 设置KEY
  125 + msg.set_message_key("MessageKey")
  126 + # 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  127 + # 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  128 + msg.set_trans_check_immunity_time(10)
  129 + re_msg = trans_producer.publish_message(msg)
  130 + print("Publish Transaction Message Succeed. MessageID:%s, BodyMD5:%s, Handle:%s" \
  131 + % (re_msg.message_id, re_msg.message_body_md5, re_msg.receipt_handle))
  132 + time.sleep(1)
  133 + if i == 0:
  134 + # 发送完事务消息后能获取到半消息句柄,可以直接commit / rollback事务消息
  135 + try:
  136 + trans_producer.commit(re_msg.receipt_handle)
  137 + except MQExceptionBase as pub_commit_roll_e:
  138 + process_trans_error(pub_commit_roll_e)
  139 +
  140 +except MQExceptionBase as pub_e:
  141 + if pub_e.type == "TopicNotExist":
  142 + print("Topic not exist, please create it.")
  143 + sys.exit(1)
  144 + print("Publish Message Fail. Exception:%s" % pub_e)
  145 +
  146 +while 1:
  147 + if not consume_half_thread.isAlive():
  148 + break
  149 + time.sleep(1)
... ...