//////////////////////////////////////////////////////////////////////////// // // Copyright 2020 Realm Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////// #import "RLMMongoCollection_Private.hpp" #import "RLMApp_Private.hpp" #import "RLMBSON_Private.hpp" #import "RLMFindOneAndModifyOptions_Private.hpp" #import "RLMFindOptions_Private.hpp" #import "RLMNetworkTransport_Private.hpp" #import "RLMUpdateResult_Private.hpp" #import "RLMUser_Private.hpp" #import <realm/object-store/sync/mongo_client.hpp> #import <realm/object-store/sync/mongo_collection.hpp> #import <realm/object-store/sync/mongo_database.hpp> @implementation RLMChangeStream { realm::app::WatchStream _watchStream; id<RLMChangeEventDelegate> _subscriber; __weak NSURLSession *_session; _Nonnull dispatch_queue_t _queue; } - (instancetype)initWithChangeEventSubscriber:(id<RLMChangeEventDelegate>)subscriber delegateQueue:(nullable dispatch_queue_t)queue { if (self = [super init]) { _subscriber = subscriber; _queue = queue ?: dispatch_get_main_queue(); return self; } return nil; } - (void)didCloseWithError:(NSError *)error { dispatch_async(_queue, ^{ [_subscriber changeStreamDidCloseWithError:error]; }); } - (void)didOpen { dispatch_async(_queue, ^{ [_subscriber changeStreamDidOpen:self]; }); } - (void)didReceiveError:(nonnull NSError *)error { dispatch_async(_queue, ^{ [_subscriber changeStreamDidReceiveError:error]; }); } - (void)didReceiveEvent:(nonnull NSData *)event { std::string_view str = [[NSString alloc] initWithData:event encoding:NSUTF8StringEncoding].UTF8String; if (!str.empty() && _watchStream.state() == realm::app::WatchStream::State::NEED_DATA) { _watchStream.feed_buffer(str); } while (_watchStream.state() == realm::app::WatchStream::State::HAVE_EVENT) { id<RLMBSON> event = RLMConvertBsonToRLMBSON(_watchStream.next_event()); dispatch_async(_queue, ^{ [_subscriber changeStreamDidReceiveChangeEvent:event]; }); } if (_watchStream.state() == realm::app::WatchStream::State::HAVE_ERROR) { [self didReceiveError:RLMAppErrorToNSError(_watchStream.error())]; } } - (void)attachURLSession:(NSURLSession *)urlSession { _session = urlSession; } - (void)close { [_session invalidateAndCancel]; } @end @implementation RLMMongoCollection static realm::bson::BsonDocument toBsonDocument(id<RLMBSON> bson) { return realm::bson::BsonDocument(RLMConvertRLMBSONToBson(bson)); } static realm::bson::BsonArray toBsonArray(id<RLMBSON> bson) { return realm::bson::BsonArray(RLMConvertRLMBSONToBson(bson)); } - (instancetype)initWithUser:(RLMUser *)user serviceName:(NSString *)serviceName databaseName:(NSString *)databaseName collectionName:(NSString *)collectionName { if (self = [super init]) { _user = user; _serviceName = serviceName; _databaseName = databaseName; _name = collectionName; } return self; } - (realm::app::MongoCollection)collection:(NSString *)name { return _user._syncUser->mongo_client(self.serviceName.UTF8String) .db(self.databaseName.UTF8String).collection(name.UTF8String); } - (realm::app::MongoCollection)collection { return [self collection:self.name]; } - (void)findWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document options:(RLMFindOptions *)options completion:(RLMMongoFindBlock)completion { self.collection.find(toBsonDocument(document), [options _findOptions], [completion](realm::util::Optional<realm::bson::BsonArray> documents, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion((NSArray<NSDictionary<NSString *, id<RLMBSON>> *> *)RLMConvertBsonToRLMBSON(*documents), nil); }); } - (void)findWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoFindBlock)completion { [self findWhere:document options:[[RLMFindOptions alloc] init] completion:completion]; } - (void)findOneDocumentWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document options:(RLMFindOptions *)options completion:(RLMMongoFindOneBlock)completion { self.collection.find_one(toBsonDocument(document), [options _findOptions], [completion](realm::util::Optional<realm::bson::BsonDocument> document, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion((NSDictionary<NSString *, id<RLMBSON>> *)RLMConvertBsonToRLMBSON(*document), nil); }); } - (void)findOneDocumentWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoFindOneBlock)completion { [self findOneDocumentWhere:document options:[[RLMFindOptions alloc] init] completion:completion]; } - (void)insertOneDocument:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoInsertBlock)completion { self.collection.insert_one(toBsonDocument(document), [completion](realm::util::Optional<realm::bson::Bson> objectId, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion(RLMConvertBsonToRLMBSON(*objectId), nil); }); } - (void)insertManyDocuments:(NSArray<NSDictionary<NSString *, id<RLMBSON>> *> *)documents completion:(RLMMongoInsertManyBlock)completion { self.collection.insert_many(toBsonArray(documents), [completion](std::vector<realm::bson::Bson> insertedIds, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } NSMutableArray *insertedArr = [[NSMutableArray alloc] initWithCapacity:insertedIds.size()]; for (auto& objectId : insertedIds) { [insertedArr addObject:RLMConvertBsonToRLMBSON(objectId)]; } completion(insertedArr, nil); }); } - (void)aggregateWithPipeline:(NSArray<NSDictionary<NSString *, id<RLMBSON>> *> *)pipeline completion:(RLMMongoFindBlock)completion { self.collection.aggregate(toBsonArray(pipeline), [completion](realm::util::Optional<realm::bson::BsonArray> documents, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion((NSArray<id> *)RLMConvertBsonToRLMBSON(*documents), nil); }); } - (void)countWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document limit:(NSInteger)limit completion:(RLMMongoCountBlock)completion { self.collection.count(toBsonDocument(document), limit, [completion](uint64_t count, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(0, RLMAppErrorToNSError(*error)); } completion(static_cast<NSInteger>(count), nil); }); } - (void)countWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoCountBlock)completion { [self countWhere:document limit:0 completion:completion]; } - (void)deleteOneDocumentWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoCountBlock)completion { self.collection.delete_one(toBsonDocument(document), [completion](uint64_t count, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(0, RLMAppErrorToNSError(*error)); } completion(static_cast<NSInteger>(count), nil); }); } - (void)deleteManyDocumentsWhere:(NSDictionary<NSString *, id<RLMBSON>> *)document completion:(RLMMongoCountBlock)completion { self.collection.delete_many(toBsonDocument(document), [completion](uint64_t count, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(0, RLMAppErrorToNSError(*error)); } completion(static_cast<NSInteger>(count), nil); }); } - (void)updateOneDocumentWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument upsert:(BOOL)upsert completion:(RLMMongoUpdateBlock)completion { self.collection.update_one(toBsonDocument(filterDocument), toBsonDocument(updateDocument), upsert, [completion](realm::app::MongoCollection::UpdateResult result, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion([[RLMUpdateResult alloc] initWithUpdateResult:result], nil); }); } - (void)updateOneDocumentWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument completion:(RLMMongoUpdateBlock)completion { [self updateOneDocumentWhere:filterDocument updateDocument:updateDocument upsert:NO completion:completion]; } - (void)updateManyDocumentsWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument upsert:(BOOL)upsert completion:(RLMMongoUpdateBlock)completion { self.collection.update_many(toBsonDocument(filterDocument), toBsonDocument(updateDocument), upsert, [completion](realm::app::MongoCollection::UpdateResult result, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } completion([[RLMUpdateResult alloc] initWithUpdateResult:result], nil); }); } - (void)updateManyDocumentsWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument completion:(RLMMongoUpdateBlock)completion { [self updateManyDocumentsWhere:filterDocument updateDocument:updateDocument upsert:NO completion:completion]; } - (void)findOneAndUpdateWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument options:(RLMFindOneAndModifyOptions *)options completion:(RLMMongoFindOneBlock)completion { self.collection.find_one_and_update(toBsonDocument(filterDocument), toBsonDocument(updateDocument), [options _findOneAndModifyOptions], [completion](realm::util::Optional<realm::bson::BsonDocument> document, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } return completion((NSDictionary *)RLMConvertBsonDocumentToRLMBSON(document), nil); }); } - (void)findOneAndUpdateWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument updateDocument:(NSDictionary<NSString *, id<RLMBSON>> *)updateDocument completion:(RLMMongoFindOneBlock)completion { [self findOneAndUpdateWhere:filterDocument updateDocument:updateDocument options:[[RLMFindOneAndModifyOptions alloc] init] completion:completion]; } - (void)findOneAndReplaceWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument replacementDocument:(NSDictionary<NSString *, id<RLMBSON>> *)replacementDocument options:(RLMFindOneAndModifyOptions *)options completion:(RLMMongoFindOneBlock)completion { self.collection.find_one_and_replace(toBsonDocument(filterDocument), toBsonDocument(replacementDocument), [options _findOneAndModifyOptions], [completion](realm::util::Optional<realm::bson::BsonDocument> document, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } return completion((NSDictionary *)RLMConvertBsonDocumentToRLMBSON(document), nil); }); } - (void)findOneAndReplaceWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument replacementDocument:(NSDictionary<NSString *, id<RLMBSON>> *)replacementDocument completion:(RLMMongoFindOneBlock)completion { [self findOneAndReplaceWhere:filterDocument replacementDocument:replacementDocument options:[[RLMFindOneAndModifyOptions alloc] init] completion:completion]; } - (void)findOneAndDeleteWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument options:(RLMFindOneAndModifyOptions *)options completion:(RLMMongoDeleteBlock)completion { self.collection.find_one_and_delete(toBsonDocument(filterDocument), [options _findOneAndModifyOptions], [completion](realm::util::Optional<realm::bson::BsonDocument> document, realm::util::Optional<realm::app::AppError> error) { if (error) { return completion(nil, RLMAppErrorToNSError(*error)); } return completion((NSDictionary *)RLMConvertBsonDocumentToRLMBSON(document), nil); }); } - (void)findOneAndDeleteWhere:(NSDictionary<NSString *, id<RLMBSON>> *)filterDocument completion:(RLMMongoDeleteBlock)completion { [self findOneAndDeleteWhere:filterDocument options:[[RLMFindOneAndModifyOptions alloc] init] completion:completion]; } - (RLMChangeStream *)watchWithDelegate:(id<RLMChangeEventDelegate>)delegate delegateQueue:(nullable dispatch_queue_t)delegateQueue { return [self watchWithMatchFilter:nil idFilter:nil delegate:delegate delegateQueue:delegateQueue]; } - (RLMChangeStream *)watchWithFilterIds:(NSArray<RLMObjectId *> *)filterIds delegate:(id<RLMChangeEventDelegate>)delegate delegateQueue:(nullable dispatch_queue_t)delegateQueue { return [self watchWithMatchFilter:nil idFilter:filterIds delegate:delegate delegateQueue:delegateQueue]; } - (RLMChangeStream *)watchWithMatchFilter:(NSDictionary<NSString *, id<RLMBSON>> *)matchFilter delegate:(id<RLMChangeEventDelegate>)delegate delegateQueue:(nullable dispatch_queue_t)delegateQueue { return [self watchWithMatchFilter:matchFilter idFilter:nil delegate:delegate delegateQueue:delegateQueue]; } - (RLMChangeStream *)watchWithMatchFilter:(nullable id<RLMBSON>)matchFilter idFilter:(nullable id<RLMBSON>)idFilter delegate:(id<RLMChangeEventDelegate>)delegate delegateQueue:(nullable dispatch_queue_t)queue { realm::bson::BsonDocument baseArgs = { {"database", self.databaseName.UTF8String}, {"collection", self.name.UTF8String} }; if (matchFilter) { baseArgs["filter"] = RLMConvertRLMBSONToBson(matchFilter); } if (idFilter) { baseArgs["ids"] = RLMConvertRLMBSONToBson(idFilter); } auto args = realm::bson::BsonArray{baseArgs}; auto app = self.user.app._realmApp; auto request = app->make_streaming_request(app->current_user(), "watch", args, realm::util::Optional<std::string>(self.serviceName.UTF8String)); RLMChangeStream *changeStream = [[RLMChangeStream alloc] initWithChangeEventSubscriber:delegate delegateQueue:queue]; RLMNetworkTransport *transport = self.user.app.configuration.transport; RLMRequest *rlmRequest = [transport RLMRequestFromRequest:request]; NSURLSession *watchSession = [transport doStreamRequest:rlmRequest eventSubscriber:changeStream]; [changeStream attachURLSession:watchSession]; return changeStream; } @end