Connection Examples

TypeScript/JavaScript

import { credentials } from "@grpc/grpc-js";
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";

// Production
const prodClient = new RecryptOperatorClient(
  "pre.gateway.tech:443",
  credentials.createSsl()
);

// Testnet
const testnetClient = new RecryptOperatorClient(
  "pre-testnet.gateway.tech:443",
  credentials.createSsl()
);

// With timeout and metadata
const client = new RecryptOperatorClient(
  "pre.gateway.tech:443",
  credentials.createSsl(),
  {
    "grpc.keepalive_timeout_ms": 10000,
    "grpc.keepalive_permit_without_calls": 1,
  }
);

Python

import grpc
from generated import recrypt_pb2_grpc

# Production
channel = grpc.secure_channel('pre.gateway.tech:443', grpc.ssl_channel_credentials())
prod_client = recrypt_pb2_grpc.RecryptOperatorStub(channel)

# Testnet
testnet_channel = grpc.secure_channel('pre-testnet.gateway.tech:443', grpc.ssl_channel_credentials())
testnet_client = recrypt_pb2_grpc.RecryptOperatorStub(testnet_channel)

# With retry policy
from grpc.experimental import retry
retry_policy = {
    'initial_backoff_ms': 100,
    'max_backoff_ms': 3000,
    'backoff_multiplier': 1.3,
    'retryable_status_codes': [grpc.StatusCode.UNAVAILABLE]
}

channel = grpc.secure_channel(
    'pre.gateway.tech:443',
    grpc.ssl_channel_credentials(),
    options=[('grpc.enable_retries', 1)],
    compression=grpc.Compression.Gzip
)

Go

package main

import (
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    pb "generated/recrypt"
)

func newClient() (*grpc.ClientConn, pb.RecryptOperatorClient, error) {
    // Create TLS credentials
    creds := credentials.NewClientTLSFromCert(nil, "")

    // Connect with options
    conn, err := grpc.Dial(
        "pre.gateway.tech:443",
        grpc.WithTransportCredentials(creds),
        grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024)),
        grpc.WithDefaultServiceConfig(`{
            "loadBalancingPolicy": "round_robin",
            "methodConfig": [{
                "name": [{"service": "recrypt.RecryptOperator"}],
                "retryPolicy": {
                    "maxAttempts": 4,
                    "initialBackoff": "0.1s",
                    "maxBackoff": "1s",
                    "backoffMultiplier": 2.0,
                    "retryableStatusCodes": ["UNAVAILABLE"]
                }
            }]
        }`),
    )
    if err != nil {
        return nil, nil, err
    }

    client := pb.NewRecryptOperatorClient(conn)
    return conn, client, nil
}

Complete Workflows

TypeScript: File Encryption and Sharing

import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { credentials } from "@grpc/grpc-js";
import { promises as fs } from "fs";

async function encryptAndShareFile(
  filePath: string,
  recipientPublicKey: string
): Promise<void> {
  const client = new RecryptOperatorClient(
    "pre.gateway.tech:443",
    credentials.createSsl()
  );

  try {
    // Generate sender's keypair
    const senderKeys = await new Promise((resolve, reject) => {
      client.generateKeyPair({}, (error, response) => {
        if (error) reject(error);
        else resolve(response);
      });
    });

    // Read and encrypt file
    const fileData = await fs.readFile(filePath);
    const fileBase64 = fileData.toString("base64");

    const encrypted = await new Promise((resolve, reject) => {
      client.encrypt(
        {
          data: fileBase64,
          pubkey_base64: senderKeys.getPubkeyBase64(),
          is_data_base64: true,
        },
        (error, response) => {
          if (error) reject(error);
          else resolve(response);
        }
      );
    });

    // Generate transform key
    const transformKey = await new Promise((resolve, reject) => {
      client.generateTransformKey(
        {
          to_pubkey_base64: recipientPublicKey,
          from_privkey_base64: senderKeys.getPrivkeyBase64(),
        },
        (error, response) => {
          if (error) reject(error);
          else resolve(response);
        }
      );
    });

    // Transform ciphertext
    const transformed = await new Promise((resolve, reject) => {
      client.transform(
        {
          cipher_base64: encrypted.getCipherBase64(),
          transformkey_base64: transformKey.getPayloadBase64(),
        },
        (error, response) => {
          if (error) reject(error);
          else resolve(response);
        }
      );
    });

    // Save transformed ciphertext
    await fs.writeFile(`${filePath}.encrypted`, transformed.getPayloadBase64());

    console.log("File encrypted and ready for recipient");
  } catch (error) {
    console.error("Error during encryption process:", error);
    throw error;
  }
}

