Skip to content

Commit

Permalink
fixup! chore(dependency): 升级 rxjs 6
Browse files Browse the repository at this point in the history
  • Loading branch information
chuan6 committed Sep 20, 2018
1 parent 081b9a7 commit e525fa0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 48 deletions.
50 changes: 23 additions & 27 deletions src/Net/Net.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Observable, BehaviorSubject, of, from } from 'rxjs'
import { concatAll, concatMap, filter, mapTo, mergeMap, switchMap, tap } from 'rxjs/operators'
import { Observable, BehaviorSubject, of, from, empty } from 'rxjs'
import { concatAll, concatMap, filter, mapTo, mergeMap, reduce, switchMap, tap } from 'rxjs/operators'
import { QueryToken, SelectorMeta, ProxySelector } from 'reactivedb/proxy'
import { JoinMode } from 'reactivedb/interface'
import { Database, Query, Predicate, ExecutorResult } from 'reactivedb'

import { forEach, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils'
import { forEach, identity, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils'
import { SDKLogger } from '../utils/Logger'

/**
Expand Down Expand Up @@ -140,31 +140,27 @@ export class Net {
public persistedDataBuffer: BufferObject[] = []
private msgToDB: WSMsgToDBHandler | undefined

private validate = <T>(result: ApiResult<T, CacheStrategy>) => {
const { tableName, required, padding } = result

const hasRequiredFields = Array.isArray(required)
const hasPaddingFunction = typeof padding === 'function'
private validate = <T>({ tableName, required, padding }: ApiResult<T, CacheStrategy>) => {
const pk = this.primaryKeys.get(tableName)

const fn = switchMap<T[], T[]>(data => !data.length
? of(data)
: from(data).pipe(
mergeMap(datum => {
if (!hasRequiredFields || !hasPaddingFunction || !pk ||
required!.every(k => typeof datum[k] !== 'undefined')
) {
return of(datum)
}
const patch = padding!(datum[pk])
return patch.pipe(
filter((r): r is T => r != null),
concatMap(r => this.database!.upsert(tableName, r).pipe(mapTo(r))),
tap(r => Object.assign(datum, r))
)
})
).pipe(mapTo(data))
)
const noRequiredPadding = !Array.isArray(required) || typeof padding !== 'function' || !pk

const fn = switchMap<T[], T[]>((results) => {
return !results.length
? of([])
: from(results).pipe(
mergeMap((result) => {
return noRequiredPadding || required!.every(k => typeof result[k] !== 'undefined')
? empty()
: padding!(result[pk!]).pipe(
filter((r): r is T => r != null),
concatMap((r) => this.database!.upsert(tableName, r).pipe(
tap(() => Object.assign(result, r))
))
)
}),
reduce<any, T[]>(identity, results)
)
})
fn.toString = () => 'SDK_VALIDATE'
return fn
}
Expand Down
25 changes: 4 additions & 21 deletions test/net/net.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { defer, of, Subscription, asapScheduler } from 'rxjs'
import { share, take, tap, subscribeOn } from 'rxjs/operators'
import { defer, of, asapScheduler } from 'rxjs'
import { take, tap, subscribeOn } from 'rxjs/operators'
import { describe, beforeEach, afterEach, it } from 'tman'
import { Database, DataStoreType } from 'reactivedb'
import { expect, use } from 'chai'
Expand All @@ -22,7 +22,6 @@ describe('Net test', () => {
let httpBackend: Backend
let database: Database
let version = 1
let subscription: Subscription | undefined
let spyFetch: sinon.SinonSpy

const sdkFetch = new SDKFetch()
Expand All @@ -46,9 +45,6 @@ describe('Net test', () => {
afterEach(function* () {
httpBackend.restore()
spyFetch && spyFetch.restore()
if (subscription instanceof Subscription) {
subscription.unsubscribe()
}
yield database.dispose()
})

Expand Down Expand Up @@ -114,8 +110,6 @@ describe('Net test', () => {
})
.changes()

subscription = stream$.subscribe()

yield stream$.pipe(take(1))

const newLocation = 'test_new_location'
Expand Down Expand Up @@ -201,8 +195,6 @@ describe('Net test', () => {
} as ApiResult<EventSchema, CacheStrategy.Request>)
.changes()

subscription = stream$.subscribe()

yield stream$.pipe(take(1))

const newLocation = 'new_event_location'
Expand Down Expand Up @@ -248,8 +240,6 @@ describe('Net test', () => {
})
.changes()

subscription = stream$.subscribe()

yield stream$.pipe(take(1))

httpBackend.whenGET(`${apiHost}/api/events/${partialEvent._id}`)
Expand Down Expand Up @@ -292,9 +282,7 @@ describe('Net test', () => {
required: ['startDate'],
padding: (id: string) => sdkFetch.get<any>(`api/events/${id}`)
})
.changes().pipe(share())

subscription = stream$.subscribe()
.changes()

yield stream$.pipe(take(1))

Expand Down Expand Up @@ -409,8 +397,6 @@ describe('Net test', () => {
const stream$ = getToken()
.changes()

subscription = stream$.subscribe()

yield stream$.pipe(take(1))

const newLocation = 'new_event_location'
Expand Down Expand Up @@ -464,10 +450,7 @@ describe('Net test', () => {
location: newLocation
}

const stream$ = getToken()
.changes()

subscription = stream$.subscribe()
const stream$ = getToken().changes()

yield stream$.pipe(take(1))

Expand Down

0 comments on commit e525fa0

Please sign in to comment.