Create Flux from messages on SQS queue
I'm trying to create a Flux
from incoming messages received from a queue.
For instance, if I'm using Amazon SQS how do I achieve to write the following code:
Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);
messages.map(s -> log.info("message: {}", s).subscribe();
After experimentation, I found the following issues:
- How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?
- How do I make the
Flux
cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.
First pass over this problem yielded something like the following code as per Reactor documentation:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next()
on each message received.
project-reactor
add a comment |
I'm trying to create a Flux
from incoming messages received from a queue.
For instance, if I'm using Amazon SQS how do I achieve to write the following code:
Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);
messages.map(s -> log.info("message: {}", s).subscribe();
After experimentation, I found the following issues:
- How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?
- How do I make the
Flux
cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.
First pass over this problem yielded something like the following code as per Reactor documentation:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next()
on each message received.
project-reactor
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28
add a comment |
I'm trying to create a Flux
from incoming messages received from a queue.
For instance, if I'm using Amazon SQS how do I achieve to write the following code:
Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);
messages.map(s -> log.info("message: {}", s).subscribe();
After experimentation, I found the following issues:
- How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?
- How do I make the
Flux
cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.
First pass over this problem yielded something like the following code as per Reactor documentation:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next()
on each message received.
project-reactor
I'm trying to create a Flux
from incoming messages received from a queue.
For instance, if I'm using Amazon SQS how do I achieve to write the following code:
Flux<String> messages = connectionToSQS.receiveFromQueue(queueName);
messages.map(s -> log.info("message: {}", s).subscribe();
After experimentation, I found the following issues:
- How do I keep requesting messages from the queue (loop forever)? Do I create one thread that has a loop that keeps on requesting from the queue?
- How do I make the
Flux
cold? I don't want to request messages from SQS unless the consumer asks for it. This allows me to use backpressure.
First pass over this problem yielded something like the following code as per Reactor documentation:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
The idea being to create a single thread that keeps on requesting for messages in a loop and then using an observer pattern like above to do a next()
on each message received.
project-reactor
project-reactor
asked Nov 15 '18 at 2:19
fizixxfizixx
5118
5118
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28
add a comment |
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311508%2fcreate-flux-from-messages-on-sqs-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53311508%2fcreate-flux-from-messages-on-sqs-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Typically sqs queue readers loop on some condition. In your case you might loop until no messages are available and then break out of the loop. You'll probably want to pull as many messages as you can process in a reasonable time frame to reduce cost and latency. You'll also want to set the max wait to be small if you'd prefer ending the loop to waiting for late arriving messages. Finally, don't forget to delete the messages once you've proceeded them.
– Dan Farrell
Nov 15 '18 at 2:28