Python: Batch Processing with Error Handling

import grpc
import base64
from typing import List
from concurrent.futures import ThreadPoolExecutor
from generated import recrypt_pb2, recrypt_pb2_grpc

class BatchProcessor:
    def __init__(self, endpoint: str = 'pre.gateway.tech:443'):
        self.channel = grpc.secure_channel(
            endpoint,
            grpc.ssl_channel_credentials()
        )
        self.client = recrypt_pb2_grpc.RecryptOperatorStub(self.channel)

    def process_batch(
        self,
        data_items: List[bytes],
        recipient_public_key: str
    ) -> List[str]:
        try:
            # Generate sender keys
            sender_keys = self.client.GenerateKeyPair(recrypt_pb2.Empty())

            # Encrypt all items
            encrypted_items = []
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                for data in data_items:
                    future = executor.submit(
                        self._encrypt_item,
                        data,
                        sender_keys.pubkey_base64
                    )
                    futures.append(future)

                encrypted_items = [f.result() for f in futures]

            # Generate transform key
            transform_key = self.client.GenerateTransformKey(
                recrypt_pb2.GenerateTransformKeyRequest(
                    to_pubkey_base64=recipient_public_key,
                    from_privkey_base64=sender_keys.privkey_base64
                )
            )

            # Transform all items
            transformed_items = []
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                for encrypted in encrypted_items:
                    future = executor.submit(
                        self._transform_item,
                        encrypted.cipher_base64,
                        transform_key.payload_base64
                    )
                    futures.append(future)

                transformed_items = [f.result() for f in futures]

            return transformed_items

        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                # Try backup node
                backup_processor = BatchProcessor('pre-2.gateway.tech:443')
                return backup_processor.process_batch(
                    data_items,
                    recipient_public_key
                )
            raise e

    def _encrypt_item(self, data: bytes, public_key: str) -> recrypt_pb2.EncryptReply:
        return self.client.Encrypt(recrypt_pb2.EncryptRequest(
            data=base64.b64encode(data).decode(),
            pubkey_base64=public_key,
            is_data_base64=True
        ))

    def _transform_item(
        self,
        cipher_base64: str,
        transform_key_base64: str
    ) -> recrypt_pb2.EncodedPayload:
        return self.client.Transform(recrypt_pb2.TransformRequest(
            cipher_base64=cipher_base64,
            transformkey_base64=transform_key_base64
        ))

Go: Multi-Node Client with Load Balancing

package main

import (
    "context"
    "time"
    "sync"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    pb "generated/recrypt"
)

type MultiNodeClient struct {
    endpoints []string
    clients   []pb.RecryptOperatorClient
    mutex     sync.RWMutex
    current   int
}

func NewMultiNodeClient(endpoints []string) (*MultiNodeClient, error) {
    clients := make([]pb.RecryptOperatorClient, len(endpoints))

    for i, endpoint := range endpoints {
        conn, err := createConnection(endpoint)
        if err != nil {
            return nil, err
        }

        clients[i] = pb.NewRecryptOperatorClient(conn)
    }

    return &MultiNodeClient{
        endpoints: endpoints,
        clients:   clients,
        current:   0,
    }, nil
}

