Initial version

This commit is contained in:
2026-01-02 10:53:04 -03:00
parent b5d96508c6
commit e7e05ea6ad
12 changed files with 3129 additions and 0 deletions

5
.env.example Normal file
View File

@@ -0,0 +1,5 @@
# Configuration file path (optional, defaults to config.json)
CONFIG_PATH=
# Log level (optional, defaults to 'info' Options: 'error', 'warn', 'info', 'debug')
LOG_LEVEL=info

5
.gitignore vendored
View File

@@ -137,3 +137,8 @@ dist
# Vite logs files
vite.config.js.timestamp-*
vite.config.ts.timestamp-*
# SMTP Load Balancer specific
config.json
data/
logs/

48
config.example.json Normal file
View File

@@ -0,0 +1,48 @@
{
"server": {
"port": 2525,
"auth": {
"user": "admin",
"pass": "your-secure-password"
}
},
"providers": [
{
"name": "Gmail",
"host": "smtp.gmail.com",
"port": 587,
"secure": false,
"auth": {
"user": "your-email@gmail.com",
"pass": "your-app-password"
},
"from": "your-email@gmail.com"
},
{
"name": "SendGrid",
"host": "smtp.sendgrid.net",
"port": 587,
"secure": false,
"auth": {
"user": "apikey",
"pass": "your-sendgrid-api-key"
},
"from": "noreply@yourdomain.com"
},
{
"name": "Mailgun",
"host": "smtp.mailgun.org",
"port": 587,
"secure": false,
"auth": {
"user": "postmaster@your-domain.mailgun.org",
"pass": "your-mailgun-password"
},
"from": "noreply@yourdomain.com"
}
],
"queue": {
"maxRetries": 5,
"retryDelay": 60000
}
}

2107
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

35
package.json Normal file
View File

@@ -0,0 +1,35 @@
{
"name": "smtp-loadbalancer",
"version": "1.0.0",
"description": "SMTP load balancer that distributes emails across multiple upstream SMTP providers",
"main": "src/index.js",
"type": "module",
"scripts": {
"start": "node src/index.js",
"dev": "node --watch src/index.js"
},
"keywords": [
"smtp",
"load-balancer",
"email",
"mail-server"
],
"author": "ovosimpatico",
"license": "AGPL-3.0",
"repository": {
"type": "git",
"url": "https://github.com/ovosimpatico/smtp-loadbalancer"
},
"bugs": {
"url": "https://github.com/ovosimpatico/smtp-loadbalancer/issues"
},
"dependencies": {
"better-queue": "^3.8.12",
"better-queue-sqlite": "^1.0.4",
"dotenv": "^16.3.1",
"mailparser": "^3.6.5",
"nodemailer": "^7.0.12",
"smtp-server": "^3.13.4",
"winston": "^3.11.0"
}
}

115
src/config-loader.js Normal file
View File

@@ -0,0 +1,115 @@
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
export function loadConfig(configPath = null) {
const defaultPath = path.join(__dirname, '..', 'config.json');
const finalPath = configPath || defaultPath;
// Check for file
if (!fs.existsSync(finalPath)) {
throw new Error(`File not found: ${finalPath}`);
}
// Parse
let config;
try {
const configContent = fs.readFileSync(finalPath, 'utf8');
config = JSON.parse(configContent);
} catch (error) {
throw new Error(`Failed to parse: ${error.message}`);
}
validateConfig(config);
return config;
}
function validateConfig(config) {
// Validate server
if (!config.server) {
throw new Error('Missing "server" section');
}
if (!config.server.port || typeof config.server.port !== 'number') {
throw new Error('Invalid "port" number');
}
if (config.server.port < 1 || config.server.port > 65535) {
throw new Error('Port must be between 1 and 65535');
}
if (config.server.auth) {
if (!config.server.auth.user || !config.server.auth.pass) {
throw new Error('Missing "user" and "pass" fields');
}
}
if (!config.providers || !Array.isArray(config.providers)) {
throw new Error('Missing "providers" array');
}
if (config.providers.length === 0) {
throw new Error('At least one provider is required');
}
// Validate providers
config.providers.forEach((provider) => {
if (!provider.name) {
throw new Error(`Provider ${provider.name} missing "name" field`);
}
if (!provider.host) {
throw new Error(`Provider "${provider.name}" missing "host" field`);
}
if (!provider.port || typeof provider.port !== 'number') {
throw new Error(`Provider "${provider.name}" missing "port" field`);
}
if (typeof provider.secure !== 'boolean') {
throw new Error(`Provider "${provider.name}" missing "secure" field`);
}
if (!provider.auth || !provider.auth.user || !provider.auth.pass) {
throw new Error(`Provider "${provider.name}" missing "auth" credentials`);
}
if (!provider.from) {
throw new Error(`Provider "${provider.name}" missing "from" field`);
}
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(provider.from)) {
throw new Error(`Provider "${provider.name}" has invalid "from" field`);
}
});
// Validate queue
if (!config.queue) {
throw new Error('Missing "queue" section');
}
if (typeof config.queue.maxRetries !== 'number' || config.queue.maxRetries < 0) {
throw new Error('Invalid "maxRetries" number');
}
if (typeof config.queue.retryDelay !== 'number' || config.queue.retryDelay < 0) {
throw new Error('Invalid "retryDelay" number');
}
}
export function getProvider(config, index) {
if (index < 0 || index >= config.providers.length) {
throw new Error(`Provider ${index} out of bounds`);
}
return config.providers[index];
}
export function getProviderCount(config) {
return config.providers.length;
}

193
src/index.js Normal file
View File

@@ -0,0 +1,193 @@
import dotenv from 'dotenv';
import path from 'path';
import { fileURLToPath } from 'url';
import fs from 'fs';
import { loadConfig } from './config-loader.js';
import { LoadBalancer } from './load-balancer.js';
import { QueueManager } from './queue-manager.js';
import { SMTPClient } from './smtp-client.js';
import { IncomingSMTPServer } from './smtp-server.js';
import { createLogger } from './logger.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
dotenv.config();
class SMTPLoadBalancer {
constructor() {
this.logger = null;
this.config = null;
this.loadBalancer = null;
this.smtpClient = null;
this.queueManager = null;
this.smtpServer = null;
this.isShuttingDown = false;
}
async init() {
try {
// Create logs dir
const logsDir = path.join(__dirname, '..', 'logs');
if (!fs.existsSync(logsDir)) {
fs.mkdirSync(logsDir, { recursive: true });
}
this.logger = createLogger();
this.logger.info('Starting...');
// Load config
const configPath = process.env.CONFIG_PATH || null;
this.config = loadConfig(configPath);
this.logger.info('Config OK', {
providers: this.config.providers.length,
serverPort: this.config.server.port,
authEnabled: !!this.config.server.auth,
});
// Init load balancer
this.loadBalancer = new LoadBalancer(this.config);
this.logger.info('Load balancer OK', {
strategy: 'round-robin',
providers: this.loadBalancer.getProviderCount(),
});
// Init SMTP client
this.smtpClient = new SMTPClient(this.loadBalancer, this.logger);
this.logger.info('SMTP client OK');
// Verify providers
this.logger.info('Verifying providers');
const verificationResults = await this.smtpClient.verifyAllProviders();
const failedProviders = Object.entries(verificationResults)
.filter(([_, success]) => !success)
.map(([name]) => name);
if (failedProviders.length === this.config.providers.length) {
throw new Error('All providers failed');
} else if (failedProviders.length > 0) {
this.logger.warn('Some providers failed', {
failed: failedProviders,
});
}
// Init queue manager
this.queueManager = new QueueManager(
this.config,
(emailData) => this.smtpClient.deliverEmail(emailData),
this.logger
);
this.logger.info('Queue manager OK');
// Start SMTP server
this.smtpServer = new IncomingSMTPServer(
this.config,
this.queueManager,
this.logger
);
this.smtpServer.start();
this.logger.info('SMTP server OK', {
serverPort: this.config.server.port,
providers: this.config.providers.map((p) => p.name),
});
// Shutdown handlers
this.setupGracefulShutdown();
} catch (error) {
if (this.logger) {
this.logger.error('Failed to start', {
error: error.message,
stack: error.stack,
});
} else {
console.error('Failed to start:', error);
}
process.exit(1);
}
}
setupGracefulShutdown() {
const shutdown = async (signal) => {
if (this.isShuttingDown) {
return;
}
this.isShuttingDown = true;
this.logger.info(`Received ${signal}, shutting down`);
try {
// Stop SMTP server
if (this.smtpServer) {
await this.smtpServer.stop();
}
// Pause queue
if (this.queueManager) {
this.queueManager.pause();
}
// Wait for in-flight emails
this.logger.info('Waiting for in-flight emails');
await new Promise((resolve) => setTimeout(resolve, 5000));
// Shutdown queue
if (this.queueManager) {
await this.queueManager.shutdown();
}
// Close SMTP client
if (this.smtpClient) {
await this.smtpClient.closeAllTransports();
}
this.logger.info('Shut down');
process.exit(0);
} catch (error) {
this.logger.error('Error during shutdown:', {
error: error.message,
});
process.exit(1);
}
};
// Handle signals
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Handle exceptions
process.on('uncaughtException', (error) => {
this.logger.error('Exception:', {
error: error.message,
stack: error.stack,
});
shutdown('uncaughtException');
});
// Handle promise rejections
process.on('unhandledRejection', (reason, promise) => {
this.logger.error('Promise rejection:', {
reason,
promise,
});
});
}
getStatus() {
return {
server: this.smtpServer?.getStatus(),
queue: this.queueManager?.getStats(),
loadBalancer: {
providers: this.loadBalancer?.getProviderCount(),
currentIndex: this.loadBalancer?.getCurrentIndex(),
},
};
}
}
// Start
const app = new SMTPLoadBalancer();
app.init().catch((error) => {
console.error('Fatal error:', error);
process.exit(1);
});

42
src/load-balancer.js Normal file
View File

@@ -0,0 +1,42 @@
import { getProvider, getProviderCount } from './config-loader.js';
export class LoadBalancer {
constructor(config) {
this.config = config;
this.currentIndex = 0;
this.providerCount = getProviderCount(config);
if (this.providerCount === 0) {
throw new Error('No providers configured');
}
}
getNextProvider() {
const provider = getProvider(this.config, this.currentIndex);
// Move to next provider
this.currentIndex = (this.currentIndex + 1) % this.providerCount;
return provider;
}
getCurrentIndex() {
return this.currentIndex;
}
reset() {
this.currentIndex = 0;
}
getProviderCount() {
return this.providerCount;
}
getProviderByName(providerName) {
return this.config.providers.find(p => p.name === providerName) || null;
}
getAllProviders() {
return this.config.providers;
}
}

46
src/logger.js Normal file
View File

@@ -0,0 +1,46 @@
import winston from 'winston';
export function createLogger() {
const logFormat = winston.format.combine(
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
winston.format.errors({ stack: true }),
winston.format.splat(),
winston.format.json()
);
const consoleFormat = winston.format.combine(
winston.format.colorize(),
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
winston.format.printf(({ timestamp, level, message, ...meta }) => {
let metaStr = '';
if (Object.keys(meta).length > 0) {
metaStr = '\n' + JSON.stringify(meta, null, 2);
}
return `${timestamp} [${level}]: ${message}${metaStr}`;
})
);
const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: logFormat,
transports: [
new winston.transports.Console({
format: consoleFormat,
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 10485760, // 10MB
maxFiles: 5,
}),
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 10485760, // 10MB
maxFiles: 5,
}),
],
});
return logger;
}

160
src/queue-manager.js Normal file
View File

