LiveDataReactiveStreams: converting Flowable to LiveData doesn't work












1















I'm trying to convert a Flowable into LiveData and observing it in the activity. My Flowable is emitting values with a constant delay ,
but the LiveData which i'm converting this Flowable to, is not at all receiving any values in its observer. I have created a sample code to
demonstrate the problem



Activity



    public class MyrActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
MyViewModel myViewModel = ViewModelProviders.of(this).get(MyViewModel.class);
myViewModel.init();
myViewModel.getListLiveData().observe(this, new Observer<List<String>>() {
@Override
public void onChanged(@Nullable List<String> strings) {
Timber.d("value received in live data observer: %s", strings);
// This callback never get called
for (String string : strings) {
Timber.d(string);
}
}
});

}
}


ViewModel class



     static class MyViewModel extends ViewModel{
LiveData<List<String>> mListLiveData;
PublishProcessor<String> mStringPublishProcessor = PublishProcessor.create();

public void init() {
mListLiveData = LiveDataReactiveStreams.fromPublisher(mStringPublishProcessor.toList().toFlowable());

// This is to trigger the mStringPublishProcessor on constant intervals
Observable.interval(0,5,TimeUnit.SECONDS)
.map(aLong -> {
Timber.d("value emitted: "); // this log is showing as expected
mStringPublishProcessor.onNext("Value "+aLong);
return aLong;
}).subscribe();
}

public LiveData<List<String>> getListLiveData() {
return mListLiveData;
}
}


Now , in my activity I can see only the logs from the Observable.interval



     com.example.app D/MyActivity$MyViewModel: value emitted: 
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:


Why the LiveData observer never received any values from the Flowable?



as per the documentation of LiveDataReactiveStreams.fromPublisher




Creates an Observable stream from a ReactiveStreams publisher.
When the LiveData becomes active, it subscribes to the emissions from the Publisher.











share|improve this question

























  • if this is api call , why we have to use this instead of normaly using setValue when onNext called

    – Nguyen Hoà
    Mar 18 at 3:05











  • LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

    – Sarath Kn
    Mar 18 at 9:02











  • do you have example , iam still confuse with this , tkanks

    – Nguyen Hoà
    Mar 21 at 9:35











  • I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

    – Sarath Kn
    Mar 21 at 9:37











  • Please refer the Documentation for more details

    – Sarath Kn
    Mar 21 at 9:38


















1















I'm trying to convert a Flowable into LiveData and observing it in the activity. My Flowable is emitting values with a constant delay ,
but the LiveData which i'm converting this Flowable to, is not at all receiving any values in its observer. I have created a sample code to
demonstrate the problem



Activity



    public class MyrActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
MyViewModel myViewModel = ViewModelProviders.of(this).get(MyViewModel.class);
myViewModel.init();
myViewModel.getListLiveData().observe(this, new Observer<List<String>>() {
@Override
public void onChanged(@Nullable List<String> strings) {
Timber.d("value received in live data observer: %s", strings);
// This callback never get called
for (String string : strings) {
Timber.d(string);
}
}
});

}
}


ViewModel class



     static class MyViewModel extends ViewModel{
LiveData<List<String>> mListLiveData;
PublishProcessor<String> mStringPublishProcessor = PublishProcessor.create();

public void init() {
mListLiveData = LiveDataReactiveStreams.fromPublisher(mStringPublishProcessor.toList().toFlowable());

// This is to trigger the mStringPublishProcessor on constant intervals
Observable.interval(0,5,TimeUnit.SECONDS)
.map(aLong -> {
Timber.d("value emitted: "); // this log is showing as expected
mStringPublishProcessor.onNext("Value "+aLong);
return aLong;
}).subscribe();
}

public LiveData<List<String>> getListLiveData() {
return mListLiveData;
}
}


Now , in my activity I can see only the logs from the Observable.interval



     com.example.app D/MyActivity$MyViewModel: value emitted: 
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:


Why the LiveData observer never received any values from the Flowable?



as per the documentation of LiveDataReactiveStreams.fromPublisher




Creates an Observable stream from a ReactiveStreams publisher.
When the LiveData becomes active, it subscribes to the emissions from the Publisher.











share|improve this question

























  • if this is api call , why we have to use this instead of normaly using setValue when onNext called

    – Nguyen Hoà
    Mar 18 at 3:05











  • LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

    – Sarath Kn
    Mar 18 at 9:02











  • do you have example , iam still confuse with this , tkanks

    – Nguyen Hoà
    Mar 21 at 9:35











  • I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

    – Sarath Kn
    Mar 21 at 9:37











  • Please refer the Documentation for more details

    – Sarath Kn
    Mar 21 at 9:38
















