Proxy Re-Encryption (PRE)
Examples
Proxy Re-Encryption (PRE)
Examples
Connection Examples
TypeScript/JavaScript
import { credentials } from "@grpc/grpc-js";
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
// Production
const prodClient = new RecryptOperatorClient(
"pre.gateway.tech:443",
credentials.createSsl()
);
// Testnet
const testnetClient = new RecryptOperatorClient(
"pre-testnet.gateway.tech:443",
credentials.createSsl()
);
// With timeout and metadata
const client = new RecryptOperatorClient(
"pre.gateway.tech:443",
credentials.createSsl(),
{
"grpc.keepalive_timeout_ms": 10000,
"grpc.keepalive_permit_without_calls": 1,
}
);
Python
import grpc
from generated import recrypt_pb2_grpc
# Production
channel = grpc.secure_channel('pre.gateway.tech:443', grpc.ssl_channel_credentials())
prod_client = recrypt_pb2_grpc.RecryptOperatorStub(channel)
# Testnet
testnet_channel = grpc.secure_channel('pre-testnet.gateway.tech:443', grpc.ssl_channel_credentials())
testnet_client = recrypt_pb2_grpc.RecryptOperatorStub(testnet_channel)
# With retry policy
from grpc.experimental import retry
retry_policy = {
'initial_backoff_ms': 100,
'max_backoff_ms': 3000,
'backoff_multiplier': 1.3,
'retryable_status_codes': [grpc.StatusCode.UNAVAILABLE]
}
channel = grpc.secure_channel(
'pre.gateway.tech:443',
grpc.ssl_channel_credentials(),
options=[('grpc.enable_retries', 1)],
compression=grpc.Compression.Gzip
)
Go
package main
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "generated/recrypt"
)
func newClient() (*grpc.ClientConn, pb.RecryptOperatorClient, error) {
// Create TLS credentials
creds := credentials.NewClientTLSFromCert(nil, "")
// Connect with options
conn, err := grpc.Dial(
"pre.gateway.tech:443",
grpc.WithTransportCredentials(creds),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024)),
grpc.WithDefaultServiceConfig(`{
"loadBalancingPolicy": "round_robin",
"methodConfig": [{
"name": [{"service": "recrypt.RecryptOperator"}],
"retryPolicy": {
"maxAttempts": 4,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`),
)
if err != nil {
return nil, nil, err
}
client := pb.NewRecryptOperatorClient(conn)
return conn, client, nil
}
Complete Workflows
TypeScript: File Encryption and Sharing
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { credentials } from "@grpc/grpc-js";
import { promises as fs } from "fs";
async function encryptAndShareFile(
filePath: string,
recipientPublicKey: string
): Promise<void> {
const client = new RecryptOperatorClient(
"pre.gateway.tech:443",
credentials.createSsl()
);
try {
// Generate sender's keypair
const senderKeys = await new Promise((resolve, reject) => {
client.generateKeyPair({}, (error, response) => {
if (error) reject(error);
else resolve(response);
});
});
// Read and encrypt file
const fileData = await fs.readFile(filePath);
const fileBase64 = fileData.toString("base64");
const encrypted = await new Promise((resolve, reject) => {
client.encrypt(
{
data: fileBase64,
pubkey_base64: senderKeys.getPubkeyBase64(),
is_data_base64: true,
},
(error, response) => {
if (error) reject(error);
else resolve(response);
}
);
});
// Generate transform key
const transformKey = await new Promise((resolve, reject) => {
client.generateTransformKey(
{
to_pubkey_base64: recipientPublicKey,
from_privkey_base64: senderKeys.getPrivkeyBase64(),
},
(error, response) => {
if (error) reject(error);
else resolve(response);
}
);
});
// Transform ciphertext
const transformed = await new Promise((resolve, reject) => {
client.transform(
{
cipher_base64: encrypted.getCipherBase64(),
transformkey_base64: transformKey.getPayloadBase64(),
},
(error, response) => {
if (error) reject(error);
else resolve(response);
}
);
});
// Save transformed ciphertext
await fs.writeFile(`${filePath}.encrypted`, transformed.getPayloadBase64());
console.log("File encrypted and ready for recipient");
} catch (error) {
console.error("Error during encryption process:", error);
throw error;
}
}
Python: Batch Processing with Error Handling
import grpc
import base64
from typing import List
from concurrent.futures import ThreadPoolExecutor
from generated import recrypt_pb2, recrypt_pb2_grpc
class BatchProcessor:
def __init__(self, endpoint: str = 'pre.gateway.tech:443'):
self.channel = grpc.secure_channel(
endpoint,
grpc.ssl_channel_credentials()
)
self.client = recrypt_pb2_grpc.RecryptOperatorStub(self.channel)
def process_batch(
self,
data_items: List[bytes],
recipient_public_key: str
) -> List[str]:
try:
# Generate sender keys
sender_keys = self.client.GenerateKeyPair(recrypt_pb2.Empty())
# Encrypt all items
encrypted_items = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for data in data_items:
future = executor.submit(
self._encrypt_item,
data,
sender_keys.pubkey_base64
)
futures.append(future)
encrypted_items = [f.result() for f in futures]
# Generate transform key
transform_key = self.client.GenerateTransformKey(
recrypt_pb2.GenerateTransformKeyRequest(
to_pubkey_base64=recipient_public_key,
from_privkey_base64=sender_keys.privkey_base64
)
)
# Transform all items
transformed_items = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for encrypted in encrypted_items:
future = executor.submit(
self._transform_item,
encrypted.cipher_base64,
transform_key.payload_base64
)
futures.append(future)
transformed_items = [f.result() for f in futures]
return transformed_items
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
# Try backup node
backup_processor = BatchProcessor('pre-2.gateway.tech:443')
return backup_processor.process_batch(
data_items,
recipient_public_key
)
raise e
def _encrypt_item(self, data: bytes, public_key: str) -> recrypt_pb2.EncryptReply:
return self.client.Encrypt(recrypt_pb2.EncryptRequest(
data=base64.b64encode(data).decode(),
pubkey_base64=public_key,
is_data_base64=True
))
def _transform_item(
self,
cipher_base64: str,
transform_key_base64: str
) -> recrypt_pb2.EncodedPayload:
return self.client.Transform(recrypt_pb2.TransformRequest(
cipher_base64=cipher_base64,
transformkey_base64=transform_key_base64
))
Go: Multi-Node Client with Load Balancing
package main
import (
"context"
"time"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
pb "generated/recrypt"
)
type MultiNodeClient struct {
endpoints []string
clients []pb.RecryptOperatorClient
mutex sync.RWMutex
current int
}
func NewMultiNodeClient(endpoints []string) (*MultiNodeClient, error) {
clients := make([]pb.RecryptOperatorClient, len(endpoints))
for i, endpoint := range endpoints {
conn, err := createConnection(endpoint)
if err != nil {
return nil, err
}
clients[i] = pb.NewRecryptOperatorClient(conn)
}
return &MultiNodeClient{
endpoints: endpoints,
clients: clients,
current: 0,
}, nil
}
func (m *MultiNodeClient) getNextClient() pb.RecryptOperatorClient {
m.mutex.Lock()
defer m.mutex.Unlock()
client := m.clients[m.current]
m.current = (m.current + 1) % len(m.clients)
return client
}
func (m *MultiNodeClient) Encrypt(
ctx context.Context,
data []byte,
publicKey string,
) (*pb.EncryptReply, error) {
var lastErr error
// Try each node until success or all fail
for i := 0; i < len(m.clients); i++ {
client := m.getNextClient()
req := &pb.EncryptRequest{
Data: base64.StdEncoding.EncodeToString(data),
PubkeyBase64: publicKey,
IsDataBase64: true,
}
if reply, err := client.Encrypt(ctx, req); err == nil {
return reply, nil
} else {
lastErr = err
}
}
return nil, lastErr
}
func (m *MultiNodeClient) Transform(
ctx context.Context,
ciphertext string,
transformKey string,
) (*pb.EncodedPayload, error) {
var lastErr error
for i := 0; i < len(m.clients); i++ {
client := m.getNextClient()
req := &pb.TransformRequest{
CipherBase64: ciphertext,
TransformkeyBase64: transformKey,
}
if reply, err := client.Transform(ctx, req); err == nil {
return reply, nil
} else {
lastErr = err
}
}
return nil, lastErr
}
// Usage example
func main() {
client, err := NewMultiNodeClient([]string{
"pre.gateway.tech:443",
"pre-2.gateway.tech:443",
})
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Generate keypair
keyPair, err := client.getNextClient().GenerateKeyPair(ctx, &pb.Empty{})
if err != nil {
panic(err)
}
// Use the client for operations
encrypted, err := client.Encrypt(ctx, []byte("Hello"), keyPair.PubkeyBase64)
if err != nil {
panic(err)
}
// ... continue with other operations
}
Rust: Asynchronous Client
use tonic::{transport::Channel, Request};
use recrypt::{
recrypt_operator_client::RecryptOperatorClient,
Empty, EncryptRequest, GenerateTransformKeyRequest, TransformRequest,
};
pub struct RecryptClient {
client: RecryptOperatorClient<Channel>,
}
impl RecryptClient {
pub async fn new(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
let channel = Channel::from_static(endpoint)
.connect()
.await?;
Ok(Self {
client: RecryptOperatorClient::new(channel),
})
}
pub async fn generate_keypair(&mut self) -> Result<KeyPair, Box<dyn std::error::Error>> {
let response = self.client
.generate_key_pair(Request::new(Empty {}))
.await?;
Ok(response.into_inner())
}
pub async fn encrypt(
&mut self,
data: &[u8],
public_key: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let request = Request::new(EncryptRequest {
data: base64::encode(data),
pubkey_base64: public_key.to_string(),
is_data_base64: true,
});
let response = self.client
.encrypt(request)
.await?;
Ok(response.into_inner().cipher_base64)
}
pub async fn generate_transform_key(
&mut self,
from_private: &str,
to_public: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let request = Request::new(GenerateTransformKeyRequest {
to_pubkey_base64: to_public.to_string(),
from_privkey_base64: from_private.to_string(),
});
let response = self.client
.generate_transform_key(request)
.await?;
Ok(response.into_inner().payload_base64)
}
pub async fn transform(
&mut self,
ciphertext: &str,
transform_key: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let request = Request::new(TransformRequest {
cipher_base64: ciphertext.to_string(),
transformkey_base64: transform_key.to_string(),
});
let response = self.client
.transform(request)
.await?;
Ok(response.into_inner().payload_base64)
}
}
// Usage example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = RecryptClient::new("https://pre.gateway.tech:443").await?;
// Generate keys
let keys = client.generate_keypair().await?;
// Encrypt data
let encrypted = client.encrypt(
b"Hello, World!",
&keys.pubkey_base64
).await?;
// Generate recipient keys
let recipient_keys = client.generate_keypair().await?;
// Generate transform key
let transform_key = client.generate_transform_key(
&keys.privkey_base64,
&recipient_keys.pubkey_base64
).await?;
// Transform ciphertext
let transformed = client.transform(
&encrypted,
&transform_key
).await?;
println!("Transformed ciphertext: {}", transformed);
Ok(())
}
TypeScript: Streaming File Processing
import { createReadStream, createWriteStream } from "fs";
import { chunk } from "lodash";
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { promisify } from "util";
import { pipeline } from "stream";
class StreamProcessor {
private client: RecryptOperatorClient;
private chunkSize: number = 1024 * 1024; // 1MB chunks
constructor(endpoint: string) {
this.client = new RecryptOperatorClient(endpoint, credentials.createSsl());
}
async processLargeFile(
inputPath: string,
outputPath: string,
publicKey: string
): Promise<void> {
const readStream = createReadStream(inputPath);
const writeStream = createWriteStream(outputPath);
let buffer: Buffer[] = [];
let size = 0;
for await (const chunk of readStream) {
buffer.push(chunk);
size += chunk.length;
if (size >= this.chunkSize) {
await this.processBuffer(Buffer.concat(buffer), writeStream, publicKey);
buffer = [];
size = 0;
}
}
// Process remaining data
if (buffer.length > 0) {
await this.processBuffer(Buffer.concat(buffer), writeStream, publicKey);
}
await promisify(pipeline)(writeStream);
}
private async processBuffer(
buffer: Buffer,
writeStream: WriteStream,
publicKey: string
): Promise<void> {
try {
const encrypted = await new Promise((resolve, reject) => {
this.client.encrypt(
{
data: buffer.toString("base64"),
pubkey_base64: publicKey,
is_data_base64: true,
},
(error, response) => {
if (error) reject(error);
else resolve(response);
}
);
});
writeStream.write(encrypted.getCipherBase64() + "\n");
} catch (error) {
console.error("Error processing chunk:", error);
throw error;
}
}
}
Python: Async Client with Rate Limiting
import asyncio
import grpc.aio
from async_timeout import timeout
from dataclasses import dataclass
from typing import Optional
import time
from generated import recrypt_pb2, recrypt_pb2_grpc
@dataclass
class RateLimitConfig:
requests_per_second: int = 100
burst_size: int = 200
timeout_seconds: int = 10
class AsyncRateLimitedClient:
def __init__(
self,
endpoint: str = 'pre.gateway.tech:443',
config: Optional[RateLimitConfig] = None
):
self.endpoint = endpoint
self.config = config or RateLimitConfig()
self.channel = None
self.client = None
self._tokens = self.config.burst_size
self._last_update = time.monotonic()
self._lock = asyncio.Lock()
async def __aenter__(self):
self.channel = grpc.aio.secure_channel(
self.endpoint,
grpc.ssl_channel_credentials()
)
self.client = recrypt_pb2_grpc.RecryptOperatorStub(self.channel)
return self
async def __aexit__(self, exc_type, exc, tb):
if self.channel:
await self.channel.close()
async def _get_token(self):
async with self._lock:
now = time.monotonic()
time_passed = now - self._last_update
self._tokens = min(
self.config.burst_size,
self._tokens + time_passed * self.config.requests_per_second
)
if self._tokens < 1:
wait_time = (1 - self._tokens) / self.config.requests_per_second
await asyncio.sleep(wait_time)
self._tokens = 0
else:
self._tokens -= 1
self._last_update = now
async def encrypt(self, data: bytes, public_key: str) -> str:
await self._get_token()
async with timeout(self.config.timeout_seconds):
response = await self.client.Encrypt(
recrypt_pb2.EncryptRequest(
data=base64.b64encode(data).decode(),
pubkey_base64=public_key,
is_data_base64=True
)
)
return response.cipher_base64
async def transform(self, ciphertext: str, transform_key: str) -> str:
await self._get_token()
async with timeout(self.config.timeout_seconds):
response = await self.client.Transform(
recrypt_pb2.TransformRequest(
cipher_base64=ciphertext,
transformkey_base64=transform_key
)
)
return response.payload_base64
# Usage example
async def main():
async with AsyncRateLimitedClient() as client:
# Generate keys
sender_keys = await client.client.GenerateKeyPair(recrypt_pb2.Empty())
recipient_keys = await client.client.GenerateKeyPair(recrypt_pb2.Empty())
# Process multiple items concurrently with rate limiting
tasks = []
for data in data_items:
task = asyncio.create_task(client.encrypt(
data,
sender_keys.pubkey_base64
))
tasks.append(task)
encrypted_items = await asyncio.gather(*tasks)
# Generate transform key
transform_key = await client.client.GenerateTransformKey(
recrypt_pb2.GenerateTransformKeyRequest(
to_pubkey_base64=recipient_keys.pubkey_base64,
from_privkey_base64=sender_keys.privkey_base64
)
)
# Transform all items
tasks = []
for encrypted in encrypted_items:
task = asyncio.create_task(client.transform(
encrypted,
transform_key.payload_base64
))
tasks.append(task)
transformed_items = await asyncio.gather(*tasks)
Key rotation
Key rotation is a critical security practice that involves periodically changing encryption keys while maintaining access to previously encrypted data. These examples demonstrate different key rotation strategies using Gateway’s PRE system.
TypeScript: Basic Key Rotation
import { RecryptOperatorClient } from "./generated/recrypt_grpc_pb";
import { credentials } from "@grpc/grpc-js";
class KeyRotationManager {
private client: RecryptOperatorClient;
constructor(endpoint: string = "pre.gateway.tech:443") {
this.client = new RecryptOperatorClient(endpoint, credentials.createSsl());
}
async rotateUserKeys(
oldPrivateKey: string,
encryptedData: string[],
metadata?: { userId: string; timestamp: number }
): Promise<{
newKeys: { public: string; private: string };
transformedData: string[];
transformKey: string;
}> {
// Generate new keypair
const newKeys = await new Promise((resolve, reject) => {
this.client.generateKeyPair({}, (error, response) => {
if (error) reject(error);
else resolve(response);
});
});
// Generate transform key from old to new
const transformKey = await new Promise((resolve, reject) => {
this.client.generateTransformKey(
{
from_privkey_base64: oldPrivateKey,
to_pubkey_base64: newKeys.pubkey_base64,
},
(error, response) => {
if (error) reject(error);
else resolve(response);
}
);
});
// Transform all encrypted data
const transformedData = await Promise.all(
encryptedData.map(async (data) => {
return new Promise((resolve, reject) => {
this.client.transform(
{
cipher_base64: data,
transformkey_base64: transformKey.payload_base64,
},
(error, response) => {
if (error) reject(error);
else resolve(response.payload_base64);
}
);
});
})
);
return {
newKeys: {
public: newKeys.pubkey_base64,
private: newKeys.privkey_base64,
},
transformedData,
transformKey: transformKey.payload_base64,
};
}
}
Go: Scheduled Key Rotation
package main
import (
"context"
"time"
"sync"
"google.golang.org/grpc"
pb "generated/recrypt"
)
type KeyRotationScheduler struct {
client pb.RecryptOperatorClient
keyCache *sync.Map
interval time.Duration
batchSize int
rotationCh chan struct{}
doneCh chan struct{}
}
type RotationMetadata struct {
VersionID string
CreatedAt time.Time
ExpiresAt time.Time
PublicKey string
PrivateKey string
IsActive bool
}
func NewKeyRotationScheduler(
endpoint string,
interval time.Duration,
batchSize int,
) (*KeyRotationScheduler, error) {
conn, err := grpc.Dial(endpoint, grpc.WithInsecure())
if err != nil {
return nil, err
}
return &KeyRotationScheduler{
client: pb.NewRecryptOperatorClient(conn),
keyCache: &sync.Map{},
interval: interval,
batchSize: batchSize,
rotationCh: make(chan struct{}),
doneCh: make(chan struct{}),
}, nil
}
func (k *KeyRotationScheduler) Start(ctx context.Context) {
ticker := time.NewTicker(k.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(k.doneCh)
return
case <-ticker.C:
k.rotationCh <- struct{}{}
case <-k.rotationCh:
if err := k.performRotation(ctx); err != nil {
// Handle error
continue
}
}
}
}
func (k *KeyRotationScheduler) performRotation(ctx context.Context) error {
// Generate new keypair
newKeys, err := k.client.GenerateKeyPair(ctx, &pb.Empty{})
if err != nil {
return err
}
versionID := time.Now().Format("20060102_150405")
metadata := &RotationMetadata{
VersionID: versionID,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(k.interval * 2),
PublicKey: newKeys.PubkeyBase64,
PrivateKey: newKeys.PrivkeyBase64,
IsActive: true,
}
// Store new key version
k.keyCache.Store(versionID, metadata)
// Get old active key
var oldKey *RotationMetadata
k.keyCache.Range(func(key, value interface{}) bool {
if md, ok := value.(*RotationMetadata); ok {
if md.IsActive && md.VersionID != versionID {
oldKey = md
return false
}
}
return true
})
if oldKey != nil {
// Generate transform key
transformKey, err := k.client.GenerateTransformKey(ctx, &pb.GenerateTransformKeyRequest{
FromPrivkeyBase64: oldKey.PrivateKey,
ToPubkeyBase64: newKeys.PubkeyBase64,
})
if err != nil {
return err
}
// Start rotating data in batches
go k.rotateBatches(ctx, oldKey, metadata, transformKey.PayloadBase64)
}
return nil
}
func (k *KeyRotationScheduler) rotateBatches(
ctx context.Context,
oldKey *RotationMetadata,
newKey *RotationMetadata,
transformKey string,
) {
// Implement batch processing logic here
// This could involve fetching data from storage and
// transforming it in batches
}
// Example usage
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scheduler, err := NewKeyRotationScheduler(
"pre.gateway.tech:443",
24*time.Hour, // Rotate keys daily
1000, // Process in batches of 1000
)
if err != nil {
panic(err)
}
go scheduler.Start(ctx)
// Wait for shutdown signal
<-ctx.Done()
<-scheduler.doneCh
}
On this page
- Connection Examples
- TypeScript/JavaScript
- Python
- Go
- Complete Workflows
- TypeScript: File Encryption and Sharing
- Python: Batch Processing with Error Handling
- Go: Multi-Node Client with Load Balancing
- Rust: Asynchronous Client
- TypeScript: Streaming File Processing
- Python: Async Client with Rate Limiting
- Key rotation
- TypeScript: Basic Key Rotation
- Go: Scheduled Key Rotation