跳至主要内容
版本:v6 - 稳定

在 AWS Lambda 中使用 Sequelize

AWS Lambda 是一种无服务器计算服务,允许客户运行代码而无需担心底层服务器。如果某些概念没有得到充分理解且没有使用适当的配置,则在 AWS Lambda 中使用 sequelize 可能很棘手。本指南旨在澄清其中一些概念,以便库用户能够正确地为 AWS Lambda 配置 sequelize 并解决问题。

TL;DR

如果您只想了解如何正确配置 sequelize 连接池 以用于 AWS Lambda,那么您需要知道的是,sequelize 连接池与 AWS Lambda 的 Node.js 运行时不兼容,最终会导致更多问题而不是解决问题。因此,最合适的配置是**在同一调用中使用池**,并**避免跨调用使用池**(即在结束时关闭所有连接)。

const { Sequelize } = require("sequelize");

let sequelize = null;

async function loadSequelize() {
const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: {
/*
* Lambda functions process one request at a time but your code may issue multiple queries
* concurrently. Be wary that `sequelize` has methods that issue 2 queries concurrently
* (e.g. `Model.findAndCountAll()`). Using a value higher than 1 allows concurrent queries to
* be executed in parallel rather than serialized. Careful with executing too many queries in
* parallel per Lambda function execution since that can bring down your database with an
* excessive number of connections.
*
* Ideally you want to choose a `max` number where this holds true:
* max * EXPECTED_MAX_CONCURRENT_LAMBDA_INVOCATIONS < MAX_ALLOWED_DATABASE_CONNECTIONS * 0.8
*/
max: 2,
/*
* Set this value to 0 so connection pool eviction logic eventually cleans up all connections
* in the event of a Lambda function timeout.
*/
min: 0,
/*
* Set this value to 0 so connections are eligible for cleanup immediately after they're
* returned to the pool.
*/
idle: 0,
// Choose a small enough value that fails fast if a connection takes too long to be established.
acquire: 3000,
/*
* Ensures the connection pool attempts to be cleaned up automatically on the next Lambda
* function invocation, if the previous invocation timed out.
*/
evict: CURRENT_LAMBDA_FUNCTION_TIMEOUT
}
});

// or `sequelize.sync()`
await sequelize.authenticate();

return sequelize;
}

module.exports.handler = async function (event, callback) {
// re-use the sequelize instance across invocations to improve performance
if (!sequelize) {
sequelize = await loadSequelize();
} else {
// restart connection pool to ensure connections are not re-used across invocations
sequelize.connectionManager.initPools();

// restore `getConnection()` if it has been overwritten by `close()`
if (sequelize.connectionManager.hasOwnProperty("getConnection")) {
delete sequelize.connectionManager.getConnection;
}
}

try {
return await doSomethingWithSequelize(sequelize);
} finally {
// close any opened connections during the invocation
// this will wait for any in-progress queries to finish before closing the connections
await sequelize.connectionManager.close();
}
};

使用 AWS RDS 代理

如果您使用的是 AWS RDS 并且正在使用 Aurora受支持的数据库引擎,那么请使用 AWS RDS 代理 连接到您的数据库。这将确保在每次调用时打开/关闭连接不会对您的底层数据库服务器造成昂贵的操作。


如果您想了解为什么必须在 AWS Lambda 中以这种方式使用 sequelize,请继续阅读本文档的其余部分。

Node.js 事件循环

Node.js 事件循环

允许 Node.js 执行非阻塞 I/O 操作的原因 — 尽管 JavaScript 是单线程的 —

虽然事件循环的实现是在 C++ 中,但以下是一个简化的 JavaScript 伪实现,说明了 Node.js 将如何执行名为 index.js 的脚本

// see: https://node.org.cn/en/docs/guides/event-loop-timers-and-nexttick/
// see: https://www.youtube.com/watch?v=P9csgxBgaZ8
// see: https://www.youtube.com/watch?v=PNa9OMajw9w
const process = require('process');

/*
* counter of pending events
*
* reference counter is increased for every:
*
* 1. scheduled timer: `setTimeout()`, `setInterval()`, etc.
* 2. scheduled immediate: `setImmediate()`.
* 3. syscall of non-blocking IO: `require('net').Server.listen()`, etc.
* 4. scheduled task to the thread pool: `require('fs').WriteStream.write()`, etc.
*
* reference counter is decreased for every:
*
* 1. elapsed timer
* 2. executed immediate
* 3. completed non-blocking IO
* 4. completed thread pool task
*
* references can be explicitly decreased by invoking `.unref()` on some
* objects like: `require('net').Socket.unref()`
*/
let refs = 0;

/*
* a heap of timers, sorted by next ocurrence
*
* whenever `setTimeout()` or `setInterval()` is invoked, a timer gets added here
*/
const timersHeap = /* (...) */;

/*
* a FIFO queue of immediates
*
* whenever `setImmediate()` is invoked, it gets added here
*/
const immediates = /* (...) */;

/*
* a FIFO queue of next tick callbacks
*
* whenever `require('process').nextTick()` is invoked, the callback gets added here
*/
const nextTickCallbacks = [];

/*
* a heap of Promise-related callbacks, sorted by promise constructors callbacks first,
* and then resolved/rejected callbacks
*
* whenever a new Promise instance is created via `new Promise` or a promise resolves/rejects
* the appropriate callback (if any) gets added here
*/
const promiseCallbacksHeap = /* ... */;

function execTicksAndPromises() {
while (nextTickCallbacks.length || promiseCallbacksHeap.size()) {
// execute all callbacks scheduled with `process.nextTick()`
while (nextTickCallbacks.length) {
const callback = nextTickCallbacks.shift();
callback();
}

// execute all promise-related callbacks
while (promiseCallbacksHeap.size()) {
const callback = promiseCallbacksHeap.pop();
callback();
}
}
}

try {
// execute index.js
require('./index');
execTicksAndPromises();

do {
// timers phase: executes all elapsed timers
getElapsedTimerCallbacks(timersHeap).forEach(callback => {
callback();
execTicksAndPromises();
});

// pending callbacks phase: executes some system operations (like `TCP errors`) that are not
// executed in the poll phase
getPendingCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
})

// poll phase: gets completed non-blocking I/O events or thread pool tasks and invokes the
// corresponding callbacks; if there are none and there's no pending immediates,
// it blocks waiting for events/completed tasks for a maximum of `maxWait`
const maxWait = computeWhenNextTimerElapses(timersHeap);
pollForEventsFromKernelOrThreadPool(maxWait, immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// check phase: execute available immediates; if an immediate callback invokes `setImmediate()`
// it will be invoked on the next event loop iteration
getImmediateCallbacks(immediates).forEach(callback => {
callback();
execTicksAndPromises();
});

// close callbacks phase: execute special `.on('close')` callbacks
getCloseCallbacks().forEach(callback => {
callback();
execTicksAndPromises();
});

if (refs === 0) {
// listeners of this event may execute code that increments `refs`
process.emit('beforeExit');
}
} while (refs > 0);
} catch (err) {
if (!process.listenerCount('uncaughtException')) {
// default behavior: print stack and exit with status code 1
console.error(err.stack);
process.exit(1);
} else {
// there are listeners: emit the event and exit using `process.exitCode || 0`
process.emit('uncaughtException');
process.exit();
}
}

Node.js 中的 AWS Lambda 函数处理程序类型

Node.js 中的 AWS Lambda 处理程序有两种形式

