Skip to content

Commit

Permalink
Merge pull request #7535 from kl4w/support-multiple-outputs
Browse files Browse the repository at this point in the history
r/kinesis_analytics_application: support multiple outputs
  • Loading branch information
bflad authored Feb 13, 2019
2 parents 1fc586a + 86954c5 commit 536af19
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 20 deletions.
41 changes: 21 additions & 20 deletions aws/resource_aws_kinesis_analytics_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,13 @@ func resourceAwsKinesisAnalyticsApplicationCreate(d *schema.ResourceData, meta i
createOpts.Inputs = []*kinesisanalytics.Input{inputs}
}

if v, ok := d.GetOk("outputs"); ok {
o := v.([]interface{})[0].(map[string]interface{})
outputs := expandKinesisAnalyticsOutputs(o)
createOpts.Outputs = []*kinesisanalytics.Output{outputs}
if v := d.Get("outputs").([]interface{}); len(v) > 0 {
outputs := make([]*kinesisanalytics.Output, 0)
for _, o := range v {
output := expandKinesisAnalyticsOutputs(o.(map[string]interface{}))
outputs = append(outputs, output)
}
createOpts.Outputs = outputs
}

// Retry for IAM eventual consistency
Expand Down Expand Up @@ -1404,45 +1407,43 @@ func flattenKinesisAnalyticsInputs(inputs []*kinesisanalytics.InputDescription)
func flattenKinesisAnalyticsOutputs(outputs []*kinesisanalytics.OutputDescription) []interface{} {
s := []interface{}{}

if len(outputs) > 0 {
id := outputs[0]

for _, o := range outputs {
output := map[string]interface{}{
"id": aws.StringValue(id.OutputId),
"name": aws.StringValue(id.Name),
"id": aws.StringValue(o.OutputId),
"name": aws.StringValue(o.Name),
}

if id.KinesisFirehoseOutputDescription != nil {
if o.KinesisFirehoseOutputDescription != nil {
output["kinesis_firehose"] = []interface{}{
map[string]interface{}{
"resource_arn": aws.StringValue(id.KinesisFirehoseOutputDescription.ResourceARN),
"role_arn": aws.StringValue(id.KinesisFirehoseOutputDescription.RoleARN),
"resource_arn": aws.StringValue(o.KinesisFirehoseOutputDescription.ResourceARN),
"role_arn": aws.StringValue(o.KinesisFirehoseOutputDescription.RoleARN),
},
}
}

if id.KinesisStreamsOutputDescription != nil {
if o.KinesisStreamsOutputDescription != nil {
output["kinesis_stream"] = []interface{}{
map[string]interface{}{
"resource_arn": aws.StringValue(id.KinesisStreamsOutputDescription.ResourceARN),
"role_arn": aws.StringValue(id.KinesisStreamsOutputDescription.RoleARN),
"resource_arn": aws.StringValue(o.KinesisStreamsOutputDescription.ResourceARN),
"role_arn": aws.StringValue(o.KinesisStreamsOutputDescription.RoleARN),
},
}
}

if id.LambdaOutputDescription != nil {
if o.LambdaOutputDescription != nil {
output["lambda"] = []interface{}{
map[string]interface{}{
"resource_arn": aws.StringValue(id.LambdaOutputDescription.ResourceARN),
"role_arn": aws.StringValue(id.LambdaOutputDescription.RoleARN),
"resource_arn": aws.StringValue(o.LambdaOutputDescription.ResourceARN),
"role_arn": aws.StringValue(o.LambdaOutputDescription.RoleARN),
},
}
}

if id.DestinationSchema != nil {
if o.DestinationSchema != nil {
output["schema"] = []interface{}{
map[string]interface{}{
"record_format_type": aws.StringValue(id.DestinationSchema.RecordFormatType),
"record_format_type": aws.StringValue(o.DestinationSchema.RecordFormatType),
},
}
}
Expand Down
64 changes: 64 additions & 0 deletions aws/resource_aws_kinesis_analytics_application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,29 @@ func TestAccAWSKinesisAnalyticsApplication_outputsKinesisStream(t *testing.T) {
})
}

func TestAccAWSKinesisAnalyticsApplication_outputsMultiple(t *testing.T) {
var application kinesisanalytics.ApplicationDetail
resName := "aws_kinesis_analytics_application.test"
rInt1 := acctest.RandInt()
rInt2 := acctest.RandInt()
step := testAccKinesisAnalyticsApplication_prereq(rInt1) + testAccKinesisAnalyticsApplication_outputsMultiple(rInt1, rInt2)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisAnalyticsApplicationDestroy,
Steps: []resource.TestStep{
{
Config: step,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisAnalyticsApplicationExists(resName, &application),
resource.TestCheckResourceAttr(resName, "outputs.#", "2"),
),
},
},
})
}

func TestAccAWSKinesisAnalyticsApplication_outputsAdd(t *testing.T) {
var before, after kinesisanalytics.ApplicationDetail
resName := "aws_kinesis_analytics_application.test"
Expand Down Expand Up @@ -650,6 +673,47 @@ resource "aws_kinesis_analytics_application" "test" {
`, rInt, rInt)
}

func testAccKinesisAnalyticsApplication_outputsMultiple(rInt1, rInt2 int) string {
return fmt.Sprintf(`
resource "aws_kinesis_stream" "test1" {
name = "testAcc-%d"
shard_count = 1
}
resource "aws_kinesis_stream" "test2" {
name = "testAcc-%d"
shard_count = 1
}
resource "aws_kinesis_analytics_application" "test" {
name = "testAcc-%d"
code = "testCode\n"
outputs = {
name = "test_name1"
kinesis_stream = {
resource_arn = "${aws_kinesis_stream.test1.arn}"
role_arn = "${aws_iam_role.test.arn}"
}
schema = {
record_format_type = "JSON"
}
}
outputs = {
name = "test_name2"
kinesis_stream = {
resource_arn = "${aws_kinesis_stream.test2.arn}"
role_arn = "${aws_iam_role.test.arn}"
}
schema = {
record_format_type = "JSON"
}
}
}
`, rInt1, rInt2, rInt1)
}

func testAccKinesisAnalyticsApplicationConfigOutputsLambda(rInt int) string {
return fmt.Sprintf(`
data "aws_iam_policy_document" "kinesisanalytics_assume_role_policy" {
Expand Down

0 comments on commit 536af19

Please sign in to comment.