Conditional statement Python Apache Beam pipeline
Current situation
The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Desirable situation 1
If the GeoDataIngestion result is true, then the raw_data will be stored in big query
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Desirable situation 2
Instead of store the data in BigQuery, it would be also good to send to pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline
| beam.io.WriteStringsToPubSub(TOPIC)
Neither in a function/class
Question
How can I do that?
How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?
python google-cloud-dataflow apache-beam
add a comment |
Current situation
The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Desirable situation 1
If the GeoDataIngestion result is true, then the raw_data will be stored in big query
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Desirable situation 2
Instead of store the data in BigQuery, it would be also good to send to pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline
| beam.io.WriteStringsToPubSub(TOPIC)
Neither in a function/class
Question
How can I do that?
How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?
python google-cloud-dataflow apache-beam
add a comment |
Current situation
The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Desirable situation 1
If the GeoDataIngestion result is true, then the raw_data will be stored in big query
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Desirable situation 2
Instead of store the data in BigQuery, it would be also good to send to pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline
| beam.io.WriteStringsToPubSub(TOPIC)
Neither in a function/class
Question
How can I do that?
How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?
python google-cloud-dataflow apache-beam
Current situation
The porpouse of this pipeline is to read from pub/sub the payload with geodata, then this data are transformed and analyzed and finally return if a condition is true or false
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Desirable situation 1
If the GeoDataIngestion result is true, then the raw_data will be stored in big query
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Desirable situation 2
Instead of store the data in BigQuery, it would be also good to send to pub/sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Here, the problem is to set the Topic depending of the condition result, because i'm not able to pass the topic like parameter in the pipeline
| beam.io.WriteStringsToPubSub(TOPIC)
Neither in a function/class
Question
How can I do that?
How/where should I call WriteToBigQuery to store the PCollection raw_data if the result of Evaluate condition is true?
python google-cloud-dataflow apache-beam
python google-cloud-dataflow apache-beam
edited Nov 14 '18 at 13:59
IoT user
asked Nov 13 '18 at 16:44
IoT userIoT user
11312
11312
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\pythonVirtual\Mycodes\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
Thanks, I will try. For now, I've done withif value: yield Payload
. In the begging I hadreturn Payload
in mydef Condition(condition):
so when the condition was false and it return none, the program crash
– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53285726%2fconditional-statement-python-apache-beam-pipeline%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\pythonVirtual\Mycodes\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
Thanks, I will try. For now, I've done withif value: yield Payload
. In the begging I hadreturn Payload
in mydef Condition(condition):
so when the condition was false and it return none, the program crash
– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
add a comment |
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\pythonVirtual\Mycodes\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
Thanks, I will try. For now, I've done withif value: yield Payload
. In the begging I hadreturn Payload
in mydef Condition(condition):
so when the condition was false and it return none, the program crash
– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
add a comment |
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\pythonVirtual\Mycodes\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
I think branching collections based on the evaluation condition result might be helpful for your scenario. Please see the documentation here.
To illustrate the branching, suppose I have a collection below, where you want to do different action based on the content of the string.
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'
The code below will create tag the collection and you can get three different PCollections based on the tag. Then you can decide what further actions you want to perform on the individual collections.
import apache_beam as beam
from apache_beam import pvalue
import sys
class Split(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_BQ = 'BigQuery'
OUTPUT_TAG_PS1 = 'pubsub topic1'
OUTPUT_TAG_PS2 = 'pubsub topic2'
def process(self, element):
"""
tags the input as it processes the orignal PCollection
"""
print element
if "BigQuery" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
print 'found bq'
elif "pubsub topic1" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
elif "pubsub topic2" in element:
yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)
if __name__ == '__main__':
output_prefix = 'C:\pythonVirtual\Mycodes\output'
p = beam.Pipeline(argv=sys.argv)
lines = (p
| beam.Create([
'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2']))
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
tagged_lines_result = (lines
| beam.ParDo(Split()).with_outputs(
Split.OUTPUT_TAG_BQ,
Split.OUTPUT_TAG_PS1,
Split.OUTPUT_TAG_PS2))
# tagged_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')
p.run().wait_until_finish()
Please let me know if that helps.
answered Nov 17 '18 at 21:54
Tanveer UddinTanveer Uddin
53719
53719
Thanks, I will try. For now, I've done withif value: yield Payload
. In the begging I hadreturn Payload
in mydef Condition(condition):
so when the condition was false and it return none, the program crash
– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
add a comment |
Thanks, I will try. For now, I've done withif value: yield Payload
. In the begging I hadreturn Payload
in mydef Condition(condition):
so when the condition was false and it return none, the program crash
– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
Thanks, I will try. For now, I've done with
if value: yield Payload
. In the begging I had return Payload
in my def Condition(condition):
so when the condition was false and it return none, the program crash– IoT user
Nov 19 '18 at 8:27
Thanks, I will try. For now, I've done with
if value: yield Payload
. In the begging I had return Payload
in my def Condition(condition):
so when the condition was false and it return none, the program crash– IoT user
Nov 19 '18 at 8:27
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
Thanks. Keen to know about your findings.
– Tanveer Uddin
Nov 19 '18 at 10:11
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53285726%2fconditional-statement-python-apache-beam-pipeline%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown