Skip to content

Commit

Permalink
matching: expect AUS_ISTFAHRT_2 stream & consumer to be created by th…
Browse files Browse the repository at this point in the history
…e user 💥📝
  • Loading branch information
derhuerst committed Nov 25, 2024
1 parent eb1c170 commit 591288b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 33 deletions.
54 changes: 21 additions & 33 deletions lib/match.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
import {createLogger} from './logger.js'
import {register} from './metrics.js'
import {
AckPolicy as NatsAckPolicy,
jsonCodec as natsJson,
} from './nats.js'
import {
Expand Down Expand Up @@ -68,13 +67,13 @@ const runGtfsMatching = async (cfg, opt = {}) => {
ok(natsJetstreamManager)

const {
natsConsumerDurableName,
natsConsumerName,
natsAckWait, // in milliseconds
matchConcurrency,
} = {
natsConsumerDurableName: process.env.MATCHING_CONSUMER_DURABLE_NAME
? process.env.MATCHING_CONSUMER_DURABLE_NAME
: NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME + '_' + Math.random().toString(16).slice(2, 6),
natsConsumerName: process.env.MATCHING_CONSUMER_NAME
? process.env.MATCHING_CONSUMER_NAME
: 'gtfs-rt-feed',
natsAckWait: 60 * 1000, // 60 seconds
matchConcurrency: process.env.MATCHING_CONCURRENCY
? parseInt(process.env.MATCHING_CONCURRENCY)
Expand Down Expand Up @@ -315,19 +314,13 @@ const runGtfsMatching = async (cfg, opt = {}) => {
}

{
// todo: shouldn't this be done upfront by the person deploying the service?
{
// create/update NATS JetStream stream for AUS IstFahrts
const streamInfo = await natsJetstreamManager.streams.add({
name: NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME,
subjects: [
AUS_ISTFAHRT_TOPIC_PREFIX + '>',
],
// todo: limits?
})
// query details of the NATS JetStream stream for AUS IstFahrts
const stream = await natsJetstreamClient.streams.get(NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME)
const streamInfo = await stream.info()
serviceLogger.debug({
streamInfo,
}, 'created/re-used NATS JetStream stream for AUS IstFahrts')
}, 'using NATS JetStream stream for AUS IstFahrts')
}
{
// create/update NATS JetStream stream for GTFS-RT data
Expand All @@ -343,27 +336,22 @@ const runGtfsMatching = async (cfg, opt = {}) => {
}, 'created/re-used NATS JetStream stream for GTFS-RT data')
}

// create durable NATS JetStream consumer for previously created stream
const consumerInfo = await natsJetstreamManager.consumers.add(NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME, {
ack_policy: NatsAckPolicy.Explicit,
durable_name: natsConsumerDurableName,
ack_wait: natsAckWait * 1000, // nats.js expects nanoseconds
// todo: configure inactive_threshold?
// todo: set max_ack_pending to 1 for strict ordering of messages?
// todo: configure ack_wait?
const istFahrtsConsumer = await natsJetstreamClient.consumers.get(
NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME,
natsConsumerName,
)

// todo: https://nats-io.github.io/nats.deno/interfaces/ConsumerConfig.html ?

// todo: add trip ID to topic, consume with `DeliverLastPerSubject`? – would not work for partial IstFahrts
})
serviceLogger.debug({
consumerInfo,
}, 'created/re-used NATS JetStream consumer')
{
// query details of the (externally created) NATS JetStream consumer
const consumerInfo = await istFahrtsConsumer.info()
serviceLogger.debug({
consumerInfo,
}, 'using NATS JetStream consumer')
}

const tripsConsumer = await natsJetstreamClient.consumers.get(NATS_JETSTREAM_AUS_ISTFAHRT_STREAM_NAME, consumerInfo.name)
const tripsSub = await tripsConsumer.consume()
const istFahrtsSub = await istFahrtsConsumer.consume()
execPipe(
tripsSub,
istFahrtsSub,

// asyncBuffer workaround
// see also https://github.com/iter-tools/iter-tools/issues/425#issuecomment-882875848
Expand Down
46 changes: 46 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,52 @@ To make sure that the connection works, use [`psql`](https://www.postgresql.org/

By default, `gtfs-rt-feed` will connect as `gtfs-rt-$MAJOR_VERSION` to `localhost:4222` without authentication.

#### create NATS stream & consumer

We also need to create a [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) [stream](https://docs.nats.io/nats-concepts/jetstream/streams) called `AUS_ISTFAHRT_2` that `gtfs-rt-feed` will read (unmatched) GTFS-RT messages from. This can be done using the [NATS CLI](https://github.com/nats-io/natscli):

```shell
nats stream add \
# omit this if you want to configure more details
--defaults \
# collect all messages published to these subjects
--subjects='aus.istfahrt.>' \
# acknowledge publishes
--ack \
# with limited storage, discard the oldest limits first
--retention=limits --discard=old \
--description='VDV-454 AUS IstFahrt messages' \
# name of the stream
AUS_ISTFAHRT_2
```

On the `AUS_ISTFAHRT_2` stream, we create a durable [consumer]():

```shell
nats consumer add \
# omit this if you want to configure more details
--defaults \
# create a pull-based consumer (refer to the NATS JetStream docs)
--pull \
# let gtfs-rt-feed explicitly acknowledge all received messages
--ack=explicit \
# let the newly created consumer start with the latest messages in AUS_ISTFAHRT_2 (not all)
--deliver=new \
# send gtfs-rt-feed at most 200 messages at once
--max-pending=200 \
# when & how often to re-deliver a message that hasn't been acknowledged (usually because it couldn't be processed)
--max-deliver=3 \
--backoff=linear \
--backoff-steps=2 \
--backoff-min=15s \
--backoff-max=2m \
--description 'OpenDataVBB/gtfs-rt-feed' \
# name of the stream
AUS_ISTFAHRT_2 \
# name of the consumer
gtfs-rt-feed
```

#### configure access to Redis

`gtfs-rt-feed` uses [`ioredis`](https://npmjs.com/package/ioredis) to connect to PostgreSQL; For details about supported environment variables and their defaults, refer to [its docs](https://github.com/redis/ioredis#readme).
Expand Down

0 comments on commit 591288b

Please sign in to comment.