Skip to main content

Overview

This tutorial shows you how to build an automated system that monitors file sources (local directories, S3 buckets, web feeds) and automatically ingests new documents into your Rayrift knowledge base, organizing them into folders for easy retrieval.

Use Cases

  • Document Management: Automatically index reports, invoices, or contracts as they arrive
  • Content Monitoring: Watch RSS feeds or websites for new content
  • Batch Processing: Process large document collections from cloud storage
  • Data Pipeline Integration: Integrate Rayrift into your existing data workflows

Architecture

Step 1: Set Up Folder Structure

Create folders to organize your documents:
const folders = {
  reports: null,
  invoices: null,
  contracts: null
};

// Create folders
for (const [name, id] of Object.entries(folders)) {
  const response = await fetch('https://api.Rayrift.com/v1/folders', {
    method: 'POST',
    headers: {
      'Authorization': 'sk_live_...',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      name: name.charAt(0).toUpperCase() + name.slice(1),
      description: `Automated ${name} collection`
    })
  });
  
  folders[name] = (await response.json()).id;
}

console.log('Folders created:', folders);

Step 2: Monitor Local Directory

Watch a local directory for new files:
const fs = require('fs');
const path = require('path');
const chokidar = require('chokidar'); // npm install chokidar

const WATCH_DIR = './documents';
const Rayrift_API_KEY = 'sk_live_...';

// Determine folder based on file type or path
function getFolderId(filePath) {
  const ext = path.extname(filePath).toLowerCase();
  const name = path.basename(filePath).toLowerCase();
  
  if (name.includes('invoice') || name.includes('bill')) {
    return folders.invoices;
  } else if (name.includes('contract') || name.includes('agreement')) {
    return folders.contracts;
  } else if (ext === '.pdf' || ext === '.docx') {
    return folders.reports;
  }
  
  return folders.reports; // default
}

// Upload file to Rayrift
async function uploadToRayrift(filePath, folderId) {
  const fileStream = fs.createReadStream(filePath);
  const formData = new FormData();
  
  formData.append('file', fileStream);
  formData.append('payload', JSON.stringify({
    folder_id: folderId
  }));
  
  const response = await fetch('https://api.Rayrift.com/v1/documents', {
    method: 'POST',
    headers: {
      'Authorization': Rayrift_API_KEY
    },
    body: formData
  });
  
  if (!response.ok) {
    throw new Error(`Upload failed: ${response.statusText}`);
  }
  
  return await response.json();
}

// Watch directory for new files
const watcher = chokidar.watch(WATCH_DIR, {
  ignored: /(^|[\/\\])\../, // ignore dotfiles
  persistent: true,
  ignoreInitial: true // don't process existing files on startup
});

watcher
  .on('add', async (filePath) => {
    console.log(`New file detected: ${filePath}`);
    
    try {
      const folderId = getFolderId(filePath);
      const result = await uploadToRayrift(filePath, folderId);
      console.log(`✓ Uploaded: ${result.id}`);
    } catch (error) {
      console.error(`✗ Failed to upload ${filePath}:`, error.message);
    }
  })
  .on('error', error => console.error('Watcher error:', error));

console.log(`Watching ${WATCH_DIR} for new files...`);

Step 3: Monitor S3 Bucket

Watch an S3 bucket for new objects:
const AWS = require('aws-sdk');
const s3 = new AWS.S3();

// Set up S3 event notification or poll for new objects
async function processS3Object(bucket, key) {
  console.log(`Processing S3 object: s3://${bucket}/${key}`);
  
  // Download file from S3
  const s3Object = await s3.getObject({ Bucket: bucket, Key: key }).promise();
  
  // Determine folder based on S3 key/path
  const folderId = getFolderIdFromS3Key(key);
  
  // Upload to Rayrift
  const formData = new FormData();
  formData.append('file', s3Object.Body, {
    filename: path.basename(key),
    contentType: s3Object.ContentType
  });
  formData.append('payload', JSON.stringify({
    folder_id: folderId
  }));
  
  const response = await fetch('https://api.Rayrift.com/v1/documents', {
    method: 'POST',
    headers: {
      'Authorization': Rayrift_API_KEY
    },
    body: formData
  });
  
  return await response.json();
}

function getFolderIdFromS3Key(key) {
  // Organize by S3 prefix
  if (key.startsWith('invoices/')) return folders.invoices;
  if (key.startsWith('contracts/')) return folders.contracts;
  if (key.startsWith('reports/')) return folders.reports;
  return folders.reports; // default
}

// AWS Lambda handler for S3 events
exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventSource === 'aws:s3') {
      const bucket = record.s3.bucket.name;
      const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
      
      try {
        await processS3Object(bucket, key);
      } catch (error) {
        console.error(`Failed to process ${key}:`, error);
      }
    }
  }
};

Step 4: Monitor Web Content

