Skip to content

Commit

Permalink
feat(core): docs search service
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Jun 14, 2024
1 parent 74063eb commit c80be85
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Doc as YDoc } from 'yjs';
import { Entity } from '../../../framework';
import { AwarenessEngine, BlobEngine, DocEngine } from '../../../sync';
import { throwIfAborted } from '../../../utils';
import { WorkspaceEngineBeforeStart } from '../events';
import type { WorkspaceEngineProvider } from '../providers/flavour';
import type { WorkspaceService } from '../services/workspace';

Expand Down Expand Up @@ -33,6 +34,7 @@ export class WorkspaceEngine extends Entity<{
}

start() {
this.eventBus.emit(WorkspaceEngineBeforeStart, this);
this.doc.start();
this.awareness.connect(this.workspaceService.workspace.awareness);
this.blob.start();
Expand Down
6 changes: 6 additions & 0 deletions packages/common/infra/src/modules/workspace/events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { createEvent } from '../../../framework';
import type { WorkspaceEngine } from '../entities/engine';

export const WorkspaceEngineBeforeStart = createEvent<WorkspaceEngine>(
'WorkspaceEngineBeforeStart'
);
1 change: 1 addition & 0 deletions packages/common/infra/src/modules/workspace/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export type { WorkspaceProfileInfo } from './entities/profile';
export { Workspace } from './entities/workspace';
export { WorkspaceEngineBeforeStart } from './events';
export { globalBlockSuiteSchema } from './global-schema';
export type { WorkspaceMetadata } from './metadata';
export type { WorkspaceOpenOptions } from './open-options';
Expand Down
7 changes: 4 additions & 3 deletions packages/common/infra/src/sync/doc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export {
} from './storage';

export class DocEngine {
clientId: string;
localPart: DocEngineLocalPart;
remotePart: DocEngineRemotePart | null;

Expand Down Expand Up @@ -67,11 +68,11 @@ export class DocEngine {
storage: DocStorage,
private readonly server?: DocServer | null
) {
const clientId = nanoid();
this.clientId = nanoid();
this.storage = new DocStorageInner(storage);
this.localPart = new DocEngineLocalPart(clientId, this.storage);
this.localPart = new DocEngineLocalPart(this.clientId, this.storage);
this.remotePart = this.server
? new DocEngineRemotePart(clientId, this.storage, this.server)
? new DocEngineRemotePart(this.clientId, this.storage, this.server)
: null;
}

Expand Down
8 changes: 8 additions & 0 deletions packages/common/infra/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ export type { BlobStatus, BlobStorage } from './blob/blob';
export { BlobEngine, EmptyBlobStorage } from './blob/blob';
export { BlobStorageOverCapacity } from './blob/error';
export * from './doc';
export * from './indexer';
export {
IndexedDBIndex,
IndexedDBIndexStorage,
} from './indexer/impl/indexeddb';
export { MemoryIndex, MemoryIndexStorage } from './indexer/impl/memory';
export * from './job';
export { IndexedDBJobQueue } from './job/impl/indexeddb';
3 changes: 3 additions & 0 deletions packages/frontend/core/src/modules/docs-search/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# docs-search

This module is responsible for subscribing to updates from the doc engine and writing the doc content into the indexer, providing search and aggregation capabilities.
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
import { DebugLogger } from '@affine/debug';
import type { Job, WorkspaceService } from '@toeverything/infra';
import {
Document,
Entity,
IndexedDBIndexStorage,
IndexedDBJobQueue,
JobRunner,
} from '@toeverything/infra';
import { difference } from 'lodash-es';
import type { Array as YArray, Map as YMap } from 'yjs';
import { applyUpdate, Doc as YDoc } from 'yjs';

import { blockIndexSchema, docIndexSchema } from '../schema';

const logger = new DebugLogger('crawler');

interface IndexerJobPayload {
docId: string;
}

export class DocsIndexer extends Entity {
private readonly jobQueue = new IndexedDBJobQueue<IndexerJobPayload>(
'jq:' + this.workspaceService.workspace.id
);

private readonly runner = new JobRunner(
this.jobQueue,
(jobs, signal) => this.execJob(jobs, signal),
() => new Promise<void>(resolve => requestIdleCallback(() => resolve()))
);

private readonly indexStorage = new IndexedDBIndexStorage(
'idx:' + this.workspaceService.workspace.id
);

readonly docIndex = this.indexStorage.getIndex('doc', docIndexSchema);

readonly blockIndex = this.indexStorage.getIndex('block', blockIndexSchema);

private readonly workspaceEngine = this.workspaceService.workspace.engine;

private readonly workspaceId = this.workspaceService.workspace.id;

constructor(private readonly workspaceService: WorkspaceService) {
super();
}

setupListener() {
this.workspaceEngine.doc.storage.eventBus.on(event => {
if (event.clientId === this.workspaceEngine.doc.clientId) {
const docId = event.docId;

this.jobQueue
.enqueue([
{
batchKey: docId,
payload: { docId },
},
])
.catch(err => {
console.error('Error enqueueing job', err);
});
}
});
}

async execJob(jobs: Job<IndexerJobPayload>[], _signal: AbortSignal) {
if (jobs.length === 0) {
return;
}

// jobs should have the same docId, so we just pick the first one
const docId = jobs[0].payload.docId;

logger.debug('Start crawling job for docId:', docId);

if (docId) {
if (docId === this.workspaceId) {
await this.crawlingRootDocData();
} else {
await this.crawlingDocData(docId);
}
}
}

startCrawling() {
this.runner.start();
this.jobQueue
.enqueue([
{
batchKey: this.workspaceId,
payload: { docId: this.workspaceId },
},
])
.catch(err => {
console.error('Error enqueueing job', err);
});
}

async crawlingDocData(docId: string) {
const rootDocBuffer =
await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId);

if (!rootDocBuffer) {
return;
}

const yRootDoc = new YDoc();
applyUpdate(yRootDoc, rootDocBuffer);

const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs())
.map(doc => doc.guid)
.filter(id => id.endsWith(docId));

let docBuffer;

for (const id of docStoragePossibleIds) {
docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id);
}

if (!docBuffer) {
return;
}

const ydoc = new YDoc();

applyUpdate(ydoc, docBuffer);

let docExists: boolean | null = null;

(
yRootDoc.getMap('meta').get('pages') as YArray<YMap<any>> | undefined
)?.forEach(page => {
if (page.get('id') === docId) {
docExists = !(page.get('trash') ?? false);
}
});

if (!docExists) {
const indexWriter = await this.docIndex.write();
indexWriter.delete(docId);
await indexWriter.commit();

const blockIndexWriter = await this.blockIndex.write();
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
blockIndexWriter.delete(block.id);
}
await blockIndexWriter.commit();
} else {
const blocks = ydoc.getMap<any>('blocks');

if (blocks.size === 0) {
return;
}

let docTitle = '';

const blockDocuments: Document<typeof blockIndexSchema>[] = [];

for (const block of blocks.values()) {
const flavour = block.get('sys:flavour')?.toString();
const blockId = block.get('sys:id')?.toString();

if (!flavour || !blockId) {
continue;
}

if (flavour === 'affine:page') {
docTitle = block.get('prop:title').toString();
blockDocuments.push(
Document.from(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: docTitle,
})
);
}

if (flavour === 'affine:paragraph') {
blockDocuments.push(
Document.from(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: block.get('prop:text')?.toString(),
})
);
}
}

