Overview
This tutorial shows you how to build an automated system that monitors file sources (local directories, S3 buckets, web feeds) and automatically ingests new documents into your Rayrift knowledge base, organizing them into folders for easy retrieval.Use Cases
- Document Management: Automatically index reports, invoices, or contracts as they arrive
- Content Monitoring: Watch RSS feeds or websites for new content
- Batch Processing: Process large document collections from cloud storage
- Data Pipeline Integration: Integrate Rayrift into your existing data workflows
Architecture
Step 1: Set Up Folder Structure
Create folders to organize your documents:Copy
const folders = {
reports: null,
invoices: null,
contracts: null
};
// Create folders
for (const [name, id] of Object.entries(folders)) {
const response = await fetch('https://api.Rayrift.com/v1/folders', {
method: 'POST',
headers: {
'Authorization': 'sk_live_...',
'Content-Type': 'application/json'
},
body: JSON.stringify({
name: name.charAt(0).toUpperCase() + name.slice(1),
description: `Automated ${name} collection`
})
});
folders[name] = (await response.json()).id;
}
console.log('Folders created:', folders);
Step 2: Monitor Local Directory
Watch a local directory for new files:Copy
const fs = require('fs');
const path = require('path');
const chokidar = require('chokidar'); // npm install chokidar
const WATCH_DIR = './documents';
const Rayrift_API_KEY = 'sk_live_...';
// Determine folder based on file type or path
function getFolderId(filePath) {
const ext = path.extname(filePath).toLowerCase();
const name = path.basename(filePath).toLowerCase();
if (name.includes('invoice') || name.includes('bill')) {
return folders.invoices;
} else if (name.includes('contract') || name.includes('agreement')) {
return folders.contracts;
} else if (ext === '.pdf' || ext === '.docx') {
return folders.reports;
}
return folders.reports; // default
}
// Upload file to Rayrift
async function uploadToRayrift(filePath, folderId) {
const fileStream = fs.createReadStream(filePath);
const formData = new FormData();
formData.append('file', fileStream);
formData.append('payload', JSON.stringify({
folder_id: folderId
}));
const response = await fetch('https://api.Rayrift.com/v1/documents', {
method: 'POST',
headers: {
'Authorization': Rayrift_API_KEY
},
body: formData
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.statusText}`);
}
return await response.json();
}
// Watch directory for new files
const watcher = chokidar.watch(WATCH_DIR, {
ignored: /(^|[\/\\])\../, // ignore dotfiles
persistent: true,
ignoreInitial: true // don't process existing files on startup
});
watcher
.on('add', async (filePath) => {
console.log(`New file detected: ${filePath}`);
try {
const folderId = getFolderId(filePath);
const result = await uploadToRayrift(filePath, folderId);
console.log(`✓ Uploaded: ${result.id}`);
} catch (error) {
console.error(`✗ Failed to upload ${filePath}:`, error.message);
}
})
.on('error', error => console.error('Watcher error:', error));
console.log(`Watching ${WATCH_DIR} for new files...`);
Step 3: Monitor S3 Bucket
Watch an S3 bucket for new objects:Copy
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
// Set up S3 event notification or poll for new objects
async function processS3Object(bucket, key) {
console.log(`Processing S3 object: s3://${bucket}/${key}`);
// Download file from S3
const s3Object = await s3.getObject({ Bucket: bucket, Key: key }).promise();
// Determine folder based on S3 key/path
const folderId = getFolderIdFromS3Key(key);
// Upload to Rayrift
const formData = new FormData();
formData.append('file', s3Object.Body, {
filename: path.basename(key),
contentType: s3Object.ContentType
});
formData.append('payload', JSON.stringify({
folder_id: folderId
}));
const response = await fetch('https://api.Rayrift.com/v1/documents', {
method: 'POST',
headers: {
'Authorization': Rayrift_API_KEY
},
body: formData
});
return await response.json();
}
function getFolderIdFromS3Key(key) {
// Organize by S3 prefix
if (key.startsWith('invoices/')) return folders.invoices;
if (key.startsWith('contracts/')) return folders.contracts;
if (key.startsWith('reports/')) return folders.reports;
return folders.reports; // default
}
// AWS Lambda handler for S3 events
exports.handler = async (event) => {
for (const record of event.Records) {
if (record.eventSource === 'aws:s3') {
const bucket = record.s3.bucket.name;
const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
try {
await processS3Object(bucket, key);
} catch (error) {
console.error(`Failed to process ${key}:`, error);
}
}
}
};
Step 4: Monitor Web Content
Automatically ingest content from URLs:Copy
// Monitor RSS feed or website for new content
async function monitorWebContent(urls, folderId) {
for (const url of urls) {
try {
// Check if URL was already processed (store in database)
if (await isAlreadyProcessed(url)) {
continue;
}
// Ingest URL into Rayrift
const response = await fetch('https://api.Rayrift.com/v1/documents', {
method: 'POST',
headers: {
'Authorization': Rayrift_API_KEY,
'Content-Type': 'application/json'
},
body: JSON.stringify({
url: url,
folder_id: folderId
})
});
if (response.ok) {
const result = await response.json();
await markAsProcessed(url);
console.log(`✓ Ingested: ${url} -> ${result.id}`);
}
} catch (error) {
console.error(`✗ Failed to ingest ${url}:`, error.message);
}
}
}
// Run periodically (e.g., every hour)
setInterval(async () => {
const blogUrls = [
'https://blog.example.com/post-1',
'https://blog.example.com/post-2'
];
await monitorWebContent(blogUrls, folders.reports);
}, 3600000); // 1 hour
Step 5: Batch Processing
Process existing document collections:Copy
async function batchProcessDirectory(dirPath, folderId) {
const files = fs.readdirSync(dirPath);
const results = {
success: 0,
failed: 0,
errors: []
};
for (const file of files) {
const filePath = path.join(dirPath, file);
const stats = fs.statSync(filePath);
if (stats.isFile()) {
try {
await uploadToRayrift(filePath, folderId);
results.success++;
console.log(`✓ Processed: ${file}`);
} catch (error) {
results.failed++;
results.errors.push({ file, error: error.message });
console.error(`✗ Failed: ${file} - ${error.message}`);
}
}
}
return results;
}
// Process all PDFs in a directory
const results = await batchProcessDirectory('./legacy-docs', folders.reports);
console.log(`Batch complete: ${results.success} succeeded, ${results.failed} failed`);
Complete Example: File Watcher Service
Here’s a complete Node.js service that monitors multiple sources:Copy
const fs = require('fs');
const path = require('path');
const chokidar = require('chokidar');
const fetch = require('node-fetch');
const Rayrift_API_KEY = process.env.Rayrift_API_KEY;
const Rayrift_API_URL = 'https://api.Rayrift.com/v1';
// Folder configuration
const FOLDERS = {
reports: process.env.FOLDER_REPORTS_ID,
invoices: process.env.FOLDER_INVOICES_ID,
contracts: process.env.FOLDER_CONTRACTS_ID
};
class DocumentProcessor {
async uploadFile(filePath, folderId) {
const fileStream = fs.createReadStream(filePath);
const formData = new FormData();
formData.append('file', fileStream);
formData.append('payload', JSON.stringify({ folder_id: folderId }));
const response = await fetch(`${Rayrift_API_URL}/documents`, {
method: 'POST',
headers: { 'Authorization': Rayrift_API_KEY },
body: formData
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.statusText}`);
}
return await response.json();
}
determineFolder(filePath) {
const name = path.basename(filePath).toLowerCase();
if (name.includes('invoice') || name.includes('bill')) {
return FOLDERS.invoices;
}
if (name.includes('contract') || name.includes('agreement')) {
return FOLDERS.contracts;
}
return FOLDERS.reports;
}
async processFile(filePath) {
try {
const folderId = this.determineFolder(filePath);
const result = await this.uploadFile(filePath, folderId);
console.log(`✓ Processed: ${filePath} -> ${result.id}`);
return { success: true, documentId: result.id };
} catch (error) {
console.error(`✗ Failed: ${filePath} - ${error.message}`);
return { success: false, error: error.message };
}
}
}
// Initialize processor
const processor = new DocumentProcessor();
// Watch directories
const watchDirs = process.env.WATCH_DIRS?.split(',') || ['./documents'];
watchDirs.forEach(dir => {
const watcher = chokidar.watch(dir, {
ignored: /(^|[\/\\])\../,
persistent: true,
ignoreInitial: true
});
watcher.on('add', filePath => processor.processFile(filePath));
watcher.on('error', error => console.error('Watcher error:', error));
console.log(`Watching: ${dir}`);
});
console.log('Document processor started');
Best Practices
Error Handling: Implement retry logic for failed uploads and log errors for debugging.
Deduplication: Track processed files to avoid re-uploading the same document.
Rate Limiting: Respect Rayrift API rate limits when processing large batches.
Metadata: Consider adding metadata tags when uploading to help with organization and search.
Next Steps
- Learn how to build a RAG-Powered Chatbot to query your organized documents
- Explore the Document API for advanced ingestion options
- Check out Folder Management to retrieve and manage your organized content

