Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions orchestrator/src/config/network.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { SuiNetwork } from "@/types";
import { getFullnodeUrl } from "@mysten/sui/client";

export const networkConfig = {
localnet: {
url: getFullnodeUrl("localnet"),
},
devnet: {
url: getFullnodeUrl("devnet"),
},
testnet: {
url: getFullnodeUrl("testnet"),
},
mainnet: {
url: getFullnodeUrl("mainnet"),
},
} as const;

export function getNetworkConfig(network: SuiNetwork) {
return networkConfig[network];
}
30 changes: 30 additions & 0 deletions orchestrator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ const baseConfig = {
// Integrations
xBearerToken: process.env.X_BEARER_TOKEN ?? "",
openAIToken: process.env.OPEN_AI_TOKEN ?? "",
// Add Sui configuration
sui: process.env.CHAINS?.includes("SUI")
? {
chainId: process.env.SUI_CHAIN_ID as "mainnet" | "testnet" | "devnet",
oracleAddress: process.env.SUI_ORACLE_ADDRESS,
indexerCron: process.env.SUI_INDEXER_CRON,
privateKey: process.env.SUI_PRIVATE_KEY,
}
: undefined,
};

interface IEnvVars {
Expand All @@ -43,6 +52,12 @@ interface IEnvVars {
batchSize: number;
xBearerToken: string;
openAIToken: string;
sui?: {
chainId: "mainnet" | "testnet" | "devnet";
oracleAddress: string;
indexerCron: string;
privateKey: string;
};
}

const envVarsSchema = Joi.object({
Expand Down Expand Up @@ -98,6 +113,20 @@ const envVarsSchema = Joi.object({
sentryDSN: Joi.string().allow("", null),
ecdsaPrivateKey: Joi.string().allow("", null),
batchSize: Joi.number().default(1000),

// Add Sui validation
sui: Joi.object({
chainId: Joi.string().valid("mainnet", "testnet", "devnet").insensitive(),
oracleAddress: Joi.string().custom((value, helper) => addressValidator(value, helper)),
indexerCron: Joi.string().default("*/5 * * * * *"),
privateKey: Joi.string().custom((value, helper) => {
return privateKeyValidator(value, helper);
}),
}).when("chains", {
is: Joi.array().items(Joi.string().valid(SupportedChain.SUI)),
then: Joi.required(),
otherwise: Joi.optional(),
}),
});

const { value, error } = envVarsSchema.validate({
Expand Down Expand Up @@ -132,4 +161,5 @@ export default {
privateKey: envVars.aptosPrivateKey,
noditKey: envVars.aptosNoditKey,
},
sui: envVars.sui,
};
35 changes: 35 additions & 0 deletions orchestrator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import "dotenv/config";
import env from "./env";
import AptosIndexer from "./indexer/aptos";
import RoochIndexer from "./indexer/rooch";
import SuiIndexer from "./indexer/sui";
import { log } from "./logger";

(async () => {
Expand Down Expand Up @@ -45,4 +46,38 @@ import { log } from "./logger";
} else {
log.info(`Skipping Aptos Indexer initialization...`);
}

// Add debug logs before the Sui check
console.log("Debug Sui values:", {
privateKey: !!env.sui?.privateKey, // just log if it exists
chainId: env.sui?.chainId,
oracleAddress: env.sui?.oracleAddress,
chains: env.chains,
includesSUI: env.chains.includes("SUI"),
});

if (env.sui?.privateKey && env.sui.chainId && env.sui.oracleAddress && env.chains.includes("SUI")) {
const suiIndexer = new SuiIndexer(env.sui.oracleAddress, env.sui.chainId, env.sui.privateKey);

// Add immediate execution for testing
log.info("Running Sui indexer immediately...");
await suiIndexer.run();

new CronJob(
env.sui.indexerCron,
async () => {
log.info("Running Sui indexer from cron...");
await suiIndexer.run();
},
null,
true,
);
} else {
log.info(`Skipping Sui Indexer initialization...`, {
hasPrivateKey: !!env.sui?.privateKey,
chainId: env.sui?.chainId,
oracleAddress: env.sui?.oracleAddress,
includesSUI: env.chains.includes("SUI"),
});
}
})();
109 changes: 55 additions & 54 deletions orchestrator/src/indexer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import { instance as xTwitterInstance } from "@/integrations/xtwitter";
import type { BasicBearerAPIHandler } from "@/integrations/base";
import prismaClient from "../../prisma";

type ChainEventData = {
id?: { txDigest: string };
event_id?: { event_handle_id: string };
};

// Abstract base class
export abstract class Indexer {
constructor(
Expand Down Expand Up @@ -75,15 +80,13 @@ export abstract class Indexer {
async processRequestAddedEvent<T>(
data: ProcessedRequestAdded<T>,
): Promise<{ status: number; message: string } | null> {
log.debug("processing request:", data.request_id);

if (data.oracle.toLowerCase() !== this.getOrchestratorAddress().toLowerCase()) {
log.debug(
"skipping request as it's not for this Oracle:",
data.request_id,
this.getOrchestratorAddress().toLowerCase(),
data.oracle.toLowerCase(),
);
log.debug(`processing request: ${data.request_id}`);

const eventOracle = data.oracle.toLowerCase();
const oracleAddress = this.getOracleAddress().toLowerCase();

if (eventOracle !== oracleAddress) {
log.debug(`skipping request as it's not for this Oracle: ${data.request_id} ${eventOracle} ${oracleAddress}`);
return null;
}
try {
Expand All @@ -95,67 +98,65 @@ export abstract class Indexer {
return handler.submitRequest(data);
}
return { status: 406, message: "URL Not supported" };
} catch {
} catch (error: any) {
return { status: 406, message: "Invalid URL" };
}
}

async run() {
log.info(`${this.getChainId()} indexer running...`, Date.now());

const latestCommit = await prismaClient.events.findFirst({
const latestSuccessfulEvent = await prismaClient.events.findFirst({
where: {
chain: this.getChainId(),
status: RequestStatus.SUCCESS,
},
orderBy: {
eventSeq: "desc",
// indexedAt: "desc", // Order by date in descending order
},
});

// Fetch the latest events from the Aptos Oracles Contract
const newRequestsEvents = await this.fetchRequestAddedEvents(Number(latestCommit?.eventSeq ?? 0) ?? 0);
for (let i = 0; i < newRequestsEvents.length; i++) {
const cursor = latestSuccessfulEvent?.eventHandleId || null;
const newRequestsEvents = await this.fetchRequestAddedEvents(cursor);

for (const event of newRequestsEvents) {
try {
const event = newRequestsEvents[i];

if (!(await this.isPreviouslyExecuted(event))) {
const data = await this.processRequestAddedEvent(event);

log.info({ data });

if (data) {
try {
await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
await this.save(event, data, RequestStatus.SUCCESS);
} catch (err: any) {
log.error({ err: err.message });
await this.save(event, data, RequestStatus.FAILED);
}
}
} else {
log.debug({ message: `Request: ${event.request_id} as already been processed` });
await this.save(event, {}, RequestStatus.SUCCESS);
// First check if request is already executed on-chain
if (await this.isPreviouslyExecuted(event)) {
log.debug(`Skipping already executed request: ${event.request_id}`);
continue;
}
} catch (error) {
console.error(`Error processing event ${i}:`, error);

// Then check our database for any previous processing attempts
const existingEvent = await prismaClient.events.findFirst({
where: {
AND: [
{ chain: this.getChainId() },
{
OR: [
{ eventHandleId: event.request_id },
{ eventHandleId: (event.fullData as ChainEventData)?.id?.txDigest },
],
},
],
},
});

if (existingEvent) {
log.debug(`Skipping previously processed event: ${event.request_id}`);
continue;
}

const data = await this.processRequestAddedEvent(event);
if (data && data.status === 200) {
await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
await this.save(event, data, RequestStatus.SUCCESS);
}
// Don't save failed events at all
} catch (error: any) {
log.error(`Error processing event ${event.request_id}:`, {
error: error instanceof Error ? error.message : String(error),
});
// Don't save error events
}
}

// await Promise.all(
// newRequestsEvents.map(async (event) => {
// const data = await this.processRequestAddedEvent(event);
// if (data) {
// try {
// await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// // TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event
// await this.save(event, data, RequestStatus.SUCCESS);
// } catch (err: any) {
// log.error({ err: err.message });
// await this.save(event, data, RequestStatus.FAILED);
// }
// }
// }),
// );
}
}
Loading