Skip to content

Commit

Permalink
remove anon functions from minimal wordcount (#29179)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Oct 28, 2023
1 parent ab2ab2f commit c2816c8
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions sdks/go/examples/minimal_wordcount/minimal_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//
// Concepts:
//
// 0. Registering transforms with Beam.
// 1. Reading data from text files
// 2. Specifying 'inline' transforms
// 3. Counting items in a PCollection
Expand Down Expand Up @@ -62,6 +63,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

Expand All @@ -71,6 +73,26 @@ import (

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func splitWords(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}

func formatCounts(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}

// Concept #0: Transform functions executed by Beam need to be registered
// so they can be executed by portable runners. We use the register package
// in an init block to inform Beam of the functions we will be using, so
// it can access them on workers.
func init() {
register.Function2x0(splitWords)
register.Function2x1(formatCounts)
register.Emitter1[string]()
}

func main() {
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()
Expand All @@ -91,15 +113,11 @@ func main() {
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Concept #2: Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// This ParDo invokes a DoFn (registered earlier) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
words := beam.ParDo(s, splitWords, lines)

// Concept #3: Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
Expand All @@ -110,9 +128,7 @@ func main() {
// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}, counted)
formatted := beam.ParDo(s, formatCounts, counted)

// Concept #4: Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
Expand Down

0 comments on commit c2816c8

Please sign in to comment.