Make go routine wait until result from rabbitMQ is sent











up vote
1
down vote

favorite
1












I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() {
requests := int{3, 4, 5, 6, 7}
var wg sync.WaitGroup
wg.Add(1)
resArr := string{}
go func() {
for _, r := range requests {
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
}
defer wg.Done()
}()
wg.Wait()

log.Println("Result", resArr)
}


rabbitSend method:



func rabbitSend(queueName string, msg int) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: byte(body),
})
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
}


rabbitReceive method:



func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
close(resCh)
}()
return resCh
}


Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks before :)










share|improve this question
























  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 at 8:21















up vote
1
down vote

favorite
1












I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() {
requests := int{3, 4, 5, 6, 7}
var wg sync.WaitGroup
wg.Add(1)
resArr := string{}
go func() {
for _, r := range requests {
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
}
defer wg.Done()
}()
wg.Wait()

log.Println("Result", resArr)
}


rabbitSend method:



func rabbitSend(queueName string, msg int) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: byte(body),
})
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
}


rabbitReceive method:



func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
close(resCh)
}()
return resCh
}


Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks before :)










share|improve this question
























  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 at 8:21













up vote
1
down vote

favorite
1









up vote
1
down vote

favorite
1






1





I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() {
requests := int{3, 4, 5, 6, 7}
var wg sync.WaitGroup
wg.Add(1)
resArr := string{}
go func() {
for _, r := range requests {
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
}
defer wg.Done()
}()
wg.Wait()

log.Println("Result", resArr)
}


rabbitSend method:



func rabbitSend(queueName string, msg int) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: byte(body),
})
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
}


rabbitReceive method:



func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
close(resCh)
}()
return resCh
}


Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks before :)










share|improve this question















I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() {
requests := int{3, 4, 5, 6, 7}
var wg sync.WaitGroup
wg.Add(1)
resArr := string{}
go func() {
for _, r := range requests {
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
}
defer wg.Done()
}()
wg.Wait()

log.Println("Result", resArr)
}


rabbitSend method:



func rabbitSend(queueName string, msg int) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: byte(body),
})
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
}


rabbitReceive method:



func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
close(resCh)
}()
return resCh
}


Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks before :)







go rabbitmq synchronization goroutine






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 11 at 23:39

























asked Nov 11 at 22:17









Kris MP

513920




513920












  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 at 8:21


















  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 at 8:21
















here, msgs, err := ch.Consume( what is the type of msgs?
– nightfury1204
Nov 12 at 6:08




here, msgs, err := ch.Consume( what is the type of msgs?
– nightfury1204
Nov 12 at 6:08












@nightfury1204 it's <-chan Delivery
– Kris MP
Nov 12 at 8:21




@nightfury1204 it's <-chan Delivery
– Kris MP
Nov 12 at 8:21












2 Answers
2






active

oldest

votes

















up vote
2
down vote



accepted










Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
}()
return resCh
}


The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer





















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 at 4:18


















up vote
0
down vote













following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
conn.Close()
ch.Close()
close(resCh)
}()





share|improve this answer





















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 at 4:22












  • I c, thank you for the explanation
    – Kris MP
    Nov 13 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 at 13:29











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
2
down vote



accepted










Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
}()
return resCh
}


The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer





















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 at 4:18















up vote
2
down vote



accepted










Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
}()
return resCh
}


The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer





















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 at 4:18













up vote
2
down vote



accepted







up vote
2
down vote



accepted






Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
}()
return resCh
}


The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer












Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func() {
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
}()
return resCh
}


The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 12 at 9:25









nightfury1204

1,41248




1,41248












  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 at 4:18


















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 at 4:18
















thank you for your help, it works now :)
– Kris MP
Nov 13 at 4:18




thank you for your help, it works now :)
– Kris MP
Nov 13 at 4:18












up vote
0
down vote













following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
conn.Close()
ch.Close()
close(resCh)
}()





share|improve this answer





















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 at 4:22












  • I c, thank you for the explanation
    – Kris MP
    Nov 13 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 at 13:29















up vote
0
down vote













following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
conn.Close()
ch.Close()
close(resCh)
}()





share|improve this answer





















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 at 4:22












  • I c, thank you for the explanation
    – Kris MP
    Nov 13 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 at 13:29













up vote
0
down vote










up vote
0
down vote









following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
conn.Close()
ch.Close()
close(resCh)
}()





share|improve this answer












following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() {
for d := range msgs {
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
}
conn.Close()
ch.Close()
close(resCh)
}()






share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 12 at 10:22









eladm26

387314




387314












  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 at 4:22












  • I c, thank you for the explanation
    – Kris MP
    Nov 13 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 at 13:29


















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 at 4:22












  • I c, thank you for the explanation
    – Kris MP
    Nov 13 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 at 13:29
















Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 at 4:21




Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 at 4:21




1




1




msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 at 4:22






msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 at 4:22














I c, thank you for the explanation
– Kris MP
Nov 13 at 4:27




I c, thank you for the explanation
– Kris MP
Nov 13 at 4:27












@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 at 13:29




@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 at 13:29


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Bressuire

Vorschmack

Quarantine