首页

通过camel-solr源码包SolrEndpoint依赖apache的CommonsHttpSolrServer实现自定义solr的组件化应用

标签:自定义solr组件,CommonsHttpSolrServer,camel-solr,solrj,component     发布时间:2017-12-26   

一、前言

通过Apachesolr实现包依赖org.apache.solr.client.solrj.impl.CommonsHttpSolrServer(2.9.4)服务拓展使用org.apache.camel.component.solr.SolrEndpoint、org.apache.solr.common.SolrInputDocument.SolrProducer自定义apache solr端应用服务。

二、源码说明

1.SolrEndpoint部分

package org.apache.camel.component.solr;@b@@b@import java.util.Map;@b@import org.apache.camel.Consumer;@b@import org.apache.camel.Processor;@b@import org.apache.camel.Producer;@b@import org.apache.camel.impl.DefaultEndpoint;@b@import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;@b@import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;@b@@b@public class SolrEndpoint extends DefaultEndpoint@b@{@b@  private CommonsHttpSolrServer solrServer;@b@  private CommonsHttpSolrServer streamingSolrServer;@b@  private String requestHandler;@b@  private int streamingThreadCount;@b@  private int streamingQueueSize;@b@@b@  public SolrEndpoint(String endpointUri, SolrComponent component, String address, Map<String, Object> parameters)@b@    throws Exception@b@  {@b@    super(endpointUri, component);@b@@b@    this.solrServer = new CommonsHttpSolrServer("http://" + address);@b@    this.streamingQueueSize = getIntFromString((String)parameters.get("streamingQueueSize"), 10);@b@    this.streamingThreadCount = getIntFromString((String)parameters.get("streamingThreadCount"), 2);@b@    this.streamingSolrServer = new StreamingUpdateSolrServer("http://" + address, this.streamingQueueSize, this.streamingThreadCount);@b@  }@b@@b@  public static int getIntFromString(String value, int defaultValue) {@b@    if ((value != null) && (value.length() > 0))@b@      return Integer.parseInt(value);@b@@b@    return defaultValue;@b@  }@b@@b@  public Producer createProducer() throws Exception@b@  {@b@    return new SolrProducer(this);@b@  }@b@@b@  public Consumer createConsumer(Processor processor) throws Exception@b@  {@b@    throw new UnsupportedOperationException("Consumer not supported for Solr endpoint.");@b@  }@b@@b@  public boolean isSingleton()@b@  {@b@    return true;@b@  }@b@@b@  public CommonsHttpSolrServer getSolrServer() {@b@    return this.solrServer;@b@  }@b@@b@  public CommonsHttpSolrServer getStreamingSolrServer() {@b@    return this.streamingSolrServer;@b@  }@b@@b@  public void setStreamingSolrServer(CommonsHttpSolrServer streamingSolrServer) {@b@    this.streamingSolrServer = streamingSolrServer;@b@  }@b@@b@  public void setMaxRetries(int maxRetries) {@b@    this.solrServer.setMaxRetries(maxRetries);@b@    this.streamingSolrServer.setMaxRetries(maxRetries);@b@  }@b@@b@  public void setSoTimeout(int soTimeout) {@b@    this.solrServer.setSoTimeout(soTimeout);@b@    this.streamingSolrServer.setSoTimeout(soTimeout);@b@  }@b@@b@  public void setConnectionTimeout(int connectionTimeout) {@b@    this.solrServer.setConnectionTimeout(connectionTimeout);@b@    this.streamingSolrServer.setConnectionTimeout(connectionTimeout);@b@  }@b@@b@  public void setDefaultMaxConnectionsPerHost(int defaultMaxConnectionsPerHost) {@b@    this.solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);@b@    this.streamingSolrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);@b@  }@b@@b@  public void setMaxTotalConnections(int maxTotalConnections) {@b@    this.solrServer.setMaxTotalConnections(maxTotalConnections);@b@    this.streamingSolrServer.setMaxTotalConnections(maxTotalConnections);@b@  }@b@@b@  public void setFollowRedirects(boolean followRedirects) {@b@    this.solrServer.setFollowRedirects(followRedirects);@b@    this.streamingSolrServer.setFollowRedirects(followRedirects);@b@  }@b@@b@  public void setAllowCompression(boolean allowCompression) {@b@    this.solrServer.setAllowCompression(allowCompression);@b@    this.streamingSolrServer.setAllowCompression(allowCompression);@b@  }@b@@b@  public void setRequestHandler(String requestHandler) {@b@    this.requestHandler = requestHandler;@b@  }@b@@b@  public String getRequestHandler() {@b@    return this.requestHandler;@b@  }@b@@b@  public int getStreamingThreadCount() {@b@    return this.streamingThreadCount;@b@  }@b@@b@  public void setStreamingThreadCount(int streamingThreadCount) {@b@    this.streamingThreadCount = streamingThreadCount;@b@  }@b@@b@  public int getStreamingQueueSize() {@b@    return this.streamingQueueSize;@b@  }@b@@b@  public void setStreamingQueueSize(int streamingQueueSize) {@b@    this.streamingQueueSize = streamingQueueSize;@b@  }@b@}

2.SolrProducer部分

