Skip to content

Commit

Permalink
chore(dependency): 升级 rxjs 6
Browse files Browse the repository at this point in the history
  • Loading branch information
chuan6 committed Sep 22, 2018
1 parent a2f6651 commit 63d304a
Show file tree
Hide file tree
Showing 70 changed files with 838 additions and 926 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@
"moment": "^2.18.1",
"node-watch": "^0.5.8",
"nyc": "^11.2.1",
"reactivedb": "~0.10.2",
"reactivedb": "~0.11.0",
"rollup": "^0.60.5",
"rollup-plugin-alias": "^1.3.1",
"rollup-plugin-commonjs": "^8.4.1",
"rollup-plugin-node-resolve": "^3.0.0",
"rrule": "2.2.0",
"rxjs": "^5.4.3",
"rxjs-marbles": "^2.4.1",
"rxjs": "6",
"rxjs-marbles": "^4.3.1",
"semver": "^5.5.0",
"sinon": "^4.0.0",
"sinon-chai": "^2.14.0",
Expand Down
68 changes: 32 additions & 36 deletions src/Net/Http.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import 'rxjs/add/observable/throw'
import 'rxjs/add/observable/dom/ajax'
import 'rxjs/add/observable/empty'
import 'rxjs/add/operator/catch'
import 'rxjs/add/operator/map'
import { AjaxError } from 'rxjs/observable/dom/AjaxObservable'
import { Observable } from 'rxjs/Observable'
import { Observer } from 'rxjs/Observer'
import { Subject } from 'rxjs/Subject'
import { empty, throwError, Observable, Observer, Subject } from 'rxjs'
import { catchError, map, publishReplay, refCount } from 'rxjs/operators'
import { ajax, AjaxError } from 'rxjs/ajax'
import { parseHeaders } from '../utils/index'
import { testable } from '../testable'
import { forEach } from '../utils'
Expand Down Expand Up @@ -40,9 +34,9 @@ const rxAjaxDefaultHeaderKey2NormKey = {
}