func (m *MultiNodeClient) getNextClient() pb.RecryptOperatorClient {
    m.mutex.Lock()
    defer m.mutex.Unlock()

    client := m.clients[m.current]
    m.current = (m.current + 1) % len(m.clients)
    return client
}

func (m *MultiNodeClient) Encrypt(
    ctx context.Context,
    data []byte,
    publicKey string,
) (*pb.EncryptReply, error) {
    var lastErr error

    // Try each node until success or all fail
    for i := 0; i < len(m.clients); i++ {
        client := m.getNextClient()

        req := &pb.EncryptRequest{
            Data:         base64.StdEncoding.EncodeToString(data),
            PubkeyBase64: publicKey,
            IsDataBase64: true,
        }

        if reply, err := client.Encrypt(ctx, req); err == nil {
            return reply, nil
        } else {
            lastErr = err
        }
    }

    return nil, lastErr
}

func (m *MultiNodeClient) Transform(
    ctx context.Context,
    ciphertext string,
    transformKey string,
) (*pb.EncodedPayload, error) {
    var lastErr error

    for i := 0; i < len(m.clients); i++ {
        client := m.getNextClient()

        req := &pb.TransformRequest{
            CipherBase64:      ciphertext,
            TransformkeyBase64: transformKey,
        }

        if reply, err := client.Transform(ctx, req); err == nil {
            return reply, nil
        } else {
            lastErr = err
        }
    }

    return nil, lastErr
}

