mirror of
https://github.com/Unleash/unleash.git
synced 2025-01-25 00:07:47 +01:00
feat: make edge use token's cache (#6893)
## About the changes This PR removes the feature flag `queryMissingTokens` that was fully rolled out. It introduces a new way of checking edgeValidTokens controlled by the flag `checkEdgeValidTokensFromCache` that relies in the cached data but hits the DB if needed. The assumption is that most of the times edge will find tokens in the cache, except for a few cases in which a new token is queried. From all tokens we expect at most one to hit the DB and in this case querying a single token should be better than querying all the tokens.
This commit is contained in:
parent
ff6297d338
commit
126b78896e
@ -194,10 +194,12 @@ export class ApiTokenStore implements IApiTokenStore {
|
||||
}
|
||||
|
||||
async get(key: string): Promise<IApiToken> {
|
||||
const stopTimer = this.timer('get-by-secret');
|
||||
const row = await this.makeTokenProjectQuery().where(
|
||||
'tokens.secret',
|
||||
key,
|
||||
);
|
||||
stopTimer();
|
||||
return toTokens(row)[0];
|
||||
}
|
||||
|
||||
|
@ -49,7 +49,7 @@ export const scheduleServices = async (
|
||||
apiTokenService.fetchActiveTokens.bind(apiTokenService),
|
||||
minutesToMilliseconds(1),
|
||||
'fetchActiveTokens',
|
||||
0,
|
||||
0, // no jitter, we need tokens at startup
|
||||
);
|
||||
|
||||
schedulerService.schedule(
|
||||
|
@ -2,7 +2,6 @@ import { ApiTokenType } from '../types/models/api-token';
|
||||
import type { IUnleashConfig } from '../types/option';
|
||||
import type { IApiRequest, IAuthRequest } from '../routes/unleash-types';
|
||||
import type { IUnleashServices } from '../server-impl';
|
||||
import type { IFlagContext } from '../types';
|
||||
|
||||
const isClientApi = ({ path }) => {
|
||||
return path && path.indexOf('/api/client') > -1;
|
||||
@ -27,20 +26,6 @@ const isProxyApi = ({ path }) => {
|
||||
);
|
||||
};
|
||||
|
||||
const contextFrom = (
|
||||
req: IAuthRequest<any, any, any, any> | IApiRequest<any, any, any, any>,
|
||||
): IFlagContext | undefined => {
|
||||
// this is what we'd get from edge:
|
||||
// req_path: '/api/client/features',
|
||||
// req_user_agent: 'unleash-edge-16.0.4'
|
||||
return {
|
||||
reqPath: req.path,
|
||||
reqUserAgent: req.get ? req.get('User-Agent') ?? '' : '',
|
||||
reqAppName:
|
||||
req.headers?.['unleash-appname'] ?? req.query?.appName ?? '',
|
||||
};
|
||||
};
|
||||
|
||||
export const TOKEN_TYPE_ERROR_MESSAGE =
|
||||
'invalid token: expected a different token type for this endpoint';
|
||||
|
||||
@ -70,10 +55,7 @@ const apiAccessMiddleware = (
|
||||
const apiToken = req.header('authorization');
|
||||
if (!apiToken?.startsWith('user:')) {
|
||||
const apiUser = apiToken
|
||||
? await apiTokenService.getUserForToken(
|
||||
apiToken,
|
||||
contextFrom(req),
|
||||
)
|
||||
? await apiTokenService.getUserForToken(apiToken)
|
||||
: undefined;
|
||||
const { CLIENT, FRONTEND } = ApiTokenType;
|
||||
|
||||
|
@ -197,17 +197,17 @@ test('getUserForToken should get a user with admin token user id and token name'
|
||||
expect(user!.internalAdminTokenUserId).toBe(ADMIN_TOKEN_USER.id);
|
||||
});
|
||||
|
||||
describe('When token is added by another instance', () => {
|
||||
const setup = (options?: IUnleashOptions) => {
|
||||
const token: IApiTokenCreate = {
|
||||
environment: 'default',
|
||||
projects: ['*'],
|
||||
secret: '*:*:some-random-string',
|
||||
type: ApiTokenType.CLIENT,
|
||||
tokenName: 'new-token-by-another-instance',
|
||||
expiresAt: undefined,
|
||||
};
|
||||
describe('API token getTokenWithCache', () => {
|
||||
const token: IApiTokenCreate = {
|
||||
environment: 'default',
|
||||
projects: ['*'],
|
||||
secret: '*:*:some-random-string',
|
||||
type: ApiTokenType.CLIENT,
|
||||
tokenName: 'new-token-by-another-instance',
|
||||
expiresAt: undefined,
|
||||
};
|
||||
|
||||
const setup = (options?: IUnleashOptions) => {
|
||||
const config: IUnleashConfig = createTestConfig(options);
|
||||
const apiTokenStore = new FakeApiTokenStore();
|
||||
const environmentStore = new FakeEnvironmentStore();
|
||||
@ -220,60 +220,43 @@ describe('When token is added by another instance', () => {
|
||||
return {
|
||||
apiTokenService,
|
||||
apiTokenStore,
|
||||
token,
|
||||
};
|
||||
};
|
||||
test('should not return the token when query db flag is disabled', async () => {
|
||||
const { apiTokenService, apiTokenStore, token } = setup();
|
||||
|
||||
// simulate this token being inserted by another instance (apiTokenService does not know about it)
|
||||
test('should return the token and perform only one db query', async () => {
|
||||
const { apiTokenService, apiTokenStore } = setup();
|
||||
const apiTokenStoreGet = jest.spyOn(apiTokenStore, 'get');
|
||||
|
||||
// valid token not present in cache (could be inserted by another instance)
|
||||
apiTokenStore.insert(token);
|
||||
|
||||
const found = await apiTokenService.getUserForToken(token.secret);
|
||||
expect(found).toBeUndefined();
|
||||
});
|
||||
|
||||
test('should return the token when query db flag is enabled', async () => {
|
||||
const { apiTokenService, apiTokenStore, token } = setup({
|
||||
experimental: {
|
||||
flags: {
|
||||
queryMissingTokens: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// simulate this token being inserted by another instance (apiTokenService does not know about it)
|
||||
apiTokenStore.insert(token);
|
||||
|
||||
const found = await apiTokenService.getUserForToken(token.secret);
|
||||
expect(found).toBeDefined();
|
||||
expect(found?.username).toBe(token.tokenName);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const found = await apiTokenService.getTokenWithCache(token.secret);
|
||||
expect(found).toBeDefined();
|
||||
expect(found?.tokenName).toBe(token.tokenName);
|
||||
expect(found?.createdAt).toBeDefined();
|
||||
}
|
||||
expect(apiTokenStoreGet).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
test('should query the db only once for invalid tokens', async () => {
|
||||
jest.useFakeTimers();
|
||||
const { apiTokenService, apiTokenStore } = setup({
|
||||
experimental: {
|
||||
flags: {
|
||||
queryMissingTokens: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
const { apiTokenService, apiTokenStore } = setup();
|
||||
const apiTokenStoreGet = jest.spyOn(apiTokenStore, 'get');
|
||||
|
||||
const invalidToken = 'invalid-token';
|
||||
for (let i = 0; i < 10; i++) {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(
|
||||
await apiTokenService.getUserForToken(invalidToken),
|
||||
await apiTokenService.getTokenWithCache(invalidToken),
|
||||
).toBeUndefined();
|
||||
}
|
||||
expect(apiTokenStoreGet).toHaveBeenCalledTimes(1);
|
||||
|
||||
// after more than 5 minutes we should be able to query again
|
||||
jest.advanceTimersByTime(minutesToMilliseconds(6));
|
||||
for (let i = 0; i < 10; i++) {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(
|
||||
await apiTokenService.getUserForToken(invalidToken),
|
||||
await apiTokenService.getTokenWithCache(invalidToken),
|
||||
).toBeUndefined();
|
||||
}
|
||||
expect(apiTokenStoreGet).toHaveBeenCalledTimes(2);
|
||||
|
@ -25,13 +25,14 @@ import {
|
||||
ApiTokenDeletedEvent,
|
||||
ApiTokenUpdatedEvent,
|
||||
type IAuditUser,
|
||||
type IFlagContext,
|
||||
type IFlagResolver,
|
||||
SYSTEM_USER_AUDIT,
|
||||
} from '../types';
|
||||
import { omitKeys } from '../util';
|
||||
import type EventService from '../features/events/event-service';
|
||||
import { addMinutes, isPast } from 'date-fns';
|
||||
import metricsHelper from '../util/metrics-helper';
|
||||
import { FUNCTION_TIME } from '../metric-events';
|
||||
|
||||
const resolveTokenPermissions = (tokenType: string) => {
|
||||
if (tokenType === ApiTokenType.ADMIN) {
|
||||
@ -60,14 +61,14 @@ export class ApiTokenService {
|
||||
|
||||
private queryAfter = new Map<string, Date>();
|
||||
|
||||
private initialized = false;
|
||||
|
||||
private eventService: EventService;
|
||||
|
||||
private lastSeenSecrets: Set<string> = new Set<string>();
|
||||
|
||||
private flagResolver: IFlagResolver;
|
||||
|
||||
private timer: Function;
|
||||
|
||||
constructor(
|
||||
{
|
||||
apiTokenStore,
|
||||
@ -75,7 +76,7 @@ export class ApiTokenService {
|
||||
}: Pick<IUnleashStores, 'apiTokenStore' | 'environmentStore'>,
|
||||
config: Pick<
|
||||
IUnleashConfig,
|
||||
'getLogger' | 'authentication' | 'flagResolver'
|
||||
'getLogger' | 'authentication' | 'flagResolver' | 'eventBus'
|
||||
>,
|
||||
eventService: EventService,
|
||||
) {
|
||||
@ -94,18 +95,21 @@ export class ApiTokenService {
|
||||
this.initApiTokens(config.authentication.initApiTokens),
|
||||
);
|
||||
}
|
||||
this.timer = (functionName: string) =>
|
||||
metricsHelper.wrapTimer(config.eventBus, FUNCTION_TIME, {
|
||||
className: 'ApiTokenService',
|
||||
functionName,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executed by a scheduler to refresh all active tokens
|
||||
* Called by a scheduler without jitter to refresh all active tokens
|
||||
*/
|
||||
async fetchActiveTokens(): Promise<void> {
|
||||
try {
|
||||
this.activeTokens = await this.store.getAllActive();
|
||||
this.initialized = true;
|
||||
} finally {
|
||||
// biome-ignore lint/correctness/noUnsafeFinally: We ignored this for eslint. Leaving this here for now, server-impl test fails without it
|
||||
return;
|
||||
} catch (e) {
|
||||
this.logger.warn('Failed to fetch active tokens', e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,50 +117,7 @@ export class ApiTokenService {
|
||||
return this.store.get(secret);
|
||||
}
|
||||
|
||||
async updateLastSeen(): Promise<void> {
|
||||
if (this.lastSeenSecrets.size > 0) {
|
||||
const toStore = [...this.lastSeenSecrets];
|
||||
this.lastSeenSecrets = new Set<string>();
|
||||
await this.store.markSeenAt(toStore);
|
||||
}
|
||||
}
|
||||
|
||||
public async getAllTokens(): Promise<IApiToken[]> {
|
||||
return this.store.getAll();
|
||||
}
|
||||
|
||||
public async getAllActiveTokens(): Promise<IApiToken[]> {
|
||||
if (this.flagResolver.isEnabled('useMemoizedActiveTokens')) {
|
||||
if (!this.initialized) {
|
||||
// unlikely this will happen but nice to have a fail safe
|
||||
this.logger.info('Fetching active tokens before initialized');
|
||||
await this.fetchActiveTokens();
|
||||
}
|
||||
return this.activeTokens;
|
||||
} else {
|
||||
return this.store.getAllActive();
|
||||
}
|
||||
}
|
||||
|
||||
private async initApiTokens(tokens: ILegacyApiTokenCreate[]) {
|
||||
const tokenCount = await this.store.count();
|
||||
if (tokenCount > 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const createAll = tokens
|
||||
.map(mapLegacyTokenWithSecret)
|
||||
.map((t) => this.insertNewApiToken(t, SYSTEM_USER_AUDIT));
|
||||
await Promise.all(createAll);
|
||||
} catch (e) {
|
||||
this.logger.error('Unable to create initial Admin API tokens');
|
||||
}
|
||||
}
|
||||
|
||||
public async getUserForToken(
|
||||
secret: string,
|
||||
flagContext?: IFlagContext, // temporarily added, expected from the middleware
|
||||
): Promise<IApiUser | undefined> {
|
||||
async getTokenWithCache(secret: string): Promise<IApiToken | undefined> {
|
||||
if (!secret) {
|
||||
return undefined;
|
||||
}
|
||||
@ -178,11 +139,7 @@ export class ApiTokenService {
|
||||
}
|
||||
|
||||
const nextAllowedQuery = this.queryAfter.get(secret) ?? 0;
|
||||
if (
|
||||
!token &&
|
||||
isPast(nextAllowedQuery) &&
|
||||
this.flagResolver.isEnabled('queryMissingTokens', flagContext)
|
||||
) {
|
||||
if (!token && isPast(nextAllowedQuery)) {
|
||||
if (this.queryAfter.size > 1000) {
|
||||
// establish a max limit for queryAfter size to prevent memory leak
|
||||
this.queryAfter.clear();
|
||||
@ -190,12 +147,52 @@ export class ApiTokenService {
|
||||
// prevent querying the same invalid secret multiple times. Expire after 5 minutes
|
||||
this.queryAfter.set(secret, addMinutes(new Date(), 5));
|
||||
|
||||
const stopCacheTimer = this.timer('getTokenWithCache.query');
|
||||
token = await this.store.get(secret);
|
||||
if (token) {
|
||||
this.activeTokens.push(token);
|
||||
}
|
||||
stopCacheTimer();
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
async updateLastSeen(): Promise<void> {
|
||||
if (this.lastSeenSecrets.size > 0) {
|
||||
const toStore = [...this.lastSeenSecrets];
|
||||
this.lastSeenSecrets = new Set<string>();
|
||||
await this.store.markSeenAt(toStore);
|
||||
}
|
||||
}
|
||||
|
||||
public async getAllTokens(): Promise<IApiToken[]> {
|
||||
return this.store.getAll();
|
||||
}
|
||||
|
||||
public async getAllActiveTokens(): Promise<IApiToken[]> {
|
||||
return this.store.getAllActive();
|
||||
}
|
||||
|
||||
private async initApiTokens(tokens: ILegacyApiTokenCreate[]) {
|
||||
const tokenCount = await this.store.count();
|
||||
if (tokenCount > 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const createAll = tokens
|
||||
.map(mapLegacyTokenWithSecret)
|
||||
.map((t) => this.insertNewApiToken(t, SYSTEM_USER_AUDIT));
|
||||
await Promise.all(createAll);
|
||||
} catch (e) {
|
||||
this.logger.error('Unable to create initial Admin API tokens');
|
||||
}
|
||||
}
|
||||
|
||||
public async getUserForToken(
|
||||
secret: string,
|
||||
): Promise<IApiUser | undefined> {
|
||||
const token = await this.getTokenWithCache(secret);
|
||||
if (token) {
|
||||
this.lastSeenSecrets.add(token.secret);
|
||||
const apiUser: IApiUser = new ApiUser({
|
||||
|
@ -1,39 +1,83 @@
|
||||
import type { IUnleashConfig } from '../types';
|
||||
import type { IFlagResolver, IUnleashConfig } from '../types';
|
||||
import type { Logger } from '../logger';
|
||||
import type { EdgeTokenSchema } from '../openapi/spec/edge-token-schema';
|
||||
import { constantTimeCompare } from '../util/constantTimeCompare';
|
||||
import type { ValidatedEdgeTokensSchema } from '../openapi/spec/validated-edge-tokens-schema';
|
||||
import type { ApiTokenService } from './api-token-service';
|
||||
import metricsHelper from '../util/metrics-helper';
|
||||
import { FUNCTION_TIME } from '../metric-events';
|
||||
|
||||
export default class EdgeService {
|
||||
private logger: Logger;
|
||||
|
||||
private apiTokenService: ApiTokenService;
|
||||
|
||||
private flagResolver: IFlagResolver;
|
||||
|
||||
private timer: Function;
|
||||
|
||||
constructor(
|
||||
{ apiTokenService }: { apiTokenService: ApiTokenService },
|
||||
{ getLogger }: Pick<IUnleashConfig, 'getLogger'>,
|
||||
{
|
||||
getLogger,
|
||||
flagResolver,
|
||||
eventBus,
|
||||
}: Pick<IUnleashConfig, 'getLogger' | 'flagResolver' | 'eventBus'>,
|
||||
) {
|
||||
this.logger = getLogger('lib/services/edge-service.ts');
|
||||
this.apiTokenService = apiTokenService;
|
||||
this.flagResolver = flagResolver;
|
||||
this.timer = (functionName: string) =>
|
||||
metricsHelper.wrapTimer(eventBus, FUNCTION_TIME, {
|
||||
className: 'EdgeService',
|
||||
functionName,
|
||||
});
|
||||
}
|
||||
|
||||
async getValidTokens(tokens: string[]): Promise<ValidatedEdgeTokensSchema> {
|
||||
const activeTokens = await this.apiTokenService.getAllActiveTokens();
|
||||
const edgeTokens = tokens.reduce((result: EdgeTokenSchema[], token) => {
|
||||
const dbToken = activeTokens.find((activeToken) =>
|
||||
constantTimeCompare(activeToken.secret, token),
|
||||
);
|
||||
if (dbToken) {
|
||||
result.push({
|
||||
token: token,
|
||||
type: dbToken.type,
|
||||
projects: dbToken.projects,
|
||||
});
|
||||
if (this.flagResolver.isEnabled('checkEdgeValidTokensFromCache')) {
|
||||
const stopTimer = this.timer('validateTokensWithCache');
|
||||
// new behavior: use cached tokens when possible
|
||||
// use the db to fetch the missing ones
|
||||
// cache stores both missing and active so we don't hammer the db
|
||||
const validatedTokens: EdgeTokenSchema[] = [];
|
||||
for (const token of tokens) {
|
||||
const found =
|
||||
await this.apiTokenService.getTokenWithCache(token);
|
||||
if (found) {
|
||||
validatedTokens.push({
|
||||
token: token,
|
||||
type: found.type,
|
||||
projects: found.projects,
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}, []);
|
||||
return { tokens: edgeTokens };
|
||||
stopTimer();
|
||||
return { tokens: validatedTokens };
|
||||
} else {
|
||||
// old behavior: go to the db to fetch all tokens and then filter in memory
|
||||
const stopTimer = this.timer('validateTokensWithoutCache');
|
||||
const activeTokens =
|
||||
await this.apiTokenService.getAllActiveTokens();
|
||||
const edgeTokens = tokens.reduce(
|
||||
(result: EdgeTokenSchema[], token) => {
|
||||
const dbToken = activeTokens.find((activeToken) =>
|
||||
constantTimeCompare(activeToken.secret, token),
|
||||
);
|
||||
if (dbToken) {
|
||||
result.push({
|
||||
token: token,
|
||||
type: dbToken.type,
|
||||
projects: dbToken.projects,
|
||||
});
|
||||
}
|
||||
return result;
|
||||
},
|
||||
[],
|
||||
);
|
||||
stopTimer();
|
||||
return { tokens: edgeTokens };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,7 @@ export type IFlagKey =
|
||||
| 'displayTrafficDataUsage'
|
||||
| 'useMemoizedActiveTokens'
|
||||
| 'queryMissingTokens'
|
||||
| 'checkEdgeValidTokensFromCache'
|
||||
| 'userAccessUIEnabled'
|
||||
| 'disableUpdateMaxRevisionId'
|
||||
| 'disablePublishUnannouncedEvents'
|
||||
|
Loading…
Reference in New Issue
Block a user