Automatically ingest content from URLs:
// Monitor RSS feed or website for new content
async function monitorWebContent(urls, folderId) {
  for (const url of urls) {
    try {
      // Check if URL was already processed (store in database)
      if (await isAlreadyProcessed(url)) {
        continue;
      }
      
      // Ingest URL into Rayrift
      const response = await fetch('https://api.Rayrift.com/v1/documents', {
        method: 'POST',
        headers: {
          'Authorization': Rayrift_API_KEY,
          'Content-Type': 'application/json'
        },
        body: JSON.stringify({
          url: url,
          folder_id: folderId
        })
      });
      
      if (response.ok) {
        const result = await response.json();
        await markAsProcessed(url);
        console.log(`✓ Ingested: ${url} -> ${result.id}`);
      }
    } catch (error) {
      console.error(`✗ Failed to ingest ${url}:`, error.message);
    }
  }
}

// Run periodically (e.g., every hour)
setInterval(async () => {
  const blogUrls = [
    'https://blog.example.com/post-1',
    'https://blog.example.com/post-2'
  ];
  
  await monitorWebContent(blogUrls, folders.reports);
}, 3600000); // 1 hour

Step 5: Batch Processing

Process existing document collections:
async function batchProcessDirectory(dirPath, folderId) {
  const files = fs.readdirSync(dirPath);
  const results = {
    success: 0,
    failed: 0,
    errors: []
  };
  
  for (const file of files) {
    const filePath = path.join(dirPath, file);
    const stats = fs.statSync(filePath);
    
    if (stats.isFile()) {
      try {
        await uploadToRayrift(filePath, folderId);
        results.success++;
        console.log(`✓ Processed: ${file}`);
      } catch (error) {
        results.failed++;
        results.errors.push({ file, error: error.message });
        console.error(`✗ Failed: ${file} - ${error.message}`);
      }
    }
  }
  
  return results;
}

// Process all PDFs in a directory
const results = await batchProcessDirectory('./legacy-docs', folders.reports);
console.log(`Batch complete: ${results.success} succeeded, ${results.failed} failed`);

Complete Example: File Watcher Service

Here’s a complete Node.js service that monitors multiple sources:
const fs = require('fs');
const path = require('path');
const chokidar = require('chokidar');
const fetch = require('node-fetch');

const Rayrift_API_KEY = process.env.Rayrift_API_KEY;
const Rayrift_API_URL = 'https://api.Rayrift.com/v1';

// Folder configuration
const FOLDERS = {
  reports: process.env.FOLDER_REPORTS_ID,
  invoices: process.env.FOLDER_INVOICES_ID,
  contracts: process.env.FOLDER_CONTRACTS_ID
};

class DocumentProcessor {
  async uploadFile(filePath, folderId) {
    const fileStream = fs.createReadStream(filePath);
    const formData = new FormData();
    
    formData.append('file', fileStream);
    formData.append('payload', JSON.stringify({ folder_id: folderId }));
    
    const response = await fetch(`${Rayrift_API_URL}/documents`, {
      method: 'POST',
      headers: { 'Authorization': Rayrift_API_KEY },
      body: formData
    });
    
    if (!response.ok) {
      throw new Error(`Upload failed: ${response.statusText}`);
    }
    
    return await response.json();
  }
  
  determineFolder(filePath) {
    const name = path.basename(filePath).toLowerCase();
    
    if (name.includes('invoice') || name.includes('bill')) {
      return FOLDERS.invoices;
    }
    if (name.includes('contract') || name.includes('agreement')) {
      return FOLDERS.contracts;
    }
    return FOLDERS.reports;
  }
  
  async processFile(filePath) {
    try {
      const folderId = this.determineFolder(filePath);
      const result = await this.uploadFile(filePath, folderId);
      
      console.log(`✓ Processed: ${filePath} -> ${result.id}`);
      return { success: true, documentId: result.id };
    } catch (error) {
      console.error(`✗ Failed: ${filePath} - ${error.message}`);
      return { success: false, error: error.message };
    }
  }
}

// Initialize processor
const processor = new DocumentProcessor();

// Watch directories
const watchDirs = process.env.WATCH_DIRS?.split(',') || ['./documents'];

watchDirs.forEach(dir => {
  const watcher = chokidar.watch(dir, {
    ignored: /(^|[\/\\])\../,
    persistent: true,
    ignoreInitial: true
  });
  
  watcher.on('add', filePath => processor.processFile(filePath));
  watcher.on('error', error => console.error('Watcher error:', error));
  
  console.log(`Watching: ${dir}`);
});

console.log('Document processor started');

Best Practices

Error Handling: Implement retry logic for failed uploads and log errors for debugging.
Deduplication: Track processed files to avoid re-uploading the same document.
Rate Limiting: Respect Rayrift API rate limits when processing large batches.
Metadata: Consider adding metadata tags when uploading to help with organization and search.

Next Steps