// Usage example
func main() {
    client, err := NewMultiNodeClient([]string{
        "pre.gateway.tech:443",
        "pre-2.gateway.tech:443",
    })
    if err != nil {
        panic(err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Generate keypair
    keyPair, err := client.getNextClient().GenerateKeyPair(ctx, &pb.Empty{})
    if err != nil {
        panic(err)
    }

    // Use the client for operations
    encrypted, err := client.Encrypt(ctx, []byte("Hello"), keyPair.PubkeyBase64)
    if err != nil {
        panic(err)
    }

    // ... continue with other operations
}

Rust: Asynchronous Client

use tonic::{transport::Channel, Request};
use recrypt::{
    recrypt_operator_client::RecryptOperatorClient,
    Empty, EncryptRequest, GenerateTransformKeyRequest, TransformRequest,
};

pub struct RecryptClient {
    client: RecryptOperatorClient<Channel>,
}

impl RecryptClient {
    pub async fn new(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let channel = Channel::from_static(endpoint)
            .connect()
            .await?;

        Ok(Self {
            client: RecryptOperatorClient::new(channel),
        })
    }

    pub async fn generate_keypair(&mut self) -> Result<KeyPair, Box<dyn std::error::Error>> {
        let response = self.client
            .generate_key_pair(Request::new(Empty {}))
            .await?;

        Ok(response.into_inner())
    }

    pub async fn encrypt(
        &mut self,
        data: &[u8],
        public_key: &str,
    ) -> Result<String, Box<dyn std::error::Error>> {
        let request = Request::new(EncryptRequest {
            data: base64::encode(data),
            pubkey_base64: public_key.to_string(),
            is_data_base64: true,
        });

        let response = self.client
            .encrypt(request)
            .await?;

        Ok(response.into_inner().cipher_base64)
    }

    pub async fn generate_transform_key(
        &mut self,
        from_private: &str,
        to_public: &str,
    ) -> Result<String, Box<dyn std::error::Error>> {
        let request = Request::new(GenerateTransformKeyRequest {
            to_pubkey_base64: to_public.to_string(),
            from_privkey_base64: from_private.to_string(),
        });

        let response = self.client
            .generate_transform_key(request)
            .await?;

        Ok(response.into_inner().payload_base64)
    }

    pub async fn transform(
        &mut self,
        ciphertext: &str,
        transform_key: &str,
    ) -> Result<String, Box<dyn std::error::Error>> {
        let request = Request::new(TransformRequest {
            cipher_base64: ciphertext.to_string(),
            transformkey_base64: transform_key.to_string(),
        });

        let response = self.client
            .transform(request)
            .await?;

        Ok(response.into_inner().payload_base64)
    }
}

// Usage example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = RecryptClient::new("https://pre.gateway.tech:443").await?;

    // Generate keys
    let keys = client.generate_keypair().await?;

    // Encrypt data
    let encrypted = client.encrypt(
        b"Hello, World!",
        &keys.pubkey_base64
    ).await?;

    // Generate recipient keys
    let recipient_keys = client.generate_keypair().await?;

    // Generate transform key
    let transform_key = client.generate_transform_key(
        &keys.privkey_base64,
        &recipient_keys.pubkey_base64
    ).await?;

    // Transform ciphertext
    let transformed = client.transform(
        &encrypted,
        &transform_key
    ).await?;

    println!("Transformed ciphertext: {}", transformed);
    Ok(())
}

TypeScript: Streaming File Processing

import { createReadStream, createWriteStream } from "fs";
import { chunk } from "lodash";
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { promisify } from "util";
import { pipeline } from "stream";

class StreamProcessor {
  private client: RecryptOperatorClient;
  private chunkSize: number = 1024 * 1024; // 1MB chunks

  constructor(endpoint: string) {
    this.client = new RecryptOperatorClient(endpoint, credentials.createSsl());
  }

  async processLargeFile(
    inputPath: string,
    outputPath: string,
    publicKey: string
  ): Promise<void> {
    const readStream = createReadStream(inputPath);
    const writeStream = createWriteStream(outputPath);

    let buffer: Buffer[] = [];
    let size = 0;

    for await (const chunk of readStream) {
      buffer.push(chunk);
      size += chunk.length;

      if (size >= this.chunkSize) {
        await this.processBuffer(Buffer.concat(buffer), writeStream, publicKey);
        buffer = [];
        size = 0;
      }
    }

    // Process remaining data
    if (buffer.length > 0) {
      await this.processBuffer(Buffer.concat(buffer), writeStream, publicKey);
    }

    await promisify(pipeline)(writeStream);
  }

  private async processBuffer(
    buffer: Buffer,
    writeStream: WriteStream,
    publicKey: string
  ): Promise<void> {
    try {
      const encrypted = await new Promise((resolve, reject) => {
        this.client.encrypt(
          {
            data: buffer.toString("base64"),
            pubkey_base64: publicKey,
            is_data_base64: true,
          },
          (error, response) => {
            if (error) reject(error);
            else resolve(response);
          }
        );
      });

      writeStream.write(encrypted.getCipherBase64() + "\n");
    } catch (error) {
      console.error("Error processing chunk:", error);
      throw error;
    }
  }
}

Python: Async Client with Rate Limiting

import asyncio
import grpc.aio
from async_timeout import timeout
from dataclasses import dataclass
from typing import Optional
import time
from generated import recrypt_pb2, recrypt_pb2_grpc

@dataclass
class RateLimitConfig:
    requests_per_second: int = 100
    burst_size: int = 200
    timeout_seconds: int = 10

class AsyncRateLimitedClient:
    def __init__(
        self,
        endpoint: str = 'pre.gateway.tech:443',
        config: Optional[RateLimitConfig] = None
    ):
        self.endpoint = endpoint
        self.config = config or RateLimitConfig()
        self.channel = None
        self.client = None
        self._tokens = self.config.burst_size
        self._last_update = time.monotonic()
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        self.channel = grpc.aio.secure_channel(
            self.endpoint,
            grpc.ssl_channel_credentials()
        )
        self.client = recrypt_pb2_grpc.RecryptOperatorStub(self.channel)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        if self.channel:
            await self.channel.close()

    async def _get_token(self):
        async with self._lock:
            now = time.monotonic()
            time_passed = now - self._last_update
            self._tokens = min(
                self.config.burst_size,
                self._tokens + time_passed * self.config.requests_per_second
            )

            if self._tokens < 1:
                wait_time = (1 - self._tokens) / self.config.requests_per_second
                await asyncio.sleep(wait_time)
                self._tokens = 0
            else:
                self._tokens -= 1

            self._last_update = now

    async def encrypt(self, data: bytes, public_key: str) -> str:
        await self._get_token()

        async with timeout(self.config.timeout_seconds):
            response = await self.client.Encrypt(
                recrypt_pb2.EncryptRequest(
                    data=base64.b64encode(data).decode(),
                    pubkey_base64=public_key,
                    is_data_base64=True
                )
            )
            return response.cipher_base64

    async def transform(self, ciphertext: str, transform_key: str) -> str:
        await self._get_token()

        async with timeout(self.config.timeout_seconds):
            response = await self.client.Transform(
                recrypt_pb2.TransformRequest(
                    cipher_base64=ciphertext,
                    transformkey_base64=transform_key
                )
            )
            return response.payload_base64

# Usage example
async def main():
    async with AsyncRateLimitedClient() as client:
        # Generate keys
        sender_keys = await client.client.GenerateKeyPair(recrypt_pb2.Empty())
        recipient_keys = await client.client.GenerateKeyPair(recrypt_pb2.Empty())

        # Process multiple items concurrently with rate limiting
        tasks = []
        for data in data_items:
            task = asyncio.create_task(client.encrypt(
                data,
                sender_keys.pubkey_base64
            ))
            tasks.append(task)

        encrypted_items = await asyncio.gather(*tasks)

        # Generate transform key
        transform_key = await client.client.GenerateTransformKey(
            recrypt_pb2.GenerateTransformKeyRequest(
                to_pubkey_base64=recipient_keys.pubkey_base64,
                from_privkey_base64=sender_keys.privkey_base64
            )
        )

        # Transform all items
        tasks = []
        for encrypted in encrypted_items:
            task = asyncio.create_task(client.transform(
                encrypted,
                transform_key.payload_base64
            ))
            tasks.append(task)

        transformed_items = await asyncio.gather(*tasks)

Key rotation

Key rotation is a critical security practice that involves periodically changing encryption keys while maintaining access to previously encrypted data. These examples demonstrate different key rotation strategies using Gateway’s PRE system.

TypeScript: Basic Key Rotation

import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { credentials } from "@grpc/grpc-js";

class KeyRotationManager {
  private client: RecryptOperatorClient;

  constructor(endpoint: string = "pre.gateway.tech:443") {
    this.client = new RecryptOperatorClient(endpoint, credentials.createSsl());
  }

  async rotateUserKeys(
    oldPrivateKey: string,
    encryptedData: string[],
    metadata?: { userId: string; timestamp: number }
  ): Promise<{
    newKeys: { public: string; private: string };
    transformedData: string[];
    transformKey: string;
  }> {
    // Generate new keypair
    const newKeys = await new Promise((resolve, reject) => {
      this.client.generateKeyPair({}, (error, response) => {
        if (error) reject(error);
        else resolve(response);
      });
    });

    // Generate transform key from old to new
    const transformKey = await new Promise((resolve, reject) => {
      this.client.generateTransformKey(
        {
          from_privkey_base64: oldPrivateKey,
          to_pubkey_base64: newKeys.pubkey_base64,
        },
        (error, response) => {
          if (error) reject(error);
          else resolve(response);
        }
      );
    });

    // Transform all encrypted data
    const transformedData = await Promise.all(
      encryptedData.map(async (data) => {
        return new Promise((resolve, reject) => {
          this.client.transform(
            {
              cipher_base64: data,
              transformkey_base64: transformKey.payload_base64,
            },
            (error, response) => {
              if (error) reject(error);
              else resolve(response.payload_base64);
            }
          );
        });
      })
    );

    return {
      newKeys: {
        public: newKeys.pubkey_base64,
        private: newKeys.privkey_base64,
      },
      transformedData,
      transformKey: transformKey.payload_base64,
    };
  }
}

Go: Scheduled Key Rotation

package main

import (
    "context"
    "time"
    "sync"

    "google.golang.org/grpc"
    pb "generated/recrypt"
)

type KeyRotationScheduler struct {
    client     pb.RecryptOperatorClient
    keyCache   *sync.Map
    interval   time.Duration
    batchSize  int
    rotationCh chan struct{}
    doneCh     chan struct{}
}

type RotationMetadata struct {
    VersionID    string
    CreatedAt    time.Time
    ExpiresAt    time.Time
    PublicKey    string
    PrivateKey   string
    IsActive     bool
}

func NewKeyRotationScheduler(
    endpoint string,
    interval time.Duration,
    batchSize int,
) (*KeyRotationScheduler, error) {
    conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
    if err != nil {
        return nil, err
    }

    return &KeyRotationScheduler{
        client:     pb.NewRecryptOperatorClient(conn),
        keyCache:   &sync.Map{},
        interval:   interval,
        batchSize:  batchSize,
        rotationCh: make(chan struct{}),
        doneCh:     make(chan struct{}),
    }, nil
}

func (k *KeyRotationScheduler) Start(ctx context.Context) {
    ticker := time.NewTicker(k.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            close(k.doneCh)
            return
        case <-ticker.C:
            k.rotationCh <- struct{}{}
        case <-k.rotationCh:
            if err := k.performRotation(ctx); err != nil {
                // Handle error
                continue
            }
        }
    }
}

