Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
WIP

WIP

WIP

WIP

fix: marking

WIP

marking correct candidates
  • Loading branch information
AdiGajbhiye committed Jan 23, 2024
1 parent d4cb25f commit c6de241
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 89 deletions.
22 changes: 2 additions & 20 deletions new_lineage_panel/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import ExposureDetails from "./exposure/ExposureDetails";
import { Feedback } from "./Feedback";
import { Help } from "./Help";
import { Demo } from "./Demo";
import { createTableNode } from "./utils";

declare const acquireVsCodeApi: () => { postMessage: (v: unknown) => void };

Expand Down Expand Up @@ -239,26 +240,7 @@ function App() {
const addNodesEdges = async (table: string, right: boolean) => {
[nodes, edges] = await expandTableLineage(nodes, edges, table, right);
};
nodes = [
{
id: node.table,
data: {
table: node.table,
label: node.label,
url: node.url,
level: 0,
shouldExpand: [node.downstreamCount > 0, node.upstreamCount > 0],
processed: [node.downstreamCount > 0, node.upstreamCount > 0],
nodeType: node.nodeType,
upstreamCount: node.upstreamCount,
downstreamCount: node.downstreamCount,
tests: node.tests,
materialization: node.materialization,
},
position: { x: 100, y: 100 },
type: "table",
},
];
nodes.push(createTableNode(node, 0, ""));
if (node.upstreamCount > 0) await addNodesEdges(node.table, true);
if (node.downstreamCount > 0) await addNodesEdges(node.table, false);
setSelectedTable(null);
Expand Down
43 changes: 26 additions & 17 deletions new_lineage_panel/src/CustomNodes.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import styles from "./styles.module.scss";
import classNames from "classnames";
import {
bfsTraversal,
collapse,
expandTableLineage,
highlightTableConnections,
layoutElementsOnCanvas,
Expand Down Expand Up @@ -170,8 +171,6 @@ export const TableHeader: FunctionComponent<{

export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
const {
shouldExpand,
processed,
label,
table,
url,
Expand All @@ -180,6 +179,7 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
nodeType,
tests,
materialization,
mark,
} = data;
const flow = useReactFlow();

Expand Down Expand Up @@ -214,7 +214,6 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
};

const expand = async (right: boolean) => {
if (processed[right ? 1 : 0]) return;
let [nodes, edges] = await expandTableLineage(
flow.getNodes(),
flow.getEdges(),
Expand Down Expand Up @@ -256,6 +255,18 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
const expandRight = () => expand(true);
const expandLeft = () => expand(false);

const _collapse = (right: boolean) => {
const [_nodes, _edges] = collapse(
flow.getNodes(),
flow.getEdges(),
table,
right
);
flow.setNodes(_nodes);
flow.setEdges(_edges);
rerender();
};

const onDetailsClick = (e: React.MouseEvent) => {
if (!selected) return;
e.stopPropagation();
Expand All @@ -268,6 +279,10 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
};

const _edges = flow.getEdges();
const processed = [
downstreamCount === _edges.filter((e) => e.target === table).length,
upstreamCount === _edges.filter((e) => e.source === table).length,
];
return (
<div
className="position-relative"
Expand Down Expand Up @@ -305,19 +320,16 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {
<div className="w-100 d-flex align-items-center gap-xs">
<div
className={classNames("nodrag", styles.table_handle, {
invisible:
!shouldExpand[0] ||
processed[0] ||
downstreamCount ===
_edges.filter((e) => e.target === table).length,
invisible: downstreamCount === 0 || (processed[0] && !mark[0]),
})}
onClick={(e) => {
e.stopPropagation();
expandLeft();
processed[0] ? _collapse(false) : expandLeft();
}}
data-testid={"expand-left-btn-" + table}
>
{processed[0] ? "-" : "+"}
{!processed[0] && "+"}
{processed[0] && mark[0] && "-"}
</div>

<div
Expand All @@ -334,19 +346,16 @@ export const TableNode: FunctionComponent<NodeProps> = ({ data }) => {

<div
className={classNames("nodrag", styles.table_handle, {
invisible:
!shouldExpand[1] ||
processed[1] ||
upstreamCount ===
_edges.filter((e) => e.source === table).length,
invisible: upstreamCount === 0 || (processed[1] && !mark[1]),
})}
onClick={(e) => {
e.stopPropagation();
expandRight();
processed[1] ? _collapse(true) : expandRight();
}}
data-testid={"expand-right-btn-" + table}
>
{processed[1] ? "-" : "+"}
{!processed[1] && "+"}
{processed[1] && mark[1] && "-"}
</div>
</div>
</div>
Expand Down
179 changes: 135 additions & 44 deletions new_lineage_panel/src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const createNewNodesEdges = (
const newNodes = [...prevNodes];
const newEdges = [...prevEdges];
const newLevel = right ? level + 1 : level - 1;
const _node = newNodes.find((_n) => _n.id === t);
if (_node) _node.data.processed[right ? 1 : 0] = true;
// const _node = newNodes.find((_n) => _n.id === t);
// if (_node) _node.data.processed[right ? 1 : 0] = true;

const addUniqueEdge = (to: string) => {
const toLevel = newNodes.find((n) => n.id === to)?.data?.level;
Expand Down Expand Up @@ -190,67 +190,58 @@ export const highlightTableConnections = (
return [newNodes, newEdges];
};

// TODO: fix member_profile-> expand left, expand right, collapse left, collapse right
export const removeRelatedNodesEdges = (
export const collapse = (
prevNodes: Node[],
prevEdges: Edge[],
table: string,
right: boolean,
level: number,
): [Node[], Edge[]] => {
const nodesToRemove: Record<string, boolean> = {};
const edgesToRemove: Record<string, boolean> = {};
// const nodes = [...prevNodes];
// const edges = [...prevEdges];

const src = right ? "source" : "target";
const dst = !right ? "source" : "target";
const nodesToRemove: Record<string, boolean> = {};
const edgesToRemove: Record<string, boolean> = {};

const nodesIdMap: Record<string, Node> = {};
for (const n of prevNodes) {
if (isColumn(n)) continue;
nodesIdMap[n.id] = n;
}

// TODO: check visited == nodesToRemove
const queue = [table];
const visited: Record<string, boolean> = {};
while (queue.length > 0) {
const curr = queue.shift()!;
const dfsTraversal = (curr: string): number => {
visited[curr] = true;
prevEdges.forEach((e) => {
const performVisit = (
src: "source" | "target",
dst: "source" | "target",
) => {
if (e[src] !== curr) return;
const _t = e[dst];
if (visited[_t]) return;
const _level = nodesIdMap[_t].data.level;
if ((right && _level > level) || (!right && _level < level)) {
queue.push(_t);
nodesToRemove[_t] = true;
}
};
performVisit(src, dst);
performVisit(dst, src);
});
}
const currNode = prevNodes.find((n) => n.id === curr)!;
const currLevel = currNode.data.level;
let maxHeight = 0;
for (const e of prevEdges) {
if (isColumn(e)) continue;
if (e[src] !== curr) continue;
const n = prevNodes.find((n) => n.id === e[dst])!;
if (right && n.data.level < currLevel) continue;
if (!right && n.data.level > currLevel) continue;
if (visited[e[dst]]) continue;
nodesToRemove[e[dst]] = true;
const h = dfsTraversal(e[dst]);
maxHeight = Math.max(maxHeight, h);
}
return maxHeight + 1;
};
dfsTraversal(table);

const columnNodesToRemove: Record<string, boolean> = {};
const columnEdgesToRemove: Record<string, boolean> = {};

prevNodes.forEach((n) => {
if (!nodesToRemove[n.parentNode || ""]) return;
for (const n of prevNodes) {
if (!nodesToRemove[n.parentNode || ""]) continue;
columnNodesToRemove[n.id] = true;
});
}

prevEdges.forEach((e) => {
for (const e of prevEdges) {
if (isNotColumn(e)) {
edgesToRemove[e.id] = nodesToRemove[e.source] ||
nodesToRemove[e.target] || e[src] === table;
nodesToRemove[e.target];
} else {
columnEdgesToRemove[e.id] = columnNodesToRemove[e.source] ||
columnNodesToRemove[e.target];
}
});
}

const remove =
(dict: Record<string, boolean>) => (x: { id: string | number }) =>
Expand All @@ -263,12 +254,100 @@ export const removeRelatedNodesEdges = (
.filter(remove(columnEdgesToRemove))
.filter(remove(edgesToRemove));

const _node = newNodes.find((_n) => _n.id === table);
if (_node) _node.data.processed[right ? 1 : 0] = false;

markNodesForRemoval(newNodes, newEdges);
return [newNodes, newEdges];
};

export const markLastSecondNode = (
prevNodes: Node[],
prevEdges: Edge[],
right: boolean,
) => {
const nodeHeights: Record<string, number> = {};
const src = right ? "source" : "target";
const dst = !right ? "source" : "target";
for (const n of prevNodes) {
if (isColumn(n)) continue;
nodeHeights[n.id] = -1;
}
const dfsTraversal = (curr: string) => {
if (nodeHeights[curr] !== -1) return;
const currNode = prevNodes.find((n) => n.id === curr)!;
const currLevel = currNode.data.level;
let maxHeight = 0;
for (const e of prevEdges) {
if (isColumn(e)) continue;
if (e[src] !== curr) continue;
const n = prevNodes.find((n) => n.id === e[dst])!;
if (right && n.data.level < currLevel) continue;
if (!right && n.data.level > currLevel) continue;
if (nodeHeights[e[dst]] === -1) {
dfsTraversal(e[dst]);
}
maxHeight = Math.max(maxHeight, nodeHeights[e[dst]]);
}
// as ephemeral nodes are auto expanded
nodeHeights[curr] = currNode.data.materialization === "ephemeral"
? maxHeight
: maxHeight + 1;
};

for (const n of prevNodes) {
if (isColumn(n)) continue;
dfsTraversal(n.id);
}

console.log("dfsTraversal:result:", right, nodeHeights);
const lastSecondLeafNodes = Object.entries(nodeHeights)
.filter(([, v]) => v === 2).map(([k]) => k);
const calculateAncestors = (lastSecondLeafNodes: string[]) => {
const ancestors: Record<string, string[]> = {};
const visited: Record<string, boolean> = {};
for (const n of lastSecondLeafNodes) ancestors[n] = [];
for (const candidate of lastSecondLeafNodes) {
const queue = [candidate];
while (queue.length > 0) {
const curr = queue.shift()!;
if (visited[curr]) continue; // maybe cycle, should ignore candidate??
visited[curr] = true;
for (const e of prevEdges) {
if (isColumn(e)) continue;
if (e[src] !== curr) continue;
const target = e[dst];
ancestors[target] = ancestors[target] || [];
ancestors[target].push(curr);
ancestors[target].push(...(ancestors[curr] || []));
queue.push(target);
}
}
}
return ancestors;
};
const calculateCandidates = (
lastSecondLeafNodes: string[],
ancestors: Record<string, string[]>,
) => {
const potentialCandidates: Record<string, boolean> = {};
for (const n of lastSecondLeafNodes) potentialCandidates[n] = true;
for (const k in ancestors) {
if (lastSecondLeafNodes.includes(k)) continue;
let count = 0;
for (const a of ancestors[k]) {
if (lastSecondLeafNodes.includes(a)) count++;
if (count > 1) {
for (const v of ancestors[k]) potentialCandidates[v] = false;
}
}
}
return Object.entries(potentialCandidates).filter(([, v]) => v).map((
[k],
) => k);
};
const ancestors = calculateAncestors(lastSecondLeafNodes);
const candidates = calculateCandidates(lastSecondLeafNodes, ancestors);
return candidates;
};

export const processColumnLineage = async (
levelMap: Record<string, number>,
seeMoreIdTableReverseMap: Record<string, string>,
Expand Down Expand Up @@ -437,9 +516,21 @@ export const expandTableLineage = async (
}
});
}
markNodesForRemoval(nodes, edges);
return [nodes, edges];
};

export const markNodesForRemoval = (nodes: Node[], edges: Edge[]) => {
const rightNodes = markLastSecondNode(nodes, edges, true);
const leftNodes = markLastSecondNode(nodes, edges, false);
for (const n of nodes) {
if (isColumn(n)) continue;
if (n.data.materialization === "ephemeral") continue;
n.data["mark"][1] = rightNodes.includes(n.id);
n.data["mark"][0] = leftNodes.includes(n.id);
}
};

export const bfsTraversal = async (
nodes: Node[],
edges: Edge[],
Expand Down
Loading

0 comments on commit c6de241

Please sign in to comment.