1












1








1


1






I'm trying to convert a Flowable into LiveData and observing it in the activity. My Flowable is emitting values with a constant delay ,
but the LiveData which i'm converting this Flowable to, is not at all receiving any values in its observer. I have created a sample code to
demonstrate the problem



Activity



    public class MyrActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
MyViewModel myViewModel = ViewModelProviders.of(this).get(MyViewModel.class);
myViewModel.init();
myViewModel.getListLiveData().observe(this, new Observer<List<String>>() {
@Override
public void onChanged(@Nullable List<String> strings) {
Timber.d("value received in live data observer: %s", strings);
// This callback never get called
for (String string : strings) {
Timber.d(string);
}
}
});

}
}


ViewModel class



     static class MyViewModel extends ViewModel{
LiveData<List<String>> mListLiveData;
PublishProcessor<String> mStringPublishProcessor = PublishProcessor.create();

public void init() {
mListLiveData = LiveDataReactiveStreams.fromPublisher(mStringPublishProcessor.toList().toFlowable());

// This is to trigger the mStringPublishProcessor on constant intervals
Observable.interval(0,5,TimeUnit.SECONDS)
.map(aLong -> {
Timber.d("value emitted: "); // this log is showing as expected
mStringPublishProcessor.onNext("Value "+aLong);
return aLong;
}).subscribe();
}

public LiveData<List<String>> getListLiveData() {
return mListLiveData;
}
}


Now , in my activity I can see only the logs from the Observable.interval



     com.example.app D/MyActivity$MyViewModel: value emitted: 
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:


Why the LiveData observer never received any values from the Flowable?



as per the documentation of LiveDataReactiveStreams.fromPublisher




Creates an Observable stream from a ReactiveStreams publisher.
When the LiveData becomes active, it subscribes to the emissions from the Publisher.











share|improve this question
















I'm trying to convert a Flowable into LiveData and observing it in the activity. My Flowable is emitting values with a constant delay ,
but the LiveData which i'm converting this Flowable to, is not at all receiving any values in its observer. I have created a sample code to
demonstrate the problem



Activity



    public class MyrActivity extends AppCompatActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
MyViewModel myViewModel = ViewModelProviders.of(this).get(MyViewModel.class);
myViewModel.init();
myViewModel.getListLiveData().observe(this, new Observer<List<String>>() {
@Override
public void onChanged(@Nullable List<String> strings) {
Timber.d("value received in live data observer: %s", strings);
// This callback never get called
for (String string : strings) {
Timber.d(string);
}
}
});

}
}


ViewModel class



     static class MyViewModel extends ViewModel{
LiveData<List<String>> mListLiveData;
PublishProcessor<String> mStringPublishProcessor = PublishProcessor.create();

public void init() {
mListLiveData = LiveDataReactiveStreams.fromPublisher(mStringPublishProcessor.toList().toFlowable());

// This is to trigger the mStringPublishProcessor on constant intervals
Observable.interval(0,5,TimeUnit.SECONDS)
.map(aLong -> {
Timber.d("value emitted: "); // this log is showing as expected
mStringPublishProcessor.onNext("Value "+aLong);
return aLong;
}).subscribe();
}

public LiveData<List<String>> getListLiveData() {
return mListLiveData;
}
}


Now , in my activity I can see only the logs from the Observable.interval



     com.example.app D/MyActivity$MyViewModel: value emitted: 
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:
com.example.app D/MyActivity$MyViewModel: value emitted:


Why the LiveData observer never received any values from the Flowable?



as per the documentation of LiveDataReactiveStreams.fromPublisher




Creates an Observable stream from a ReactiveStreams publisher.
When the LiveData becomes active, it subscribes to the emissions from the Publisher.








android rx-java rx-java2 android-architecture-components android-livedata






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 16 '18 at 7:48







Sarath Kn

















asked Nov 16 '18 at 7:40









Sarath KnSarath Kn

1,673919




