Skip to content

Commit

Permalink
feat: scheduler lock for schedule jobs (#214)
Browse files Browse the repository at this point in the history
* feat: scheduler lock for schedule jobs

* fix: tests

* fix: cron job time
  • Loading branch information
h4l-yup committed Mar 12, 2024
1 parent fb14c67 commit 4a2bac5
Show file tree
Hide file tree
Showing 23 changed files with 478 additions and 139 deletions.
2 changes: 2 additions & 0 deletions apps/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { UserModule } from './domains/admin/user/user.module';
import { APIModule } from './domains/api/api.module';
import { HealthModule } from './domains/operation/health/health.module';
import { MigrationModule } from './domains/operation/migration/migration.module';
import { SchedulerLockModule } from './domains/operation/scheduler-lock/scheduler-lock.module';

export const domainModules = [
AuthModule,
Expand All @@ -78,6 +79,7 @@ export const domainModules = [
IssueStatisticsModule,
FeedbackIssueStatisticsModule,
APIModule,
SchedulerLockModule,
] as any[];

@Module({
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/configs/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import { registerAs } from '@nestjs/config';
import Joi from 'joi';
import { v4 as uuidv4 } from 'uuid';

export const appConfigSchema = Joi.object({
APP_PORT: Joi.number().default(4000),
Expand All @@ -25,4 +26,5 @@ export const appConfigSchema = Joi.object({
export const appConfig = registerAs('app', () => ({
port: process.env.APP_PORT,
address: process.env.APP_ADDRESS,
serverId: uuidv4(),
}));
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
import type { MigrationInterface, QueryRunner } from 'typeorm';

export class SchedulerLock1709803978757 implements MigrationInterface {
name = 'SchedulerLock1709803978757';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TABLE \`scheduler_locks\` (\`lock_type\` enum ('FEEDBACK_STATISTICS', 'ISSUE_STATISTICS', 'FEEDBACK_ISSUE_STATISTICS', 'FEEDBACK_COUNT') NOT NULL, \`server_id\` varchar(255) NOT NULL, \`timestamp\` datetime NOT NULL, PRIMARY KEY (\`lock_type\`)) ENGINE=InnoDB`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE \`scheduler_locks\``);
}
}
2 changes: 2 additions & 0 deletions apps/api/src/domains/admin/project/issue/issue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';

import { IssueStatisticsModule } from '@/domains/admin/statistics/issue/issue-statistics.module';
import { SchedulerLockModule } from '@/domains/operation/scheduler-lock/scheduler-lock.module';
import { ProjectEntity } from '../project/project.entity';
import { IssueController } from './issue.controller';
import { IssueEntity } from './issue.entity';
Expand All @@ -26,6 +27,7 @@ import { IssueService } from './issue.service';
imports: [
TypeOrmModule.forFeature([IssueEntity, ProjectEntity]),
IssueStatisticsModule,
SchedulerLockModule,
],
providers: [IssueService],
controllers: [IssueController],
Expand Down
25 changes: 23 additions & 2 deletions apps/api/src/domains/admin/project/issue/issue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { SchedulerRegistry } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
Expand All @@ -28,6 +28,8 @@ import type { TimeRange } from '@/common/dtos';
import { EventTypeEnum } from '@/common/enums';
import type { CountByProjectIdDto } from '@/domains/admin/feedback/dtos';
import { IssueStatisticsService } from '@/domains/admin/statistics/issue/issue-statistics.service';
import { LockTypeEnum } from '@/domains/operation/scheduler-lock/lock-type.enum';
import { SchedulerLockService } from '@/domains/operation/scheduler-lock/scheduler-lock.service';
import { ProjectEntity } from '../project/project.entity';
import type { FindByIssueIdDto, FindIssuesByProjectIdDto } from './dtos';
import { CreateIssueDto, UpdateIssueDto } from './dtos';
Expand All @@ -40,13 +42,15 @@ import { IssueEntity } from './issue.entity';

@Injectable()
export class IssueService {
private logger = new Logger(IssueService.name);
constructor(
@InjectRepository(IssueEntity)
private readonly repository: Repository<IssueEntity>,
@InjectRepository(ProjectEntity)
private readonly projectRepository: Repository<ProjectEntity>,
private readonly issueStatisticsService: IssueStatisticsService,
private readonly schedulerRegistry: SchedulerRegistry,
private readonly schedulerLockService: SchedulerLockService,
private readonly eventEmitter: EventEmitter2,
) {}

Expand Down Expand Up @@ -275,7 +279,24 @@ export class IssueService {
const cronHour = (24 - Number(timezoneOffset.split(':')[0])) % 24;

const job = new CronJob(`30 ${cronHour} * * *`, async () => {
await this.calculateFeedbackCount(id);
if (
await this.schedulerLockService.acquireLock(
LockTypeEnum.FEEDBACK_COUNT,
1000 * 60 * 5,
)
) {
try {
await this.calculateFeedbackCount(id);
} finally {
await this.schedulerLockService.releaseLock(
LockTypeEnum.FEEDBACK_COUNT,
);
}
} else {
this.logger.log({
message: 'Failed to acquire lock for feedback count calculation',
});
}
});
this.schedulerRegistry.addCronJob(`feedback-count-by-issue-${id}`, job);
job.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Repository } from 'typeorm';
import { FeedbackEntity } from '@/domains/admin/feedback/feedback.entity';
import { IssueEntity } from '@/domains/admin/project/issue/issue.entity';
import { ProjectEntity } from '@/domains/admin/project/project/project.entity';
import { SchedulerLockModule } from '@/domains/operation/scheduler-lock/scheduler-lock.module';
import { FeedbackIssueStatisticsController } from './feedback-issue-statistics.controller';
import { FeedbackIssueStatisticsEntity } from './feedback-issue-statistics.entity';
import { FeedbackIssueStatisticsService } from './feedback-issue-statistics.service';
Expand All @@ -32,6 +33,7 @@ import { FeedbackIssueStatisticsService } from './feedback-issue-statistics.serv
IssueEntity,
ProjectEntity,
]),
SchedulerLockModule,
],
exports: [FeedbackIssueStatisticsService],
providers: [FeedbackIssueStatisticsService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,15 @@ describe('FeedbackIssueStatisticsService suite', () => {
jest.spyOn(issueRepo, 'find').mockResolvedValue(issues as IssueEntity[]);
jest.spyOn(feedbackRepo, 'count').mockResolvedValueOnce(0);
jest.spyOn(feedbackRepo, 'count').mockResolvedValue(1);
jest
.spyOn(feedbackIssueStatsRepo, 'createQueryBuilder')
.mockImplementation(() => createQueryBuilder);
jest.spyOn(feedbackIssueStatsRepo.manager, 'transaction');

await feedbackIssueStatsService.createFeedbackIssueStatistics(
projectId,
dayToCreate,
);

expect(feedbackRepo.count).toBeCalledTimes(dayToCreate * issueCount);
expect(feedbackIssueStatsRepo.createQueryBuilder).toBeCalledTimes(
dayToCreate * issueCount - 1,
expect(feedbackIssueStatsRepo.manager.transaction).toBeCalledTimes(
dayToCreate * issueCount,
);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import { Transactional } from 'typeorm-transactional';
import { FeedbackEntity } from '@/domains/admin/feedback/feedback.entity';
import { IssueEntity } from '@/domains/admin/project/issue/issue.entity';
import { ProjectEntity } from '@/domains/admin/project/project/project.entity';
import { LockTypeEnum } from '@/domains/operation/scheduler-lock/lock-type.enum';
import { SchedulerLockService } from '@/domains/operation/scheduler-lock/scheduler-lock.service';
import { getIntervalDatesInFormat } from '../utils/util-functions';
import { UpdateFeedbackCountDto } from './dtos';
import type { GetCountByDateByIssueDto } from './dtos';
Expand All @@ -46,6 +48,7 @@ export class FeedbackIssueStatisticsService {
@InjectRepository(ProjectEntity)
private readonly projectRepository: Repository<ProjectEntity>,
private readonly schedulerRegistry: SchedulerRegistry,
private readonly schedulerLockService: SchedulerLockService,
) {}

async getCountByDateByIssue(dto: GetCountByDateByIssueDto) {
Expand Down Expand Up @@ -115,7 +118,24 @@ export class FeedbackIssueStatisticsService {
const timezoneOffset = timezone.offset;
const cronHour = (24 - Number(timezoneOffset.split(':')[0])) % 24;
const job = new CronJob(`2 ${cronHour} * * *`, async () => {
await this.createFeedbackIssueStatistics(projectId, 365);
if (
await this.schedulerLockService.acquireLock(
LockTypeEnum.FEEDBACK_ISSUE_STATISTICS,
1000 * 60 * 5,
)
) {
try {
await this.createFeedbackIssueStatistics(projectId, 365);
} finally {
await this.schedulerLockService.releaseLock(
LockTypeEnum.FEEDBACK_ISSUE_STATISTICS,
);
}
} else {
this.logger.log({
message: 'Failed to acquire lock for feedback count calculation',
});
}
});
this.schedulerRegistry.addCronJob(
`feedback-issue-statistics-${projectId}`,
Expand All @@ -126,7 +146,6 @@ export class FeedbackIssueStatisticsService {
this.logger.log(`feedback-issue-statistics-${projectId} cron job started`);
}

@Transactional()
async createFeedbackIssueStatistics(
projectId: number,
dayToCreate: number = 1,
Expand All @@ -144,48 +163,58 @@ export class FeedbackIssueStatisticsService {

for (let day = 1; day <= dayToCreate; day++) {
for (const issue of issues) {
const feedbackCount = await this.feedbackRepository.count({
where: {
issues: { id: issue.id },
createdAt: Between(
DateTime.utc()
.minus({ days: day })
.startOf('day')
.minus({ hours: offset })
.toJSDate(),
DateTime.utc()
.minus({ days: day })
.endOf('day')
.minus({ hours: offset })
.toJSDate(),
),
},
});

if (feedbackCount === 0) continue;

await this.repository
.createQueryBuilder()
.insert()
.values({
date:
offset >= 0
? DateTime.utc()
await this.repository.manager
.transaction(async (transactionalEntityManager) => {
const feedbackCount = await this.feedbackRepository.count({
where: {
issues: { id: issue.id },
createdAt: Between(
DateTime.utc()
.minus({ days: day })
.endOf('day')
.startOf('day')
.minus({ hours: offset })
.toFormat('yyyy-MM-dd')
: DateTime.utc()
.toJSDate(),
DateTime.utc()
.minus({ days: day })
.startOf('day')
.endOf('day')
.minus({ hours: offset })
.toFormat('yyyy-MM-dd'),
issue: { id: issue.id },
feedbackCount,
.toJSDate(),
),
},
});

if (feedbackCount === 0) return;

await transactionalEntityManager
.createQueryBuilder()
.insert()
.into(FeedbackIssueStatisticsEntity)
.values({
date:
offset >= 0
? DateTime.utc()
.minus({ days: day })
.endOf('day')
.minus({ hours: offset })
.toFormat('yyyy-MM-dd')
: DateTime.utc()
.minus({ days: day })
.startOf('day')
.minus({ hours: offset })
.toFormat('yyyy-MM-dd'),
issue: { id: issue.id },
feedbackCount,
})
.orUpdate(['feedback_count'], ['date', 'issue'])
.updateEntity(false)
.execute();
})
.orUpdate(['feedback_count'], ['date', 'issue'])
.updateEntity(false)
.execute();
.catch((error) => {
this.logger.error({
message: 'Failed to create feedback issue statistics',
error,
});
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ChannelEntity } from '@/domains/admin/channel/channel/channel.entity';
import { FeedbackEntity } from '@/domains/admin/feedback/feedback.entity';
import { IssueEntity } from '@/domains/admin/project/issue/issue.entity';
import { ProjectEntity } from '@/domains/admin/project/project/project.entity';
import { SchedulerLockModule } from '@/domains/operation/scheduler-lock/scheduler-lock.module';
import { FeedbackStatisticsController } from './feedback-statistics.controller';
import { FeedbackStatisticsEntity } from './feedback-statistics.entity';
import { FeedbackStatisticsService } from './feedback-statistics.service';
Expand All @@ -34,6 +35,7 @@ import { FeedbackStatisticsService } from './feedback-statistics.service';
ChannelEntity,
ProjectEntity,
]),
SchedulerLockModule,
],
exports: [FeedbackStatisticsService],
providers: [FeedbackStatisticsService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,18 +320,15 @@ describe('FeedbackStatisticsService suite', () => {
.mockResolvedValue(channels as ChannelEntity[]);
jest.spyOn(feedbackRepo, 'count').mockResolvedValueOnce(0);
jest.spyOn(feedbackRepo, 'count').mockResolvedValue(1);
jest
.spyOn(feedbackStatsRepo, 'createQueryBuilder')
.mockImplementation(() => createQueryBuilder);
jest.spyOn(feedbackStatsRepo.manager, 'transaction');

await feedbackStatsService.createFeedbackStatistics(
projectId,
dayToCreate,
);

expect(feedbackRepo.count).toBeCalledTimes(dayToCreate * channelCount);
expect(feedbackStatsRepo.createQueryBuilder).toBeCalledTimes(
dayToCreate * channelCount - 1,
expect(feedbackStatsRepo.manager.transaction).toBeCalledTimes(
dayToCreate * channelCount,
);
});
});
Expand Down
Loading

0 comments on commit 4a2bac5

Please sign in to comment.