Conditional statement Python Apache Beam pipeline












0















Current situation



The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false



 with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))

geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))



def GeoDataIngestion(string_input):
<...>
return True or False


Desirable situation 1



If the GeoDataIngestion result is true, then the raw_data will be stored in big query



geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)

def Condition(condition):
if condition:
<...WriteToBigQuery...>


#The class I used before to store raw_data without depending on evaluate condition:

class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


Desirable situation 2



Instead of store the data in BigQuery, it would be also good to send to pub/sub



def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>


Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline



 | beam.io.WriteStringsToPubSub(TOPIC)


Neither in a function/class



Question



How can I do that?



How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?










share|improve this question





























    0















    Current situation



    The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false



     with beam.Pipeline(options=pipeline_options) as p:
    raw_data = (p
    | 'Read from PubSub' >> beam.io.ReadFromPubSub(
    subscription='projects/XXX/subscriptions/YYY'))

    geo_data = (raw_data
    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))



    def GeoDataIngestion(string_input):
    <...>
    return True or False


    Desirable situation 1



    If the GeoDataIngestion result is true, then the raw_data will be stored in big query



    geo_data = (raw_data
    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
    | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
    )

    def Condition(condition):
    if condition:
    <...WriteToBigQuery...>


    #The class I used before to store raw_data without depending on evaluate condition:

    class WriteToBigQuery(beam.PTransform):
    def expand(self, pcoll):
    return (
    pcoll
    | 'Format' >> beam.ParDo(FormatBigQueryFn())
    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
    'XXX',
    schema=TABLE_SCHEMA,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


    Desirable situation 2



    Instead of store the data in BigQuery, it would be also good to send to pub/sub



    def Condition(condition):
    if condition:
    <...SendToPubSub(Topic1)...>
    else:
    <...SendToPubSub(Topic2)...>


    Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline



     | beam.io.WriteStringsToPubSub(TOPIC)


    Neither in a function/class



    Question



    How can I do that?



    How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?










    share|improve this question



























      0












      0








      0








      Current situation



      The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false



       with beam.Pipeline(options=pipeline_options) as p:
      raw_data = (p
      | 'Read from PubSub' >> beam.io.ReadFromPubSub(
      subscription='projects/XXX/subscriptions/YYY'))

      geo_data = (raw_data
      | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))



      def GeoDataIngestion(string_input):
      <...>
      return True or False


      Desirable situation 1



      If the GeoDataIngestion result is true, then the raw_data will be stored in big query



      geo_data = (raw_data
      | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
      | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
      )

      def Condition(condition):
      if condition:
      <...WriteToBigQuery...>


      #The class I used before to store raw_data without depending on evaluate condition:

      class WriteToBigQuery(beam.PTransform):
      def expand(self, pcoll):
      return (
      pcoll
      | 'Format' >> beam.ParDo(FormatBigQueryFn())
      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
      'XXX',
      schema=TABLE_SCHEMA,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


      Desirable situation 2



      Instead of store the data in BigQuery, it would be also good to send to pub/sub



      def Condition(condition):
      if condition:
      <...SendToPubSub(Topic1)...>
      else:
      <...SendToPubSub(Topic2)...>


      Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline



       | beam.io.WriteStringsToPubSub(TOPIC)


      Neither in a function/class



      Question



      How can I do that?



      How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?










      share|improve this question
















      Current situation



      The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false



       with beam.Pipeline(options=pipeline_options) as p:
      raw_data = (p
      | 'Read from PubSub' >> beam.io.ReadFromPubSub(
      subscription='projects/XXX/subscriptions/YYY'))

      geo_data = (raw_data
      | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))



      def GeoDataIngestion(string_input):
      <...>
      return True or False


      Desirable situation 1



      If the GeoDataIngestion result is true, then the raw_data will be stored in big query



      geo_data = (raw_data
      | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
      | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
      )

      def Condition(condition):
      if condition:
      <...WriteToBigQuery...>


      #The class I used before to store raw_data without depending on evaluate condition:

      class WriteToBigQuery(beam.PTransform):
      def expand(self, pcoll):
      return (
      pcoll
      | 'Format' >> beam.ParDo(FormatBigQueryFn())
      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
      'XXX',
      schema=TABLE_SCHEMA,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


      Desirable situation 2



      Instead of store the data in BigQuery, it would be also good to send to pub/sub



      def Condition(condition):
      if condition:
      <...SendToPubSub(Topic1)...>
      else:
      <...SendToPubSub(Topic2)...>


      Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline



       | beam.io.WriteStringsToPubSub(TOPIC)


      Neither in a function/class



      Question



      How can I do that?



      How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?







      python google-cloud-dataflow apache-beam






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 14 '18 at 13:59







      IoT user

















      asked Nov 13 '18 at 16:44









      IoT userIoT user

      11312




      11312
























          1 Answer
          1






          active

          oldest

          votes


















          1














          I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.



          To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.



          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2'


          The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.



          import apache_beam as beam
          from apache_beam import pvalue
          import sys

          class Split(beam.DoFn):

          # These tags will be used to tag the outputs of this DoFn.
          OUTPUT_TAG_BQ = 'BigQuery'
          OUTPUT_TAG_PS1 = 'pubsub topic1'
          OUTPUT_TAG_PS2 = 'pubsub topic2'

          def process(self, element):
          """
          tags the input as it processes the orignal PCollection
          """
          print element
          if "BigQuery" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
          print 'found bq'
          elif "pubsub topic1" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
          elif "pubsub topic2" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


          if __name__ == '__main__':
          output_prefix = 'C:\pythonVirtual\Mycodes\output'
          p = beam.Pipeline(argv=sys.argv)
          lines = (p
          | beam.Create([
          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2']))

          # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
          tagged_lines_result = (lines
          | beam.ParDo(Split()).with_outputs(
          Split.OUTPUT_TAG_BQ,
          Split.OUTPUT_TAG_PS1,
          Split.OUTPUT_TAG_PS2))

          # tagged_lines_result is an object of type DoOutputsTuple. It supports
          # accessing result in alternative ways.
          bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
          ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
          ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

          p.run().wait_until_finish()


          Please let me know if that helps.






          share|improve this answer
























          • Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

            – IoT user
            Nov 19 '18 at 8:27











          • Thanks. Keen to know about your findings.

            – Tanveer Uddin
            Nov 19 '18 at 10:11











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53285726%2fconditional-statement-python-apache-beam-pipeline%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.



          To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.



          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2'


          The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.



          import apache_beam as beam
          from apache_beam import pvalue
          import sys

          class Split(beam.DoFn):

          # These tags will be used to tag the outputs of this DoFn.
          OUTPUT_TAG_BQ = 'BigQuery'
          OUTPUT_TAG_PS1 = 'pubsub topic1'
          OUTPUT_TAG_PS2 = 'pubsub topic2'

          def process(self, element):
          """
          tags the input as it processes the orignal PCollection
          """
          print element
          if "BigQuery" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
          print 'found bq'
          elif "pubsub topic1" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
          elif "pubsub topic2" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


          if __name__ == '__main__':
          output_prefix = 'C:\pythonVirtual\Mycodes\output'
          p = beam.Pipeline(argv=sys.argv)
          lines = (p
          | beam.Create([
          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2']))

          # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
          tagged_lines_result = (lines
          | beam.ParDo(Split()).with_outputs(
          Split.OUTPUT_TAG_BQ,
          Split.OUTPUT_TAG_PS1,
          Split.OUTPUT_TAG_PS2))

          # tagged_lines_result is an object of type DoOutputsTuple. It supports
          # accessing result in alternative ways.
          bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
          ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
          ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

          p.run().wait_until_finish()


          Please let me know if that helps.






          share|improve this answer
























          • Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

            – IoT user
            Nov 19 '18 at 8:27











          • Thanks. Keen to know about your findings.

            – Tanveer Uddin
            Nov 19 '18 at 10:11
















          1














          I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.



          To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.



          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2'


          The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.



          import apache_beam as beam
          from apache_beam import pvalue
          import sys

          class Split(beam.DoFn):

          # These tags will be used to tag the outputs of this DoFn.
          OUTPUT_TAG_BQ = 'BigQuery'
          OUTPUT_TAG_PS1 = 'pubsub topic1'
          OUTPUT_TAG_PS2 = 'pubsub topic2'

          def process(self, element):
          """
          tags the input as it processes the orignal PCollection
          """
          print element
          if "BigQuery" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
          print 'found bq'
          elif "pubsub topic1" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
          elif "pubsub topic2" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


          if __name__ == '__main__':
          output_prefix = 'C:\pythonVirtual\Mycodes\output'
          p = beam.Pipeline(argv=sys.argv)
          lines = (p
          | beam.Create([
          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2']))

          # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
          tagged_lines_result = (lines
          | beam.ParDo(Split()).with_outputs(
          Split.OUTPUT_TAG_BQ,
          Split.OUTPUT_TAG_PS1,
          Split.OUTPUT_TAG_PS2))

          # tagged_lines_result is an object of type DoOutputsTuple. It supports
          # accessing result in alternative ways.
          bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
          ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
          ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

          p.run().wait_until_finish()


          Please let me know if that helps.






          share|improve this answer
























          • Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

            – IoT user
            Nov 19 '18 at 8:27











          • Thanks. Keen to know about your findings.

            – Tanveer Uddin
            Nov 19 '18 at 10:11














          1












          1








          1







          I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.



          To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.



          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2'


          The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.



          import apache_beam as beam
          from apache_beam import pvalue
          import sys

          class Split(beam.DoFn):

          # These tags will be used to tag the outputs of this DoFn.
          OUTPUT_TAG_BQ = 'BigQuery'
          OUTPUT_TAG_PS1 = 'pubsub topic1'
          OUTPUT_TAG_PS2 = 'pubsub topic2'

          def process(self, element):
          """
          tags the input as it processes the orignal PCollection
          """
          print element
          if "BigQuery" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
          print 'found bq'
          elif "pubsub topic1" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
          elif "pubsub topic2" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


          if __name__ == '__main__':
          output_prefix = 'C:\pythonVirtual\Mycodes\output'
          p = beam.Pipeline(argv=sys.argv)
          lines = (p
          | beam.Create([
          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2']))

          # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
          tagged_lines_result = (lines
          | beam.ParDo(Split()).with_outputs(
          Split.OUTPUT_TAG_BQ,
          Split.OUTPUT_TAG_PS1,
          Split.OUTPUT_TAG_PS2))

          # tagged_lines_result is an object of type DoOutputsTuple. It supports
          # accessing result in alternative ways.
          bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
          ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
          ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

          p.run().wait_until_finish()


          Please let me know if that helps.






          share|improve this answer













          I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.



          To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.



          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2'


          The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.



          import apache_beam as beam
          from apache_beam import pvalue
          import sys

          class Split(beam.DoFn):

          # These tags will be used to tag the outputs of this DoFn.
          OUTPUT_TAG_BQ = 'BigQuery'
          OUTPUT_TAG_PS1 = 'pubsub topic1'
          OUTPUT_TAG_PS2 = 'pubsub topic2'

          def process(self, element):
          """
          tags the input as it processes the orignal PCollection
          """
          print element
          if "BigQuery" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
          print 'found bq'
          elif "pubsub topic1" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
          elif "pubsub topic2" in element:
          yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


          if __name__ == '__main__':
          output_prefix = 'C:\pythonVirtual\Mycodes\output'
          p = beam.Pipeline(argv=sys.argv)
          lines = (p
          | beam.Create([
          'this line is for BigQuery',
          'this line for pubsub topic1',
          'this line for pubsub topic2']))

          # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
          tagged_lines_result = (lines
          | beam.ParDo(Split()).with_outputs(
          Split.OUTPUT_TAG_BQ,
          Split.OUTPUT_TAG_PS1,
          Split.OUTPUT_TAG_PS2))

          # tagged_lines_result is an object of type DoOutputsTuple. It supports
          # accessing result in alternative ways.
          bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
          ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
          ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

          p.run().wait_until_finish()


          Please let me know if that helps.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 17 '18 at 21:54









          Tanveer UddinTanveer Uddin

          53719




          53719













          • Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

            – IoT user
            Nov 19 '18 at 8:27











          • Thanks. Keen to know about your findings.

            – Tanveer Uddin
            Nov 19 '18 at 10:11



















          • Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

            – IoT user
            Nov 19 '18 at 8:27











          • Thanks. Keen to know about your findings.

            – Tanveer Uddin
            Nov 19 '18 at 10:11

















          Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

          – IoT user
          Nov 19 '18 at 8:27





          Thanks, I will try. For now, I've done with if value: yield Payload. In the begging I had return Payload in my def Condition(condition): so when the condition was false and it return none, the program crash

          – IoT user
          Nov 19 '18 at 8:27













          Thanks. Keen to know about your findings.

          – Tanveer Uddin
          Nov 19 '18 at 10:11





          Thanks. Keen to know about your findings.

          – Tanveer Uddin
          Nov 19 '18 at 10:11


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53285726%2fconditional-statement-python-apache-beam-pipeline%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Xamarin.iOS Cant Deploy on Iphone

          Glorious Revolution

          Dulmage-Mendelsohn matrix decomposition in Python