func (k *KeyRotationScheduler) performRotation(ctx context.Context) error {
    // Generate new keypair
    newKeys, err := k.client.GenerateKeyPair(ctx, &pb.Empty{})
    if err != nil {
        return err
    }

    versionID := time.Now().Format("20060102_150405")
    metadata := &RotationMetadata{
        VersionID:  versionID,
        CreatedAt:  time.Now(),
        ExpiresAt:  time.Now().Add(k.interval * 2),
        PublicKey:  newKeys.PubkeyBase64,
        PrivateKey: newKeys.PrivkeyBase64,
        IsActive:   true,
    }

    // Store new key version
    k.keyCache.Store(versionID, metadata)

    // Get old active key
    var oldKey *RotationMetadata
    k.keyCache.Range(func(key, value interface{}) bool {
        if md, ok := value.(*RotationMetadata); ok {
            if md.IsActive && md.VersionID != versionID {
                oldKey = md
                return false
            }
        }
        return true
    })

    if oldKey != nil {
        // Generate transform key
        transformKey, err := k.client.GenerateTransformKey(ctx, &pb.GenerateTransformKeyRequest{
            FromPrivkeyBase64: oldKey.PrivateKey,
            ToPubkeyBase64:   newKeys.PubkeyBase64,
        })
        if err != nil {
            return err
        }

        // Start rotating data in batches
        go k.rotateBatches(ctx, oldKey, metadata, transformKey.PayloadBase64)
    }

    return nil
}

func (k *KeyRotationScheduler) rotateBatches(
    ctx context.Context,
    oldKey *RotationMetadata,
    newKey *RotationMetadata,
    transformKey string,
) {
    // Implement batch processing logic here
    // This could involve fetching data from storage and
    // transforming it in batches
}

// Example usage
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    scheduler, err := NewKeyRotationScheduler(
        "pre.gateway.tech:443",
        24*time.Hour,  // Rotate keys daily
        1000,          // Process in batches of 1000
    )
    if err != nil {
        panic(err)
    }

    go scheduler.Start(ctx)

    // Wait for shutdown signal
    <-ctx.Done()
    <-scheduler.doneCh
}