@@ -0,0 +1,160 @@
import Queue from 'better-queue';
import SQLiteStore from 'better-queue-sqlite';
import path from 'path';
import { fileURLToPath } from 'url';
import fs from 'fs';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
export class QueueManager {
constructor(config, deliveryHandler, logger) {
this.config = config;
this.deliveryHandler = deliveryHandler;
this.logger = logger;
this.queue = null;
// Ensure data dir exists
const dataDir = path.join(__dirname, '..', 'data');
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
this.dbPath = path.join(dataDir, 'email-queue.db');
this.initQueue();
}
initQueue() {
const queueConfig = this.config.queue;
this.queue = new Queue(
async (emailData, cb) => {
try {
await this.processEmail(emailData);
cb(null, { success: true });
} catch (error) {
// Return error so better-queue retries
cb(error);
}
},
{
store: new SQLiteStore({
path: this.dbPath,
// Store email
serialize: (data) => JSON.stringify(data),
deserialize: (text) => JSON.parse(text),
}),
// Retry
maxRetries: queueConfig.maxRetries,
retryDelay: queueConfig.retryDelay,
concurrent: 1,
// Retry exponential backoff
afterProcessDelay: 100,
retryDelay: (retries) => {
const baseDelay = queueConfig.retryDelay || 60000;
return baseDelay * Math.pow(2, retries);
},
}
);
// Event handlers
this.queue.on('task_finish', (taskId) => {
this.logger.info(`Delivered successfully`, { taskId });
});
this.queue.on('task_failed', (taskId, errorMessage, stats) => {
const retries = stats.attempts - 1;
const maxRetries = queueConfig.maxRetries;
if (retries >= maxRetries) {
this.logger.error(`Delivery failed after ${maxRetries} retries`, {
taskId,
error: errorMessage,
});
} else {
this.logger.warn(`Delivery attempt ${stats.attempts} failed, will retry`, {
taskId,
error: errorMessage,
nextRetryIn: this.queue.options.retryDelay(retries),
});
}
});
this.queue.on('task_progress', (taskId, completed, total) => {
this.logger.debug(`Progress: ${completed}/${total}`, { taskId });
});
this.logger.info('Queue initialized', { dbPath: this.dbPath });
}
async processEmail(emailData) {
this.logger.info('Processing email', {
from: emailData.envelope.from,
to: emailData.envelope.to,
subject: emailData.subject,
});
// Call SMTP client
await this.deliveryHandler(emailData);
}
enqueue(emailData) {
return new Promise((resolve, reject) => {
// Add metadata to email
const queuedEmail = {
...emailData,
queuedAt: new Date().toISOString(),
attempts: 0,
};
this.queue.push(queuedEmail, (error, result) => {
if (error) {
this.logger.error('Failed to queue', { error: error.message });
reject(error);
} else {
this.logger.info('Queued successfully', {
from: emailData.envelope.from,
to: emailData.envelope.to,
});
resolve(result);
}
});
});
}
getStats() {
return {
length: this.queue.length,
running: this.queue.running,
};
}
async shutdown() {
return new Promise((resolve) => {
this.logger.info('Shutting down...');
if (this.queue) {
this.queue.destroy(() => {
this.logger.info('Shut down');
resolve();
});
} else {
resolve();
}
});
}
pause() {
if (this.queue) {
this.queue.pause();
this.logger.info('Paused');
}
}
resume() {
if (this.queue) {
this.queue.resume();
this.logger.info('Resumed');
}
}
}

151
src/smtp-client.js Normal file
View File

