🪻 distributed transcription service thistle.dunkirk.sh

feat: add syncing with existing customers

dunkirk.sh 692627f4 d39cfeca

verified
Changed files
+315 -115
src
+62 -5
src/components/admin-users.ts
···
@state() error: string | null = null;
@state() currentUserEmail: string | null = null;
@state() revokingSubscriptions = new Set<number>();
+
@state() syncingSubscriptions = new Set<number>();
static override styles = css`
:host {
···
cursor: not-allowed;
}
+
.sync-btn {
+
background: transparent;
+
border: 2px solid var(--primary);
+
color: var(--primary);
+
padding: 0.5rem 1rem;
+
border-radius: 4px;
+
cursor: pointer;
+
font-size: 0.875rem;
+
font-weight: 600;
+
transition: all 0.2s;
+
}
+
+
.sync-btn:hover:not(:disabled) {
+
background: var(--primary);
+
color: var(--white);
+
}
+
+
.sync-btn:disabled {
+
opacity: 0.5;
+
cursor: not-allowed;
+
}
+
.subscription-badge {
background: var(--primary);
color: var(--white);
···
const user = await response.json();
this.currentUserEmail = user.email;
}
-
} catch (error) {
-
console.error("Failed to get current user:", error);
+
} catch {
+
// Silent fail
}
}
···
// Remove user from local array instead of reloading
this.users = this.users.filter(u => u.id !== userId);
this.dispatchEvent(new CustomEvent("user-deleted"));
-
} catch (error) {
-
console.error("Failed to delete user:", error);
+
} catch {
alert("Failed to delete user. Please try again.");
}
}
···
await this.loadUsers();
alert(`Subscription revoked for ${email}`);
} catch (error) {
-
console.error("Failed to revoke subscription:", error);
alert(`Failed to revoke subscription: ${error instanceof Error ? error.message : "Unknown error"}`);
this.revokingSubscriptions.delete(userId);
}
}
+
private async handleSyncSubscription(userId: number, event: Event) {
+
event.stopPropagation();
+
+
this.syncingSubscriptions.add(userId);
+
this.requestUpdate();
+
+
try {
+
const response = await fetch(`/api/admin/users/${userId}/subscription`, {
+
method: "PUT",
+
headers: { "Content-Type": "application/json" },
+
});
+
+
if (!response.ok) {
+
const data = await response.json();
+
// Don't alert if there's just no subscription
+
if (response.status !== 404) {
+
alert(`Failed to sync subscription: ${data.error || "Unknown error"}`);
+
}
+
return;
+
}
+
+
await this.loadUsers();
+
} finally {
+
this.syncingSubscriptions.delete(userId);
+
this.requestUpdate();
+
}
+
}
+
private getDeleteButtonText(userId: number, type: "user" | "revoke"): string {
if (
!this.deleteState ||
···
<option value="user">User</option>
<option value="admin">Admin</option>
</select>
+
<button
+
class="sync-btn"
+
?disabled=${this.syncingSubscriptions.has(u.id)}
+
@click=${(e: Event) => this.handleSyncSubscription(u.id, e)}
+
title="Sync subscription status from Polar"
+
>
+
${this.syncingSubscriptions.has(u.id) ? "Syncing..." : "🔄 Sync"}
+
</button>
<button
class="revoke-btn"
?disabled=${!u.subscription_status || !u.subscription_id || this.revokingSubscriptions.has(u.id)}
+253 -110
src/index.ts
···
try {
// Get subscription from database
-
const subscription = db.query<
-
{
-
id: string;
-
status: string;
-
current_period_start: number | null;
-
current_period_end: number | null;
-
cancel_at_period_end: number;
-
canceled_at: number | null;
-
},
-
[number]
-
>(
-
"SELECT id, status, current_period_start, current_period_end, cancel_at_period_end, canceled_at FROM subscriptions WHERE user_id = ? ORDER BY created_at DESC LIMIT 1",
-
).get(user.id);
+
const subscription = db
+
.query<
+
{
+
id: string;
+
status: string;
+
current_period_start: number | null;
+
current_period_end: number | null;
+
cancel_at_period_end: number;
+
canceled_at: number | null;
+
},
+
[number]
+
>(
+
"SELECT id, status, current_period_start, current_period_end, cancel_at_period_end, canceled_at FROM subscriptions WHERE user_id = ? ORDER BY created_at DESC LIMIT 1",
+
)
+
.get(user.id);
if (!subscription) {
return Response.json({ subscription: null });
···
const { polar } = await import("./lib/polar");
// Get subscription to find customer ID
-
const subscription = db.query<
-
{
-
customer_id: string;
-
},
-
[number]
-
>(
-
"SELECT customer_id FROM subscriptions WHERE user_id = ? ORDER BY created_at DESC LIMIT 1",
-
).get(user.id);
+
const subscription = db
+
.query<
+
{
+
customer_id: string;
+
},
+
[number]
+
>(
+
"SELECT customer_id FROM subscriptions WHERE user_id = ? ORDER BY created_at DESC LIMIT 1",
+
)
+
.get(user.id);
if (!subscription || !subscription.customer_id) {
return Response.json(
···
const webhookSecret = process.env.POLAR_WEBHOOK_SECRET;
if (!webhookSecret) {
console.error("[Webhook] POLAR_WEBHOOK_SECRET not configured");
-
return Response.json({ error: "Webhook secret not configured" }, { status: 500 });
+
return Response.json(
+
{ error: "Webhook secret not configured" },
+
{ status: 500 },
+
);
}
const event = validateEvent(rawBody, headers, webhookSecret);
···
customerId,
status,
event.data.currentPeriodStart
-
? Math.floor(new Date(event.data.currentPeriodStart).getTime() / 1000)
+
? Math.floor(
+
new Date(event.data.currentPeriodStart).getTime() /
+
1000,
+
)
: null,
event.data.currentPeriodEnd
-
? Math.floor(new Date(event.data.currentPeriodEnd).getTime() / 1000)
+
? Math.floor(
+
new Date(event.data.currentPeriodEnd).getTime() / 1000,
+
)
: null,
event.data.cancelAtPeriodEnd ? 1 : 0,
event.data.canceledAt
-
? Math.floor(new Date(event.data.canceledAt).getTime() / 1000)
+
? Math.floor(
+
new Date(event.data.canceledAt).getTime() / 1000,
+
)
: null,
],
);
-
console.log(`[Webhook] Updated subscription ${id} for user ${userId}`);
+
console.log(
+
`[Webhook] Updated subscription ${id} for user ${userId}`,
+
);
break;
}
···
// Event-driven SSE stream with reconnection support
const stream = new ReadableStream({
async start(controller) {
-
const encoder = new TextEncoder();
-
let isClosed = false;
-
let lastEventId = Math.floor(Date.now() / 1000);
+
const encoder = new TextEncoder();
+
let isClosed = false;
+
let lastEventId = Math.floor(Date.now() / 1000);
-
const sendEvent = (data: Partial<TranscriptionUpdate>) => {
-
if (isClosed) return;
-
try {
-
// Send event ID for reconnection support
-
lastEventId = Math.floor(Date.now() / 1000);
-
controller.enqueue(
-
encoder.encode(
-
`id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
-
),
-
);
-
} catch {
-
// Controller already closed (client disconnected)
-
isClosed = true;
-
}
-
};
+
const sendEvent = (data: Partial<TranscriptionUpdate>) => {
+
if (isClosed) return;
+
try {
+
// Send event ID for reconnection support
+
lastEventId = Math.floor(Date.now() / 1000);
+
controller.enqueue(
+
encoder.encode(
+
`id: ${lastEventId}\nevent: update\ndata: ${JSON.stringify(data)}\n\n`,
+
),
+
);
+
} catch {
+
// Controller already closed (client disconnected)
+
isClosed = true;
+
}
+
};
-
const sendHeartbeat = () => {
-
if (isClosed) return;
-
try {
-
controller.enqueue(encoder.encode(": heartbeat\n\n"));
-
} catch {
+
const sendHeartbeat = () => {
+
if (isClosed) return;
+
try {
+
controller.enqueue(encoder.encode(": heartbeat\n\n"));
+
} catch {
+
isClosed = true;
+
}
+
};
+
// Send initial state from DB and file
+
const current = db
+
.query<
+
{
+
status: string;
+
progress: number;
+
},
+
[string]
+
>("SELECT status, progress FROM transcriptions WHERE id = ?")
+
.get(transcriptionId);
+
if (current) {
+
sendEvent({
+
status: current.status as TranscriptionUpdate["status"],
+
progress: current.progress,
+
});
+
}
+
// If already complete, close immediately
+
if (
+
current?.status === "completed" ||
+
current?.status === "failed"
+
) {
isClosed = true;
+
controller.close();
+
return;
}
-
};
-
// Send initial state from DB and file
-
const current = db
-
.query<
-
{
-
status: string;
-
progress: number;
-
},
-
[string]
-
>("SELECT status, progress FROM transcriptions WHERE id = ?")
-
.get(transcriptionId);
-
if (current) {
-
sendEvent({
-
status: current.status as TranscriptionUpdate["status"],
-
progress: current.progress,
-
});
-
}
-
// If already complete, close immediately
-
if (
-
current?.status === "completed" ||
-
current?.status === "failed"
-
) {
-
isClosed = true;
-
controller.close();
-
return;
-
}
-
// Send heartbeats every 2.5 seconds to keep connection alive
-
const heartbeatInterval = setInterval(sendHeartbeat, 2500);
+
// Send heartbeats every 2.5 seconds to keep connection alive
+
const heartbeatInterval = setInterval(sendHeartbeat, 2500);
-
// Subscribe to EventEmitter for live updates
-
const updateHandler = (data: TranscriptionUpdate) => {
-
if (isClosed) return;
+
// Subscribe to EventEmitter for live updates
+
const updateHandler = (data: TranscriptionUpdate) => {
+
if (isClosed) return;
-
// Only send changed fields to save bandwidth
-
const payload: Partial<TranscriptionUpdate> = {
-
status: data.status,
-
progress: data.progress,
-
};
+
// Only send changed fields to save bandwidth
+
const payload: Partial<TranscriptionUpdate> = {
+
status: data.status,
+
progress: data.progress,
+
};
-
if (data.transcript !== undefined) {
-
payload.transcript = data.transcript;
-
}
-
if (data.error_message !== undefined) {
-
payload.error_message = data.error_message;
-
}
+
if (data.transcript !== undefined) {
+
payload.transcript = data.transcript;
+
}
+
if (data.error_message !== undefined) {
+
payload.error_message = data.error_message;
+
}
-
sendEvent(payload);
+
sendEvent(payload);
-
// Close stream when done
-
if (data.status === "completed" || data.status === "failed") {
+
// Close stream when done
+
if (data.status === "completed" || data.status === "failed") {
+
isClosed = true;
+
clearInterval(heartbeatInterval);
+
transcriptionEvents.off(transcriptionId, updateHandler);
+
controller.close();
+
}
+
};
+
transcriptionEvents.on(transcriptionId, updateHandler);
+
// Cleanup on client disconnect
+
return () => {
isClosed = true;
clearInterval(heartbeatInterval);
transcriptionEvents.off(transcriptionId, updateHandler);
-
controller.close();
-
}
-
};
-
transcriptionEvents.on(transcriptionId, updateHandler);
-
// Cleanup on client disconnect
-
return () => {
-
isClosed = true;
-
clearInterval(heartbeatInterval);
-
transcriptionEvents.off(transcriptionId, updateHandler);
-
};
-
},
-
});
-
return new Response(stream, {
+
};
+
},
+
});
+
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
···
try {
const { polar } = await import("./lib/polar");
await polar.subscriptions.revoke({ id: subscriptionId });
-
console.log(
-
`[Admin] Revoked subscription ${subscriptionId} for user ${userId}`,
-
);
return Response.json({
success: true,
message: "Subscription revoked successfully",
···
error instanceof Error
? error.message
: "Failed to revoke subscription",
+
},
+
{ status: 500 },
+
);
+
}
+
} catch (error) {
+
return handleError(error);
+
}
+
},
+
PUT: async (req) => {
+
try {
+
requireAdmin(req);
+
const userId = Number.parseInt(req.params.id, 10);
+
if (Number.isNaN(userId)) {
+
return Response.json({ error: "Invalid user ID" }, { status: 400 });
+
}
+
+
try {
+
const { polar } = await import("./lib/polar");
+
+
// Get user email
+
const user = db
+
.query<{ email: string }, [number]>(
+
"SELECT email FROM users WHERE id = ?",
+
)
+
.get(userId);
+
+
if (!user) {
+
return Response.json(
+
{ error: "User not found" },
+
{ status: 404 },
+
);
+
}
+
+
console.log(`[Admin] Looking for Polar customer with email: ${user.email}`);
+
+
// Search for customer by email
+
const customers = await polar.customers.list({
+
organizationId: process.env.POLAR_ORGANIZATION_ID,
+
query: user.email,
+
});
+
+
console.log(
+
`[Admin] Found ${customers.result.items?.length || 0} customer(s) matching email`,
+
);
+
+
if (!customers.result.items || customers.result.items.length === 0) {
+
return Response.json(
+
{ error: "No Polar customer found with this email" },
+
{ status: 404 },
+
);
+
}
+
+
const customer = customers.result.items[0];
+
console.log(`[Admin] Customer ID: ${customer.id}`);
+
+
// Get all subscriptions for this customer
+
const subscriptions = await polar.subscriptions.list({
+
customerId: customer.id,
+
});
+
+
console.log(
+
`[Admin] Found ${subscriptions.result.items?.length || 0} subscription(s) for customer`,
+
);
+
+
if (!subscriptions.result.items || subscriptions.result.items.length === 0) {
+
return Response.json(
+
{ error: "No subscriptions found for this customer" },
+
{ status: 404 },
+
);
+
}
+
+
// Update each subscription in the database
+
for (const subscription of subscriptions.result.items) {
+
db.run(
+
`INSERT INTO subscriptions (id, user_id, customer_id, status, current_period_start, current_period_end, cancel_at_period_end, canceled_at, updated_at)
+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+
ON CONFLICT(id) DO UPDATE SET
+
user_id = excluded.user_id,
+
status = excluded.status,
+
current_period_start = excluded.current_period_start,
+
current_period_end = excluded.current_period_end,
+
cancel_at_period_end = excluded.cancel_at_period_end,
+
canceled_at = excluded.canceled_at,
+
updated_at = excluded.updated_at`,
+
[
+
subscription.id,
+
userId,
+
subscription.customerId,
+
subscription.status,
+
subscription.currentPeriodStart
+
? Math.floor(
+
new Date(subscription.currentPeriodStart).getTime() /
+
1000,
+
)
+
: null,
+
subscription.currentPeriodEnd
+
? Math.floor(
+
new Date(subscription.currentPeriodEnd).getTime() /
+
1000,
+
)
+
: null,
+
subscription.cancelAtPeriodEnd ? 1 : 0,
+
subscription.canceledAt
+
? Math.floor(
+
new Date(subscription.canceledAt).getTime() / 1000,
+
)
+
: null,
+
Math.floor(Date.now() / 1000),
+
],
+
);
+
}
+
+
console.log(
+
`[Admin] Synced ${subscriptions.result.items.length} subscription(s) for user ${userId} (${user.email})`,
+
);
+
return Response.json({
+
success: true,
+
message: "Subscription synced successfully",
+
});
+
} catch (error) {
+
console.error(
+
`[Admin] Failed to sync subscription for user ${userId}:`,
+
error,
+
);
+
return Response.json(
+
{
+
error:
+
error instanceof Error
+
? error.message
+
: "Failed to sync subscription",
},
{ status: 500 },
);