Skip to content

Commit f722007

Browse files
author
Ahmet Halaç
authored
Merge pull request #2 from atlassian/add-channel-id-support-to-owner
check channelId also for job's ownerId equality
2 parents f0ab6fb + 3d84b69 commit f722007

File tree

5 files changed

+9
-4
lines changed

5 files changed

+9
-4
lines changed

queue/job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ func (j *job) Execute() error {
7474
messageAttr := j.sqsMessage().MessageAttributes
7575

7676
if messageAttr == nil ||
77-
*messageAttr[ownerId].StringValue != j.ownerId {
77+
*messageAttr[ownerId].StringValue != j.ownerId &&
78+
*messageAttr[channelId].StringValue != j.ownerId {
7879
j.state = jobError
7980
return errors.Errorf("Message[%s] is invalid, will not be processed.", messageId)
8081
}

queue/job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func TestExecuteWithInvalidQueueMessage(t *testing.T) {
163163
sqsJob := newJobTest()
164164

165165
falseIntegrationId := "falseIntegrationId"
166-
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &falseIntegrationId}}
166+
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &falseIntegrationId}, channelId: {StringValue: &falseIntegrationId}}
167167
sqsJob.message = sqs.Message{MessageAttributes: messageAttr, MessageId: &mockMessageId}
168168

169169
err := sqsJob.Execute()

queue/message_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var (
1919
mockApiKey = "mockApiKey"
2020
mockBaseUrl = "mockBaseUrl"
2121
mockOwnerId = "mockOwnerId"
22+
mockChannelId = "mockChannelId"
2223
)
2324

2425
var mockActionSpecs = conf.ActionSpecifications{

queue/sqs_provider.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
)
1212

1313
const ownerId = "ownerId"
14+
const channelId = "channelId"
1415

1516
type SQSClient interface {
1617
ChangeMessageVisibility(input *sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error)
@@ -111,6 +112,7 @@ func (qp *sqsProvider) ReceiveMessage(maxNumOfMessage int64, visibilityTimeout i
111112
request := &sqs.ReceiveMessageInput{
112113
MessageAttributeNames: []*string{
113114
aws.String(ownerId),
115+
aws.String(channelId),
114116
},
115117
QueueUrl: &queueUrl,
116118
MaxNumberOfMessages: aws.Int64(maxNumOfMessage),

queue/sqs_provider_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ func TestReceiveMessage(t *testing.T) {
109109
assert.Equal(t, mockQueueUrl1, *capturedInput.QueueUrl)
110110
assert.Equal(t, int64(0), *capturedInput.WaitTimeSeconds) // because of short polling
111111
assert.Equal(t, int64(10), *capturedInput.MaxNumberOfMessages)
112-
assert.Equal(t, 1, len(capturedInput.MessageAttributeNames))
112+
assert.Equal(t, 2, len(capturedInput.MessageAttributeNames))
113113
assert.Equal(t, "ownerId", *capturedInput.MessageAttributeNames[0])
114+
assert.Equal(t, "channelId", *capturedInput.MessageAttributeNames[1])
114115
}
115116

116117
func TestReceiveMessageWithError(t *testing.T) {
@@ -233,7 +234,7 @@ var mockSuccessReceiveFunc = func(numOfMessage int64, visibilityTimeout int64) (
233234
messages := make([]*sqs.Message, 0)
234235
for i := int64(0); i < numOfMessage; i++ {
235236
id := strconv.FormatInt(i, 10)
236-
messageAttr := map[string]*sqs.MessageAttributeValue{"ownerId": {StringValue: &mockOwnerId}}
237+
messageAttr := map[string]*sqs.MessageAttributeValue{"ownerId": {StringValue: &mockOwnerId}, "channelId": {StringValue: &mockChannelId}}
237238
messages = append(messages, &sqs.Message{MessageId: &id, MessageAttributes: messageAttr, Body: &body})
238239
}
239240

0 commit comments

Comments
 (0)