@@ -0,0 +1,151 @@
import nodemailer from 'nodemailer';
export class SMTPClient {
constructor(loadBalancer, logger) {
this.loadBalancer = loadBalancer;
this.logger = logger;
this.transportCache = new Map();
}
getTransport(provider) {
const cacheKey = provider.name;
// Return cached transport
if (this.transportCache.has(cacheKey)) {
return this.transportCache.get(cacheKey);
}
// Create transport
const transport = nodemailer.createTransport({
host: provider.host,
port: provider.port,
secure: provider.secure,
auth: {
user: provider.auth.user,
pass: provider.auth.pass,
},
connectionTimeout: 30000,
greetingTimeout: 30000,
socketTimeout: 60000,
});
// Cache transport
this.transportCache.set(cacheKey, transport);
this.logger.debug(`Created transport for provider: ${provider.name}`);
return transport;
}
async deliverEmail(emailData) {
// Get next provider (round-robin)
const provider = this.loadBalancer.getNextProvider();
this.logger.info(`Attempting to deliver email via provider: ${provider.name}`, {
from: emailData.envelope.from,
to: emailData.envelope.to,
subject: emailData.subject,
});
const transport = this.getTransport(provider);
try {
// Prepare email
const mailOptions = {
from: provider.from,
to: emailData.envelope.to,
subject: emailData.subject || '(No Subject)',
text: emailData.text,
html: emailData.html,
headers: emailData.headers || {},
attachments: emailData.attachments || [],
// Reply-to original sender
replyTo: emailData.envelope.from,
};
// Send email
const info = await transport.sendMail(mailOptions);
this.logger.info(`Email delivered successfully via ${provider.name}`, {
messageId: info.messageId,
response: info.response,
from: emailData.envelope.from,
to: emailData.envelope.to,
});
return {
success: true,
provider: provider.name,
messageId: info.messageId,
response: info.response,
};
} catch (error) {
this.logger.error(`Failed to deliver email via ${provider.name}`, {
error: error.message,
code: error.code,
from: emailData.envelope.from,
to: emailData.envelope.to,
});
// Re-throw error to trigger retry
throw new Error(
`Delivery failed via ${provider.name}: ${error.message}`
);
}
}
async verifyProvider(provider) {
const transport = this.getTransport(provider);
try {
await transport.verify();
this.logger.info(`Connection OK: ${provider.name}`);
return true;
} catch (error) {
this.logger.error(`Connection FAIL: ${provider.name}`, {
error: error.message,
});
return false;
}
}
async verifyAllProviders() {
const providers = this.loadBalancer.getAllProviders();
const results = {};
this.logger.info(`Verifying ${providers.length} provider(s)...`);
for (const provider of providers) {
results[provider.name] = await this.verifyProvider(provider);
}
const successCount = Object.values(results).filter((r) => r).length;
this.logger.info(
`Provider verification complete: ${successCount}/${providers.length} successful`
);
return results;
}
async closeAllTransports() {
this.logger.info('Closing all SMTP transports...');
const closePromises = [];
for (const [name, transport] of this.transportCache.entries()) {
if (transport && typeof transport.close === 'function') {
const closePromise = Promise.resolve(transport.close()).catch((error) => {
this.logger.warn(`Failed to close transport for ${name}`, {
error: error.message,
});
});
closePromises.push(closePromise);
}
}
await Promise.all(closePromises);
this.transportCache.clear();
this.logger.info('All SMTP transports closed');
}
}

222
src/smtp-server.js Normal file
View File