package org.apache.camel.component.solr;@b@@b@import java.io.File;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import org.apache.camel.Exchange;@b@import org.apache.camel.Message;@b@import org.apache.camel.WrappedFile;@b@import org.apache.camel.impl.DefaultProducer;@b@import org.apache.solr.client.solrj.SolrServer;@b@import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;@b@import org.apache.solr.client.solrj.request.DirectXmlRequest;@b@import org.apache.solr.client.solrj.request.UpdateRequest;@b@import org.apache.solr.common.SolrException;@b@import org.apache.solr.common.SolrException.ErrorCode;@b@import org.apache.solr.common.SolrInputDocument;@b@@b@public class SolrProducer extends DefaultProducer@b@{@b@  private SolrServer solrServer;@b@  private SolrServer streamingSolrServer;@b@@b@  public SolrProducer(SolrEndpoint endpoint)@b@  {@b@    super(endpoint);@b@    this.solrServer = endpoint.getSolrServer();@b@    this.streamingSolrServer = endpoint.getStreamingSolrServer();@b@  }@b@@b@  public void process(Exchange exchange)@b@    throws Exception@b@  {@b@    String operation = (String)exchange.getIn().getHeader("SolrOperation");@b@@b@    if (operation == null) {@b@      throw new IllegalArgumentException("SolrOperation header is missing");@b@    }@b@@b@    if (operation.equalsIgnoreCase("INSERT"))@b@      insert(exchange, false);@b@    else if (operation.equalsIgnoreCase("INSERT_STREAMING"))@b@      insert(exchange, true);@b@    else if (operation.equalsIgnoreCase("DELETE_BY_ID"))@b@      this.solrServer.deleteById((String)exchange.getIn().getBody(String.class));@b@    else if (operation.equalsIgnoreCase("DELETE_BY_QUERY"))@b@      this.solrServer.deleteByQuery((String)exchange.getIn().getBody(String.class));@b@    else if (operation.equalsIgnoreCase("ADD_BEAN"))@b@      this.solrServer.addBean(exchange.getIn().getBody());@b@    else if (operation.equalsIgnoreCase("COMMIT"))@b@      this.solrServer.commit();@b@    else if (operation.equalsIgnoreCase("ROLLBACK"))@b@      this.solrServer.rollback();@b@    else if (operation.equalsIgnoreCase("OPTIMIZE"))@b@      this.solrServer.optimize();@b@    else@b@      throw new IllegalArgumentException("SolrOperation header value '" + operation + "' is not supported");@b@  }@b@@b@  private void insert(Exchange exchange, boolean isStreaming)@b@    throws Exception@b@  {@b@    Object body = exchange.getIn().getBody();@b@    if (body instanceof WrappedFile) {@b@      body = ((WrappedFile)body).getFile();@b@    }@b@@b@    if (body instanceof File)@b@    {@b@      ContentStreamUpdateRequest updateRequest = new ContentStreamUpdateRequest(getRequestHandler());@b@      updateRequest.addFile((File)body);@b@@b@      for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@        if (((String)entry.getKey()).startsWith("SolrParam.")) {@b@          String paramName = ((String)entry.getKey()).substring("SolrParam.".length());@b@          updateRequest.setParam(paramName, entry.getValue().toString());@b@        }@b@@b@@b@      if (isStreaming)@b@        updateRequest.process(this.streamingSolrServer);@b@      else@b@        updateRequest.process(this.solrServer);@b@@b@    }@b@    else if (body instanceof SolrInputDocument)@b@    {@b@      UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());@b@      updateRequest.add((SolrInputDocument)body);@b@@b@      if (isStreaming)@b@        updateRequest.process(this.streamingSolrServer);@b@      else@b@        updateRequest.process(this.solrServer);@b@@b@    }@b@    else@b@    {@b@      boolean hasSolrHeaders = false;@b@      Map headers = exchange.getIn().getHeaders();@b@      for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@        if (((String)entry.getKey()).startsWith("SolrField.")) {@b@          hasSolrHeaders = true;@b@          break;@b@        }@b@@b@@b@      if (hasSolrHeaders)@b@      {@b@        UpdateRequest updateRequest = new UpdateRequest(getRequestHandler());@b@@b@        SolrInputDocument doc = new SolrInputDocument();@b@        for (Map.Entry entry : exchange.getIn().getHeaders().entrySet())@b@          if (((String)entry.getKey()).startsWith("SolrField.")) {@b@            String fieldName = ((String)entry.getKey()).substring("SolrField.".length());@b@            doc.setField(fieldName, entry.getValue());@b@          }@b@@b@        updateRequest.add(doc);@b@@b@        if (isStreaming)@b@          updateRequest.process(this.streamingSolrServer);@b@        else@b@          updateRequest.process(this.solrServer);@b@@b@      }@b@      else if (body instanceof String)@b@      {@b@        String bodyAsString = (String)body;@b@@b@        if (!(bodyAsString.startsWith("<add"))) {@b@          bodyAsString = "<add>" + bodyAsString + "</add>";@b@        }@b@@b@        DirectXmlRequest xmlRequest = new DirectXmlRequest(getRequestHandler(), bodyAsString);@b@@b@        if (isStreaming)@b@          this.streamingSolrServer.request(xmlRequest);@b@        else@b@          this.solrServer.request(xmlRequest);@b@      }@b@      else {@b@        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "unable to find data in Exchange to update Solr");@b@      }@b@    }@b@  }@b@@b@  private String getRequestHandler() {@b@    String requestHandler = getEndpoint().getRequestHandler();@b@    return ((requestHandler == null) ? "/update" : requestHandler);@b@  }@b@@b@  public SolrEndpoint getEndpoint()@b@  {@b@    return ((SolrEndpoint)super.getEndpoint());@b@  }@b@}