Hot observable with multiple subscribers without loosing any event
I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.
The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.
It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.
Is it possible?
Edit: Two approaches and its problems are illustrated below.
1)
public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Problem: The combined subscriber is missing values from two observables because of the delay in subscription
2)
public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));
obs1.Connect();
obs2.Connect();
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.
I need full life cycle control of publisher and subscriber
c# system.reactive
|
show 2 more comments
I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.
The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.
It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.
Is it possible?
Edit: Two approaches and its problems are illustrated below.
1)
public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Problem: The combined subscriber is missing values from two observables because of the delay in subscription
2)
public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));
obs1.Connect();
obs2.Connect();
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.
I need full life cycle control of publisher and subscriber
c# system.reactive
1
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
1
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
1
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12
|
show 2 more comments
I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.
The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.
It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.
Is it possible?
Edit: Two approaches and its problems are illustrated below.
1)
public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Problem: The combined subscriber is missing values from two observables because of the delay in subscription
2)
public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));
obs1.Connect();
obs2.Connect();
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.
I need full life cycle control of publisher and subscriber
c# system.reactive
I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.
The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.
It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.
Is it possible?
Edit: Two approaches and its problems are illustrated below.
1)
public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Problem: The combined subscriber is missing values from two observables because of the delay in subscription
2)
public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish();
var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();
var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));
Thread.Sleep(TimeSpan.FromSeconds(1));
var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));
obs1.Connect();
obs2.Connect();
Thread.Sleep(TimeSpan.FromSeconds(1));
sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();
Thread.Sleep(TimeSpan.FromSeconds(1));
}
This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.
I need full life cycle control of publisher and subscriber
c# system.reactive
c# system.reactive
edited Nov 16 '18 at 10:44
Jimmy
asked Nov 15 '18 at 19:29
JimmyJimmy
1,71721938
1,71721938
1
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
1
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
1
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12
|
show 2 more comments
1
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
1
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
1
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12
1
1
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
1
1
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
1
1
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12
|
show 2 more comments
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53326679%2fhot-observable-with-multiple-subscribers-without-loosing-any-event%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53326679%2fhot-observable-with-multiple-subscribers-without-loosing-any-event%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?
– Enigmativity
Nov 16 '18 at 7:05
Fair point. Addded code
– Jimmy
Nov 16 '18 at 10:44
1
To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()
– Felix Keil
Nov 16 '18 at 11:12
1
Usage of Replay extension method suggested. No time for full answer
– Felix Keil
Nov 16 '18 at 11:26
Felix, yes the connect functions return disposable was the key
– Jimmy
Nov 16 '18 at 19:12