/**
* Observable.ajax 目前的实现,对请求头字段的已有设置检查没有遵循
* rxjs ajax 目前的实现,对请求头字段的已有设置检查没有遵循
* 头字段 key 不区分大小写的原则,比如:如果用户已經设置 `content-type`,
* Observable.ajax 内部会发现 `Content-Type` 没有设置,结果会
* rxjs ajax 内部会发现 `Content-Type` 没有设置,结果会
* 额外添加一个 `Content-Type` 字段,结果导致浏览器发现请求头字段里
* 既有 `content-type` 又有 `Content-Type`,出现问题。
*/
Expand All @@ -63,36 +57,38 @@ export const createMethod = (method: AllowedHttpMethod) => (params: MethodParams
/* istanbul ignore if */
if (testable.UseXMLHTTPRequest && typeof window !== 'undefined') {
coverRxAjaxHeadersBug(_opts.headers)
return Observable.ajax({
return ajax({
url, body, method,
headers: _opts.headers,
withCredentials: _opts.credentials === 'include',
responseType: _opts.responseType || 'json',
crossDomain: typeof _opts.crossDomain !== 'undefined' ? !!_opts.crossDomain : true
})
.map(value => {
const respBody = value.response
if (!includeHeaders) {
return respBody
}
const respHeaders = parseHeaders(value.xhr.getAllResponseHeaders())
return { headers: respHeaders, body: respBody }
})
.catch((e: AjaxError) => {
const headers = e.xhr.getAllResponseHeaders()
const errorResponse = new Response(new Blob([JSON.stringify(e.xhr.response)]), {
status: e.xhr.status,
statusText: e.xhr.statusText,
headers: headers.length ? parseHeaders(headers) : new Headers()
.pipe(
map(value => {
const respBody = value.response
if (!includeHeaders) {
return respBody
}
const respHeaders = parseHeaders(value.xhr.getAllResponseHeaders())
return { headers: respHeaders, body: respBody }
}),
catchError((e: AjaxError) => {
const headers = e.xhr.getAllResponseHeaders()
const errorResponse = new Response(new Blob([JSON.stringify(e.xhr.response)]), {
status: e.xhr.status,
statusText: e.xhr.statusText,
headers: headers.length ? parseHeaders(headers) : new Headers()
})
const requestInfo = { method, url, body }
const errorResponseClone = errorResponse.clone()

setTimeout(() => {
errorAdapter$.next({ ...requestInfo, error: errorResponseClone })
}, 10)
return throwError({ ...requestInfo, error: errorResponse })
})
const requestInfo = { method, url, body }
const errorResponseClone = errorResponse.clone()

setTimeout(() => {
errorAdapter$.next({ ...requestInfo, error: errorResponseClone })
}, 10)
return Observable.throw({ ...requestInfo, error: errorResponse })
})
)
} else { // 测试用分支
return Observable.create((observer: Observer<any>) => {
const _options = {
Expand Down Expand Up @@ -226,13 +222,13 @@ export class Http<T> {
}

send(): Observable<T> {
return this.request ? this.mapFn(this.request) : Observable.empty()
return this.request ? this.mapFn(this.request) : empty()
}

clone() {
const result = new Http<T>(this.url, this.errorAdapter$)
if (!this.cloned && this.request) {
this.request = this.request.publishReplay(1).refCount()
this.request = this.request.pipe(publishReplay(1), refCount())
this.cloned = true
result.cloned = true
}
Expand Down
143 changes: 66 additions & 77 deletions src/Net/Net.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import 'rxjs/add/observable/forkJoin'
import 'rxjs/add/observable/of'
import 'rxjs/add/operator/concatAll'
import 'rxjs/add/operator/do'
import 'rxjs/add/operator/mapTo'
import 'rxjs/add/operator/mergeMap'
import 'rxjs/add/operator/switchMap'
import 'rxjs/add/operator/filter'
import { Observable } from 'rxjs/Observable'
import { BehaviorSubject } from 'rxjs/BehaviorSubject'
import { empty, from, of, BehaviorSubject, Observable } from 'rxjs'
import { concatAll, concatMap, filter, mapTo, mergeMap, reduce, switchMap, tap } from 'rxjs/operators'
import { QueryToken, SelectorMeta, ProxySelector } from 'reactivedb/proxy'
import { JoinMode } from 'reactivedb/interface'
import { Database, Query, Predicate, ExecutorResult } from 'reactivedb'

import { forEach, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils'
import { forEach, identity, ParsedWSMsg, WSMsgToDBHandler, GeneralSchemaDef } from '../utils'
import { SDKLogger } from '../utils/Logger'

/**
Expand Down Expand Up @@ -148,32 +140,27 @@ export class Net {
public persistedDataBuffer: BufferObject[] = []
private msgToDB: WSMsgToDBHandler | undefined

private validate = <T>(result: ApiResult<T, CacheStrategy>) => {
const { tableName, required, padding } = result

const hasRequiredFields = Array.isArray(required)
const hasPaddingFunction = typeof padding === 'function'
private validate = <T>({ tableName, required, padding }: ApiResult<T, CacheStrategy>) => {
const pk = this.primaryKeys.get(tableName)

const fn = (stream$: Observable<T[]>) =>
stream$.switchMap(data => !data.length
? Observable.of(data)
: Observable.forkJoin(
Observable.from(data)
.mergeMap(datum => {
if (!hasRequiredFields || !hasPaddingFunction || !pk ||
required!.every(k => typeof datum[k] !== 'undefined')
) {
return Observable.of(datum)
}
const patch = padding!(datum[pk]).filter(r => r != null) as Observable<T>
return patch
.concatMap(r => this.database!.upsert(tableName, r).mapTo(r))
.do(r => Object.assign(datum, r))
})
const noRequiredPadding = !Array.isArray(required) || typeof padding !== 'function' || !pk

const fn = switchMap<T[], T[]>((results) => {
return !results.length
? of([])
: from(results).pipe(
mergeMap((result) => {
return noRequiredPadding || required!.every(k => typeof result[k] !== 'undefined')
? empty()
: padding!(result[pk!]).pipe(
filter((r): r is T => r != null),
concatMap((r) => this.database!.upsert(tableName, r).pipe(
tap(() => Object.assign(result, r))
))
)
}),
reduce<any, T[]>(identity, results)
)
.mapTo(data)
)
})
fn.toString = () => 'SDK_VALIDATE'
return fn
}
Expand Down Expand Up @@ -232,25 +219,24 @@ export class Net {

const database = this.database!

const { request, method, tableName } = result as CUDApiResult<T>
let destination: Observable<ExecutorResult> | Observable<T | T[]>
return request
.concatMap(v => {
switch (method) {
case 'create':
destination = database.upsert<T>(tableName, v)
break
case 'update':
destination = database.upsert(tableName, v)
break
case 'delete':
destination = database.delete<T>(tableName, (result as UDResult<T>).clause)
break
default:
throw new Error()
}
return destination.mapTo<ExecutorResult | T | T[], T>(v)
})
const { request, method, tableName } = result
let destination: Observable<ExecutorResult>
return request.pipe(concatMap(v => {
switch (method) {
case 'create':
destination = database.upsert<T>(tableName, v)
break
case 'update':
destination = database.upsert(tableName, v)
break
case 'delete':
destination = database.delete<T>(tableName, (result as UDResult<T>).clause)
break
default:
throw new Error()
}
return destination.pipe(mapTo(v))
}))
}

persist(database: Database) {
Expand All @@ -277,12 +263,11 @@ export class Net {
const token = this.handleRequestCache(v.realSelectorInfo)
const selector$ = token.selector$

p = selector$
.do({
next(selector) {
cacheControl$.next(selector)
}
})
p = selector$.pipe(tap({
next(selector) {
cacheControl$.next(selector)
}
}))
break
default:
break
Expand All @@ -295,12 +280,15 @@ export class Net {

this.persistedDataBuffer.length = 0

return Observable.from(asyncQueue).concatAll().do({
error: async (err: Observable<Error>) => {
const errObj = await err.toPromise()
SDKLogger.error(errObj.message)
}
})
return from(asyncQueue).pipe(
concatAll(),
tap({
error: async (err: Observable<Error>) => {
const errObj = await err.toPromise()
SDKLogger.error(errObj.message)
}
})
)
}

bufferResponse<T>(result: ApiResult<T, CacheStrategy>) {
Expand All @@ -323,23 +311,22 @@ export class Net {

bufferCUDResponse<T>(result: CUDApiResult<T>) {
const { request, method, tableName } = result as CUDApiResult<T>
return request
.do((v: T | T[]) => {
return request.pipe(tap((v: T | T[]) => {
this.persistedDataBuffer.push({
kind: 'CUD',
tableName,
method: (method === 'create' || method === 'update') ? 'upsert' : method,
value: method === 'delete' ? (result as UDResult<T>).clause : v
})
})
}))
}

bufferSocketPush(socketMessage: ParsedWSMsg) {
this.persistedDataBuffer.push({
kind: 'SocketCUD',
socketMessage
})
return Observable.of(null)
return of(null)
}

private genCacheKey<T>(tableName: string, q: Readonly<Query<T>>) {
Expand All @@ -366,20 +353,22 @@ export class Net {
case CacheStrategy.Request:
if (!requestCache) {
/*tslint:disable no-shadowed-variable*/
const selector$ = response$
.concatMap(v => database.upsert(tableName, v))
.do(() => this.requestMap.set(cacheKey, true))
.concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
const selector$ = response$.pipe(
concatMap(v => database.upsert(tableName, v)),
tap(() => this.requestMap.set(cacheKey, true)),
concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
)
token = new QueryToken(selector$)
} else {
token = dbGetWithSelfJoinEnabled<T>(database, tableName, q)
}
token.map(this.validate(result))
break
case CacheStrategy.Cache:
const selector$ = response$
.concatMap(v => database.upsert(tableName, v))
.concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
const selector$ = response$.pipe(
concatMap(v => database.upsert(tableName, v)),
concatMap(() => dbGetWithSelfJoinEnabled<T>(database, tableName, q).selector$)
)
return new QueryToken(selector$)
default:
throw new TypeError('unreachable code path')
Expand Down
29 changes: 15 additions & 14 deletions src/Net/Pagination.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import 'rxjs/add/observable/throw'
import 'rxjs/add/operator/startWith'
import 'rxjs/add/operator/withLatestFrom'
import { Observable } from 'rxjs/Observable'
import { Observer } from 'rxjs/Observer'
import { OperatorFunction } from 'rxjs/interfaces'
import { empty, throwError, Observable, Observer, OperatorFunction } from 'rxjs'
import { catchError, finalize, map, mergeAll, startWith, tap } from 'rxjs/operators'

export type PageToken = string & { kind: 'PageToken' }

Expand Down Expand Up @@ -72,11 +68,14 @@ export const accumulateResultByConcat = <T>(state: State<T>, resp: OriginalRespo
export const loadAndExpand = <T>(
step: (curr: State<T>) => Observable<OriginalResponse<T>>,
initState: State<T>,
loadMore$: Observable<{}> = Observable.empty()
loadMore$: Observable<{}> = empty()
): Observable<State<T>> => {
return loadMore$.startWith({})
.pipe(expand(step, accumulateResultByConcat, initState))
.mergeAll()
return loadMore$
.pipe(
startWith({}),
expand(step, accumulateResultByConcat, initState),
mergeAll()
)
}

export const expand = <T>(
Expand All @@ -99,10 +98,12 @@ export const expand = <T>(
if (!isLoading) {
isLoading = true
observer.next(step(state)
.map((stepResult) => accumulator(state, stepResult))
.do((expanded) => Object.assign(state, expanded))
.catch((err) => Observable.throw(err))
.finally(() => { isLoading = false })
.pipe(
map((stepResult) => accumulator(state, stepResult)),
tap((expanded) => Object.assign(state, expanded)),
catchError((err) => throwError(err)),
finalize(() => { isLoading = false })
)
)
}
},
Expand Down
2 changes: 1 addition & 1 deletion src/SDK.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable } from 'rxjs/Observable'
import { Observable } from 'rxjs'
import { Database } from 'reactivedb'
import { Net } from './Net'
import { forEach } from './utils'
Expand Down
Loading

0 comments on commit 63d304a

Please sign in to comment.