Skip to content

Instantly share code, notes, and snippets.

@shimondoodkin
Last active February 5, 2024 15:58
Show Gist options
  • Save shimondoodkin/285e3a55a1181ee59c583b780b8d2296 to your computer and use it in GitHub Desktop.
Save shimondoodkin/285e3a55a1181ee59c583b780b8d2296 to your computer and use it in GitHub Desktop.
this queue lets run not related tasks in parallel,
type Task = () => Promise<void> | void;
export class TTLCache {
cache: Map<string, number> = new Map();
set(key: string, ttl: number): void {
const expiresAt = Date.now() + ttl * 1000; // Convert TTL to milliseconds
this.cache.set(key, expiresAt);
}
isExpired(key: string): boolean {
if (!this.cache.has(key)) return true;
const expiresAt = this.cache.get(key);
return Date.now() > expiresAt;
}
cleanup(): void {
const now = Date.now();
for (let [key, expiresAt] of this.cache.entries()) {
if (now > expiresAt) {
this.cache.delete(key);
}
}
}
}
export class SomeService {
queueTtlCache: TTLCache;
constructor(
) {
this.queueTtlCache = new TTLCache();
}
private queueTasks: Array<{ task: () => Promise<void>; blockingValues: string[] }> = [];
private queueIsRunning: boolean = false;
addTaskToQueue(task: () => Promise<void>, blockingValues: string[]): void {
// Add task to queue
this.queueTasks.push({ task, blockingValues });
console.info('zoho service: added to queue, ítems in queue ' + this.queueTasks.length + ', items in cache ' + this.queueTtlCache.cache.size)
// Start processing if not already running
if (!this.queueIsRunning) {
this.processQueue();
}
}
private async processQueue(): Promise<void> {
this.queueIsRunning = true;
while (this.queueIsRunning) {
// Cleanup expired values from the cache
this.queueTtlCache.cleanup();
// Execute tasks without blocked values or wait if all are blocked
if (this.queueTasks.length > 0) {
let donetasks = []
let queueitems = this.queueTasks.slice(0)
for (let queueItem of queueitems) {
let { task, blockingValues } = queueItem;
if (blockingValues.every(value => this.queueTtlCache.isExpired(value))) {
donetasks.push((async () => {
// Add blocking values to cache with TTL, avoiding reset if not expired
blockingValues.forEach(value => {
if (this.queueTtlCache.isExpired(value)) {
this.queueTtlCache.set(value, 60); // Set with a TTL of 60 seconds
}
});
// execute the task
try {
await task()
} catch (e) {
console.log(e?.stack || e)
}
finally {
this.queueTasks.splice(this.queueTasks.indexOf(queueItem), 1)
}
})());
}
}
// validate all taks finished
await Promise.all(donetasks)
if (donetasks.length > 0)
console.info('zoho service: processed queue items, ítems in queue ' + this.queueTasks.length + ', items in cache ' + this.queueTtlCache.cache.size)
if (this.queueTasks.length > 0) {
// Wait before trying again if there are tasks still delayed by blocking values
await new Promise(resolve => setTimeout(resolve, 10000)); // 10-second wait
} else {
this.queueIsRunning = false; // No more tasks to process
}
} else {
// No tasks in queue, stop running
this.queueIsRunning = false;
}
}
}
addOrder(
addOrderDto: AddOrderDto
) {
this.addTaskToQueue(async () => {
try {
// do something with addOrderDto
} catch (error) {
console.error(
error?.stack ||error
)
}
}, (() => {
var a = [];
if (addOrderDto.Order_ID) a.push('Order_ID:' + addOrderDto.Order_ID)
if (addOrderDto.Email) a.push('Email:' + addOrderDto.Email)
return a;
})());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment