Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.myme.so/llms.txt

Use this file to discover all available pages before exploring further.

SwiftUI reactive queries

On a pure-local or synced client, client.makeStore() returns a MarfaStore — a @MainActor @Observable factory for live queries driven by ModelContext.didSave with a short debounce window and a FetchDescriptor refetch per save.
import SwiftUI
import MarfaSDK

struct NotesView: View {
  @State private var store: MarfaStore?
  @State private var query: ItemQuery?

  var body: some View {
    List(query?.items ?? [], id: \.id) { item in
      Text(item.properties["body"]?.stringValue ?? "")
    }
    .task {
      let client = try? await MarfaClient.local(path: dbPath)
      self.store = client?.makeStore()
      self.query = store?.query(filters: ListFilters(type: "core.note"))
    }
  }
}
The query classes:
  • ItemQuery[Item] matching a ListFilters
  • TypedItemQuery<T: MarfaItem>[T] — e.g. TypedItemQuery<CoreNote>
  • SingleItemQuery — one item by id; nil when purged
  • ItemsWithMetadataQuery[ItemWithMetadata] pairs — item plus its metadata row. Re-runs when either the items or item-metadata tables change. Use when a view needs per-item tags or favorite flags alongside the item (replaces hand-rolling a cache subscribed to SyncEngine.events).
  • EdgesQuery — outbound edges for a sourceId, optionally filtered by edgeType. A second initialiser takes only an edgeType to track all edges of a type across the local store — store.queryEdges(ofType: "in-thread") for taxonomy-style “every reply” surfaces.
  • TagsQuery[TagWithCount] for the whole tenant. Re-runs the tag aggregation when items or metadata change.
  • BackrefsQuery[String: [Edge]] keyed by target id. Re-runs when edges change.
  • PendingMutationsQuery[PendingMutationSummary] for the mutation queue. See Pending mutations.
  • BlobUploadProgressQuery[String: BlobUploadProgress] keyed by content hash. See Blob upload progress.
All are @Observable @MainActor. Views observe them directly — no ObservableObject, no manual Combine plumbing. Each query exposes its data property (items, item, edges, tags, or edgesByTarget), isLoading, and error. Call stop() to tear down the underlying observation.
struct TagCloud: View {
  @State private var tags: TagsQuery?
  @State private var replies: BackrefsQuery?

  var body: some View {
    VStack {
      ForEach(tags?.tags ?? [], id: \.tag) { entry in
        Text("\(entry.tag) (\(entry.count))")
      }
    }
    .task {
      tags = store.queryTags()
      replies = store.queryBackrefs(to: messageIds, edgeType: "in-thread")
    }
  }
}
For views that render each item with its tags or favorite flag, use ItemsWithMetadataQuery:
struct NoteFeed: View {
  @State private var query: ItemsWithMetadataQuery?

  var body: some View {
    List(query?.items ?? [], id: \.item.id) { pair in
      NoteRow(
        item: pair.item,
        tags: pair.metadata.tags,
        isFavorite: pair.metadata.tags.contains("favorite")
      )
    }
    .task {
      query = store.queryItemsWithMetadata(filters: .init(type: "core.note"))
    }
  }
}
TagsQuery and ItemsWithMetadataQuery both re-run their aggregations on every relevant write. They’re tuned for UI-scale surfaces, not unbounded analytics.
Network-only clients return nil from makeStore() — the reactive layer only materialises when a local store is configured.

Pending mutations

store.queryPendingMutations() vends a PendingMutationsQuery over the local mutation queue — every write that’s been enqueued but not yet replayed to the server. Use it to render per-item “syncing” badges, queue-depth banners, or retry toasts without subscribing to SyncEngine.events and reimplementing the projection.
struct SyncStatusBar: View {
  @State private var query: PendingMutationsQuery?

  var body: some View {
    Group {
      if let q = query, !q.isEmpty {
        HStack {
          ProgressView()
          Text("\(q.mutations.count) pending")
        }
      }
    }
    .task {
      query = store?.queryPendingMutations()
    }
  }
}
Each entry is a PendingMutationSummary with id, kind (e.g. .createItem, .uploadBlob, .setMetadata), itemId, createdAt, and a projected status:
  • .pending — queued, not yet attempted.
  • .inFlight — the sync engine is issuing the transport call right now.
  • .retrying(attemptCount: Int, lastError: String) — a transient failure, will retry on the next drain cycle.
The query refreshes on every ModelContext.didSave — enqueues, in-flight transitions, retries, and removals all land on the same observation path as every other reactive query. Returns nil on clients without a mutation queue (network-only mode).

Blob upload progress

store.queryBlobUploadProgress() vends a BlobUploadProgressQuery that projects the four .blobUpload* sync events into a dict keyed by content hash. Use it for global upload HUDs or per-attachment progress bars without reading the raw event stream.
struct AttachmentProgressView: View {
  @State private var query: BlobUploadProgressQuery?
  let hash: String

  var body: some View {
    Group {
      if let entry = query?.uploads[hash] {
        switch entry.state {
        case .uploading(let sent, let total):
          ProgressView(value: Double(sent), total: Double(total))
        case .failed:
          Label("Upload failed", systemImage: "exclamationmark.triangle")
        case .pending, .completed:
          EmptyView()
        }
      }
    }
    .task {
      query = store?.queryBlobUploadProgress()
    }
  }
}
Entries are evicted on .completed — apps wanting a “recently completed” fade should snapshot the final value in their own view layer. .failed entries linger so a consumer can render a retry affordance; a subsequent transient retry overwrites the entry with a fresh .uploading(0, total). Returns nil on clients without a sync engine (network-only and pure-local modes).

Sync engine

In synced mode, client.syncEngine is non-nil. Start it when your app is ready to sync:
await client.syncEngine?.start()
The engine:
  1. Watches NWPathMonitor via ConnectionStateManager and opens an SSE stream on GET /events when the path goes online.
  2. Applies SSE events into the local store. Persists Last-Event-ID so reconnection resumes from the correct point.
  3. Drains the MutationQueue both when the stream is idle and within a short debounce window of any new enqueue, replaying local writes against the server. The mutation queue captures per-call UpdateOptions.conflict so the chosen strategy applies on replay; the .callback resolver closure is call-site-only and degrades to .auto on replay because closures aren’t serialisable.
  4. On failure, records the attempt against the queue record and retries on the next reachability cycle.
stop() tears the engine down gracefully.

Proactive drain

The engine listens on MutationQueue.drainRequests and schedules a replay within drainDebounceInterval of each enqueue while the connection is online — a device that’s online but idle no longer holds pending writes until the next SSE reconnect. Offline, connecting, and syncing states short out: the queue sits silently, the post-SSE-close drain picks up, or an in-flight drain coalesces the burst respectively. Tune via SyncEngine.init(drainDebounceInterval:) (default .milliseconds(150)).

Sync events

Subscribe to client.syncEngine?.events for typed sync activity:
for await event in client.syncEngine?.events ?? .init({ _ in }) {
  switch event {
  case .synced(let at):                          // every successful drain
  case .failed(let error):                       // last error during a drain
  case .conflictAutoMerged(let payload):         // replay auto-merged a 409
  case .itemCreated(let id):
  case .itemUpdated(let id):
  case .itemDeleted(let id):
  case .edgeCreated(let id):
  case .edgeDeleted(let id):
  case .mutationDropped(let kind, let itemId,
                        let attempt, let error): // permanent failure; queue record removed
  case .blobUploadStarted(let hash, let total):
  case .blobUploadProgress(let hash, let sent, let total):
  case .blobUploadCompleted(let hash):
  case .blobUploadFailed(let hash, let error):
  }
}
The four .blobUpload* cases are the raw signal that BlobUploadProgressQuery projects into its observable dict — most apps subscribe to the query, not the events directly. .mutationDropped fires when a queued mutation exhausts its retries and is removed permanently. Replaces the pattern of inferring sync state from ConnectionState transitions. Multi-subscriber — each access to events returns a fresh stream; past events are not replayed to late subscribers.

End-to-end subscription

Own the lifecycle from a single place — typically an @Observable service or a SwiftUI .task { } — so the subscriber task cancels when the view goes away and the sync engine stops when the app backgrounds.
import MarfaSDK
import Observation

@MainActor
@Observable
final class SyncActivity {
  private(set) var lastSyncedAt: Date?
  private(set) var lastError: Error?
  private(set) var pendingConflictedCopies: [String] = []

  private var client: MarfaClient?
  private var task: Task<Void, Never>?

  // 1. Create — on app launch / sign-in.
  func start(client: MarfaClient) async {
    self.client = client
    await client.syncEngine?.start()

    // 2. Subscribe — each access to `events` is a fresh stream.
    task = Task { [weak self] in
      guard let events = client.syncEngine?.events else { return }
      for await event in events {
        guard !Task.isCancelled else { break }
        await self?.handle(event)
      }
    }
  }

  // 3. Handle — side-effect into observable state.
  private func handle(_ event: SyncEvent) {
    switch event {
    case .synced(let at):
      lastSyncedAt = at
      lastError = nil
    case .failed(let error):
      lastError = error
    case .conflictAutoMerged(let payload):
      if let sibling = payload.conflictedCopyId {
        pendingConflictedCopies.append(sibling)
      }
    case .itemCreated, .itemUpdated, .itemDeleted,
         .edgeCreated, .edgeDeleted, .mutationDropped,
         .blobUploadStarted, .blobUploadProgress,
         .blobUploadCompleted, .blobUploadFailed:
      break   // SwiftData-backed MarfaStore queries pick these up automatically
    }
  }

  // 4. Tear down — on sign-out / app termination.
  func stop() async {
    task?.cancel()
    task = nil
    await client?.syncEngine?.stop()
    client = nil
  }
}
From SwiftUI, scope the subscription to the view that needs it:
struct RootView: View {
  @State private var activity = SyncActivity()
  let client: MarfaClient

  var body: some View {
    ContentView(activity: activity)
      .task {
        await activity.start(client: client)
      }
      // .task's implicit cancellation on disappearance ends the subscriber
      // loop via Task.isCancelled; call activity.stop() on explicit sign-out
      // to also tear the engine down.
  }
}
The SDK’s reactive queries (ItemQuery, TagsQuery, BackrefsQuery) already update when SSE writes land in the local store, so most views don’t need to observe events directly. Subscribe when you need sync-lifecycle side effects — a “last synced” footer, a conflict toast, a sign-out-on-auth-failure hook. See Realtime for the underlying SSE contract, event catalog, and Last-Event-ID semantics.

Local IDs (UUIDv7)

LocalStore.createItem mints a UUIDv7 by default — RFC 9562, timestamp-prefixed, globally unique. Pass CreateItemInput.id explicitly to override (still encouraged for callers that need to reference the new id before createItem returns; both paths produce server-compatible IDs).