Skip to content
This repository has been archived by the owner on Apr 19, 2023. It is now read-only.

Commit

Permalink
✨ Use queue for outbound emails (fixed #963)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandChowdhary committed Feb 29, 2020
1 parent a899bab commit f68ad6d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 3 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@staart/manager",
"version": "1.3.37",
"version": "1.3.38",
"main": "index.js",
"repository": "[email protected]:staart/api.git",
"author": "Anand Chowdhary <[email protected]>",
Expand Down Expand Up @@ -125,5 +125,5 @@
"setup"
],
"snyk": true,
"staart-version": "1.3.37"
"staart-version": "1.3.38"
}
2 changes: 2 additions & 0 deletions src/crons/minute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import { elasticSearch } from "@staart/elasticsearch";
import { IdValues } from "../helpers/utils";
import { ELASTIC_EVENTS_PREFIX, ELASTIC_LOGS_PREFIX } from "../config";
import { error } from "@staart/errors";
import { receiveEmailMessage } from "../helpers/mail";

export default () => {
new CronJob(
"* * * * *",
async () => {
await receiveEmailMessage();
await storeTrackingLogs();
await storeSecurityEvents();
},
Expand Down
53 changes: 53 additions & 0 deletions src/helpers/mail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,46 @@ import { join } from "path";
import i18n from "../i18n";
import { sendMail } from "@staart/mail";
import { render } from "@staart/mustache-markdown";
import { redisQueue } from "@staart/redis";
import { logError } from "@staart/errors";

let queueSetup = false;
const setupQueue = async () => {
if (queueSetup) return;
const queues = redisQueue.listQueuesAsync();
if ((await queues).includes("outbound-emails")) return (queueSetup = true);
redisQueue.createQueueAsync({ qname: "outbound-emails" });
queueSetup = true;
};

export const receiveEmailMessage = async () => {
await setupQueue();
const result = await redisQueue.receiveMessageAsync({
qname: "outbound-emails"
});
if ("id" in result) {
console.log("Got message", result.id);
const {
to,
template,
data
}: {
to: number | string;
template: string;
data: any;
} = JSON.parse(result.message);
try {
await safeSendEmail(to, template, data);
redisQueue.deleteMessageAsync({
qname: "outbound-emails",
id: result.id
});
} catch (error) {
logError("Mail", "Unable to send email");
}
receiveEmailMessage();
}
};

/**
* Send a new email using AWS SES or SMTP
Expand All @@ -12,6 +52,19 @@ export const mail = async (
to: number | string,
template: string,
data: any = {}
) => {
await setupQueue();
const result = await redisQueue.sendMessageAsync({
qname: "outbound-emails",
message: JSON.stringify({ to, template, data })
});
console.log("Queued email", result);
};

const safeSendEmail = async (
to: number | string,
template: string,
data: any = {}
) => {
const result = render(
(
Expand Down
7 changes: 6 additions & 1 deletion src/init-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import systemInfo from "systeminformation";
import pkg from "../package.json";
import redis from "@staart/redis";
import { query } from "./helpers/mysql";
import { receiveEmailMessage } from "./helpers/mail";

redis
.set(pkg.name, systemInfo.time().current)
.then(() => redis.del(pkg.name))
.then(() => success("Redis is up and listening"))
.then(() => success("Redis is working"))
.catch(() => logError("Redis", "Unable to connect"));

receiveEmailMessage()
.then(() => success("Redis message queue is working"))
.catch(e => console.log(e, "Redis queue", "Unable to receive message"));

query("SHOW tables")
.then(() => success("Database connection is working"))
.catch(() => logError("Database", "Unable to run query `SHOW tables`"));
Expand Down

0 comments on commit f68ad6d

Please sign in to comment.