当将响应式编程模型与Spring Data MongoDB一起使用时,可以像这样执行事务:
Mono<DeleteResult> result = template.inTransaction()
.execute(action -> action.remove(query(where("id").is("step-1")), Step.class));
但是Spring Data MongoDB也支持“反应性存储库”,例如:
public interface PersonRepository extends ReactiveMongoRepository<Person, String>
Flux<Person> findByLocationNear(Point location, Distance distance);
}
和
public interface CarRepository extends ReactiveMongoRepository<Car, String>
Flux<Car> findByYear(int year);
}
我的问题是,假设您有ReactiveMongoRepository
,能否以某种方式利用MongoDB事务,例如在同一笔交易中同时插入Person
和Car
(在这种情况下使用PersonRepository
和CarRepository
)?如果是这样,您该怎么做?
我也一直在努力寻找Mongo DB和Spring Boot的响应式支持Transactional的解决方案
但是幸运的是我自己想通了。尽管google的一些功能也无济于事,但这些都是无反应的。
您需要将ReactiveMongoTransactionManager与ReactiveMongoDatabaseFactory一起使用,最后大部分细节,也共享相同的代码存储库
为了使mongo数据库支持事务,我们需要确保数据库应以副本方式运行。相同的说明如下:-
version: "3"
services:
mongo:
hostname: mongo
container_name: localmongo_docker
image: mongo
expose:
- 27017
ports:
- 27017:27017
restart: always
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
volumes:
- ./mongodata:/data/db # need to create a docker volume named as mongodata first
docker exec -it localmongo_docker mongo
rs.initiate(
{
_id : 'rs0',
members: [
{ _id : 0, host : "mongo:27017" }
]
}
)
重要
该代码的重要说明如下:-
主要部分是Bean
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
return new ReactiveMongoTransactionManager(dbFactory);
}
为了检查代码的Trasactional要求是否正常工作,您可以通过服务包here中的UserService类进行检查
如果链接对某人无效,请共享代码:-
Bean中的配置和内部
@Configuration public class MongoConfiguration extends AbstractMongoClientConfiguration { @Autowired private MongoProperties mongoProperties; @Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) { return new ReactiveMongoTransactionManager(dbFactory); } @Override protected String getDatabaseName() { return mongoProperties.getDatabase(); } @Override public MongoClient mongoClient() { return MongoClients.create(mongoProperties.getUri()); } }
application.properties(与mongo数据库有关)
spring.data.mongodb.database=mongo spring.data.mongodb.uri=mongodb://localhost:27017/mongo?replicaSet=rs0
文档类
角色类
@Getter @Setter @Accessors(chain = true) @Document(collection = "roles") @TypeAlias("role") public class Role implements Persistable<String> { @Id private String id; @Field("role_name") @Indexed(unique = true) private String role; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
用户类别
@Getter @Setter @Accessors(chain = true) @Document(collection = "users") @JsonInclude(JsonInclude.Include.NON_NULL) @TypeAlias("user") public class User implements Persistable<String> { @Id() private String id; @Field("username") @Indexed(unique = true) @JsonProperty("username") private String userName; @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private String password; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @DBRef(lazy = true) @JsonProperty("roles") private List<Role> roles = new ArrayList(); @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
UserProfile类
@Getter @Setter @Accessors(chain = true) @Document(collection = "user_profiles") @JsonInclude(JsonInclude.Include.NON_NULL) @TypeAlias("user_profile") public class UserProfile implements Persistable<String> { @Id private String id; @Indexed(unique = true) private String mobile; @Indexed(unique = true) private String email; private String address; private String firstName; private String lastName; @DBRef private User user; @CreatedDate private ZonedDateTime created; @LastModifiedDate private ZonedDateTime updated; private Boolean deleted; private Boolean enabled; @Override @JsonIgnore public boolean isNew() { if(getCreated() == null) return true; else return false; } }
ReactiveMongoRepository接口(S)>
RoleRepository
public interface RoleRepository extends ReactiveMongoRepository<Role, String> { Mono<Role> findByRole(String role); Flux<Role> findAllByRoleIn(List<String> roles); }
UserRepository
public interface UserRepository extends ReactiveMongoRepository<User, String> { Mono<User> findByUserName(String userName); }
UserProfileRepository
需要在这里创建您自己的RuntimeException类,这里是AppRuntimeException类,我一直在使用]]public interface UserProfileRepository extends ReactiveMongoRepository<UserProfile, String> { }
[用户服务类
@Slf4j @Service public class UserService { @Autowired private RoleRepository roleRepository; @Autowired private UserRepository userRepository; @Autowired private UserProfileRepository userProfileRepository; @Transactional public Mono<UserProfile> saveUserAndItsProfile(final UserRequest userRequest) { Mono<Role> roleMono = roleRepository.findByRole("USER"); Mono<User> userMono = roleMono.flatMap(r -> { User user = new User() .setUserName(userRequest.getUsername()) .setPassword(userRequest.getPassword()); user.setRoles(Arrays.asList(r)); return userRepository.save(user); }).onErrorResume(ex -> { log.error(ex.getMessage()); if(ex instanceof DuplicateKeyException) { String errorMessage = "The user with the username '"+userRequest.getUsername()+"' already exists"; log.error(errorMessage); return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex)); } return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex)); }); Mono<UserProfile> userProfileMono = userMono.flatMap(u -> { UserProfile userProfile = new UserProfile() .setAddress(userRequest.getAddress()) .setEmail(userRequest.getEmail()) .setMobile(userRequest.getMobile()) .setUser(u); return userProfileRepository.save(userProfile); }).onErrorResume(ex -> { log.error(ex.getMessage()); if(ex instanceof DuplicateKeyException) { String errorMessage = "The user with the profile mobile'"+userRequest.getMobile()+"' and/or - email '"+userRequest.getEmail()+"' already exists"; log.error(errorMessage); return Mono.error(new AppRuntimeException(errorMessage, ErrorCodes.CONFLICT, ex)); } return Mono.error(new AppRuntimeException(ex.getMessage(), ErrorCodes.INTERNAL_SERVER_ERROR, ex)); }); return userProfileMono; } }
控制器和模型类
UserRequest模型类
@Getter @Setter @Accessors(chain = true) @Slf4j @JsonInclude(JsonInclude.Include.NON_NULL) public class UserRequest { private String username; private String password; private String mobile; private String email; private String address; private String firstName; private String lastName; }
UserProfileApisController类
@Slf4j @RestController @RequestMapping("/apis/user/profile") public class UserProfileApisController { @Autowired private UserService userService; @PostMapping public Mono<UserProfile> saveUserProfile(final @RequestBody UserRequest userRequest) { return userService.saveUserAndItsProfile(userRequest); } }