非异步处理程序(即 callback

module.exports.handler = function (event, context, callback) {
try {
doSomething();
callback(null, 'Hello World!'); // Lambda returns "Hello World!"
} catch (err) {
// try/catch is not required, uncaught exceptions invoke `callback(err)` implicitly
callback(err); // Lambda fails with `err`
}
};

异步处理程序(即使用 async/awaitPromise

// async/await
module.exports.handler = async function (event, context) {
try {
await doSomethingAsync();
return 'Hello World!'; // equivalent of: callback(null, "Hello World!");
} catch (err) {
// try/cath is not required, async functions always return a Promise
throw err; // equivalent of: callback(err);
}
};

// Promise
module.exports.handler = function (event, context) {
/*
* must return a `Promise` to be considered an async handler
*
* an uncaught exception that prevents a `Promise` to be returned
* by the handler will "downgrade" the handler to non-async
*/
return Promise.resolve()
.then(() => doSomethingAsync())
.then(() => 'Hello World!');
};

乍一看,异步与非异步处理程序似乎仅仅是代码风格选择,但两者之间存在根本区别。

  • 在异步处理程序中,Lambda 函数执行在处理程序返回的 Promise 解决或拒绝时结束,而无论事件循环是否为空。
  • 在非异步处理程序中,Lambda 函数执行在以下任一条件发生时结束

为了理清 sequelize 如何受到它的影响,理解这个根本区别非常重要。以下是一些示例,说明了这种区别

// no callback invoked
module.exports.handler = function () {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
};

// callback invoked
module.exports.handler = function (event, context, callback) {
// Lambda finishes AFTER `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
callback(null, 'Hello World!');
};

// callback invoked, context.callbackWaitsForEmptyEventLoop = false
module.exports.handler = function (event, context, callback) {
// Lambda finishes BEFORE `doSomething()` is invoked
context.callbackWaitsForEmptyEventLoop = false;
setTimeout(() => doSomething(), 2000);
setTimeout(() => callback(null, 'Hello World!'), 1000);
};

// async/await
module.exports.handler = async function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return 'Hello World!';
};

// Promise
module.exports.handler = function () {
// Lambda finishes BEFORE `doSomething()` is invoked
setTimeout(() => doSomething(), 1000);
return Promise.resolve('Hello World!');
};

AWS Lambda 执行环境(即容器)

AWS Lambda 函数处理程序由内置或自定义 运行时 调用,这些运行时在执行环境(即容器)中运行,这些环境 可能会或可能不会跨调用重用。容器只能处理 一次一个请求。Lambda 函数的并发调用意味着将为每个并发请求创建一个容器实例。

在实践中,这意味着 Lambda 函数应该设计为无状态的,但开发人员可以使用状态进行缓存。

let sequelize = null;

module.exports.handler = async function () {
/*
* sequelize will already be loaded if the container is re-used
*
* containers are never re-used when a Lambda function's code change
*
* while the time elapsed between Lambda invocations is used as a factor to determine whether
* a container is re-used, no assumptions should be made of when a container is actually re-used
*
* AWS does not publicly document the rules of container re-use "by design" since containers
* can be recycled in response to internal AWS Lambda events (e.g. a Lambda function container
* may be recycled even if the function is constanly invoked)
*/
if (!sequelize) {
sequelize = await loadSequelize();
}

return await doSomethingWithSequelize(sequelize);
};

当 Lambda 函数不等待事件循环为空并且容器被重用时,事件循环将被“暂停”,直到下一个调用发生。例如

let counter = 0;

module.exports.handler = function (event, context, callback) {
/*
* The first invocation (i.e. container initialized) will:
* - log:
* - Fast timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 5XX
* - return: 1
*
* Wait 3 seconds and invoke the Lambda again. The invocation (i.e. container re-used) will:
* - log:
* - Slow timeout invoked. Request id: 00000000-0000-0000-0000-000000000000 | Elapsed ms: 3XXX
* - Fast timeout invoked. Request id: 11111111-1111-1111-1111-111111111111 | Elapsed ms: 5XX
* - return: 3
*/
const now = Date.now();

context.callbackWaitsForEmptyEventLoop = false;

setTimeout(() => {
console.log(
'Slow timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
}, 1000);

setTimeout(() => {
console.log(
'Fast timeout invoked. Request id:',
context.awsRequestId,
'| Elapsed ms:',
Date.now() - now,
);
counter++;
callback(null, counter);
}, 500);
};

AWS Lambda 中的 Sequelize 连接池

sequelize 使用连接池来优化数据库连接的使用。sequelize 使用的连接池使用 setTimeout() 回调(由 Node.js 事件循环处理)实现。

鉴于 AWS Lambda 容器一次处理一个请求的事实,人们可能会倾向于将 sequelize 配置如下

const { Sequelize } = require('sequelize');

const sequelize = new Sequelize(/* (...) */, {
// (...)
pool: { min: 1, max: 1 }
});

此配置可防止 Lambda 容器因过多的连接而压垮数据库服务器(因为每个容器最多占用 1 个连接)。它还确保容器的连接在空闲时不会被垃圾回收,因此在 Lambda 容器被重用时不需要重新建立连接。不幸的是,此配置会带来一系列问题

  1. 等待事件循环为空的 Lambda 将始终超时。sequelize 连接池每 options.pool.evict 毫秒安排一个 setTimeout,直到**所有空闲连接都被关闭**。但是,由于 min 设置为 1,因此池中始终至少有一个空闲连接,导致无限的事件循环。
  2. 一些操作,如 Model.findAndCountAll() 会异步执行多个查询(例如 Model.count()Model.findAll())。使用最多一个连接会强制查询以串行方式执行(而不是使用两个连接并行执行)。虽然这可能是为了保持可管理的数据库连接数量而可以接受的性能折衷方案,但如果查询执行时间超过默认的或配置的 options.pool.acquire 超时时间,长时间运行的查询可能会导致 ConnectionAcquireTimeoutError。这是因为序列化查询将卡在等待池中,直到另一个查询使用的连接被释放。
  3. 如果 AWS Lambda 函数超时(即配置的 AWS Lambda 超时时间已过),则 Node.js 事件循环将被“暂停”,无论其状态如何。这会导致竞争条件,从而导致连接错误。例如,您可能会遇到以下情况:非常昂贵的查询导致 Lambda 函数超时,在昂贵的查询完成并连接返回到池之前,事件循环被“暂停”,并且后续的 Lambda 调用会因 ConnectionAcquireTimeoutError 而失败,如果容器被重复使用,并且连接在 options.pool.acquire 毫秒后没有返回。

您可以尝试使用 { min: 1, max: 2 } 来缓解问题 #2。但是,这仍然会受到问题 #1#3 的困扰,同时还会引入其他问题。

  1. 在事件循环“暂停”之前,连接池驱逐回调执行或两次 Lambda 调用之间的时间超过 options.pool.evict 时间,可能会发生竞争条件。这会导致超时错误、握手错误和其他与连接相关的错误。
  2. 如果您使用诸如 Model.findAndCountAll() 之类的操作,并且底层的 Model.count()Model.findAll() 查询失败,您将无法确保另一个查询在 Lambda 函数执行完成之前已完成执行(并且连接已放回池中)并且事件循环被“暂停”。这会导致连接处于陈旧状态,从而导致 TCP 连接过早关闭和其他与连接相关的错误。

使用 { min: 2, max: 2 } 可以缓解其他问题 #1。但是,该配置仍然受到所有其他问题(原始 #1#3 和新增的 #2)的影响。

详细的竞争条件示例

为了理解这个例子,你需要更多地了解 Lambda 和 sequelize 的某些部分是如何实现的。

nodejs.12x 的内置 AWS Lambda 运行时是在 Node.js 中实现的。您可以通过读取 Node.js Lambda 函数内部 /var/runtime/ 的内容来访问运行时的全部源代码。相关的代码子集如下所示

runtime/Runtime.js

class Runtime {
// (...)

// each iteration is executed in the event loop `check` phase
scheduleIteration() {
setImmediate(() => this.handleOnce().then(/* (...) */));
}

async handleOnce() {
// get next invocation. see: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
let { bodyJson, headers } = await this.client.nextInvocation();

// prepare `context` handler parameter
let invokeContext = new InvokeContext(headers);
invokeContext.updateLoggingContext();

// prepare `callback` handler parameter
let [callback, callbackContext] = CallbackContext.build(
this.client,
invokeContext.invokeId,
this.scheduleIteration.bind(this),
);

try {
// this listener is subscribed to process.on('beforeExit')
// so that when when `context.callbackWaitsForEmptyEventLoop === true`
// the Lambda execution finishes after the event loop is empty
this._setDefaultExitListener(invokeContext.invokeId);

// execute handler
const result = this.handler(
JSON.parse(bodyJson),
invokeContext.attachEnvironmentData(callbackContext),
callback,
);

// finish the execution if the handler is async
if (_isPromise(result)) {
result.then(callbackContext.succeed, callbackContext.fail).catch(callbackContext.fail);
}
} catch (err) {
callback(err);
}
}
}

运行时在初始化代码的末尾安排一个迭代

runtime/index.js

// (...)

new Runtime(client, handler, errorCallbacks).scheduleIteration();

由使用 sequelize 的 Lambda 处理程序调用的所有 SQL 查询最终都使用 Sequelize.prototype.query() 执行。此方法负责从池中获取连接、执行查询以及在查询完成后将连接释放回池。以下代码片段展示了该方法在没有事务的情况下查询逻辑的简化版本

sequelize.js

class Sequelize {
// (...)

query(sql, options) {
// (...)

const connection = await this.connectionManager.getConnection(options);
const query = new this.dialect.Query(connection, this, options);

try {
return await query.run(sql, bindParameters);
} finally {
await this.connectionManager.releaseConnection(connection);
}
}
}

字段 this.connectionManager 是特定于方言的 ConnectionManager 类的实例。所有特定于方言的管理器都继承自抽象的 ConnectionManager 类,该类初始化连接池并将其配置为在每次需要创建新连接时调用特定于方言的类的 connect() 方法。以下代码片段展示了 mysql 方言 connect() 方法的简化版本

mysql/connection-manager.js

class ConnectionManager {
// (...)

async connect(config) {
// (...)
return await new Promise((resolve, reject) => {
// uses mysql2's `new Connection()`
const connection = this.lib.createConnection(connectionConfig);

const errorHandler = e => {
connection.removeListener('connect', connectHandler);
connection.removeListener('error', connectHandler);
reject(e);
};

const connectHandler = () => {
connection.removeListener('error', errorHandler);
resolve(connection);
};

connection.on('error', errorHandler);
connection.once('connect', connectHandler);
});
}
}

字段 this.lib 指向 mysql2,函数 createConnection() 通过创建 Connection 类的实例来创建连接。该类的相关子集如下所示

mysql2/connection.js

class Connection extends EventEmitter {
constructor(opts) {
// (...)

// create Socket
this.stream = /* (...) */;

// when data is received, clear timeout
this.stream.on('data', data => {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.packetParser.execute(data);
});

// (...)

// when handshake is completed, emit the 'connect' event
handshakeCommand.on('end', () => {
this.emit('connect', handshakeCommand.handshake);
});

// set a timeout to trigger if no data is received on the socket
if (this.config.connectTimeout) {
const timeoutHandler = this._handleTimeoutError.bind(this);
this.connectTimeout = Timers.setTimeout(
timeoutHandler,
this.config.connectTimeout
);
}
}

// (...)

_handleTimeoutError() {
if (this.connectTimeout) {
Timers.clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
this.stream.destroy && this.stream.destroy();
const err = new Error('connect ETIMEDOUT');
err.errorno = 'ETIMEDOUT';
err.code = 'ETIMEDOUT';
err.syscall = 'connect';

// this will emit the 'error' event
this._handleNetworkError(err);
}
}

根据前面的代码,以下事件序列展示了如何使用 { min: 1, max: 1 } 的连接池竞争条件会导致 ETIMEDOUT 错误

  1. 接收到 Lambda 调用(新的容器)
    1. 事件循环进入 check 阶段,并调用 runtime/Runtime.jshandleOnce() 方法。
      1. handleOnce() 方法调用 await this.client.nextInvocation() 并等待。
    2. 事件循环跳过 timers 阶段,因为没有待处理的计时器。
    3. 事件循环进入 poll 阶段,handleOnce() 方法继续执行。
    4. 调用 Lambda 处理程序。
    5. Lambda 处理程序调用 Model.count(),它调用 sequelize.jsquery(),它调用 connectionManager.getConnection()
    6. 连接池为 Model.count() 初始化 setTimeout(..., config.pool.acquire) 并调用 mysql/connection-manager.jsconnect() 来创建新的连接。
    7. mysql2/connection.js 创建 TCP 套接字并为使用 ETIMEDOUT 失败连接初始化 setTimeout()
    8. 处理程序返回的 Promise 被拒绝(原因在此处没有详细说明),因此 Lambda 函数执行完成,Node.js 事件循环被“暂停”。
  2. 在调用之间经过足够的时间,以便
    1. config.pool.acquire 计时器到期。
    2. mysql2 连接计时器尚未到期,但几乎到期了(即竞争条件)。
  3. 接收到第二个 Lambda 调用(容器被重复使用)
    1. 事件循环被“恢复”。
    2. 事件循环进入 check 阶段,并调用 runtime/Runtime.jshandleOnce() 方法。
    3. 事件循环进入 timers 阶段,config.pool.acquire 计时器到期,导致上一次调用的 Model.count() Promise 拒绝并出现 ConnectionAcquireTimeoutError
    4. 事件循环进入 poll 阶段,handleOnce() 方法继续执行。
    5. 调用 Lambda 处理程序。
    6. Lambda 处理程序调用 Model.count(),它调用 sequelize.jsquery(),它调用 connectionManager.getConnection()
    7. 连接池为 Model.count() 初始化 setTimeout(..., config.pool.acquire),由于 { max : 1 },它等待挂起的 connect() Promise 完成。
    8. 事件循环跳过 check 阶段,因为没有待处理的立即操作。
    9. 竞争条件:事件循环进入 timers 阶段,mysql2 连接超时到期,导致 ETIMEDOUT 错误,该错误使用 connection.emit('error') 发出。
    10. 发出的事件拒绝了 mysql/connection-manager.jsconnect() 中的 Promise,进而将拒绝的 Promise 转发到 Model.count() 查询的 Promise。
    11. Lambda 函数因 ETIMEDOUT 错误而失败。