@@ -0,0 +1,222 @@
import { SMTPServer } from 'smtp-server';
import { simpleParser } from 'mailparser';
export class IncomingSMTPServer {
constructor(config, queueManager, logger) {
this.config = config;
this.queueManager = queueManager;
this.logger = logger;
this.server = null;
}
start() {
const serverConfig = this.config.server;
this.server = new SMTPServer({
// Authentication
onAuth: (auth, session, callback) => {
this.handleAuth(auth, session, callback);
},
// Email data
onData: (stream, session, callback) => {
this.handleData(stream, session, callback);
},
// Allow plain auth without TLS
authOptional: !serverConfig.auth,
secure: false,
disabledCommands: ['STARTTLS'],
allowInsecureAuth: true,
size: 25 * 1024 * 1024, // Max message size: 25MB
useXClient: true,
useXForward: true,
banner: 'SMTP Ready',
logger: false,
// Error
onError: (error) => {
this.logger.error('SMTP server error', {
error: error.message,
code: error.code,
});
},
});
this.server.on('error', (error) => {
this.logger.error('SMTP server error event', {
error: error.message,
});
});
this.server.listen(serverConfig.port, () => {
this.logger.info(`SMTP server listening on port ${serverConfig.port}`, {
authRequired: !!serverConfig.auth,
});
});
}
handleAuth(auth, session, callback) {
const serverConfig = this.config.server;
// accept if no auth shouldn't happen - authOptional
if (!serverConfig.auth) {
return callback(null, { user: 'anonymous' });
}
const { username, password } = auth;
this.logger.debug('Authentication attempt', {
username,
remoteAddress: session.remoteAddress,
});
if (
username === serverConfig.auth.user &&
password === serverConfig.auth.pass
) {
this.logger.info('Authentication successful', {
username,
remoteAddress: session.remoteAddress,
});
callback(null, { user: username });
} else {
this.logger.warn('Authentication failed', {
username,
remoteAddress: session.remoteAddress,
});
callback(new Error('Invalid username or password'));
}
}
async handleData(stream, session, callback) {
this.logger.info('Receiving email', {
from: session.envelope.mailFrom?.address,
to: session.envelope.rcptTo?.map((r) => r.address),
remoteAddress: session.remoteAddress,
});
try {
// Parse email stream
const parsed = await simpleParser(stream);
// Extract email
const emailData = {
envelope: {
from: session.envelope.mailFrom?.address || parsed.from?.value[0]?.address,
to: session.envelope.rcptTo?.map((r) => r.address) ||
parsed.to?.value.map((t) => t.address) || [],
},
subject: parsed.subject,
text: parsed.text,
html: parsed.html,
headers: this.extractHeaders(parsed.headers),
attachments: parsed.attachments?.map((att) => ({
filename: att.filename,
content: att.content,
contentType: att.contentType,
contentDisposition: att.contentDisposition,
size: att.size,
})) || [],
messageId: parsed.messageId,
date: parsed.date,
receivedAt: new Date().toISOString(),
};
// Validate email
if (!emailData.envelope.from) {
throw new Error('Missing sender address');
}
if (!emailData.envelope.to || emailData.envelope.to.length === 0) {
throw new Error('Missing recipient address');
}
this.logger.debug('Email parsed successfully', {
from: emailData.envelope.from,
to: emailData.envelope.to,
subject: emailData.subject,
hasAttachments: emailData.attachments.length > 0,
});
// Queue email
await this.queueManager.enqueue(emailData);
// Accept email
callback(null, 'Message queued for delivery');
this.logger.info('Email accepted and queued', {
from: emailData.envelope.from,
to: emailData.envelope.to,
});
} catch (error) {
this.logger.error('Failed to process email', {
error: error.message,
from: session.envelope.mailFrom?.address,
});
// Reject email
callback(new Error(`Failed to process email: ${error.message}`));
}
}
extractHeaders(headers) {
const extracted = {};
if (!headers) return extracted;
// Convert headers to object
for (const [key, value] of headers) {
// Skip headers set by upstream
if (
['received', 'x-received', 'return-path', 'dkim-signature'].includes(
key.toLowerCase()
)
) {
continue;
}
if (Array.isArray(extracted[key])) {
extracted[key].push(value);
} else if (extracted[key]) {
extracted[key] = [extracted[key], value];
} else {
extracted[key] = value;
}
}
return extracted;
}
async stop() {
return new Promise((resolve, reject) => {
if (!this.server) {
return resolve();
}
this.logger.info('Stopping SMTP server...');
this.server.close((error) => {
if (error) {
this.logger.error('Error stopping SMTP server', {
error: error.message,
});
reject(error);
} else {
this.logger.info('SMTP server stopped');
resolve();
}
});
});
}
getStatus() {
return {
listening: this.server?.listening || false,
port: this.config.server.port,
};
}
}