1,673919













  • if this is api call , why we have to use this instead of normaly using setValue when onNext called

    – Nguyen Hoà
    Mar 18 at 3:05











  • LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

    – Sarath Kn
    Mar 18 at 9:02











  • do you have example , iam still confuse with this , tkanks

    – Nguyen Hoà
    Mar 21 at 9:35











  • I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

    – Sarath Kn
    Mar 21 at 9:37











  • Please refer the Documentation for more details

    – Sarath Kn
    Mar 21 at 9:38





















  • if this is api call , why we have to use this instead of normaly using setValue when onNext called

    – Nguyen Hoà
    Mar 18 at 3:05











  • LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

    – Sarath Kn
    Mar 18 at 9:02











  • do you have example , iam still confuse with this , tkanks

    – Nguyen Hoà
    Mar 21 at 9:35











  • I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

    – Sarath Kn
    Mar 21 at 9:37











  • Please refer the Documentation for more details

    – Sarath Kn
    Mar 21 at 9:38



















if this is api call , why we have to use this instead of normaly using setValue when onNext called

– Nguyen Hoà
Mar 18 at 3:05





if this is api call , why we have to use this instead of normaly using setValue when onNext called

– Nguyen Hoà
Mar 18 at 3:05













LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

– Sarath Kn
Mar 18 at 9:02





LiveDataReactiveStreams.fromPublisher will do the same thing for you. You don't have to subscribe each observable and manually convert it into a LiveData. using setValue (Provided you are handling the error properly since LiveData does NOT handle errors and it expects that errors are treated as states in the data that's held)

– Sarath Kn
Mar 18 at 9:02













do you have example , iam still confuse with this , tkanks

– Nguyen Hoà
Mar 21 at 9:35





do you have example , iam still confuse with this , tkanks

– Nguyen Hoà
Mar 21 at 9:35













I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

– Sarath Kn
Mar 21 at 9:37





I think you have understood it in a wrong way. It's not a replacement for Rx. It's just a utility class which helps to convert an Rx Observable to LiveData and vise versa

– Sarath Kn
Mar 21 at 9:37













Please refer the Documentation for more details

– Sarath Kn
Mar 21 at 9:38







Please refer the Documentation for more details

– Sarath Kn
Mar 21 at 9:38














1 Answer
1






active

oldest

votes


















1














.toList() will map the return only after onComplete() call. In your example, the completion is never called.






share|improve this answer



















  • 1





    Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

    – Sarath Kn
    Nov 16 '18 at 9:04













  • can we use LiveDataReactiveStreams when call api , do you have example for this

    – Nguyen Hoà
    Mar 18 at 2:46











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53333395%2flivedatareactivestreams-converting-flowable-to-livedata-doesnt-work%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














.toList() will map the return only after onComplete() call. In your example, the completion is never called.






share|improve this answer



















  • 1





    Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

    – Sarath Kn
    Nov 16 '18 at 9:04













  • can we use LiveDataReactiveStreams when call api , do you have example for this

    – Nguyen Hoà
    Mar 18 at 2:46
















1














.toList() will map the return only after onComplete() call. In your example, the completion is never called.






share|improve this answer



















  • 1





    Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

    – Sarath Kn
    Nov 16 '18 at 9:04













  • can we use LiveDataReactiveStreams when call api , do you have example for this

    – Nguyen Hoà
    Mar 18 at 2:46














1












1








1







.toList() will map the return only after onComplete() call. In your example, the completion is never called.






share|improve this answer













.toList() will map the return only after onComplete() call. In your example, the completion is never called.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 16 '18 at 8:51









haroldolivieriharoldolivieri

349315




349315








  • 1





    Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

    – Sarath Kn
    Nov 16 '18 at 9:04













  • can we use LiveDataReactiveStreams when call api , do you have example for this

    – Nguyen Hoà
    Mar 18 at 2:46














  • 1





    Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

    – Sarath Kn
    Nov 16 '18 at 9:04













  • can we use LiveDataReactiveStreams when call api , do you have example for this

    – Nguyen Hoà
    Mar 18 at 2:46








1




1





Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

– Sarath Kn
Nov 16 '18 at 9:04







Ohh yea... My source was a hot observable which never calls onComplete and stays active all the time. Didn't notice it. Thank you!!

– Sarath Kn
Nov 16 '18 at 9:04















can we use LiveDataReactiveStreams when call api , do you have example for this

– Nguyen Hoà
Mar 18 at 2:46





can we use LiveDataReactiveStreams when call api , do you have example for this

– Nguyen Hoà
Mar 18 at 2:46




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53333395%2flivedatareactivestreams-converting-flowable-to-livedata-doesnt-work%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Bressuire

Vorschmack

Quarantine