Hot observable with multiple subscribers without loosing any event












0















I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.



The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.



It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.



Is it possible?



Edit: Two approaches and its problems are illustrated below.



1)



public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


Problem: The combined subscriber is missing values from two observables because of the delay in subscription



2)



 public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;

}).Publish();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));

obs1.Connect();
obs2.Connect();

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.



I need full life cycle control of publisher and subscriber










share|improve this question




















  • 1





    "stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

    – Enigmativity
    Nov 16 '18 at 7:05











  • Fair point. Addded code

    – Jimmy
    Nov 16 '18 at 10:44






  • 1





    To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

    – Felix Keil
    Nov 16 '18 at 11:12






  • 1





    Usage of Replay extension method suggested. No time for full answer

    – Felix Keil
    Nov 16 '18 at 11:26











  • Felix, yes the connect functions return disposable was the key

    – Jimmy
    Nov 16 '18 at 19:12
















0















I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.



The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.



It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.



Is it possible?



Edit: Two approaches and its problems are illustrated below.



1)



public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


Problem: The combined subscriber is missing values from two observables because of the delay in subscription



2)



 public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;

}).Publish();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));

obs1.Connect();
obs2.Connect();

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.



I need full life cycle control of publisher and subscriber










share|improve this question




















  • 1





    "stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

    – Enigmativity
    Nov 16 '18 at 7:05











  • Fair point. Addded code

    – Jimmy
    Nov 16 '18 at 10:44






  • 1





    To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

    – Felix Keil
    Nov 16 '18 at 11:12






  • 1





    Usage of Replay extension method suggested. No time for full answer

    – Felix Keil
    Nov 16 '18 at 11:26











  • Felix, yes the connect functions return disposable was the key

    – Jimmy
    Nov 16 '18 at 19:12














0












0








0








I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.



The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.



It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.



Is it possible?



Edit: Two approaches and its problems are illustrated below.



1)



public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


Problem: The combined subscriber is missing values from two observables because of the delay in subscription



2)



 public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;

}).Publish();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));

obs1.Connect();
obs2.Connect();

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.



I need full life cycle control of publisher and subscriber










share|improve this question
















I need to have a hot observable which wraps a price feed. This is subscribed in multiple areas.



The observable is created using Refcount and passed around for subscription.The first subscriber calls subscribe hence starts the stream and get all the events. The second will miss the events until it's subscription and subsequent subscriptions will do the same behaviour.



It is not the missing of events which is my issue. I want all subscribers to get the same data. That is, the stream must start only when the subscription requests are finished.



Is it possible?



Edit: Two approaches and its problems are illustrated below.



1)



public void HotObservableSubscriptionWithRefCount()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish().RefCount();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value {x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2
value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x => Console.WriteLine($@"combined
subscriber value {x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


Problem: The combined subscriber is missing values from two observables because of the delay in subscription



2)



 public void HotObservableSubscriptionWithPublish()
{
var obs1 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x;
Console.WriteLine($@"observer1 publishing {publishVal}");
return publishVal;

}).Publish();

var obs2 = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(x =>
{
var publishVal = x + 100;
Console.WriteLine($@"observer2 publishing {publishVal}");
return publishVal;
}).Publish();

var sub1 = obs1.Subscribe(x => Console.WriteLine($@"subscriber1 value
{x}"));
var sub2 = obs2.Subscribe(x => Console.WriteLine($@"subscriber2 value
{x}"));

Thread.Sleep(TimeSpan.FromSeconds(1));

var combinedSub = obs1.Merge(obs2).Subscribe(x =>
Console.WriteLine($@"combined subscriber value {x}"));

obs1.Connect();
obs2.Connect();

Thread.Sleep(TimeSpan.FromSeconds(1));

sub1.Dispose();
sub2.Dispose();
combinedSub.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(1));
}


This will make sure combinedsubsciber will get values in line with any individual subscribers. However even the subscribers are disposed of, the observer still continue provide values.



I need full life cycle control of publisher and subscriber







c# system.reactive






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 16 '18 at 10:44







Jimmy

















asked Nov 15 '18 at 19:29









JimmyJimmy

1,71721938




1,71721938








  • 1





    "stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

    – Enigmativity
    Nov 16 '18 at 7:05











  • Fair point. Addded code

    – Jimmy
    Nov 16 '18 at 10:44






  • 1





    To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

    – Felix Keil
    Nov 16 '18 at 11:12






  • 1





    Usage of Replay extension method suggested. No time for full answer

    – Felix Keil
    Nov 16 '18 at 11:26











  • Felix, yes the connect functions return disposable was the key

    – Jimmy
    Nov 16 '18 at 19:12














  • 1





    "stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

    – Enigmativity
    Nov 16 '18 at 7:05











  • Fair point. Addded code

    – Jimmy
    Nov 16 '18 at 10:44






  • 1





    To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

    – Felix Keil
    Nov 16 '18 at 11:12






  • 1





    Usage of Replay extension method suggested. No time for full answer

    – Felix Keil
    Nov 16 '18 at 11:26











  • Felix, yes the connect functions return disposable was the key

    – Jimmy
    Nov 16 '18 at 19:12








1




1





"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

– Enigmativity
Nov 16 '18 at 7:05





"stream must start only when the subscriptions are finished" doesn't make sense. Subscriptions don't finish. At best they can be cancelled. Can you please provide a Minimal, Complete, and Verifiable example rather than a vague description?

– Enigmativity
Nov 16 '18 at 7:05













Fair point. Addded code

– Jimmy
Nov 16 '18 at 10:44





Fair point. Addded code

– Jimmy
Nov 16 '18 at 10:44




1




1





To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

– Felix Keil
Nov 16 '18 at 11:12





To second sample: I think disposing the connection should stop the observable from producing values. var conn1 = obs1.Connect() and then conn1.Dispose()

– Felix Keil
Nov 16 '18 at 11:12




1




1





Usage of Replay extension method suggested. No time for full answer

– Felix Keil
Nov 16 '18 at 11:26





Usage of Replay extension method suggested. No time for full answer

– Felix Keil
Nov 16 '18 at 11:26













Felix, yes the connect functions return disposable was the key

– Jimmy
Nov 16 '18 at 19:12





Felix, yes the connect functions return disposable was the key

– Jimmy
Nov 16 '18 at 19:12












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53326679%2fhot-observable-with-multiple-subscribers-without-loosing-any-event%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53326679%2fhot-observable-with-multiple-subscribers-without-loosing-any-event%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

List item for chat from Array inside array React Native

Thiostrepton

Caerphilly