const docIndexWriter = await this.docIndex.write();
docIndexWriter.put(
Document.from<typeof docIndexSchema>(docId, {
title: docTitle,
})
);
await docIndexWriter.commit();

const blockIndexWriter = await this.blockIndex.write();
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
blockIndexWriter.delete(block.id);
}
for (const block of blockDocuments) {
blockIndexWriter.insert(block);
}
await blockIndexWriter.commit();
}
}

async crawlingRootDocData() {
const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(
this.workspaceId
);
if (!buffer) {
return;
}

const ydoc = new YDoc();

applyUpdate(ydoc, buffer);

const docs = ydoc.getMap('meta').get('pages') as
| YArray<YMap<any>>
| undefined;

if (!docs) {
return;
}

const availableDocs = [];

for (const page of docs) {
const docId = page.get('id');

if (typeof docId !== 'string') {
continue;
}

const inTrash = page.get('trash') ?? false;

if (!inTrash) {
availableDocs.push(docId);
}
}

// a hack to get all docs in index
const allIndexedDocs = (
await this.docIndex.search(
{
type: 'all',
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
skip: 0,
},
}
)
).nodes.map(n => n.id);

const needDelete = difference(allIndexedDocs, availableDocs);
const needAdd = difference(availableDocs, allIndexedDocs);

console.log(availableDocs, allIndexedDocs, needAdd, needDelete);

await this.jobQueue.enqueue(
[...needAdd, ...needDelete].map(docId => ({
batchKey: docId,
payload: { docId },
}))
);
}
}
17 changes: 17 additions & 0 deletions packages/frontend/core/src/modules/docs-search/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export { DocsSearchService } from './services/docs-search';

import {
type Framework,
WorkspaceScope,
WorkspaceService,
} from '@toeverything/infra';

import { DocsIndexer } from './entities/docs-indexer';
import { DocsSearchService } from './services/docs-search';

export function configureDocsSearchModule(framework: Framework) {
framework
.scope(WorkspaceScope)
.service(DocsSearchService)
.entity(DocsIndexer, [WorkspaceService]);
}
12 changes: 12 additions & 0 deletions packages/frontend/core/src/modules/docs-search/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Schema } from '@toeverything/infra';

export const docIndexSchema = {
title: 'FullText',
} satisfies Schema;

export const blockIndexSchema = {
docId: 'String',
blockId: 'String',
content: 'FullText',
flavour: 'String',
} satisfies Schema;
Loading

0 comments on commit c80be85

Please sign in to comment.